Thursday, September 12, 2019

Streaming CloudWatch logs to Elasticsearch, Part 2: Teh Codez

My last post described my different attempts at incorporating CloudWatch Logs events into an Elasticsearch-based logging pipeline. This post looks at the actual code. To avoid boring you, it just touches on the main points; if you want the whole thing go here.

Creating the subscription

The first step in streaming your CloudWatch Logs is to create a subscription. As I mentioned in my last post, you can't do this via the AWS Console. The Amazon documentation walks you through creating the subscription via the CLI, or you can use the CloudFormation template from my project.

One important note: the stream has to be active before you can create a subscription; if you create the stream and then immediately try to create the subscription, it will fail. This is why I have a separate templates to create the stream and subscribe to it (it's also easier to replicate the subscription when it's a separate template).

The messages are written to the stream as JSON. Each message identifies the log group and stream, and contains a list of log events:

{
 "messageType": "DATA_MESSAGE",
 "owner": "012345678901",
 "logGroup": "AppenderExample",
 "logStream": "Example-20190825",
 "subscriptionFilters": ["CloudWatchSubscription-Subscription-184MSZXRD5MCP"],
 "logEvents": [{
  "id": "34939504216003552676187468284442890586332374987778818048",
  "timestamp": 1566741555269,
  "message": "some message\n"
 }, {
  "id": "34939504216092755656981590777009033459422968433802739713",
  "timestamp": 1566741555273,
  "message": "another message\n"
 }, {
  "id": "34939504239575440351034336945046144800521693099600117762",
  "timestamp": 1566741556326,
  "message": "still another\n"
 }]
}

Processing the input messages

When Kinesis invokes a Lambda, it packages multiple records into a single invocation event, with the original message GZipped and Base64-encoded:

{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "ea05911b7285b29393412d960053f81d",
                "sequenceNumber": "49598889268738564796327312231530438478076180289718059010",
                "data": "H4sIANoVbV0AA42QW2vCQBCFn5NfIfusMLs7e5m+BbU+CaURWtpKSXXRQExCsvaC+N+7sSn0QqUvC7PnMOebc4gjtnNtm23c4q127GLAJskieZxP0zSZTdkw6NVL6ZpOAS4kKm0sAT8pRbWZNdW+7sSkrl25ds30NdvVhfvUU9+4bNcZemEkgBNYoU6Odv/Urpq89nlVXuaFd00bvPdsXFT79U3mV9v0i2P0beAW5+nd7fVEzcdXbNkHTp9d6U9LDnEUsXzdZUskSQpQcA0glRLaaG4NaissIopwkbJaSiENkjXGWMstoO0YI+bzUJEP9GEVV1ob5KrbQSe1r6+LaaudG/TzQ8ni6Dgc/EFBwiillSbLFUFIBCCQoV5CIUhblNKCMJIMl2cpjPxJkZWV37rmPyDhNQoxVMJBhkhNGL41R7QAKnCSBKLQGefGaHEGREuhf9Xh86IY9DgfGMv4GL8DWgIaCnQCAAA=",
                "approximateArrivalTimestamp": 1566741561.188
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49598889268738564796327312231530438478076180289718059010",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::012345678901:role/DefaultLambdaProfile",
            "awsRegion": "us-east-1",
            "eventSourceARN": "arn:aws:kinesis:us-east-1:012345678901:stream/CloudWatchSubscriptionDestination"
        },
        ...
    ]
}

To transform the input events, you need to iterate over each of the top-level records, extract its payload, and then iterate over each of the child events to transform them. In order to simplify the output code, my process_input_record() function returns just the transformed child events, so that the output code can work on a “flattened” array.

def lambda_handler(event, context):
    outputMessages = []
    for record in event['Records']:
        outputMessages = outputMessages + process_input_record(record)
    logging.info(f'total number of messages to output: {len(outputMessages)}')
    logging.debug(f'output messages: {json.dumps(outputMessages)}')
    write_to_kinesis(outputMessages)


def process_input_record(record):
    try:
        payload = record['kinesis']['data']
        decoded = gzip.decompress(base64.b64decode(payload))
        data = json.loads(decoded)
        message_type = data.get('messageType')
        if message_type == 'DATA_MESSAGE':
            logGroup = data['logGroup']
            logStream = data['logStream']
            events = data.get('logEvents', [])
            logging.info(f'processing {len(events)} events from group "{logGroup}" / stream "{logStream}"')
            logging.debug(f'input messages: {json.dumps(events)}')
            return [transform_log_event(logGroup, logStream, event) for event in events]
        elif message_type == 'CONTROL_MESSAGE':
            logging.info('skipping control message')
        elif message_type:
            logging.warn(f'unexpected message type: {message_type}')
    except:
        logging.error(f'failed to process record; keys = {record.keys()}', exc_info=True)
    # fall-through for any unprocessed messages (exception or unhandled message type)
    return []

One thing to note about process_input_record() is the if statement at its core. CloudWatch Logs writes two different types of messages: most of the messages are data messages, which contain log events. However, when you first subscribe the log group to a stream, it writes a control message to verify that it has permission to do so. There shouldn't be any other message types, but if AWS adds one we'll ignore it. I also wrap the entire function in a try block, in case something other than a valid Logs message ends up on the stream.

Each event in the record gets passed through transform_log_event(), not shown here, which performs the following transformations:

  • If the message isn't already JSON, it's transformed into JSON with a `timestamp`, `message`, and `level` fields. This makes the message compatible with those written by my logging adapter.
  • If the message looks like a Lambda status message, it's parsed, and relevant data (eg, request ID, execution duration, and memory used) is extracted and stored in a child object.
  • If the message looks like Lambda logging output, it is parsed and the components (level, timestamp, request ID, and message) are stored in separate fields. Note: my transform only supports Python log output; the Node.JS runtime outputs fields in a different order.
  • The source log group and stream are added, as a sub-object.

If you want to add your own transformations — such as support for Node.JS logs — this is the place to do it.

Writing messages to Kinesis

As I've written elsewhere, writing to Kinesis is easy to do, but difficult to do right:

  • For performance, you should use the PutRecords API rather than PutRecord.
  • Which means that you must create batches of records that fit into the limits of this API (1 MB or 500 messages).
  • You have to be prepared for your entire call to be throttled, and resend after a delay.
  • And you have to be prepared for individual records within the batch to be throttled due to load on the destination shard, and resend just them.

The top-level write_to_kinesis() function repeatedly processes batches of records, with a sleep between batches to minimize throttling. The process_batch() function returns any messages that it couldn't send, for the next time through the loop. As I note, there shouldn't be more than one batch, unless you've configured the Lambda trigger to process lots of records or your destination is overloaded.

def write_to_kinesis(listOfEvents):
    records = prepare_records(listOfEvents)
    while records:
        records = process_batch(records)
        if (records):
            time.sleep(2) # an arbitrary sleep; we should rarely hit this
    return

The process_batch() function is small but needs the most explanation. It starts with a call to build_batch(), which splits the source list of records into two parts: the first part contains all of the records that will fit in a single PutRecords call, the second is everything left over. Next is the actual call. Finally, process_response() looks at the result of the call to find any records that were throttled; these are added

def process_batch(records):
    toBeSent, toBeReturned = build_batch(records)
    logging.info(f'sending batch of {len(toBeSent)} records with {len(toBeReturned)} remaining')
    try:
        response = kinesisClient.put_records(
            StreamName=kinesisStream,
            Records=toBeSent
        )
        return process_response(response, toBeSent) + toBeReturned 
    except kinesisClient.exceptions.ProvisionedThroughputExceededException:
        logging.warn(f'received throughput exceeded on stream {kinesisStream}; retrying all messages')
        return toBeSent + toBeReturned

I'm going to wrap up by looking at the process_response(), function in detail, because it's where the trickiness lies. The response for PutRecords contains an array of result objects, matched to the request records that you passed in. In the happy path, each of these result objects contains the sequence number that Kinesis assigned to the record (and FailedRecordCount is 0). In the unhappy path, some of the records fail but others succeed; the result object for the failed records will contain an error code.

Making things a little more tricky, this error code can either indicate that the shard throttled the record, in which case we re-send it, or an internal error happened. The API docs don't go into details on exactly what an internal error is, or whether it's recoverable, so I assume that it isn't and drop the record.

def process_response(response, records):
    if response['FailedRecordCount'] == 0:
        return []
    
    result = []
    droppedRecordCount = 0
    for ii in range(len(response['Records'])):
        entry = response['Records'][ii]
        errorCode = entry.get('ErrorCode')
        if errorCode == 'ProvisionedThroughputExceededException':
            result.append(records[ii])
        elif errorCode:
            droppedRecordCount += 1
    
    if droppedRecordCount > 0:
        logging.warn(f'dropped {droppedRecordCount} records due to Kinesis internal errors')
    if len(result) > 0:
        logging.info(f"requeueing {len(result)} records due to throughput-exceeded")
    
    return result

So, that's it. With a few adjustments to match your logging format (and, really, if you can just log in JSON), this will let you centralize your logs in Elasticsearch, even if AWS really, really wants you to use CloudWatch.

Wednesday, September 11, 2019

Streaming CloudWatch logs to Elasticsearch, Part 1: The Problem

My preferred approach to application logging on AWS is based around two ideas: first, that applications produce log messages in JSON, and second, that those messages are written directly to a Kinesis stream, which is then sent to Elasticsearch via Kinesis Firehose.

Amazon's preference, however, is CloudWatch Logs: it's the default log destination for AWS Batch and Lambda — indeed, you have to explicitly disable access to stop Lambda logging to CloudWatch; there's an agent for EC2; and the awslogs driver for Docker (ECS, EKS, or on-prem). Unfortunately, while CloudWatch Logs has come a long way from its original feature set, it still doesn't measure up to Elasticsearch. And in a multi-application environment, you don't want two centralized log services.

So what to do?

One answer is to use an alternative logging framework and bypass CloudWatch Logs. This can work, but you'll lose useful information such as the Lambda execution report. And as I discovered, any logging framework that uses a background thread to write log messages (which is usually a Good Thing) will run into problems when Lambda freezes the execution context.

A better answer is to use a CloudWatch Logs subscription, in which a log group sends regular batches of messages to a variety of destinations. However, there are a surprising number of pitfalls with subscriptions. This post looks at the different subscription types and the pros and cons associated with each. My next post details an implementation that copies CloudWatch Logs into an existing Kinesis-based pipeline, from which they end up in Elasticsearch.

This should be simple: if you use the AWS Console, you'll even see an option to subscribe a log group directly to Amazon Elasticsearch, which seems like the “no-brainer” choice. But after trying it out I consider it completely unsuitable: it sends updates multiple times per second, apparently with duplicates (because I wasn't writing that many messages in my tests). And Elasticsearch hates frequent small updates.

A second option, also available from the Console, is to invoke a Lambda. I first went down this path, with a Lambda that would write log messages to a Kinesis stream; from there they would be picked up by my existing pipeline. However, I abandoned this approach because, again, the Lambda was invoked a lot: it appears that CloudWatch invokes it as soon as events are logged, rather than attempting to batch them. With a moderately active log stream, this translates to over two million invocations per stream per month.

There are two additional subscription options that aren't available from the Console: Kinesis Streams and Kinesis Firehose. Both use a similar format: each record written to the stream/firehose identifies the source log group and stream, with an array of zero or more log events. This rules out Firehose, which expects a 1:1 relationship between the records that it reads and the records that it writes.

That left me with subscribing a Kinesis Stream, then attaching a Lambda to that stream to process the records. While this approach had a few more moving parts than I would like, it let me reuse most of my existing firehose architecture: the Lambda could write individual messages into the same Kinesis stream used for direct application logging, where they would be picked up by Firehose. And because you can specify the number of messages in a Kinesis-Lambda trigger, it meant that the number of invocations could be tuned. So all-in-all, a win.