Showing posts with label logging. Show all posts
Showing posts with label logging. Show all posts

Tuesday, January 3, 2023

Using EventBridge Pipes to route CloudWatch Logs events to Kinesis

Several years ago I wrote about connecting CloudWatch Logs into into a logging pipeline built on Kinesis Streams and Kinesis Firehose. To make that work, I wrote a Lambda.

AWS announced EventBridge Pipes at re:Invent 2022, as an easy way to connect messaging services. Connecting CloudWatch Logs to Kinesis seemed like a good way to kick the tires on this service. It still requires a custom Lambda, but that Lambda is now much simpler, because it doesn't need to concern itself with the quirks of PutRecords.

The Problem

When you subscribe a Kinesis stream to a log group, CloudWatch writes messages that look like this:

H4sIAAAAAAAAA62QTU7DMBCF95zC8jqO7LHd2NkFNVRCdNWyggqF1gqG/MlxW6qqd2cK7QWA5bw382a+OdLWjWNVu+VhcDSn02JZvMzLxaKYlTSh/b5zAWUBUulJZiwXgHLT17PQbwd0imFw3caF8rNqh8b9mIsYXNWi+9DX6l6wi8mAAwgAxXx88413HZNaWDHBqXH7Oq6DH6LvuzvfRBdGmj/Ra+zqO7fcuS6e9SP1G4yXGRgltdLcGK41SKu0AQ2KC55hrYTNsJCWc5CCW5CgteEG90WP4BHDaS4mmTDGAOKBTa4PwfjzuUwAA0W4yUHl0iTYQqbl7eOMPLkLFV+Rdd+mH5s6uLoPhxQvVe9ptR/TS0s6r3xHGNlVzdYRPxINzx09JX/DsP+LIX6LsTrdfAHRwAP2RwIAAA==

That's a GZipped and Base64-encoded JSON object. When decoded, unzipped, and pretty-printed, it looks like this:

{
  "messageType": "DATA_MESSAGE",
  "owner": "123456789012",
  "logGroup": "AppenderExample",
  "logStream": "Log4J1-Example-20221224-ithilien-351916",
  "subscriptionFilters": [
    "Example"
  ],
  "logEvents": [
    {
      "id": "37284354508805523945825240107552419724039002310923255808",
      "timestamp": 1671888278929,
      "message": "2022-12-24 08:24:38,929 DEBUG [example-0] com.kdgregory.log4j.aws.example.Main - value is 52\n"
    },
    {
      "id": "37284354508805523945825240107552419724039002310923255809",
      "timestamp": 1671888278929,
      "message": "2022-12-24 08:24:38,929 DEBUG [example-1] com.kdgregory.log4j.aws.example.Main - value is 52\n"
    }
  ]
}

As you can see, CloudWatch combines multiple log events into a single message. However, most destinations — such as Elasticsearch — want individual events, not a nested array. So you need something that pulls apart the original event. For example, the AWS-provided OpenSearch integration, which subscribes a Lambda to the log group, decomposes the messages, and writes them to the cluster. My previous implementation did a similar transformation, but wrote the log messages to a Kinesis stream.

EventBridge Pipes

EventBridge Pipes is modeled around a four-step process (image sourced from docs):

EventBridge Pipes data flow

For simple pipelines, EventBridge gives you a “no code” solution that eliminates the middle two steps: you define an output format using path references to fields in the input messages. Unfortunately, this isn't usable for CloudWatch Logs messages: it can't (currently) handle Gzipped data, and can't extract the elements in an array. So we need to add an “enrichment” step that performs this operation.

Implementation

You can implement the enrichment step in multiple ways, including an HTTP(S) API, but the easiest is a Lambda. Pipes invokes this Lambda with an event that contains one or more source records, and it returns returns zero or more result records.

The source event looks like this:

[
  {
    "eventSource": "aws:kinesis",
    "eventVersion": "1.0",
    "eventID": "shardId-000000000000:49636526735590005452916707570422271019264002608211165186",
    "eventName": "aws:kinesis:record",
    "invokeIdentityArn": "arn:aws:iam::123456789012:role/CloudWatchTransformPipeline-PipeRole-us-east-1",
    "awsRegion": "us-east-1",
    "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/log-subscription",
    "kinesisSchemaVersion": "1.0",
    "partitionKey": "87362ce96c65eb1b3125bf0129b99bee",
    "sequenceNumber": "49636526735590005452916707570422271019264002608211165186",
    "data": "...",
    "approximateArrivalTimestamp": 1672490859.613
  }
]

The first thing to call out is that this event is an array. EventBridge Pipes can batch messages together, so you must loop over the entire event and combine the results from each message (ie, a flat-map):

def lambda_handler(event, context):
    result = []
    for message in event:
        result += transform_messages(message)
    return result

The data element is the GZipped and Base64-encoded message that I showed above. You process it with code that looks like this:

data = base64.b64decode(message['data'])
data = gzip.decompress(data)
payload = json.loads(data)

The last thing to be aware of is that there are two types of messages that you'll get from a CloudWatch Logs subscription, identified by the messageType field. When the subscription first starts, CloudWatch sends a control message to the stream to verify that it can write messages. After that, it sends data messages. Your Lambda should ignore everything but the latter.

if payload['messageType'] == 'DATA_MESSAGE':
    # extract log events
else:
    return []

Conclusion

This post has shown you the basics of an EventBridge Pipes transform. You can find my complete example, which performs several transforms on the log events, here.

While I don't think that EventBridge Pipes quite lives up to the hype of Werner Vogels' keynote (and that it has nothing in common with EventBridge other than a name), it does allow you to focus on your logic rather than the quirks of your source and destination. As such, it's a useful addition to your data transformation toolbox.

Wednesday, December 14, 2022

Deprecating log4j-aws-appenders

I just made what I hope is the last release of my log4j-aws-appenders library. It started almost six years ago as a "weekend project" to produce a Log4J 1.x appender that wrote output to CloudWatch Logs. It has expanded over the years: I added Kinesis Streams and SNS as destinations, added support for Log4J 2.x and Logback, added support for the AWS 2.x Java SDK, and maintained the code as something I could be proud of.

And other people found it useful: it's received a handful of GitHub stars, and sees around 30,000 downloads a month from Maven Central (overwhelmingly for Log4J 1.x!).

But I think it's outlived its usefulness, and there are better options.

I'll start with an explanation of why I wrote it. At the time, I was working on a project that had around a hundred EC2 virtual machines running pre-baked AMIs, and using the AWS-provided logging agent to write to CloudWatch. The pre-baked AMI meant that everything went to the same log stream: we didn't configure separate streams for separate instances. The agent wasn't (at that time) very good at recognizing the boundaries between log messages, and the multi-threaded nature of our servers meant that messages would be interleaved anyway. Throw in CloudWatch's limited ability to filter messages, and the result was a mess.

As a result, I had two explicit goals for the first release: ensure that a Java LogEvent was written as a single CloudWatch Logs event (including stack traces), and allow different instances to write to different log streams. And this was better, but not great, because it still meant interleaved messages from different threads. So I implemented the Kinesis appender, to send log messages to Elasticsearch. I don't think my boss liked the monthly cost of a six-node Elasticsearch cluster, but the other developers liked the ability to quickly isolate a single web request and all of its related operations.

So why deprecate it? The answer is that the AWS world has moved forward in the past six years. CloudWatch is the default logging destination for Lambda and ECS, and the CloudWatch agent for EC2 has gotten much better. And Insights has made searching CloudWatch much easier, especially if you write log messages in JSON.

So rather than swim against the flow, I recommend that you use CloudWatch Logs rather than my library. For Lambda (and ECS or EKS with the awslogs driver), just write to Standard Error; it will be picked up by the runtime and sent to CloudWatch. For EC2, use the logging agent, configured to parse individual events. If you want to use Elasticsearch, you can use a subscription to get the data out of CloudWatch Logs (something that's become easier with EventBridge Pipes, the topic of an upcoming post). Or you can use an HTTP appender (available in all the major logging frameworks), and wire up your own pipeline using API Gateway's Kinesis integration (something else that wasn't available six years ago).

With all of these options, my library is superfluous. And while I think it's mature, stable, and bug-free, using it exposes your project to the risk of having a single volunteer maintainer responsible for a key part of your infrastructure.

Thursday, February 24, 2022

Leveraging try-with-resources to manage the mapped diagnostic context

The mapped diagnostic context is an under-appreciated feature of logging libraries. It's essentially a thread-local Map, to which you add key-value pairs that provide context about what your program is doing. Here's a trivial example:

logger.info("program starting");
File file = new File(argv[0]);

MDC.put("outputFile", file.getAbsolutePath());

try (FileOutputStream fos = new FileOutputStream(file);
        OutputStreamWriter osr = new OutputStreamWriter(fos, StandardCharsets.UTF_8);
        BufferedWriter out = new BufferedWriter(osr))
{
    logger.info("starting to write");
    out.write("hello, world");
    out.newLine();
    logger.info("finished writing");
}

MDC.remove("outputFile");
logger.info("program finished");

When you run this program, using a logging config that outputs the MDC, you'll see something like this:

2022-02-23 07:28:01,673 INFO  [main] c.kdgregory.example.Main -  - program starting
2022-02-23 07:28:01,677 INFO  [main] c.kdgregory.example.Main - outputFile=/tmp/test.txt - starting to write
2022-02-23 07:28:01,677 INFO  [main] c.kdgregory.example.Main - outputFile=/tmp/test.txt - finished writing
2022-02-23 07:28:01,677 INFO  [main] c.kdgregory.example.Main -  - program finished

Web-apps can benefit enormously from the MDC: before processing the request, you extract things like the invoking user, request path, session token, or a unique request ID. This makes it easier to track down problems, because you can filter the log by any of those parameters.

But I've recently been doing a lot of data engineering work: reading files or database tables, performing some transformation on them, and sending them to some destination. These tasks tend to be complex, with multiple steps, and they tend to be performed for multiple files or tables at a time. As with a web-app, it's really nice to add contextual information such as table name and selection criteria to the MDC, and let the logging framework write it with every message.

The problem with the MDC is that every put() has to be paired with a remove(). If you forget to remove a key it will “pollute” every subsequent log message from that thread, giving you potentially misleading information.

However, one thing that I noticed is that I tend to set the MDC just before a try-with-resources block. Or inside a loop that calls one or more functions that could be wrapped in such a block. This led me to the AutoMDC class:

logger.info("program starting");
File file = new File(argv[0]);

try (AutoMDC mdc = new AutoMDC("outputFile", file.getAbsolutePath());
        FileOutputStream fos = new FileOutputStream(file);
        OutputStreamWriter osr = new OutputStreamWriter(fos, StandardCharsets.UTF_8);
        BufferedWriter out = new BufferedWriter(osr))
{
    logger.info("starting to write");
    out.write("hello, world");
    out.newLine();
    logger.info("finished writing");
}

logger.info("program finished");

Try-with-resources works with classes that implement java.lang.AutoCloseable: an interface that declares the close() method. My AutoMDC class implements this interface, and uses a Set<String> to keep track of the keys. It provides two methods (and the “convenience” constructor shown above):

public AutoMDC put(String key, Object value)
{
    keys.add(key);
    MDC.put(key, String.valueOf(value));
    return this;
}

@Override
public void close() throws Exception
{
    for (String key : keys)
    {
        MDC.remove(key);
    }
}

That's it: you can add items to the MDC in the try-with-resources initializer, or anywhere inside the try block itself, and they'll be removed at the end. One downside is that the close() method is called before any catch or finally blocks; if you need to log contextual information in those places, you'll need to do it manually.

If you'd like to use this in your own programs, I've provided an implementation for Logback as a GitHub gist; the same basic code can be adapted to whichever logging framework you use.

I think the idea of leveraging try-with-resources is a useful one for other situations, especially those where you might otherwise turn to aspect-oriented programming. Plus, it encourages the good habit of wrapping your code in try/catch blocks.

Saturday, December 18, 2021

My take on the Log4J 2.x vulnerability

A week ago, a lot of Java application developers learned that their applications harbored a severe vulnerability, courtesy of the Log4J 2.x logging framework. The vulnerability, CVE-2021-44228 allowed execution of arbitrary code on any system that logged unsanitized user input using the Log4J 2.x library. This prompted a lot of people to scramble to protect their systems, and a lot of people to take to Twitter with opinions of What It All Means.

As the maintainer of an open-source library that integrates with Log4J, I spent my Saturday morning understanding the bug and updating my examples to use versions of the library that mitigated the problem. Fortunately, my library is not directly affected, as long as its consumers don't use unsanitized data to configure the logging framework.

Having done that, and reading as much as I could on the issue (the LunaSec writeups are excellent), I've decided to voice my own thoughts on the matter. Some of which I haven't seen other people say, so they may be similarly interesting to you.

First, I think that it's important to recognize that this vulnerability — like most things that end up in the news — is not the result of a single issue. Instead, it's a combination of features, some of which make perfect sense:

  1. Log4J 2.x provides string interpolation for configuration (aka “lookups”)

    This is a great idea; I implemented it for my own logging library. The idea is that you don't want your configuration files to contain hardcoded values, such as the address of your central syslog daemon (or, in my case, the name of a CloudWatch log group). Log4J 2.x provides a long list of lookups, ranging from environment variables to information from the Docker container running your application.

  2. One of the lookups retrieves data from the Java Naming and Directory Interface (JNDI) API

    In large deployments, it's nice to be able to centralize your configuration. There are lots of tools to do this, but Java Enterprise Edition settled on JNDI, also known as the javax.naming package. JNDI is an umbrella API for retrieving data from different sources, such as LDAP.

  3. javax.naming.spi.ObjectFactory supports loading code from a remote server

    This is a somewhat dubious idea, but the JNDI SPI (service provider interface) spec justifies it: it lets you defined resources (such as printer drivers) that can be retrieved directly from JNDI. They use the example of a printer driver, and it should be a familiar example to anyone who has installed a printer on their home computer (do you know that the driver you installed is trustworthy?).

    Note: this condition is necessary for remote-code execution, but not for data exfiltration.

  4. The Log4J 2.x PatternLayout also supports string interpolation, with the same set of lookups

    Here we're getting into questionable features in the library. I had no idea that this behavior existed, even though I dug deeply into the documentation and source code when implementing my appenders. The documentation for this layout class is quite long, and prior to the vulnerability, you had to infer the behavior based on the presence of the nolookups pattern option.

  5. Users log unsanitized data

    If users passed all of their logging messages through a sanitizer that looked for and removed the sequence ${, then the vulnerability wouldn't exist. Except nobody does that, because why would you have to? I call it out because I think it's important to consider what you're logging, and whether it might contain information that you don't want to log. As I say in my “effective logging” talk, passwords can hide in the darndest places.

These are the things that need to happen to make this vulnerability affect you. If you can prevent one of them from happening, you're not vulnerable … to this specific issue. And there are ways to do this, although they aren't things that you can easily implement once the vulnerability is discovered. I'm writing a separate post on how Cloud deployments can mitigate similar vulnerabilities.

For now, however, I want to focus on two of the responses that I saw online: pay for open-source maintainers, and inspecting code before using it. In my opinion, neither of these are valid, and they distract from preparing for the next serious vulnerability.

Response #1: inspect code before you use it.

For any non-trivial application, this is simply impossible.

The current “trunk” revision of log4j-core has 89,778 lines of code, excluding test classes (measured using find and wc). That doesn't count any add-on libraries that you might use to write to your preferred destination, such as my log4j-aws-appenders (which has over 10,000 lines of mainline code). And logging is a tiny part of a modern application, which is typically built using a framework such as Spring, and runs on an application server such as Tomcat or Jetty.

And even if your company is willing to pay for the staff to read all of that source code, what is the chance that they will discover a vulnerability? What is the chance that they will even learn all of its behaviors? I've spent quite a bit of time with the Log4J 2.x code and documentation, both to write my library and on a post describing how to write Log4J plugins, and I never realized that it applied string interpolation to the raw log messages. After I knew about the vulnerability, of course, it was easy to find the code responsible.

Response #2: we — or at least large companies — should be paying the maintainers of open-source projects.

I don't know the situation of the core Log4J developers, and whether or not they get paid for their work on the project. But I don't believe that this vulnerability was the result of an overworked, unpaid, solo developer. True, there was no discussion on the commit that introduced JNDI lookups (ignoring the tacky post-hoc commenters). But the code that applies string substitution to logging events has been in the library since the 2.0 release, and has been rewritten multiple times since then.

In fact, I think direct corporate sponsorship would lead to more unexpected behavior, because the sponsoring corporations will all have their own desires, and an expectation that those desires will be met. And if my experience in the corporate world is any guide, a developer that feels their paycheck is in jeopardy is much more likely to do something without giving it full thought.

So where does that leave us?

Unfortunately, I haven't seen very many practical responses (although, as I said, I'm writing a separate post to this effect). And I think that the reason for that is that the real answer puts the onus for vulnerabilities on us, the consumers of open-source software.

Linus's Law, coined by Eric S Raymond, is that “given enough eyeballs, all bugs are shallow.” And this played out in full view with this vulnerability: the Log4J team quickly found and patched the problem, and has been releasing additional patches since.² There have also been multiple third-parties that have written detailed evaluations of the vulnerability.³

But still, we, the consumers of this package, needed to update our builds to use the latest version. And then deploy those updated builds. If you didn't already have a process that allows you to quickly build and deploy a hotfix, then your weekend was shot. Even if you did get a hotfix out, you needed to spend time evaluating your systems to ensure that they weren't compromised (and beware that the most likely compromise was exfiltration of secrets!).

It's natural, in such circumstances, to feel resentment, and to look for external change to make such problems go away. Or maybe even to think about reducing your reliance on open-source projects.

But in my opinion, this is the wrong outcome. Instead, look at this event as a wake-up call to make your systems more robust. Be prepared to do a hotfix at a moment's notice. Utilize tools such as web application firewalls, which can be quickly updated to block malicious traffic. And improve your forensic logging, so that you can identify the effects of vulnerabilities after they appear (just don't log unsanitized input!).

Because this is not the last vulnerability that you will see.


1: You might be interested to learn that the Apache Software Foundation receives approximately $3 million a year (annual reports here). However, to the best of my knowledge, they do not pay stipends to core maintainers. I do not know how the core Log4J 2.x maintainers earn their living.

2: Log4J 2.x change report.

3: I particularly like the CloudFlare posts, especially the one that describes how attackers changed their payloads to avoid simple firewall rules. This post informed my belief that the goal of most attacks was to exfiltrate secrets rather than take over systems.

Sunday, June 21, 2020

A History of Java Logging Frameworks, or, Why Commons-Logging is Still So Common

In the beginning there was System.out.println(). Or, for those of us who were purists (or wanted our output immediately), System.err.println(). And every developer had his or her own preferences for how their log messages would appear, leading to some very messy output as the number of developers on a team increased.

To bring order to this chaos, many projects implemented their own logging framework. I worked on one such project in the late 90s: the framework consisted of a single class that implemented a static log() method. I can't remember what the logging output looked like, but I suspect that it included a consistently-formatted timestamp.

According to this article by Ceki Gülcü, a project that he was working on in 1996 also implemented their own logging framework. But unlike the project I worked on, its framework was released to the public in 1999 as Log4J.

Something else that became public in 1999 was the Jakarta project, a collaboration between Sun and the Apache Software Foundation to produce the Tomcat application server. And of course Tomcat, being a large application with contributions by many people, had its own logging framework (and it still does, although the implementation and purpose has changed over time).

And lastly, 1999 was also the year that JSR 047, the Java Logging API Specification, was released. It turned into the java.util.logging (JUL) package, released as part of JDK 1.4 in 2002.

A plethora of logging frameworks isn't a problem if you're an application developer: you pick one and stick with it. If you're developing a library that might be used by those applications, however, it's a nightmare. If your library uses Log4J and the application uses JUL, then the output becomes a mess and the developers using your library complain.

At the time, the Jakarta project was arguably the largest single producer of libraries for Java application developers, so they added another: Jakarta Commons Logging (since renamed to Apache Commons Logging, but you'll still see the initials "JCL" in the documentation). The idea of Commons Logging was that you would write your code against the JCL API, add the JCL JAR to your dependencies, and it would figure out what actual logging framework you were using.

Although Commons Logging was intended for libraries, application developers adopted it as well. I can't speak for anyone else, but I looked at it as “won't hurt, and means I don't need to keep track of multiple APIs.” Unfortunately, some developers discovered that it could hurt: they were using Tomcat, regularly redeploying their applications, and had memory leaks that would eventually cause Tomcat to stop working.

Looking back, it appears that these leaks were due to missing transitive dependencies in the deployment bundle.* This took place in the days before Maven 2, when developers were responsible for identifying every JAR that went into their application, and ensuring that it somehow got there (which often meant lots of JARs checked into source control). It wouldn't be obvious that a library used Commons Logging, so the application developer wouldn't bother to add it to the deployed WAR. Unfortunately, Tomcat made it available on the system classpath (because it used Commons Logging internally), so the developers never knew they were missing the JAR. And since Commons Logging needed to know about the actual deployed logging framework, it would establish a strong reference to the Log4J implementation that was in the WAR, preventing the classloader from unloading the classes belonging to the WAR.

That problem was rather quickly resolved: Commons Logging version 1.1 was released in 2006, Tomcat 6 moved it off the public classpath (although Tomcat 5 remained “in the wild” for quite some time), and Maven 2 ensured that a WAR would contain all of the dependencies that it needed. But developers have very long memories for things that go wrong, especially things that happened to someone else who blogged about it.**

At the same time, several popular Java frameworks appeared; Hibernate in 2001 and Spring in 2002 are two of the most familiar. These frameworks were complex enough to need logging, but for obvious reasons wouldn't want to be tied to a specific implementation. Commons Logging provided that capability (and thus became an untracked dependency for many builds).

Moving forward, the new millennium saw continued change in the logging world. Log4J became an Apache project. Ceki Gülcü left Apache and developed Logback and SLF4J. And in the early 2010s, the remaining Apache Log4J committers decided that the Log4J 1.x implementation couldn't be extended and completely rewrote it as Log4J 2.x.

Of these, SLF4J is particularly interesting because it was a logging facade, in direct competition with Commons Logging. Unlike Commons Logging, which tried to infer what underlying framework you were using, SLF4J required you to explicitly include “bridge” JARs for your actual implementation. SLF4J also provided additional features, such as formatted log messages, that were very attractive to application developers.

However, adopting SLF4J had its own pain point: if you used Spring (or Hibernate, or any other library that dependent on Commons Logging), Maven would add it to your build as a transitive dependency. Where it might take precedence over the “slf4j-jcl” bridge from SLF4J (it all depended on the order that JARs were given to the classloader). A key feature of Maven POMs from this era are multiple <exclusions> to prevent such transitive dependencies.

So here we are in 2020, and the logging frameworks scene is more complex than ever:

  • Log4J 1.x is still used by many projects, even though it was officially end-of-lifed in 2015. One of its most useful features doesn't work under Java 9 (and, I presume, later versions), so its popularity may fade (although it seems that many people, particularly those using OpenJDK, are quite happy with Java 8).
  • SLF4J/Logback is still used by many developers (including myself), even though new releases seem to have stalled at the 1.3.0-alpha stage (after 25 years of writing logging frameworks, I'm guessing Ceki is in need of a break).
  • Log4J 2.x provides “bridge” JARs that let people use Commons Logging and SLF4J as their API, with Log4J2 as the back-end.
  • Commons Logging still exists, but hasn't seen a release since 2014. Nor has its list of supported frameworks changed: Log4J 1.x, JUL, and Avalon LogKit.

Perhaps counter-intuitively, even with all of these changes, Commons Logging is still used by many libraries. However, it's become less visible. Spring Framework, for example, implements the API internally; as an application developer, you no longer need to explicitly exclude the JAR. And if you use Spring Boot, its a 3,000+ line dependency-management POM will explicitly exclude Commons Logging from the libraries that use it.

If you're developing a library, I think that Commons Logging is still the best choice for internal logging. It provides a consistent interface, and it's reasonable to expect that the consumers of your library already have the bridge JARs that they need (which might mean the internal implementation in Spring Framework). But there are a few best practices to keep your users from cursing you:

  • Mark your dependency as provided. This tells Maven (or Gradle, or any of the other tools that follow the Maven standard) not to resolve the transitive dependency; it will rely instead on an explicitly-referenced JAR to provide the necessary classes.
  • Ensure that you don't establish a transitive dependency via a package that you depend on, like HTTP Components. Take the time to look at your entire dependency tree (using mvn dependency:tree or equivalent), and add an exclusion if anything tries to pull in Commons Logging.
  • Don't implement your own logging facade. It's tempting, I know: you want to protect the people that haven't configured logging into their application. And it seems easy: two classes (a log factory and a logger), with some reflection to pick an appropriate back end. But there's a lot of room for error. If you get it wrong, you'll introduce subtle bugs and performance issues, and your experienced users will curse you (and look for an alternative library). And if you get it right, you'll find that you've re-implemented Commons Logging. And good or bad, it won't actually help inexperienced users: they should learn to use logging rather than deploy a black box and cross their fingers.

Bottom line: if you're writing an application, use whatever logging framework you want. I have a strong preference for SLF4J/Logback, but Log4J 2.x does have some features that it's missing. However, if you're implementing a library, stick with Commons Logging. In almost every case it will Just Work™.


* Open Source Archaeology can be a fascinating experience. It's next to impossible to find a source-code repository that gives a full history for older projects: they typically started on CVS, then moved to SVN, and are now on GIT. In some cases, moving between hosting providers (although it looks like Hibernate is still actively maintained on SourceForge, which makes me feel better about a couple of my older projects). Each move lost information such as version tags (assuming they ever existed).

Maven Central is also less helpful than expected, because many projects changed their group or artifact IDs over their lifetime (plus, who wants to dig through 55 pages of org.springframework JAR listings). And a lot of older versions are “retconned”: they were imported into the repository long after release, with a made-up, minimal POM.

Fortunately, most of the larger projects maintain their own archives, so you can download a particular version and unpack it to see what's there. And if you're looking for dependencies, you can pick a likely class and run javap -c to disassemble it and then look at variable declarations. It's a lot of work, and some might call it obsessive. But that's how I learned that Spring Framework 1.0 didn't log at all, while 1.2 used Commons Logging.

** They're also very vocal about it. I wrote an article on logging techniques in 2008 that referenced Commons Logging (because in my world it wasn't an issue), and constantly received comments telling me that it was horrible and should never be used. I eventually updated the article to reference SLF4J, primarily because it was focused at application logging, but also to keep the critics quiet.

Tuesday, October 8, 2019

Writing JSON Logs from a Python Lambda

My previous post looked at a Lambda function that would accept log messages from one Kinesis stream, transform them into JSON, and write them to another Kinesis stream, from which they would be inserted into an Elasticsearch cluster. In that post I skipped over the actual transformation code, other than saying that it would pass existing JSON objects unchanged, and recognize the Lambda Python log format.

However, I'd prefer not to perform this translation: I want to write log messages as JSON, which will allow me to easily add additional information to the message and use searches based on specific details. With the Python logging facility, it's easy to make this happen.

Unless you're running on Lambda.

The linked example assumes that you can configure logging at the start of your main module. However, Lambda configures logging for you, before your code gets loaded. If you print logging.root.handlers, from a Lambda, you'll see that there's already a LambdaLoggerHandler installed. To use my formatter, you need to update this handler:

for handler in root.handlers:
    handler.setFormatter(JSONFormatter())

While I could have replaced this handler entirely, with one that wrote to StdErr, I was concerned that it performed some undocumented magic. One thing that I did discover, although it doesn't quite count as magic, is that it inserts the Lambda's request ID in the LogRecord before passing it to the formatter.

By not using that piece of information, I require that the formatter be initialized for every Lambda invocation, so that it can get the request ID out of the invocation context. This also allows me to pull out the Lambda name and version, or any of the other documented context attributes.

If you also want JSON-formatted logs from your Lambdas, I've included a documented example here.

Thursday, September 12, 2019

Streaming CloudWatch logs to Elasticsearch, Part 2: Teh Codez

My last post described my different attempts at incorporating CloudWatch Logs events into an Elasticsearch-based logging pipeline. This post looks at the actual code. To avoid boring you, it just touches on the main points; if you want the whole thing go here.

Creating the subscription

The first step in streaming your CloudWatch Logs is to create a subscription. As I mentioned in my last post, you can't do this via the AWS Console. The Amazon documentation walks you through creating the subscription via the CLI, or you can use the CloudFormation template from my project.

One important note: the stream has to be active before you can create a subscription; if you create the stream and then immediately try to create the subscription, it will fail. This is why I have a separate templates to create the stream and subscribe to it (it's also easier to replicate the subscription when it's a separate template).

The messages are written to the stream as JSON. Each message identifies the log group and stream, and contains a list of log events:

{
 "messageType": "DATA_MESSAGE",
 "owner": "012345678901",
 "logGroup": "AppenderExample",
 "logStream": "Example-20190825",
 "subscriptionFilters": ["CloudWatchSubscription-Subscription-184MSZXRD5MCP"],
 "logEvents": [{
  "id": "34939504216003552676187468284442890586332374987778818048",
  "timestamp": 1566741555269,
  "message": "some message\n"
 }, {
  "id": "34939504216092755656981590777009033459422968433802739713",
  "timestamp": 1566741555273,
  "message": "another message\n"
 }, {
  "id": "34939504239575440351034336945046144800521693099600117762",
  "timestamp": 1566741556326,
  "message": "still another\n"
 }]
}

Processing the input messages

When Kinesis invokes a Lambda, it packages multiple records into a single invocation event, with the original message GZipped and Base64-encoded:

{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "ea05911b7285b29393412d960053f81d",
                "sequenceNumber": "49598889268738564796327312231530438478076180289718059010",
                "data": "H4sIANoVbV0AA42QW2vCQBCFn5NfIfusMLs7e5m+BbU+CaURWtpKSXXRQExCsvaC+N+7sSn0QqUvC7PnMOebc4gjtnNtm23c4q127GLAJskieZxP0zSZTdkw6NVL6ZpOAS4kKm0sAT8pRbWZNdW+7sSkrl25ds30NdvVhfvUU9+4bNcZemEkgBNYoU6Odv/Urpq89nlVXuaFd00bvPdsXFT79U3mV9v0i2P0beAW5+nd7fVEzcdXbNkHTp9d6U9LDnEUsXzdZUskSQpQcA0glRLaaG4NaissIopwkbJaSiENkjXGWMstoO0YI+bzUJEP9GEVV1ob5KrbQSe1r6+LaaudG/TzQ8ni6Dgc/EFBwiillSbLFUFIBCCQoV5CIUhblNKCMJIMl2cpjPxJkZWV37rmPyDhNQoxVMJBhkhNGL41R7QAKnCSBKLQGefGaHEGREuhf9Xh86IY9DgfGMv4GL8DWgIaCnQCAAA=",
                "approximateArrivalTimestamp": 1566741561.188
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49598889268738564796327312231530438478076180289718059010",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::012345678901:role/DefaultLambdaProfile",
            "awsRegion": "us-east-1",
            "eventSourceARN": "arn:aws:kinesis:us-east-1:012345678901:stream/CloudWatchSubscriptionDestination"
        },
        ...
    ]
}

To transform the input events, you need to iterate over each of the top-level records, extract its payload, and then iterate over each of the child events to transform them. In order to simplify the output code, my process_input_record() function returns just the transformed child events, so that the output code can work on a “flattened” array.

def lambda_handler(event, context):
    outputMessages = []
    for record in event['Records']:
        outputMessages = outputMessages + process_input_record(record)
    logging.info(f'total number of messages to output: {len(outputMessages)}')
    logging.debug(f'output messages: {json.dumps(outputMessages)}')
    write_to_kinesis(outputMessages)


def process_input_record(record):
    try:
        payload = record['kinesis']['data']
        decoded = gzip.decompress(base64.b64decode(payload))
        data = json.loads(decoded)
        message_type = data.get('messageType')
        if message_type == 'DATA_MESSAGE':
            logGroup = data['logGroup']
            logStream = data['logStream']
            events = data.get('logEvents', [])
            logging.info(f'processing {len(events)} events from group "{logGroup}" / stream "{logStream}"')
            logging.debug(f'input messages: {json.dumps(events)}')
            return [transform_log_event(logGroup, logStream, event) for event in events]
        elif message_type == 'CONTROL_MESSAGE':
            logging.info('skipping control message')
        elif message_type:
            logging.warn(f'unexpected message type: {message_type}')
    except:
        logging.error(f'failed to process record; keys = {record.keys()}', exc_info=True)
    # fall-through for any unprocessed messages (exception or unhandled message type)
    return []

One thing to note about process_input_record() is the if statement at its core. CloudWatch Logs writes two different types of messages: most of the messages are data messages, which contain log events. However, when you first subscribe the log group to a stream, it writes a control message to verify that it has permission to do so. There shouldn't be any other message types, but if AWS adds one we'll ignore it. I also wrap the entire function in a try block, in case something other than a valid Logs message ends up on the stream.

Each event in the record gets passed through transform_log_event(), not shown here, which performs the following transformations:

  • If the message isn't already JSON, it's transformed into JSON with a `timestamp`, `message`, and `level` fields. This makes the message compatible with those written by my logging adapter.
  • If the message looks like a Lambda status message, it's parsed, and relevant data (eg, request ID, execution duration, and memory used) is extracted and stored in a child object.
  • If the message looks like Lambda logging output, it is parsed and the components (level, timestamp, request ID, and message) are stored in separate fields. Note: my transform only supports Python log output; the Node.JS runtime outputs fields in a different order.
  • The source log group and stream are added, as a sub-object.

If you want to add your own transformations — such as support for Node.JS logs — this is the place to do it.

Writing messages to Kinesis

As I've written elsewhere, writing to Kinesis is easy to do, but difficult to do right:

  • For performance, you should use the PutRecords API rather than PutRecord.
  • Which means that you must create batches of records that fit into the limits of this API (1 MB or 500 messages).
  • You have to be prepared for your entire call to be throttled, and resend after a delay.
  • And you have to be prepared for individual records within the batch to be throttled due to load on the destination shard, and resend just them.

The top-level write_to_kinesis() function repeatedly processes batches of records, with a sleep between batches to minimize throttling. The process_batch() function returns any messages that it couldn't send, for the next time through the loop. As I note, there shouldn't be more than one batch, unless you've configured the Lambda trigger to process lots of records or your destination is overloaded.

def write_to_kinesis(listOfEvents):
    records = prepare_records(listOfEvents)
    while records:
        records = process_batch(records)
        if (records):
            time.sleep(2) # an arbitrary sleep; we should rarely hit this
    return

The process_batch() function is small but needs the most explanation. It starts with a call to build_batch(), which splits the source list of records into two parts: the first part contains all of the records that will fit in a single PutRecords call, the second is everything left over. Next is the actual call. Finally, process_response() looks at the result of the call to find any records that were throttled; these are added

def process_batch(records):
    toBeSent, toBeReturned = build_batch(records)
    logging.info(f'sending batch of {len(toBeSent)} records with {len(toBeReturned)} remaining')
    try:
        response = kinesisClient.put_records(
            StreamName=kinesisStream,
            Records=toBeSent
        )
        return process_response(response, toBeSent) + toBeReturned 
    except kinesisClient.exceptions.ProvisionedThroughputExceededException:
        logging.warn(f'received throughput exceeded on stream {kinesisStream}; retrying all messages')
        return toBeSent + toBeReturned

I'm going to wrap up by looking at the process_response(), function in detail, because it's where the trickiness lies. The response for PutRecords contains an array of result objects, matched to the request records that you passed in. In the happy path, each of these result objects contains the sequence number that Kinesis assigned to the record (and FailedRecordCount is 0). In the unhappy path, some of the records fail but others succeed; the result object for the failed records will contain an error code.

Making things a little more tricky, this error code can either indicate that the shard throttled the record, in which case we re-send it, or an internal error happened. The API docs don't go into details on exactly what an internal error is, or whether it's recoverable, so I assume that it isn't and drop the record.

def process_response(response, records):
    if response['FailedRecordCount'] == 0:
        return []
    
    result = []
    droppedRecordCount = 0
    for ii in range(len(response['Records'])):
        entry = response['Records'][ii]
        errorCode = entry.get('ErrorCode')
        if errorCode == 'ProvisionedThroughputExceededException':
            result.append(records[ii])
        elif errorCode:
            droppedRecordCount += 1
    
    if droppedRecordCount > 0:
        logging.warn(f'dropped {droppedRecordCount} records due to Kinesis internal errors')
    if len(result) > 0:
        logging.info(f"requeueing {len(result)} records due to throughput-exceeded")
    
    return result

So, that's it. With a few adjustments to match your logging format (and, really, if you can just log in JSON), this will let you centralize your logs in Elasticsearch, even if AWS really, really wants you to use CloudWatch.

Wednesday, September 11, 2019

Streaming CloudWatch logs to Elasticsearch, Part 1: The Problem

My preferred approach to application logging on AWS is based around two ideas: first, that applications produce log messages in JSON, and second, that those messages are written directly to a Kinesis stream, which is then sent to Elasticsearch via Kinesis Firehose.

Amazon's preference, however, is CloudWatch Logs: it's the default log destination for AWS Batch and Lambda — indeed, you have to explicitly disable access to stop Lambda logging to CloudWatch; there's an agent for EC2; and the awslogs driver for Docker (ECS, EKS, or on-prem). Unfortunately, while CloudWatch Logs has come a long way from its original feature set, it still doesn't measure up to Elasticsearch. And in a multi-application environment, you don't want two centralized log services.

So what to do?

One answer is to use an alternative logging framework and bypass CloudWatch Logs. This can work, but you'll lose useful information such as the Lambda execution report. And as I discovered, any logging framework that uses a background thread to write log messages (which is usually a Good Thing) will run into problems when Lambda freezes the execution context.

A better answer is to use a CloudWatch Logs subscription, in which a log group sends regular batches of messages to a variety of destinations. However, there are a surprising number of pitfalls with subscriptions. This post looks at the different subscription types and the pros and cons associated with each. My next post details an implementation that copies CloudWatch Logs into an existing Kinesis-based pipeline, from which they end up in Elasticsearch.

This should be simple: if you use the AWS Console, you'll even see an option to subscribe a log group directly to Amazon Elasticsearch, which seems like the “no-brainer” choice. But after trying it out I consider it completely unsuitable: it sends updates multiple times per second, apparently with duplicates (because I wasn't writing that many messages in my tests). And Elasticsearch hates frequent small updates.

A second option, also available from the Console, is to invoke a Lambda. I first went down this path, with a Lambda that would write log messages to a Kinesis stream; from there they would be picked up by my existing pipeline. However, I abandoned this approach because, again, the Lambda was invoked a lot: it appears that CloudWatch invokes it as soon as events are logged, rather than attempting to batch them. With a moderately active log stream, this translates to over two million invocations per stream per month.

There are two additional subscription options that aren't available from the Console: Kinesis Streams and Kinesis Firehose. Both use a similar format: each record written to the stream/firehose identifies the source log group and stream, with an array of zero or more log events. This rules out Firehose, which expects a 1:1 relationship between the records that it reads and the records that it writes.

That left me with subscribing a Kinesis Stream, then attaching a Lambda to that stream to process the records. While this approach had a few more moving parts than I would like, it let me reuse most of my existing firehose architecture: the Lambda could write individual messages into the same Kinesis stream used for direct application logging, where they would be picked up by Firehose. And because you can specify the number of messages in a Kinesis-Lambda trigger, it meant that the number of invocations could be tuned. So all-in-all, a win.

Thursday, December 27, 2018

log4j-aws-appenders now supports Logback

I started my Log4J appenders project because I wasn't happy with how the AWS CloudWatch agent broke apart logfiles. It seemed, as I said in the FAQ, like it would be an easy weekend project. That was nearly two years ago.

In the interim, I added support for Kinesis Streams (a year ago) to support search-engine-based centralized logging using AWS managed Elasticsearch. Surprisingly, it wasn't until after that implementation effort that I truly “bought into” the benefits of using a search engine to examine logs. Now I can't imagine going back to grep.

After giving a talk on centralized logging to the local Java users' group, some of the feedback that I got was “it's nice, but we're not using Log4J 1.x.” So in the early fall I started to break the library into pieces: a front-end that's tied to a particular logging framework, and a back-end that handles communication with AWS. This turned out to be quite easy, which I think means that I had a good design to start with.

Then it was a matter of picking another logging framework, and learning enough about it to be able to implement appenders. I picked Logback because it's the default logging framework for Spring, and because it's the native back-end for SLF4J (which I've been using with Log4J for around five years now).

One of the interesting things that came out of this work is that I now see a good use case for multiple inheritance. There's an enormous amount of duplicated code because each appender has two is-a relationships: one to the logging framework and another to the back end. It would be nice if Java had something like Scala traits, where each trait would encapsulate one of the is-a relationships, and the appender would just be a combination of traits. On the other hand, I've seen enough ugly code using traits that I still think Gosling et al made the right decision.

Log4J 2.x is up next, although I think I'm going to take a break for a few months. I have several other projects that have taken a back seat, including a library of AWS utility classes that I started a year ago and hasn't seen its first release.

Happy Holidays!

Saturday, December 8, 2018

Using CloudWatch Insights With JSON Log Messages

I've never been a fan of CloudWatch Logs. On the positive side, it's available by default, is relatively cheap, and there are a variety of ways to send data to it (including my own log4j-aws-appenders). But the features it supported were extremely limited: you could view log messages by date/time range or do a simple text search. As a result, I felt that “serious” log analysis required a tool like Kibana, fed by a Kinesis-based pipeline. Or, if your volumes were low enough, a service provider such as Loggly or DataDog.

That opinion changed with the introduction of CloudWatch Logs Insights. This feature adds a query tab to the CloudWatch Logs console, allowing you to filter and summarize messages. For example, this query will show you the number of messages that contain the word “ERROR” (from your selected time range, which is specified outside of the query):

filter @message like /ERROR/
| stats count()

To actually see those errors, you'd replace the stats directive with fields, along with a sort to put them in reverse chronological order:

filter @message like /ERROR/
| fields @@timestamp, @message
| sort @timestamp desc

That's nice, but it's still full-text search: if you're using Log4J it will pick up the messages that were logged at the ERROR level, but it will also pick up any other entries that happen to contain the word “ERROR” anywhere in the message.

But Insights isn't limited to simple text searches on the entire message: it will parse JSON messages and let you reference individual fields from those messages. And, as it happens, my appenders library provides a JSON layout class (originally intended for the Kinesis to Kibana pipeline linked above). To configure the CloudWatch appender with the JSON layout, you'd add these lines to your log4j.properties file (changing logGroup, logStream, and tags as appropriate):

log4j.appender.cloudwatch=com.kdgregory.log4j.aws.CloudWatchAppender
log4j.appender.cloudwatch.logGroup=AppenderExample
log4j.appender.cloudwatch.logStream={date}-{hostname}-{pid}
log4j.appender.cloudwatch.layout=com.kdgregory.log4j.aws.JsonLayout
log4j.appender.cloudwatch.layout.tags=applicationName=Example,runDate={date}
log4j.appender.cloudwatch.layout.enableHostname=true
log4j.appender.cloudwatch.layout.enableLocation=true

As I said, you can use individual fields from the JSON message in your queries. So to see only the Log4J errors, you would use the following (note: this uses the parsed “message” field, which is different from the unparsed “@message” field):

filter level == "ERROR"
| fields @timestamp, logger, message
| sort @timestamp desc
| limit 25

You can also aggregate the data. For example, this query counts the number of errors for each 15 minute period within your time range:

filter level == "ERROR"
| stats count() by bin(15m)

There are, of course, some caveats to this feature. While the documentation doesn't go into detail, it appears to work by a brute-force scan all of your log messages (unlike Kibana, which uses a search engine to index the messages). This means that queries take longer to run, and that time increases with the amount of data that you're examining.

You'll also pay (as of the time of writing) $0.005 per gigabyte of data scanned. This is cheap if you are careful to restrict your time periods and/or don't have a lot of logging output. But I suspect it will add up quickly.

With those caveats, however, I think Insights is a great solution for organizations that don't have large logging volumes, don't want to ship their logs offsite, and don't want to pay for an Elasticsearch/Kibana cluster.