blog.kdgregory.com

Thursday, September 12, 2019

Streaming CloudWatch logs to Elasticsearch, Part 2: Teh Codez

My last post described my different attempts at incorporating CloudWatch Logs events into an Elasticsearch-based logging pipeline. This post looks at the actual code. To avoid boring you, it just touches on the main points; if you want the whole thing go here.

Creating the subscription

The first step in streaming your CloudWatch Logs is to create a subscription. As I mentioned in my last post, you can't do this via the AWS Console. The Amazon documentation walks you through creating the subscription via the CLI, or you can use the CloudFormation template from my project.

One important note: the stream has to be active before you can create a subscription; if you create the stream and then immediately try to create the subscription, it will fail. This is why I have a separate templates to create the stream and subscribe to it (it's also easier to replicate the subscription when it's a separate template).

The messages are written to the stream as JSON. Each message identifies the log group and stream, and contains a list of log events:

{
 "messageType": "DATA_MESSAGE",
 "owner": "012345678901",
 "logGroup": "AppenderExample",
 "logStream": "Example-20190825",
 "subscriptionFilters": ["CloudWatchSubscription-Subscription-184MSZXRD5MCP"],
 "logEvents": [{
  "id": "34939504216003552676187468284442890586332374987778818048",
  "timestamp": 1566741555269,
  "message": "some message\n"
 }, {
  "id": "34939504216092755656981590777009033459422968433802739713",
  "timestamp": 1566741555273,
  "message": "another message\n"
 }, {
  "id": "34939504239575440351034336945046144800521693099600117762",
  "timestamp": 1566741556326,
  "message": "still another\n"
 }]
}

Processing the input messages

When Kinesis invokes a Lambda, it packages multiple records into a single invocation event, with the original message GZipped and Base64-encoded:

{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "ea05911b7285b29393412d960053f81d",
                "sequenceNumber": "49598889268738564796327312231530438478076180289718059010",
                "data": "H4sIANoVbV0AA42QW2vCQBCFn5NfIfusMLs7e5m+BbU+CaURWtpKSXXRQExCsvaC+N+7sSn0QqUvC7PnMOebc4gjtnNtm23c4q127GLAJskieZxP0zSZTdkw6NVL6ZpOAS4kKm0sAT8pRbWZNdW+7sSkrl25ds30NdvVhfvUU9+4bNcZemEkgBNYoU6Odv/Urpq89nlVXuaFd00bvPdsXFT79U3mV9v0i2P0beAW5+nd7fVEzcdXbNkHTp9d6U9LDnEUsXzdZUskSQpQcA0glRLaaG4NaissIopwkbJaSiENkjXGWMstoO0YI+bzUJEP9GEVV1ob5KrbQSe1r6+LaaudG/TzQ8ni6Dgc/EFBwiillSbLFUFIBCCQoV5CIUhblNKCMJIMl2cpjPxJkZWV37rmPyDhNQoxVMJBhkhNGL41R7QAKnCSBKLQGefGaHEGREuhf9Xh86IY9DgfGMv4GL8DWgIaCnQCAAA=",
                "approximateArrivalTimestamp": 1566741561.188
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49598889268738564796327312231530438478076180289718059010",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::012345678901:role/DefaultLambdaProfile",
            "awsRegion": "us-east-1",
            "eventSourceARN": "arn:aws:kinesis:us-east-1:012345678901:stream/CloudWatchSubscriptionDestination"
        },
        ...
    ]
}

To transform the input events, you need to iterate over each of the top-level records, extract its payload, and then iterate over each of the child events to transform them. In order to simplify the output code, my process_input_record() function returns just the transformed child events, so that the output code can work on a “flattened” array.

def lambda_handler(event, context):
    outputMessages = []
    for record in event['Records']:
        outputMessages = outputMessages + process_input_record(record)
    logging.info(f'total number of messages to output: {len(outputMessages)}')
    logging.debug(f'output messages: {json.dumps(outputMessages)}')
    write_to_kinesis(outputMessages)


def process_input_record(record):
    try:
        payload = record['kinesis']['data']
        decoded = gzip.decompress(base64.b64decode(payload))
        data = json.loads(decoded)
        message_type = data.get('messageType')
        if message_type == 'DATA_MESSAGE':
            logGroup = data['logGroup']
            logStream = data['logStream']
            events = data.get('logEvents', [])
            logging.info(f'processing {len(events)} events from group "{logGroup}" / stream "{logStream}"')
            logging.debug(f'input messages: {json.dumps(events)}')
            return [transform_log_event(logGroup, logStream, event) for event in events]
        elif message_type == 'CONTROL_MESSAGE':
            logging.info('skipping control message')
        elif message_type:
            logging.warn(f'unexpected message type: {message_type}')
    except:
        logging.error(f'failed to process record; keys = {record.keys()}', exc_info=True)
    # fall-through for any unprocessed messages (exception or unhandled message type)
    return []

One thing to note about process_input_record() is the if statement at its core. CloudWatch Logs writes two different types of messages: most of the messages are data messages, which contain log events. However, when you first subscribe the log group to a stream, it writes a control message to verify that it has permission to do so. There shouldn't be any other message types, but if AWS adds one we'll ignore it. I also wrap the entire function in a try block, in case something other than a valid Logs message ends up on the stream.

Each event in the record gets passed through transform_log_event(), not shown here, which performs the following transformations:

  • If the message isn't already JSON, it's transformed into JSON with a `timestamp`, `message`, and `level` fields. This makes the message compatible with those written by my logging adapter.
  • If the message looks like a Lambda status message, it's parsed, and relevant data (eg, request ID, execution duration, and memory used) is extracted and stored in a child object.
  • If the message looks like Lambda logging output, it is parsed and the components (level, timestamp, request ID, and message) are stored in separate fields. Note: my transform only supports Python log output; the Node.JS runtime outputs fields in a different order.
  • The source log group and stream are added, as a sub-object.

If you want to add your own transformations — such as support for Node.JS logs — this is the place to do it.

Writing messages to Kinesis

As I've written elsewhere, writing to Kinesis is easy to do, but difficult to do right:

  • For performance, you should use the PutRecords API rather than PutRecord.
  • Which means that you must create batches of records that fit into the limits of this API (1 MB or 500 messages).
  • You have to be prepared for your entire call to be throttled, and resend after a delay.
  • And you have to be prepared for individual records within the batch to be throttled due to load on the destination shard, and resend just them.

The top-level write_to_kinesis() function repeatedly processes batches of records, with a sleep between batches to minimize throttling. The process_batch() function returns any messages that it couldn't send, for the next time through the loop. As I note, there shouldn't be more than one batch, unless you've configured the Lambda trigger to process lots of records or your destination is overloaded.

def write_to_kinesis(listOfEvents):
    records = prepare_records(listOfEvents)
    while records:
        records = process_batch(records)
        if (records):
            time.sleep(2) # an arbitrary sleep; we should rarely hit this
    return

The process_batch() function is small but needs the most explanation. It starts with a call to build_batch(), which splits the source list of records into two parts: the first part contains all of the records that will fit in a single PutRecords call, the second is everything left over. Next is the actual call. Finally, process_response() looks at the result of the call to find any records that were throttled; these are added

def process_batch(records):
    toBeSent, toBeReturned = build_batch(records)
    logging.info(f'sending batch of {len(toBeSent)} records with {len(toBeReturned)} remaining')
    try:
        response = kinesisClient.put_records(
            StreamName=kinesisStream,
            Records=toBeSent
        )
        return process_response(response, toBeSent) + toBeReturned 
    except kinesisClient.exceptions.ProvisionedThroughputExceededException:
        logging.warn(f'received throughput exceeded on stream {kinesisStream}; retrying all messages')
        return toBeSent + toBeReturned

I'm going to wrap up by looking at the process_response(), function in detail, because it's where the trickiness lies. The response for PutRecords contains an array of result objects, matched to the request records that you passed in. In the happy path, each of these result objects contains the sequence number that Kinesis assigned to the record (and FailedRecordCount is 0). In the unhappy path, some of the records fail but others succeed; the result object for the failed records will contain an error code.

Making things a little more tricky, this error code can either indicate that the shard throttled the record, in which case we re-send it, or an internal error happened. The API docs don't go into details on exactly what an internal error is, or whether it's recoverable, so I assume that it isn't and drop the record.

def process_response(response, records):
    if response['FailedRecordCount'] == 0:
        return []
    
    result = []
    droppedRecordCount = 0
    for ii in range(len(response['Records'])):
        entry = response['Records'][ii]
        errorCode = entry.get('ErrorCode')
        if errorCode == 'ProvisionedThroughputExceededException':
            result.append(records[ii])
        elif errorCode:
            droppedRecordCount += 1
    
    if droppedRecordCount > 0:
        logging.warn(f'dropped {droppedRecordCount} records due to Kinesis internal errors')
    if len(result) > 0:
        logging.info(f"requeueing {len(result)} records due to throughput-exceeded")
    
    return result

So, that's it. With a few adjustments to match your logging format (and, really, if you can just log in JSON), this will let you centralize your logs in Elasticsearch, even if AWS really, really wants you to use CloudWatch.

Wednesday, September 11, 2019

Streaming CloudWatch logs to Elasticsearch, Part 1: The Problem

My preferred approach to application logging on AWS is based around two ideas: first, that applications produce log messages in JSON, and second, that those messages are written directly to a Kinesis stream, which is then sent to Elasticsearch via Kinesis Firehose.

Amazon's preference, however, is CloudWatch Logs: it's the default log destination for AWS Batch and Lambda — indeed, you have to explicitly disable access to stop Lambda logging to CloudWatch; there's an agent for EC2; and the awslogs driver for Docker (ECS, EKS, or on-prem). Unfortunately, while CloudWatch Logs has come a long way from its original feature set, it still doesn't measure up to Elasticsearch. And in a multi-application environment, you don't want two centralized log services.

So what to do?

One answer is to use an alternative logging framework and bypass CloudWatch Logs. This can work, but you'll lose useful information such as the Lambda execution report. And as I discovered, any logging framework that uses a background thread to write log messages (which is usually a Good Thing) will run into problems when Lambda freezes the execution context.

A better answer is to use a CloudWatch Logs subscription, in which a log group sends regular batches of messages to a variety of destinations. However, there are a surprising number of pitfalls with subscriptions. This post looks at the different subscription types and the pros and cons associated with each. My next post details an implementation that copies CloudWatch Logs into an existing Kinesis-based pipeline, from which they end up in Elasticsearch.

This should be simple: if you use the AWS Console, you'll even see an option to subscribe a log group directly to Amazon Elasticsearch, which seems like the “no-brainer” choice. But after trying it out I consider it completely unsuitable: it sends updates multiple times per second, apparently with duplicates (because I wasn't writing that many messages in my tests). And Elasticsearch hates frequent small updates.

A second option, also available from the Console, is to invoke a Lambda. I first went down this path, with a Lambda that would write log messages to a Kinesis stream; from there they would be picked up by my existing pipeline. However, I abandoned this approach because, again, the Lambda was invoked a lot: it appears that CloudWatch invokes it as soon as events are logged, rather than attempting to batch them. With a moderately active log stream, this translates to over two million invocations per stream per month.

There are two additional subscription options that aren't available from the Console: Kinesis Streams and Kinesis Firehose. Both use a similar format: each record written to the stream/firehose identifies the source log group and stream, with an array of zero or more log events. This rules out Firehose, which expects a 1:1 relationship between the records that it reads and the records that it writes.

That left me with subscribing a Kinesis Stream, then attaching a Lambda to that stream to process the records. While this approach had a few more moving parts than I would like, it let me reuse most of my existing firehose architecture: the Lambda could write individual messages into the same Kinesis stream used for direct application logging, where they would be picked up by Firehose. And because you can specify the number of messages in a Kinesis-Lambda trigger, it meant that the number of invocations could be tuned. So all-in-all, a win.

Monday, July 22, 2019

Trying not to fertilize a content farm

I got an email this week from a person purporting to be the webmaster of a gardening website, asking if I would link to his page about growing buckwheat at home, from my page about buckwheat pancakes. The email seemed a little strange: it looked like a form letter with inserted fields. And there was a link at the bottom to unsubscribe. That's not something that I'd expect from a hobbyist looking to increase his connections.

But, rather than just ignore the email, I decided to do some investigation. If you'd like to play along at home, you'll find the site at “the daily gardener” — for obvious reasons, I don't want to insert a link or anything like a link, but I'm sure you can figure it out. Beware: while it has innocuous content today, it may not six months from now; I used a guest account on my home Linux box, and deleted the browser's config after visiting.

As I said, it has innocuous content. In fact, for a likely content farm, it has pretty good content: easily digestible chunks of information and lots of pictures. In other words, consistent with most of the gardening sites that I've seen.

There were, however, a few red flags. The first was that certain sentence phrasings were not those of a native English speaker. That, however, could just be the result of editing (I know that I've left some bizarre sentences online after deleting and re-arranging words). The second red flag was that the word “daily” in the title really did mean daily: there was a new long-form article published every day. That too could be explained: the profile picture showed someone who appeared to be post-retirement; he could craft content full-time.

A third red flag was that there were very few outbound links. This is another place where my experience is not necessarily relevant: while I put a lot of outbound links on my site to provide the reader with more information, not everybody does. But most of the gardening sites that I looked at do. And if he's requesting me to link to his site, you'd think he'd be doing the same.

None of these red flags were convincing, so my next step was to take arbitrary sentences from his pages and put them into Google. In the past I've done this to find people who are pirating content from my site, and have issued several DMCA takedown requests as a result: people who put together pastiches tend not to change their source material. Surprisingly, it took a half-dozen attempts before I found almost-exact text from another site (which made me wonder whether most of the site's text was generated by a bot).

By now I was pretty much convinced this site was a content farm, so I explored its history. First stop was whois, which told me that the domain had been registered in 2005. Hmm, but all of the content looks new.

Next stop was the Wayback Machine, to look at how the site changed over the years. And what I discovered was that it was originally an online store, and remained so until the domain expired at the end of 2018. And then in April of 2019, the site was revived with a new look and new content.

Last up was a Google image search, using the picture from the “About Us” page. And, in addition to multiple links to “his” site, there was a link to Shutterstock. OK, that pretty much seals it.

After doing this research, I did some searches to find out if there was anything else that I should have looked for. And was surprised that there were very few “how to spot a content farm” articles available (one of the better ones is from Austin Community College, apparently to keep students from using such sites as citations). Most of the articles were about how content farms don't work, and that Google actively penalizes them. But if you search for “growing buckwheat” you will find the daily gardener site in the first page of results; perhaps it's too new to have triggered alarms.

And there is one thing that I find really strange about this site: there weren't any ads. That is — or has been — the main reason for content farms: get people on the site and hope that they click an ad. But not here. Nor did the network tab on my browser show any activity after page load, so it wasn't (when I looked) some sort of click scam.

So what's the goal? Is this a “long con” approach to black-hat SEO? Are they building traffic before turning the site into something nasty?

I don't know, but I won't be linking to it.