Tuesday, January 3, 2023

Using EventBridge Pipes to route CloudWatch Logs events to Kinesis

Several years ago I wrote about connecting CloudWatch Logs into into a logging pipeline built on Kinesis Streams and Kinesis Firehose. To make that work, I wrote a Lambda.

AWS announced EventBridge Pipes at re:Invent 2022, as an easy way to connect messaging services. Connecting CloudWatch Logs to Kinesis seemed like a good way to kick the tires on this service. It still requires a custom Lambda, but that Lambda is now much simpler, because it doesn't need to concern itself with the quirks of PutRecords.

The Problem

When you subscribe a Kinesis stream to a log group, CloudWatch writes messages that look like this:

H4sIAAAAAAAAA62QTU7DMBCF95zC8jqO7LHd2NkFNVRCdNWyggqF1gqG/MlxW6qqd2cK7QWA5bw382a+OdLWjWNVu+VhcDSn02JZvMzLxaKYlTSh/b5zAWUBUulJZiwXgHLT17PQbwd0imFw3caF8rNqh8b9mIsYXNWi+9DX6l6wi8mAAwgAxXx88413HZNaWDHBqXH7Oq6DH6LvuzvfRBdGmj/Ra+zqO7fcuS6e9SP1G4yXGRgltdLcGK41SKu0AQ2KC55hrYTNsJCWc5CCW5CgteEG90WP4BHDaS4mmTDGAOKBTa4PwfjzuUwAA0W4yUHl0iTYQqbl7eOMPLkLFV+Rdd+mH5s6uLoPhxQvVe9ptR/TS0s6r3xHGNlVzdYRPxINzx09JX/DsP+LIX6LsTrdfAHRwAP2RwIAAA==

That's a GZipped and Base64-encoded JSON object. When decoded, unzipped, and pretty-printed, it looks like this:

{
  "messageType": "DATA_MESSAGE",
  "owner": "123456789012",
  "logGroup": "AppenderExample",
  "logStream": "Log4J1-Example-20221224-ithilien-351916",
  "subscriptionFilters": [
    "Example"
  ],
  "logEvents": [
    {
      "id": "37284354508805523945825240107552419724039002310923255808",
      "timestamp": 1671888278929,
      "message": "2022-12-24 08:24:38,929 DEBUG [example-0] com.kdgregory.log4j.aws.example.Main - value is 52\n"
    },
    {
      "id": "37284354508805523945825240107552419724039002310923255809",
      "timestamp": 1671888278929,
      "message": "2022-12-24 08:24:38,929 DEBUG [example-1] com.kdgregory.log4j.aws.example.Main - value is 52\n"
    }
  ]
}

As you can see, CloudWatch combines multiple log events into a single message. However, most destinations — such as Elasticsearch — want individual events, not a nested array. So you need something that pulls apart the original event. For example, the AWS-provided OpenSearch integration, which subscribes a Lambda to the log group, decomposes the messages, and writes them to the cluster. My previous implementation did a similar transformation, but wrote the log messages to a Kinesis stream.

EventBridge Pipes

EventBridge Pipes is modeled around a four-step process (image sourced from docs):

EventBridge Pipes data flow

For simple pipelines, EventBridge gives you a “no code” solution that eliminates the middle two steps: you define an output format using path references to fields in the input messages. Unfortunately, this isn't usable for CloudWatch Logs messages: it can't (currently) handle Gzipped data, and can't extract the elements in an array. So we need to add an “enrichment” step that performs this operation.

Implementation

You can implement the enrichment step in multiple ways, including an HTTP(S) API, but the easiest is a Lambda. Pipes invokes this Lambda with an event that contains one or more source records, and it returns returns zero or more result records.

The source event looks like this:

[
  {
    "eventSource": "aws:kinesis",
    "eventVersion": "1.0",
    "eventID": "shardId-000000000000:49636526735590005452916707570422271019264002608211165186",
    "eventName": "aws:kinesis:record",
    "invokeIdentityArn": "arn:aws:iam::123456789012:role/CloudWatchTransformPipeline-PipeRole-us-east-1",
    "awsRegion": "us-east-1",
    "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/log-subscription",
    "kinesisSchemaVersion": "1.0",
    "partitionKey": "87362ce96c65eb1b3125bf0129b99bee",
    "sequenceNumber": "49636526735590005452916707570422271019264002608211165186",
    "data": "...",
    "approximateArrivalTimestamp": 1672490859.613
  }
]

The first thing to call out is that this event is an array. EventBridge Pipes can batch messages together, so you must loop over the entire event and combine the results from each message (ie, a flat-map):

def lambda_handler(event, context):
    result = []
    for message in event:
        result += transform_messages(message)
    return result

The data element is the GZipped and Base64-encoded message that I showed above. You process it with code that looks like this:

data = base64.b64decode(message['data'])
data = gzip.decompress(data)
payload = json.loads(data)

The last thing to be aware of is that there are two types of messages that you'll get from a CloudWatch Logs subscription, identified by the messageType field. When the subscription first starts, CloudWatch sends a control message to the stream to verify that it can write messages. After that, it sends data messages. Your Lambda should ignore everything but the latter.

if payload['messageType'] == 'DATA_MESSAGE':
    # extract log events
else:
    return []

Conclusion

This post has shown you the basics of an EventBridge Pipes transform. You can find my complete example, which performs several transforms on the log events, here.

While I don't think that EventBridge Pipes quite lives up to the hype of Werner Vogels' keynote (and that it has nothing in common with EventBridge other than a name), it does allow you to focus on your logic rather than the quirks of your source and destination. As such, it's a useful addition to your data transformation toolbox.

Sunday, January 1, 2023

Does that i4i.metal instance really help your build times?

Here in Philadelphia, the last day of 2022 was rainy and foggy; a day to stay inside. That, combined with $100 in AWS credits that were due to expire with the new year, made me decide to add another entry to my series of posts evaluating build performance. This time, I'm comparing my desktop PC to the current AWS king of the hill, an i4i.metal instance.

If you don't make a habit of reading EC2 instance specs, the i4i.metal has 128 virtual CPUs (32 actual cores), 1 terabyte of RAM, 30 terabytes of locally-attached SSD storage spread over 8 drives, and 75,000 megabits of network bandwidth. It costs $10.982 per hour ($96,268.212 per year) on-demand in us-east-1. There are more expensive EC2 instance types; the p3dn.24xlarge costs $31.212 per hour, but has fewer vCPUs, less memory, and less disk.

To see if the local SSD offered a performance improvement over EBS, I configured a 1 terabyte gp3 drive as the machine's root volume, and formatted one of the attached SSDs as ext4. I created a separate user, whose home directory was on the SSD, so all of the reads and writes for the builds should have stayed there.

As a comparison, I have my ten-year-old desktop: an i7-3770k (not overclocked) with 32 GB of RAM and a 512 GB Samsung 860 Pro SSD. It cost around $750 to build (no graphics card). I also spun up an m5.large EC2 instance, for consistency with previous tests.

Both of the AWS instances were running Amazon Linux 2; my devbox runs Xubuntu 20.04. All machines ran OpenJDK 1.8, although the specific builds varied (it's whatever the package manager installed). And I manually installed Maven 3.5.4 on all of the machines (it's what I'm running at home).

For a test workload, I built the AWS V2 Java SDK (commit 0f7779ef, corresponding to version 2.19.9-SNAPSHOT). This is a much bigger build than most people will ever deal with: almost 400 sub-projects, and over 100,000 Java files (including tests and generated code). I skipped tests, because it was clear that some of the tests involved timing delays; I wanted to see pure CPU/disk performance.

I ran three builds with each configuration. The first was a “clean” build, forcing Maven to download all dependencies. The second was with empty buffer cache (echo 3 > /proc/sys/vm/drop_caches), and the third, run immediately afterward, should have benefited from any files that were in the cache. As it turned out, the numbers were so close that it didn't make sense to report them separately; all numbers in the table below are from the “empty cache” run (ie, no downloads involved). As with previous tests, I show the output from the Linux time command; the “real” column is the elapsed time of the build, and is what developers care about.

  Real User Sys
My DevBox 27m 24.406s 80m 3.772s 1m 16.792s
m5.large 54m 40.495s 94m 1.207s 1m 46.047s
i4i.metal EBS 17m 49.500s 97m 30.694s 2m 19.089s
i4i.metal SSD 17m 44.165s 97m 6.427s 2m 11.365s

I have to be honest, I was surprised: I expected a small improvement in performance, but not 35%! My guess at the reason for the difference, based on the higher ”user” time, is that it's able to make more effective use of those 32 cores. The 55 MB CPU cache (versus 8MB on my PC) may have helped as well. I think the only way to summarize this test is with the words of Ferris Bueller:

It is so choice. If you have the means, I highly recommend picking one up.