blog.kdgregory.com

Tuesday, January 3, 2023

Using EventBridge Pipes to route CloudWatch Logs events to Kinesis

Several years ago I wrote about connecting CloudWatch Logs into into a logging pipeline built on Kinesis Streams and Kinesis Firehose. To make that work, I wrote a Lambda.

AWS announced EventBridge Pipes at re:Invent 2022, as an easy way to connect messaging services. Connecting CloudWatch Logs to Kinesis seemed like a good way to kick the tires on this service. It still requires a custom Lambda, but that Lambda is now much simpler, because it doesn't need to concern itself with the quirks of PutRecords.

The Problem

When you subscribe a Kinesis stream to a log group, CloudWatch writes messages that look like this:

H4sIAAAAAAAAA62QTU7DMBCF95zC8jqO7LHd2NkFNVRCdNWyggqF1gqG/MlxW6qqd2cK7QWA5bw382a+OdLWjWNVu+VhcDSn02JZvMzLxaKYlTSh/b5zAWUBUulJZiwXgHLT17PQbwd0imFw3caF8rNqh8b9mIsYXNWi+9DX6l6wi8mAAwgAxXx88413HZNaWDHBqXH7Oq6DH6LvuzvfRBdGmj/Ra+zqO7fcuS6e9SP1G4yXGRgltdLcGK41SKu0AQ2KC55hrYTNsJCWc5CCW5CgteEG90WP4BHDaS4mmTDGAOKBTa4PwfjzuUwAA0W4yUHl0iTYQqbl7eOMPLkLFV+Rdd+mH5s6uLoPhxQvVe9ptR/TS0s6r3xHGNlVzdYRPxINzx09JX/DsP+LIX6LsTrdfAHRwAP2RwIAAA==

That's a GZipped and Base64-encoded JSON object. When decoded, unzipped, and pretty-printed, it looks like this:

{
  "messageType": "DATA_MESSAGE",
  "owner": "123456789012",
  "logGroup": "AppenderExample",
  "logStream": "Log4J1-Example-20221224-ithilien-351916",
  "subscriptionFilters": [
    "Example"
  ],
  "logEvents": [
    {
      "id": "37284354508805523945825240107552419724039002310923255808",
      "timestamp": 1671888278929,
      "message": "2022-12-24 08:24:38,929 DEBUG [example-0] com.kdgregory.log4j.aws.example.Main - value is 52\n"
    },
    {
      "id": "37284354508805523945825240107552419724039002310923255809",
      "timestamp": 1671888278929,
      "message": "2022-12-24 08:24:38,929 DEBUG [example-1] com.kdgregory.log4j.aws.example.Main - value is 52\n"
    }
  ]
}

As you can see, CloudWatch combines multiple log events into a single message. However, most destinations — such as Elasticsearch — want individual events, not a nested array. So you need something that pulls apart the original event. For example, the AWS-provided OpenSearch integration, which subscribes a Lambda to the log group, decomposes the messages, and writes them to the cluster. My previous implementation did a similar transformation, but wrote the log messages to a Kinesis stream.

EventBridge Pipes

EventBridge Pipes is modeled around a four-step process (image sourced from docs):

EventBridge Pipes data flow

For simple pipelines, EventBridge gives you a “no code” solution that eliminates the middle two steps: you define an output format using path references to fields in the input messages. Unfortunately, this isn't usable for CloudWatch Logs messages: it can't (currently) handle Gzipped data, and can't extract the elements in an array. So we need to add an “enrichment” step that performs this operation.

Implementation

You can implement the enrichment step in multiple ways, including an HTTP(S) API, but the easiest is a Lambda. Pipes invokes this Lambda with an event that contains one or more source records, and it returns returns zero or more result records.

The source event looks like this:

[
  {
    "eventSource": "aws:kinesis",
    "eventVersion": "1.0",
    "eventID": "shardId-000000000000:49636526735590005452916707570422271019264002608211165186",
    "eventName": "aws:kinesis:record",
    "invokeIdentityArn": "arn:aws:iam::123456789012:role/CloudWatchTransformPipeline-PipeRole-us-east-1",
    "awsRegion": "us-east-1",
    "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/log-subscription",
    "kinesisSchemaVersion": "1.0",
    "partitionKey": "87362ce96c65eb1b3125bf0129b99bee",
    "sequenceNumber": "49636526735590005452916707570422271019264002608211165186",
    "data": "...",
    "approximateArrivalTimestamp": 1672490859.613
  }
]

The first thing to call out is that this event is an array. EventBridge Pipes can batch messages together, so you must loop over the entire event and combine the results from each message (ie, a flat-map):

def lambda_handler(event, context):
    result = []
    for message in event:
        result += transform_messages(message)
    return result

The data element is the GZipped and Base64-encoded message that I showed above. You process it with code that looks like this:

data = base64.b64decode(message['data'])
data = gzip.decompress(data)
payload = json.loads(data)

The last thing to be aware of is that there are two types of messages that you'll get from a CloudWatch Logs subscription, identified by the messageType field. When the subscription first starts, CloudWatch sends a control message to the stream to verify that it can write messages. After that, it sends data messages. Your Lambda should ignore everything but the latter.

if payload['messageType'] == 'DATA_MESSAGE':
    # extract log events
else:
    return []

Conclusion

This post has shown you the basics of an EventBridge Pipes transform. You can find my complete example, which performs several transforms on the log events, here.

While I don't think that EventBridge Pipes quite lives up to the hype of Werner Vogels' keynote (and that it has nothing in common with EventBridge other than a name), it does allow you to focus on your logic rather than the quirks of your source and destination. As such, it's a useful addition to your data transformation toolbox.

Sunday, January 1, 2023

Does that i4i.metal instance really help your build times?

Here in Philadelphia, the last day of 2022 was rainy and foggy; a day to stay inside. That, combined with $100 in AWS credits that were due to expire with the new year, made me decide to add another entry to my series of posts evaluating build performance. This time, I'm comparing my desktop PC to the current AWS king of the hill, an i4i.metal instance.

If you don't make a habit of reading EC2 instance specs, the i4i.metal has 128 virtual CPUs (32 actual cores), 1 terabyte of RAM, 30 terabytes of locally-attached SSD storage spread over 8 drives, and 75,000 megabits of network bandwidth. It costs $10.982 per hour ($96,268.212 per year) on-demand in us-east-1. There are more expensive EC2 instance types; the p3dn.24xlarge costs $31.212 per hour, but has fewer vCPUs, less memory, and less disk.

To see if the local SSD offered a performance improvement over EBS, I configured a 1 terabyte gp3 drive as the machine's root volume, and formatted one of the attached SSDs as ext4. I created a separate user, whose home directory was on the SSD, so all of the reads and writes for the builds should have stayed there.

As a comparison, I have my ten-year-old desktop: an i7-3770k (not overclocked) with 32 GB of RAM and a 512 GB Samsung 860 Pro SSD. It cost around $750 to build (no graphics card). I also spun up an m5.large EC2 instance, for consistency with previous tests.

Both of the AWS instances were running Amazon Linux 2; my devbox runs Xubuntu 20.04. All machines ran OpenJDK 1.8, although the specific builds varied (it's whatever the package manager installed). And I manually installed Maven 3.5.4 on all of the machines (it's what I'm running at home).

For a test workload, I built the AWS V2 Java SDK (commit 0f7779ef, corresponding to version 2.19.9-SNAPSHOT). This is a much bigger build than most people will ever deal with: almost 400 sub-projects, and over 100,000 Java files (including tests and generated code). I skipped tests, because it was clear that some of the tests involved timing delays; I wanted to see pure CPU/disk performance.

I ran three builds with each configuration. The first was a “clean” build, forcing Maven to download all dependencies. The second was with empty buffer cache (echo 3 > /proc/sys/vm/drop_caches), and the third, run immediately afterward, should have benefited from any files that were in the cache. As it turned out, the numbers were so close that it didn't make sense to report them separately; all numbers in the table below are from the “empty cache” run (ie, no downloads involved). As with previous tests, I show the output from the Linux time command; the “real” column is the elapsed time of the build, and is what developers care about.

  Real User Sys
My DevBox 27m 24.406s 80m 3.772s 1m 16.792s
m5.large 54m 40.495s 94m 1.207s 1m 46.047s
i4i.metal EBS 17m 49.500s 97m 30.694s 2m 19.089s
i4i.metal SSD 17m 44.165s 97m 6.427s 2m 11.365s

I have to be honest, I was surprised: I expected a small improvement in performance, but not 35%! My guess at the reason for the difference, based on the higher ”user” time, is that it's able to make more effective use of those 32 cores. The 55 MB CPU cache (versus 8MB on my PC) may have helped as well. I think the only way to summarize this test is with the words of Ferris Bueller:

It is so choice. If you have the means, I highly recommend picking one up.

Wednesday, December 14, 2022

Deprecating log4j-aws-appenders

I just made what I hope is the last release of my log4j-aws-appenders library. It started almost six years ago as a "weekend project" to produce a Log4J 1.x appender that wrote output to CloudWatch Logs. It has expanded over the years: I added Kinesis Streams and SNS as destinations, added support for Log4J 2.x and Logback, added support for the AWS 2.x Java SDK, and maintained the code as something I could be proud of.

And other people found it useful: it's received a handful of GitHub stars, and sees around 30,000 downloads a month from Maven Central (overwhelmingly for Log4J 1.x!).

But I think it's outlived its usefulness, and there are better options.

I'll start with an explanation of why I wrote it. At the time, I was working on a project that had around a hundred EC2 virtual machines running pre-baked AMIs, and using the AWS-provided logging agent to write to CloudWatch. The pre-baked AMI meant that everything went to the same log stream: we didn't configure separate streams for separate instances. The agent wasn't (at that time) very good at recognizing the boundaries between log messages, and the multi-threaded nature of our servers meant that messages would be interleaved anyway. Throw in CloudWatch's limited ability to filter messages, and the result was a mess.

As a result, I had two explicit goals for the first release: ensure that a Java LogEvent was written as a single CloudWatch Logs event (including stack traces), and allow different instances to write to different log streams. And this was better, but not great, because it still meant interleaved messages from different threads. So I implemented the Kinesis appender, to send log messages to Elasticsearch. I don't think my boss liked the monthly cost of a six-node Elasticsearch cluster, but the other developers liked the ability to quickly isolate a single web request and all of its related operations.

So why deprecate it? The answer is that the AWS world has moved forward in the past six years. CloudWatch is the default logging destination for Lambda and ECS, and the CloudWatch agent for EC2 has gotten much better. And Insights has made searching CloudWatch much easier, especially if you write log messages in JSON.

So rather than swim against the flow, I recommend that you use CloudWatch Logs rather than my library. For Lambda (and ECS or EKS with the awslogs driver), just write to Standard Error; it will be picked up by the runtime and sent to CloudWatch. For EC2, use the logging agent, configured to parse individual events. If you want to use Elasticsearch, you can use a subscription to get the data out of CloudWatch Logs (something that's become easier with EventBridge Pipes, the topic of an upcoming post). Or you can use an HTTP appender (available in all the major logging frameworks), and wire up your own pipeline using API Gateway's Kinesis integration (something else that wasn't available six years ago).

With all of these options, my library is superfluous. And while I think it's mature, stable, and bug-free, using it exposes your project to the risk of having a single volunteer maintainer responsible for a key part of your infrastructure.