blog.kdgregory.com

Monday, January 6, 2020

The Future of Open Source

The world of open source software seems to be going through a period of soul-searching. On the one hand, individual maintainers have retracted packages, causing disruption for the communities that depended on those packages. On the other, software-as-a-service providers are making more money from some applications than their creators.

This is all happening in a world where businesses depend on open-source to operate. It doesn't matter whether you're an individual launching a startup with PHP and MySQL, or a multi-national replacing your mainframe with a fleet of Linux boxes running Java. Your business depends on the work of people that have their own motivations, and those motivations may not align with yours. I think this is an untenable situation, one that will eventually resolve by changing the nature of open-source.

Before looking at how I think it will resolve, I want to give some historical perspective. This is one person's view; you may not agree with it.

I date the beginning of “professional” open source as March 1985: that was the month that Dr Dobbs published an article by Richard Stallman, an article that would turn into the GNU Manifesto. There was plenty of freely available software published prior to that time; my experience was with the Digital Equipment Corporation User Society (DECUS), which published an annual catalog of programs ranging in complexity from fast fourier transform routines to complete language implementations. These came with source code and no copyright attached (or, at least, no registered copyright, which was an important distinction in the 1970s and early 1980s).

What was different about the GNU Manifesto, and why I refer to it as the start of “professional” open source, was that Stallman set out a vision of how programmers could make money when they gave away their software. In his view, companies would get software for free but then hire programmers to maintain and enhance it.

In 1989, Stallman backed up the ideas of the GNU Manifesto with the Gnu Public License (GPL), which was applied to the software produced by the GNU project. This licence introduced the idea of “copyleft”: a requirement that any “derivative works” also be licensed using the GPL, meaning that software developers could not restrict access to their code. Even though that requirement was relaxed in 1991 with the “library” (now “lesser”) license, meaning that you could use the GNU C compiler to compile your programs without them becoming open source by that act, the GPL scared most corporations away from any use of the GNU tools (as late as 1999, I was met with a look of shock when I suggested that the GNU C compiler could make our multi-platform application easier to manage).

In my opinion, it was the Apache web server, introduced in 1995, that made open-source palatable (or at least acceptable) to the corporate world. In large part, this was due to the Apache license, which essentially said “do what you want, but don't blame us if anything goes wrong.” But also, I think it was because the corporate world was completely unprepared for the web. To give a sense of how quickly things moved: in 1989 I helped set up the DNS infrastructure for a major division of one of the world's largest corporations; I had only a few years of experience with TCP/IP networking, but it was more than the IT team. NCSA Mosaic appeared four years later, and within a year or two after that companies were scrambling to create a web presence. Much like the introduction of PCs ten years earlier, this happened outside of corporate IT; while there were commercial web-servers (including Microsoft and Netscape), “free as in beer” was a strong incentive.

Linux, of course, was a thing in the late 1990s, but in my experience wasn't used outside of a hobbyist community; corporations that wanted UNIX used a commercial distribution. In my view, Linux became popular due to two things: first, Eric Raymond published The Cathedral and the Bazaar in 1997, which made the case that open source was actually better than commercial systems: it has to be good to survive. But also, after the dot-com crash, “free as in beer” became a selling point, especially to the startups that would create “Web 2.0”

Jumping forward 20 years, open-source software is firmly embedded in the corporate world. While I'm an oddity for running Linux on the desktop, all of the companies I've worked with in the last ten or more years used it for their production deployments. And not just Linux; the most popular database systems are open source, as are the tools to provision and manage servers, and even productivity tools such as LibreOffice. And for most of the users of these tools, “free as in beer” is an important consideration.

But stability is (or should be) another important consideration, and I think that many open-source consumers have been lulled into a false sense of stability. The large projects, such as GNU and Apache, have their own repositories and aren't going anywhere. And the early “public” repositories, such as SourceForge and Maven Central, adopted a policy that “once you publish something here, it will never go away.” But newer repositories don't have such a policy, and as we saw with left-pad in 2016 and chef-sugar in 2019, authors are willing and able to pull their work down.

At the same time, companies such as Mongo and Elastic.NV found that releasing their core products as open-source might not have been such a great idea. Software-as-a-service companies such as AWS are able to take those products and host them as a paid service, often making more money from the hosting than the original companies do from the services they offer around the product. And in response, the product companies have changed the license on their software, attempting to cut off that usage (or at least capture a share of it).

Looking at both behaviors, I can't help but think that one of the core tenets of the GNU manifesto has been forgotten: that the developers of open-source software do not have the right to control its use. Indeed, the Manifesto is quite clear on this point: “[programmers] deserve to be punished if they restrict the use of these programs.”

You may or may not agree with that idea. I personally believe that creators have the right to decide how their work is used. But I also believe that releasing your work under an open-source license is a commitment, one that can't be retracted.

Regardless of any philosophical view on the matter, I think there are two practical outcomes.

The first is that companies — or development teams — that depend on open-source software need to ensure their continued access to that software. Nearly eight years ago I wrote about using a local repository server when working with Maven and Java. At the time I was focused on coordination between different development teams at the same company. If I were to rewrite the post today, it would focus on using the local server to ensure that you always have access to your dependencies.

A second, and less happy change, is that I think open-source creators will lose the ability to control their work. One way this will happen is for companies whose products are dependent on open-source to provide their own public repositories — indeed, I'm rather amazed that Chef doesn't offer such a repository (although perhaps they're gun-shy after the reaction to their hamfisted attempt to redistributed chef-sugar).

The other way this will happen is for service-provider companies to fork open-source projects and maintain their own versions. Amazon has already done this, for Elasticsearch and also OpenJDK; I don't expect them to be the only company to do so. While these actions may damage the companies' reputations within a small community of open-source enthusiasts, the much larger community of their clients will applaud those actions. I can't imagine there are many development teams that will say “we're going to self-host Elasticsearch as an act of solidarity”; convenience will always win out.

If you're like me, a person responsible for a few niche open-source projects, this probably won't matter: nobody's going to care about your library (although note that both left-pad and chef-sugar at least started out single-maintainer niche projects). But if you're a company that is planning to release your core product as open-source, you should think long and hard about why you want to do this, and whether your plan to make money is viable. And remember these words from the GNU Manifesto: “programming will not be as lucrative on the new basis as it is now.”

Tuesday, October 8, 2019

Writing JSON Logs from a Python Lambda

My previous post looked at a Lambda function that would accept log messages from one Kinesis stream, transform them into JSON, and write them to another Kinesis stream, from which they would be inserted into an Elasticsearch cluster. In that post I skipped over the actual transformation code, other than saying that it would pass existing JSON objects unchanged, and recognize the Lambda Python log format.

However, I'd prefer not to perform this translation: I want to write log messages as JSON, which will allow me to easily add additional information to the message and use searches based on specific details. With the Python logging facility, it's easy to make this happen.

Unless you're running on Lambda.

The linked example assumes that you can configure logging at the start of your main module. However, Lambda configures logging for you, before your code gets loaded. If you print logging.root.handlers, from a Lambda, you'll see that there's already a LambdaLoggerHandler installed. To use my formatter, you need to update this handler:

for handler in root.handlers:
    handler.setFormatter(JSONFormatter())

While I could have replaced this handler entirely, with one that wrote to StdErr, I was concerned that it performed some undocumented magic. One thing that I did discover, although it doesn't quite count as magic, is that it inserts the Lambda's request ID in the LogRecord before passing it to the formatter.

By not using that piece of information, I require that the formatter be initialized for every Lambda invocation, so that it can get the request ID out of the invocation context. This also allows me to pull out the Lambda name and version, or any of the other documented context attributes.

If you also want JSON-formatted logs from your Lambdas, I've included a documented example here.

Thursday, September 12, 2019

Streaming CloudWatch logs to Elasticsearch, Part 2: Teh Codez

My last post described my different attempts at incorporating CloudWatch Logs events into an Elasticsearch-based logging pipeline. This post looks at the actual code. To avoid boring you, it just touches on the main points; if you want the whole thing go here.

Creating the subscription

The first step in streaming your CloudWatch Logs is to create a subscription. As I mentioned in my last post, you can't do this via the AWS Console. The Amazon documentation walks you through creating the subscription via the CLI, or you can use the CloudFormation template from my project.

One important note: the stream has to be active before you can create a subscription; if you create the stream and then immediately try to create the subscription, it will fail. This is why I have a separate templates to create the stream and subscribe to it (it's also easier to replicate the subscription when it's a separate template).

The messages are written to the stream as JSON. Each message identifies the log group and stream, and contains a list of log events:

{
 "messageType": "DATA_MESSAGE",
 "owner": "012345678901",
 "logGroup": "AppenderExample",
 "logStream": "Example-20190825",
 "subscriptionFilters": ["CloudWatchSubscription-Subscription-184MSZXRD5MCP"],
 "logEvents": [{
  "id": "34939504216003552676187468284442890586332374987778818048",
  "timestamp": 1566741555269,
  "message": "some message\n"
 }, {
  "id": "34939504216092755656981590777009033459422968433802739713",
  "timestamp": 1566741555273,
  "message": "another message\n"
 }, {
  "id": "34939504239575440351034336945046144800521693099600117762",
  "timestamp": 1566741556326,
  "message": "still another\n"
 }]
}

Processing the input messages

When Kinesis invokes a Lambda, it packages multiple records into a single invocation event, with the original message GZipped and Base64-encoded:

{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "ea05911b7285b29393412d960053f81d",
                "sequenceNumber": "49598889268738564796327312231530438478076180289718059010",
                "data": "H4sIANoVbV0AA42QW2vCQBCFn5NfIfusMLs7e5m+BbU+CaURWtpKSXXRQExCsvaC+N+7sSn0QqUvC7PnMOebc4gjtnNtm23c4q127GLAJskieZxP0zSZTdkw6NVL6ZpOAS4kKm0sAT8pRbWZNdW+7sSkrl25ds30NdvVhfvUU9+4bNcZemEkgBNYoU6Odv/Urpq89nlVXuaFd00bvPdsXFT79U3mV9v0i2P0beAW5+nd7fVEzcdXbNkHTp9d6U9LDnEUsXzdZUskSQpQcA0glRLaaG4NaissIopwkbJaSiENkjXGWMstoO0YI+bzUJEP9GEVV1ob5KrbQSe1r6+LaaudG/TzQ8ni6Dgc/EFBwiillSbLFUFIBCCQoV5CIUhblNKCMJIMl2cpjPxJkZWV37rmPyDhNQoxVMJBhkhNGL41R7QAKnCSBKLQGefGaHEGREuhf9Xh86IY9DgfGMv4GL8DWgIaCnQCAAA=",
                "approximateArrivalTimestamp": 1566741561.188
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49598889268738564796327312231530438478076180289718059010",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::012345678901:role/DefaultLambdaProfile",
            "awsRegion": "us-east-1",
            "eventSourceARN": "arn:aws:kinesis:us-east-1:012345678901:stream/CloudWatchSubscriptionDestination"
        },
        ...
    ]
}

To transform the input events, you need to iterate over each of the top-level records, extract its payload, and then iterate over each of the child events to transform them. In order to simplify the output code, my process_input_record() function returns just the transformed child events, so that the output code can work on a “flattened” array.

def lambda_handler(event, context):
    outputMessages = []
    for record in event['Records']:
        outputMessages = outputMessages + process_input_record(record)
    logging.info(f'total number of messages to output: {len(outputMessages)}')
    logging.debug(f'output messages: {json.dumps(outputMessages)}')
    write_to_kinesis(outputMessages)


def process_input_record(record):
    try:
        payload = record['kinesis']['data']
        decoded = gzip.decompress(base64.b64decode(payload))
        data = json.loads(decoded)
        message_type = data.get('messageType')
        if message_type == 'DATA_MESSAGE':
            logGroup = data['logGroup']
            logStream = data['logStream']
            events = data.get('logEvents', [])
            logging.info(f'processing {len(events)} events from group "{logGroup}" / stream "{logStream}"')
            logging.debug(f'input messages: {json.dumps(events)}')
            return [transform_log_event(logGroup, logStream, event) for event in events]
        elif message_type == 'CONTROL_MESSAGE':
            logging.info('skipping control message')
        elif message_type:
            logging.warn(f'unexpected message type: {message_type}')
    except:
        logging.error(f'failed to process record; keys = {record.keys()}', exc_info=True)
    # fall-through for any unprocessed messages (exception or unhandled message type)
    return []

One thing to note about process_input_record() is the if statement at its core. CloudWatch Logs writes two different types of messages: most of the messages are data messages, which contain log events. However, when you first subscribe the log group to a stream, it writes a control message to verify that it has permission to do so. There shouldn't be any other message types, but if AWS adds one we'll ignore it. I also wrap the entire function in a try block, in case something other than a valid Logs message ends up on the stream.

Each event in the record gets passed through transform_log_event(), not shown here, which performs the following transformations:

  • If the message isn't already JSON, it's transformed into JSON with a `timestamp`, `message`, and `level` fields. This makes the message compatible with those written by my logging adapter.
  • If the message looks like a Lambda status message, it's parsed, and relevant data (eg, request ID, execution duration, and memory used) is extracted and stored in a child object.
  • If the message looks like Lambda logging output, it is parsed and the components (level, timestamp, request ID, and message) are stored in separate fields. Note: my transform only supports Python log output; the Node.JS runtime outputs fields in a different order.
  • The source log group and stream are added, as a sub-object.

If you want to add your own transformations — such as support for Node.JS logs — this is the place to do it.

Writing messages to Kinesis

As I've written elsewhere, writing to Kinesis is easy to do, but difficult to do right:

  • For performance, you should use the PutRecords API rather than PutRecord.
  • Which means that you must create batches of records that fit into the limits of this API (1 MB or 500 messages).
  • You have to be prepared for your entire call to be throttled, and resend after a delay.
  • And you have to be prepared for individual records within the batch to be throttled due to load on the destination shard, and resend just them.

The top-level write_to_kinesis() function repeatedly processes batches of records, with a sleep between batches to minimize throttling. The process_batch() function returns any messages that it couldn't send, for the next time through the loop. As I note, there shouldn't be more than one batch, unless you've configured the Lambda trigger to process lots of records or your destination is overloaded.

def write_to_kinesis(listOfEvents):
    records = prepare_records(listOfEvents)
    while records:
        records = process_batch(records)
        if (records):
            time.sleep(2) # an arbitrary sleep; we should rarely hit this
    return

The process_batch() function is small but needs the most explanation. It starts with a call to build_batch(), which splits the source list of records into two parts: the first part contains all of the records that will fit in a single PutRecords call, the second is everything left over. Next is the actual call. Finally, process_response() looks at the result of the call to find any records that were throttled; these are added

def process_batch(records):
    toBeSent, toBeReturned = build_batch(records)
    logging.info(f'sending batch of {len(toBeSent)} records with {len(toBeReturned)} remaining')
    try:
        response = kinesisClient.put_records(
            StreamName=kinesisStream,
            Records=toBeSent
        )
        return process_response(response, toBeSent) + toBeReturned 
    except kinesisClient.exceptions.ProvisionedThroughputExceededException:
        logging.warn(f'received throughput exceeded on stream {kinesisStream}; retrying all messages')
        return toBeSent + toBeReturned

I'm going to wrap up by looking at the process_response(), function in detail, because it's where the trickiness lies. The response for PutRecords contains an array of result objects, matched to the request records that you passed in. In the happy path, each of these result objects contains the sequence number that Kinesis assigned to the record (and FailedRecordCount is 0). In the unhappy path, some of the records fail but others succeed; the result object for the failed records will contain an error code.

Making things a little more tricky, this error code can either indicate that the shard throttled the record, in which case we re-send it, or an internal error happened. The API docs don't go into details on exactly what an internal error is, or whether it's recoverable, so I assume that it isn't and drop the record.

def process_response(response, records):
    if response['FailedRecordCount'] == 0:
        return []
    
    result = []
    droppedRecordCount = 0
    for ii in range(len(response['Records'])):
        entry = response['Records'][ii]
        errorCode = entry.get('ErrorCode')
        if errorCode == 'ProvisionedThroughputExceededException':
            result.append(records[ii])
        elif errorCode:
            droppedRecordCount += 1
    
    if droppedRecordCount > 0:
        logging.warn(f'dropped {droppedRecordCount} records due to Kinesis internal errors')
    if len(result) > 0:
        logging.info(f"requeueing {len(result)} records due to throughput-exceeded")
    
    return result

So, that's it. With a few adjustments to match your logging format (and, really, if you can just log in JSON), this will let you centralize your logs in Elasticsearch, even if AWS really, really wants you to use CloudWatch.