Several years ago I wrote about connecting CloudWatch Logs into into a logging pipeline built on Kinesis Streams and Kinesis Firehose. To make that work, I wrote a Lambda.
AWS announced EventBridge Pipes at re:Invent 2022, as an easy way to connect messaging services. Connecting CloudWatch Logs to Kinesis seemed like a good way to kick the tires on this service. It still requires a custom Lambda, but that Lambda is now much simpler, because it doesn't need to concern itself with the quirks of PutRecords
.
The Problem
When you subscribe a Kinesis stream to a log group, CloudWatch writes messages that look like this:
H4sIAAAAAAAAA62QTU7DMBCF95zC8jqO7LHd2NkFNVRCdNWyggqF1gqG/MlxW6qqd2cK7QWA5bw382a+OdLWjWNVu+VhcDSn02JZvMzLxaKYlTSh/b5zAWUBUulJZiwXgHLT17PQbwd0imFw3caF8rNqh8b9mIsYXNWi+9DX6l6wi8mAAwgAxXx88413HZNaWDHBqXH7Oq6DH6LvuzvfRBdGmj/Ra+zqO7fcuS6e9SP1G4yXGRgltdLcGK41SKu0AQ2KC55hrYTNsJCWc5CCW5CgteEG90WP4BHDaS4mmTDGAOKBTa4PwfjzuUwAA0W4yUHl0iTYQqbl7eOMPLkLFV+Rdd+mH5s6uLoPhxQvVe9ptR/TS0s6r3xHGNlVzdYRPxINzx09JX/DsP+LIX6LsTrdfAHRwAP2RwIAAA==
That's a GZipped and Base64-encoded JSON object. When decoded, unzipped, and pretty-printed, it looks like this:
{ "messageType": "DATA_MESSAGE", "owner": "123456789012", "logGroup": "AppenderExample", "logStream": "Log4J1-Example-20221224-ithilien-351916", "subscriptionFilters": [ "Example" ], "logEvents": [ { "id": "37284354508805523945825240107552419724039002310923255808", "timestamp": 1671888278929, "message": "2022-12-24 08:24:38,929 DEBUG [example-0] com.kdgregory.log4j.aws.example.Main - value is 52\n" }, { "id": "37284354508805523945825240107552419724039002310923255809", "timestamp": 1671888278929, "message": "2022-12-24 08:24:38,929 DEBUG [example-1] com.kdgregory.log4j.aws.example.Main - value is 52\n" } ] }
As you can see, CloudWatch combines multiple log events into a single message. However, most destinations — such as Elasticsearch — want individual events, not a nested array. So you need something that pulls apart the original event. For example, the AWS-provided OpenSearch integration, which subscribes a Lambda to the log group, decomposes the messages, and writes them to the cluster. My previous implementation did a similar transformation, but wrote the log messages to a Kinesis stream.
EventBridge Pipes
EventBridge Pipes is modeled around a four-step process (image sourced from docs):
For simple pipelines, EventBridge gives you a “no code” solution that eliminates the middle two steps: you define an output format using path references to fields in the input messages. Unfortunately, this isn't usable for CloudWatch Logs messages: it can't (currently) handle Gzipped data, and can't extract the elements in an array. So we need to add an “enrichment” step that performs this operation.
Implementation
You can implement the enrichment step in multiple ways, including an HTTP(S) API, but the easiest is a Lambda. Pipes invokes this Lambda with an event that contains one or more source records, and it returns returns zero or more result records.
The source event looks like this:
[ { "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000000:49636526735590005452916707570422271019264002608211165186", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/CloudWatchTransformPipeline-PipeRole-us-east-1", "awsRegion": "us-east-1", "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/log-subscription", "kinesisSchemaVersion": "1.0", "partitionKey": "87362ce96c65eb1b3125bf0129b99bee", "sequenceNumber": "49636526735590005452916707570422271019264002608211165186", "data": "...", "approximateArrivalTimestamp": 1672490859.613 } ]
The first thing to call out is that this event is an array. EventBridge Pipes can batch messages together, so you must loop over the entire event and combine the results from each message (ie, a flat-map):
def lambda_handler(event, context): result = [] for message in event: result += transform_messages(message) return result
The data
element is the GZipped and Base64-encoded message that I showed above. You process it with code that looks like this:
data = base64.b64decode(message['data']) data = gzip.decompress(data) payload = json.loads(data)
The last thing to be aware of is that there are two types of messages that you'll get from a CloudWatch Logs subscription, identified by the messageType
field. When the subscription first starts, CloudWatch sends a control message to the stream to verify that it can write messages. After that, it sends data messages. Your Lambda should ignore everything but the latter.
if payload['messageType'] == 'DATA_MESSAGE': # extract log events else: return []
Conclusion
This post has shown you the basics of an EventBridge Pipes transform. You can find my complete example, which performs several transforms on the log events, here.
While I don't think that EventBridge Pipes quite lives up to the hype of Werner Vogels' keynote (and that it has nothing in common with EventBridge other than a name), it does allow you to focus on your logic rather than the quirks of your source and destination. As such, it's a useful addition to your data transformation toolbox.