My last post described my different attempts at incorporating CloudWatch Logs events into an Elasticsearch-based logging pipeline. This post looks at the actual code. To avoid boring you, it just touches on the main points; if you want the whole thing go here.
Creating the subscription
The first step in streaming your CloudWatch Logs is to create a subscription. As I mentioned in my last post, you can't do this via the AWS Console. The Amazon documentation walks you through creating the subscription via the CLI, or you can use the CloudFormation template from my project.
One important note: the stream has to be active before you can create a subscription; if you create the stream and then immediately try to create the subscription, it will fail. This is why I have a separate templates to create the stream and subscribe to it (it's also easier to replicate the subscription when it's a separate template).
The messages are written to the stream as JSON. Each message identifies the log group and stream, and contains a list of log events:
{ "messageType": "DATA_MESSAGE", "owner": "012345678901", "logGroup": "AppenderExample", "logStream": "Example-20190825", "subscriptionFilters": ["CloudWatchSubscription-Subscription-184MSZXRD5MCP"], "logEvents": [{ "id": "34939504216003552676187468284442890586332374987778818048", "timestamp": 1566741555269, "message": "some message\n" }, { "id": "34939504216092755656981590777009033459422968433802739713", "timestamp": 1566741555273, "message": "another message\n" }, { "id": "34939504239575440351034336945046144800521693099600117762", "timestamp": 1566741556326, "message": "still another\n" }] }
Processing the input messages
When Kinesis invokes a Lambda, it packages multiple records into a single invocation event, with the original message GZipped and Base64-encoded:
{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "ea05911b7285b29393412d960053f81d", "sequenceNumber": "49598889268738564796327312231530438478076180289718059010", "data": "H4sIANoVbV0AA42QW2vCQBCFn5NfIfusMLs7e5m+BbU+CaURWtpKSXXRQExCsvaC+N+7sSn0QqUvC7PnMOebc4gjtnNtm23c4q127GLAJskieZxP0zSZTdkw6NVL6ZpOAS4kKm0sAT8pRbWZNdW+7sSkrl25ds30NdvVhfvUU9+4bNcZemEkgBNYoU6Odv/Urpq89nlVXuaFd00bvPdsXFT79U3mV9v0i2P0beAW5+nd7fVEzcdXbNkHTp9d6U9LDnEUsXzdZUskSQpQcA0glRLaaG4NaissIopwkbJaSiENkjXGWMstoO0YI+bzUJEP9GEVV1ob5KrbQSe1r6+LaaudG/TzQ8ni6Dgc/EFBwiillSbLFUFIBCCQoV5CIUhblNKCMJIMl2cpjPxJkZWV37rmPyDhNQoxVMJBhkhNGL41R7QAKnCSBKLQGefGaHEGREuhf9Xh86IY9DgfGMv4GL8DWgIaCnQCAAA=", "approximateArrivalTimestamp": 1566741561.188 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000000:49598889268738564796327312231530438478076180289718059010", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::012345678901:role/DefaultLambdaProfile", "awsRegion": "us-east-1", "eventSourceARN": "arn:aws:kinesis:us-east-1:012345678901:stream/CloudWatchSubscriptionDestination" }, ... ] }
To transform the input events, you need to iterate over each of the top-level records, extract its payload, and then iterate over each of the child events to transform them. In order to simplify the output code, my process_input_record()
function returns just the transformed child events, so that the output code can work on a “flattened” array.
def lambda_handler(event, context): outputMessages = [] for record in event['Records']: outputMessages = outputMessages + process_input_record(record) logging.info(f'total number of messages to output: {len(outputMessages)}') logging.debug(f'output messages: {json.dumps(outputMessages)}') write_to_kinesis(outputMessages) def process_input_record(record): try: payload = record['kinesis']['data'] decoded = gzip.decompress(base64.b64decode(payload)) data = json.loads(decoded) message_type = data.get('messageType') if message_type == 'DATA_MESSAGE': logGroup = data['logGroup'] logStream = data['logStream'] events = data.get('logEvents', []) logging.info(f'processing {len(events)} events from group "{logGroup}" / stream "{logStream}"') logging.debug(f'input messages: {json.dumps(events)}') return [transform_log_event(logGroup, logStream, event) for event in events] elif message_type == 'CONTROL_MESSAGE': logging.info('skipping control message') elif message_type: logging.warn(f'unexpected message type: {message_type}') except: logging.error(f'failed to process record; keys = {record.keys()}', exc_info=True) # fall-through for any unprocessed messages (exception or unhandled message type) return []
One thing to note about process_input_record()
is the if
statement at its core. CloudWatch Logs writes two different types of messages: most of the messages are data messages, which contain log events. However, when you first subscribe the log group to a stream, it writes a control message to verify that it has permission to do so. There shouldn't be any other message types, but if AWS adds one we'll ignore it. I also wrap the entire function in a try block, in case something other than a valid Logs message ends up on the stream.
Each event in the record gets passed through transform_log_event()
, not shown here, which performs the following transformations:
- If the message isn't already JSON, it's transformed into JSON with a `timestamp`, `message`, and `level` fields. This makes the message compatible with those written by my logging adapter.
- If the message looks like a Lambda status message, it's parsed, and relevant data (eg, request ID, execution duration, and memory used) is extracted and stored in a child object.
- If the message looks like Lambda logging output, it is parsed and the components (level, timestamp, request ID, and message) are stored in separate fields. Note: my transform only supports Python log output; the Node.JS runtime outputs fields in a different order.
- The source log group and stream are added, as a sub-object.
If you want to add your own transformations — such as support for Node.JS logs — this is the place to do it.
Writing messages to Kinesis
As I've written elsewhere, writing to Kinesis is easy to do, but difficult to do right:
- For performance, you should use the
PutRecords
API rather thanPutRecord
. - Which means that you must create batches of records that fit into the limits of this API (1 MB or 500 messages).
- You have to be prepared for your entire call to be throttled, and resend after a delay.
- And you have to be prepared for individual records within the batch to be throttled due to load on the destination shard, and resend just them.
The top-level write_to_kinesis()
function repeatedly processes batches of records, with a sleep between batches to minimize throttling. The process_batch()
function returns any messages that it couldn't send, for the next time through the loop. As I note, there shouldn't be more than one batch, unless you've configured the Lambda trigger to process lots of records or your destination is overloaded.
def write_to_kinesis(listOfEvents): records = prepare_records(listOfEvents) while records: records = process_batch(records) if (records): time.sleep(2) # an arbitrary sleep; we should rarely hit this return
The process_batch()
function is small but needs the most explanation. It starts with a call to build_batch()
, which splits the source list of records into two parts: the first part contains all of the records that will fit in a single PutRecords
call, the second is everything left over. Next is the actual call. Finally, process_response()
looks at the result of the call to find any records that were throttled; these are added
def process_batch(records): toBeSent, toBeReturned = build_batch(records) logging.info(f'sending batch of {len(toBeSent)} records with {len(toBeReturned)} remaining') try: response = kinesisClient.put_records( StreamName=kinesisStream, Records=toBeSent ) return process_response(response, toBeSent) + toBeReturned except kinesisClient.exceptions.ProvisionedThroughputExceededException: logging.warn(f'received throughput exceeded on stream {kinesisStream}; retrying all messages') return toBeSent + toBeReturned
I'm going to wrap up by looking at the process_response()
, function in detail, because it's where the trickiness lies. The response for PutRecords
contains an array of result objects, matched to the request records that you passed in. In the happy path, each of these result objects contains the sequence number that Kinesis assigned to the record (and FailedRecordCount
is 0). In the unhappy path, some of the records fail but others succeed; the result object for the failed records will contain an error code.
Making things a little more tricky, this error code can either indicate that the shard throttled the record, in which case we re-send it, or an internal error happened. The API docs don't go into details on exactly what an internal error is, or whether it's recoverable, so I assume that it isn't and drop the record.
def process_response(response, records): if response['FailedRecordCount'] == 0: return [] result = [] droppedRecordCount = 0 for ii in range(len(response['Records'])): entry = response['Records'][ii] errorCode = entry.get('ErrorCode') if errorCode == 'ProvisionedThroughputExceededException': result.append(records[ii]) elif errorCode: droppedRecordCount += 1 if droppedRecordCount > 0: logging.warn(f'dropped {droppedRecordCount} records due to Kinesis internal errors') if len(result) > 0: logging.info(f"requeueing {len(result)} records due to throughput-exceeded") return result
So, that's it. With a few adjustments to match your logging format (and, really, if you can just log in JSON), this will let you centralize your logs in Elasticsearch, even if AWS really, really wants you to use CloudWatch.