Showing posts with label concurrency. Show all posts
Showing posts with label concurrency. Show all posts

Tuesday, January 8, 2019

Multi-threaded Programming with AWS Lambda

Here's a simple multi-threaded program. Note that I use System.err to write messages rather than System.out: the latter is buffered, so that your output doesn't appear immediately; the former flushes output with every call.

public class MultithreadExperiment
{
    public static void main(String[] argv) throws Exception {
        new MultithreadExperiment().handler(new HashMap(), null);
    }

    public void handler(Map ignored, Context lambdaContext) {
        System.err.println("handler starting");
        new Thread(new BackgroundOperation()).start();
        System.err.println("handler finishing");
    }

    private static class BackgroundOperation implements Runnable {
        @Override
        public void run() {
            System.err.println("background thread starting: " + Thread.currentThread().getName());
            for (int ii = 0 ; ii < 10 ; ii++) {
                System.err.println("background thread running: " + Thread.currentThread().getName() + " at " + new Date());
                try {
                    Thread.sleep(1000);
                }
                catch (InterruptedException ex) {
                    System.err.println("background thread interrupted: " + Thread.currentThread().getName() + " at " + new Date());
                }
            }
        }
    }
}

When you run the program it spins up a background thread, which writes out a message every second. Since new threads are non-daemon by default, the program won't exit until this thread is finished. As a result, this is what you'll see for output:

handler starting
handler finishing
background thread starting: Thread-0
background thread running: Thread-0 at Mon Jan 07 18:18:59 EST 2019
background thread running: Thread-0 at Mon Jan 07 18:19:00 EST 2019
background thread running: Thread-0 at Mon Jan 07 18:19:01 EST 2019
background thread running: Thread-0 at Mon Jan 07 18:19:02 EST 2019
background thread running: Thread-0 at Mon Jan 07 18:19:03 EST 2019
background thread running: Thread-0 at Mon Jan 07 18:19:04 EST 2019
background thread running: Thread-0 at Mon Jan 07 18:19:05 EST 2019
background thread running: Thread-0 at Mon Jan 07 18:19:06 EST 2019
background thread running: Thread-0 at Mon Jan 07 18:19:07 EST 2019
background thread running: Thread-0 at Mon Jan 07 18:19:08 EST 2019

If we upload this program to AWS Lambda and run a test invocation, however, you see something completely different. Here's the output from CloudWatch Logs after the first invocation:

2019-01-08T00:15:39.957Z START RequestId: 87232aba-12da-11e9-8390-ed48849189e3 Version: $LATEST
2019-01-08T00:15:39.993Z handler starting
2019-01-08T00:15:39.993Z handler finishing
2019-01-08T00:15:39.994Z END RequestId: 87232aba-12da-11e9-8390-ed48849189e3
2019-01-08T00:15:39.994Z REPORT RequestId: 87232aba-12da-11e9-8390-ed48849189e3 Duration: 36.66 ms Billed Duration: 100 ms  Memory Size: 512 MB Max Memory Used: 42 MB 
No sign of the background thread. Let's invoke it again after a short wait:
2019-01-08T00:16:22.941Z START RequestId: a14450e0-12da-11e9-b18c-ed59274c4c08 Version: $LATEST
2019-01-08T00:16:22.944Z background thread starting: Thread-0
2019-01-08T00:16:22.949Z handler starting
2019-01-08T00:16:22.950Z background thread running: Thread-0 at Tue Jan 08 00:16:22 UTC 2019
2019-01-08T00:16:22.950Z handler finishing
2019-01-08T00:16:22.950Z background thread starting: Thread-1
2019-01-08T00:16:22.950Z background thread running: Thread-1 at Tue Jan 08 00:16:22 UTC 2019
2019-01-08T00:16:22.950Z END RequestId: a14450e0-12da-11e9-b18c-ed59274c4c08
2019-01-08T00:16:22.950Z REPORT RequestId: a14450e0-12da-11e9-b18c-ed59274c4c08 Duration: 2.92 ms Billed Duration: 100 ms  Memory Size: 512 MB Max Memory Used: 42 MB 

Here we see the output from two threads: “Thread-0” was presumably started by the first invocation, while ”Thread-1” was started by the current invocation. Invoking it again:

2019-01-08T00:17:18.950Z START RequestId: c296963d-12da-11e9-855a-d106c9ec09c7 Version: $LATEST
2019-01-08T00:17:18.950Z background thread running: Thread-0 at Tue Jan 08 00:17:18 UTC 2019
2019-01-08T00:17:18.950Z background thread running: Thread-1 at Tue Jan 08 00:17:18 UTC 2019
2019-01-08T00:17:18.968Z handler starting
2019-01-08T00:17:18.969Z handler finishing
2019-01-08T00:17:18.969Z background thread starting: Thread-2
2019-01-08T00:17:18.969Z background thread running: Thread-2 at Tue Jan 08 00:17:18 UTC 2019
2019-01-08T00:17:18.969Z END RequestId: c296963d-12da-11e9-855a-d106c9ec09c7
2019-01-08T00:17:18.969Z REPORT RequestId: c296963d-12da-11e9-855a-d106c9ec09c7 Duration: 15.47 ms Billed Duration: 100 ms  Memory Size: 512 MB Max Memory Used: 42 MB 

We've got another thread starting, and output from the threads started by the previous invocations. What's really interesting is that the message from those threads shows the current time, which is nearly a minute later than the previous incoation. Clearly we're not sleeping for only a single second. So what's happening?

The answer can be found in the documentation:

After a Lambda function is executed, AWS Lambda maintains the execution context for some time in anticipation of another Lambda function invocation. In effect, the service freezes the execution context after a Lambda function completes, and thaws the context for reuse, if AWS Lambda chooses to reuse the context [...]

While the documentation doesn't go into details, I think that it's safe to assume that Lambda uses the “cgroups freezer,” just like docker pause. The process still exists on the host system, but is completely suspended. When later unpaused, the background thread, like Rip Van Winkle, thinks that it's only been sleeping for a second.

But that doesn't completely explain why output from the first invocation appears in the logs for the second invocation. The answer for that is simple: it didn't have a chance to run.

Starting a thread is fast, but it's not instant. In my experiments with Lambda, it typically took between 0.1 and 0.150 milliseconds — although it occasionally took much longer. The handler function in this example only does one thing after starting the thread. True, it's a kernel call, with IO, but is probably less expensive than a thread start (indeed, in my experiments it took around 0.075 millis). Sometimes you'll get lucky, as shown in the second and third examples; sometimes (especially on cold start) you won't.

On the other hand, there's plenty of time for background threads to run when a lamba starts: as you can see from the log output, 10 to 30 milliseconds depending on whether it's a cold start or not. So in the second and third invocations, that's where you see the output from the threads started in previous invocations.

So what's the practical impact?

It means that you can't just “fire and forget” a background task in Lambda, like you can with a normal Java application. You have to use Thread.join() or some other coordination mechanism to ensure that the background task finishes before the handler function returns. If you don't, that task may never execute: there's no guarantee that Lambda container that was running it will ever be invoked again.

And if you're using a long-running background worker, such as a logging library that batches messages to reduce the number of web-service calls it makes? You (well, I) need to find a way to make those calls synchronous.

Friday, October 18, 2013

Immutability and Thread-Safety, Part 2

I've had a few offline conversations about my last post, and decided to dive a little deeper into concurrency and immutability. As an example, I'll use a shared map: the sort of thing you might use to hold session data in a web application. The first implementation simply wraps Scala's immutable.Map:

class SharedMap {

  var theMap : Map[String,String] = Map.empty;

  def size = theMap.size

  def get(key:String) = theMap.get(key)

  def put(key:String, value:String) {
    theMap = theMap + (key -> value)
  }
}

Because theMap is immutable, you have the following guarantee: put() will not interfere with get(). If they happen to be called at the same time, the get will use the old instance of the map, and not have to worry about partial changes due to the put. This is a powerful guarantee, but it does not mean that this code is thread-safe.

The problem is that this implementation allows an update race: if two threads call put() at the same time, they make their own immutable changes to the same immutable base map. But neither's changes include the other's, and only one change will be saved as the new base map.

Appending an item to an immutable map is a very fast operation. So fast that you might never hit the update race in testing. But in a production environment, with dozens or hundreds of concurrent updates, it is bound to happen, and you'll lose data. To see this happen rather spectacularly, run the following program.*

object ConcurrentUpdateRunner extends App {

  val map = new SharedMap

  val pool = java.util.concurrent.Executors.newFixedThreadPool(20)

  val futures = (1 to 100).map(ii => {
    val callable = new java.util.concurrent.Callable[String]() {
      def call() = {
        val x = Thread.currentThread.getName
        map.put(ii.toString, x)
        x
      }
    }
    pool.submit(callable)
  })

  val results = futures.map(_.get)
  println(s"number of executed Callables: ${results.size}");

  println(s"final map size: ${map.size}")
}

So what can you do?

One option is to introduce synchronization. For a simple data structure like this map, that's the route that I'd take, although some people see any use of synchronization as taboo.

def get(key:String) = theMap.get(key)

def put(key:String, value:String) {
  synchronized { theMap = theMap + (key -> value) }
}

Note that I just synchronized put(), something that I warned against in a previous post. But that warning applies only to mutable data structures. Since the underlying map in this example is immutable, we can get use half-synchronization without worry, and avoid contention between get and put.

A second alternative is to forego the Scala library, and use a ConcurrentHashMap.

At first glance, this appears to be a perfect solution: you don't have to think about concurrent access, just rely on the fact that Doug Lea understands it far better than you. However, a ConcurrentHashMap only guarantees thread safety of individual gets and puts. For our trivial map example this is sufficient. For most real-world applications it isn't, because you need atomic updates of data that's already in the map.

A third option, which I mentioned in my last post, is to introduce a message queue between the map and its consumers. This is not a particularly novel idea: Odersky suggests it in Programming in Scala (in the section “Good actors style”), and Armstrong uses a similar example in Programming Erlang. Here is one possible implementation, using Scala actors (which, although deprecated, are usable without the need for an additional library in the classpath).

class ActorMap1 {

  case class SizeMessage
  case class GetMessage(key:String)
  case class PutMessage(Key:String, value:String)

  val theActor = new DaemonActor {
    var theMap : Map[String,String] = Map.empty;

    def act() {
      loop {
        receive {
          case SizeMessage() => {
            sender ! theMap.size
          }
          case GetMessage(key) => {
            sender ! theMap.get(key)
          }
          case PutMessage(key,value) => {
            theMap = theMap + (key -> value)
          }
        }
      }
    }
  }

  theActor.start

  def size = {
    val fut = theActor !! new SizeMessage
    fut.apply
  }

  def get(key:String) = {
    val fut = theActor !! new GetMessage(key)
    fut.apply
  }

  def put(key:String, value:String) {
    theActor ! new PutMessage(key, value)
  }
}

This example adds quite a bit of complexity to the shared map: new classes to hold messages for the actor, and a facade object to handle the communications. And it still has a significant flaw: there's only one loop, handling both gets and puts. We've achieved thread safety, at the cost of introducing a bottleneck for concurrent operations.

In the case of an in-memory map, this bottleneck probably isn't significant. Unless you have dozens of threads constantly hitting the map, to the exclusion of other actions, the occasional waits will go unnoticed. But what if this map is actually a write-through cache for a database or socket? In that case, the put will take significant time, and contention becomes a real possibility. To resolve this problem, we can add another actor to the mix.

class ActorMap2 {

  case class SizeMessage
  case class GetMessage(key:String)
  case class PutMessage(Key:String, value:String)
  case class InternalPutMessage(Key:String, value:String)

  val getActor = new DaemonActor {
    var theMap : Map[String,String] = Map.empty;

    def act() {
      loop {
        receive {
          case SizeMessage() => {
            sender ! theMap.size
          }
          case GetMessage(key) => {
            sender ! theMap.get(key)
          }
          case InternalPutMessage(key,value) => {
            theMap = theMap + (key -> value)
          }
        }
      }
    }
  }

  val putActor = new DaemonActor {
    def act() {
      loop {
        receive {
          case PutMessage(key,value) => {
            // some long-running operation
            getActor ! InternalPutMessage(key, value)
          }
        }
      }
    }
  }

  getActor.start
  putActor.start

  def size = {
    val fut = getActor !! new SizeMessage
    fut.apply
  }

  def get(key:String) = {
    val fut = getActor !! new GetMessage(key)
    fut.apply
  }

  def put(key:String, value:String) {
    putActor ! new PutMessage(key, value)
  }
}

Yet more complexity, and it still isn't complete: put() doesn't have any way to signal the caller that an error occurred, and real-world actor systems need supervisors. Plus, there's still the possibility that parts of the application will hold onto stale data (a problem that can never truly be solved).

But that was the entire point of my last post. It isn't easy to correctly implement concurrent applications, and immutable data structures don't help you solve the bigger issues. If you rely on them to do so, you will be burned. You need to understand how the larger application mutates state, and choose an appropriate strategy for managing those mutations.

Finally, although in this example it appears to be a case of hunting mosquitoes with a shotgun, decomposition into independent actors does solve many of the bigger problems of concurrency. But that's simply because it forces you into the techniques of information hiding and limited interfaces that have been part of software engineering since the 1970s.


* I used a threadpool, rather than actors, to have better control over the number of threads. I'm also running on a multi-core processor, which means that I'm really hitting the map from multiple threads at the same time. Switching between single-core and multi-core machines tends to highlight different threading bugs.

Monday, October 7, 2013

The Big Myth of Functional Programming: Immutability Solves All Problems

I'm not against immutability. Indeed, I think that many concurrency problems can be eliminated by making data structures immutable. Most of those problems, however, are caused because their developers never really thought about concurrent access. And while switching to immutable data structures may solve some problems, it creates others — and in my opinion, those others are much more difficult to debug.

The underlying problem is that the whole point of a program is to modify state. A program that takes no inputs and produces no outputs is worthless, except as a way to turn an expensive CPU into a space heater.

But if the purpose of a program is to modify state, how does it hold the state that's being modified? A “pure” functional program must use the arguments and local variables of its functions. There is a top-level function, which creates the program's initial state, passes that initial state to functions, and gets back the final state.

And if your program has a single thread, or multiple independent threads, that's all you need to do (I also like the idea of decomposition into a tree of independent threads). But if your program consists of communicating threads, the purely functional, immutable data approach is not sufficient: different threads will hold different representations of what they consider the same data. Also known as a race condition.

The easiest way to solve such problems is to introduce a well-designed concurrent data structure (such as Java's ConcurrentHashMap) that is shared between threads. This data structure holds the canonical view of shared state: each thread reads the map whenever it needs to know the latest data. However, a concurrent map by itself isn't sufficient to solve race conditions: it guarantees atomic puts and gets, but not updates.

A better alternative, in my opinion, is to follow the Go mantra of “share by communicating, rather than communicate by sharing.” In other words, wrap your shared state in a data structure that has a message queue between it and the rest of the program. The rest of your program appears to be fully functional: mutations are just function invocations, and you can choose to implement your shared data using immutable objects.

This approach doesn't completely eliminate races: there is still a race between writes and reads (also known as “stale data”). However, there is no way to eliminate that particular race. No matter how hard you try, you can never guarantee that multiple independent threads have a consistent view of the world.

But stale data in one part of your program is far better than missing data due to an unrealistic expectation of what immutable data structures give you.

Friday, February 1, 2013

Concurrency and Interviewing

Yesterday I answered a question on Stack Overflow (something I don't do very often any more, for reasons I won't go into here).

You are given a paragraph, which contain n number of words, you are given m threads. What you need to do is, each thread should print one word and give the control to next thread, this way each thread will keep on printing one word, in case last thread come, it should invoke the first thread. Printing will repeat until all the words are printed in paragraph. Finally all threads should exit gracefully. What kind of synchronization will use?

There were the usual rants about how this was a lousy interview question because it didn't solve a real-world problem, several answers with “Teh Codez” and nothing more, a “change your lifestyle” response, and an answer with code and short explanation that was accepted. Standard PSE fare, and I'm not sure what drove me to write an answer — especially such a long answer. I think it started out as an alternative way to implement the problem. It ended up capturing a big part of my philosophy on both multi-threading and interviewing. So here it is, for those few people who left me in their RSS feeds over the last few months (that's another story).


In my opinion, this is a fabulous interview question -- at least assuming (1) the candidate is expected to have deep knowledge of threading, and (2) the interviewer also has deep knowledge and is using the question to probe the candidate. It's always possible that the interviewer was looking for a specific, narrow answer, but a competent interviewer should be looking for the following:

  • Ability to differentiate abstract concepts from concrete implementation. I throw this one in primarily as a meta-comment on some of the comments. No, it doesn't make sense to process a single list of words this way. However, the abstract concept of a pipeline of operations, which may span multiple machines of differing capabilities, is important.
  • In my experience (nearly 30 years of distributed, multi-process, and multi-threaded applications), distributing the work is not the hard part. Gathering the results and coordinating independent processes are where most threading bugs occur. By distilling the problem down to a simple chain, the interviewer can see how well the candidate thinks about coordination. Plus, the interviewer has the opportunity to ask all sorts of follow-on questions, such as "OK, what if each thread has to send its word to another thread for reconstruction."
  • Does the candidate think about how the processor's memory model might affect implementation? If the results of one operation never get flushed from L1 cache, that's a bug even if there's no apparent concurrency.
  • Does the candidate separate threading from application logic?

This last point is, in my opinion, the most important. Again, based on my experience, it becomes exponentially more difficult to debug threaded code if the threading is mixed with the application logic (just look at all the Swing questions over on SO for examples). I believe that the best multi-threaded code is written as self-contained single-threaded code, with clearly-defined handoffs.

With this in mind, my approach would be to give each thread two queues: one for input, one for output. The thread blocks while reading the input queue, takes the first word off of the string, and passes the remainder of the string to its output queue. Some of the features of this approach:

  • The application code is responsible for reading a queue, doing something to the data, and writing the queue. It doesn't care whether it is multi-threaded or not, or whether the queue is an in-memory queue on one machine or a TCP-based queue between machines that live on opposite sides of the world.
  • Because the application code is written as-if single-threaded, it's testable in a deterministic manner without the need for a lot of scaffolding.
  • During its phase of execution, the application code owns the string being processed. It doesn't have to care about synchronization with concurrently-executing threads.

That said, there are still a lot of grey areas that a competent interviewer can probe:

  • "OK, but we're looking to see your knowledge of concurrency primitives; can you implement a blocking queue?" Your first answer, of course, should be that you'd use a pre-built blocking queue from your platform of choice. However, if you do understand threads, you can create a queue implementation in under a dozen lines of code, using whatever synchronization primitives your platform supports.
  • "What if one step in the process takes a very long time?" You should think about whether you want a bounded or unbounded output queue, how you might handle errors, and effects on overall throughput if you have a delay.
  • How to efficiently enqueue the source string. Not necessarily a problem if you're dealing with in-memory queues, but could be an issue if you're moving between machines. You might also explore read-only wrappers on top of an underlying immutable byte array.

Finally, if you have experience in concurrent programming, you might talk about some frameworks (eg, Akka for Java/Scala) that already follow this model.

Monday, March 19, 2012

Synchronizing put() is Not Sufficient

Have you ever seen a class like this, which synchronizes put() but not get()? Have you ever written one?

public class MyConcurrentlyAccessedLookupTable
{
    private Map<String,MyDataObject> data = new HashMap<String,MyDataObject>();
    
    // ...
    
    public MyDataObject get(String key)
    {
        return data.get(key);
    }
    
    public synchronized void put(String key, MyDataObject value)
    {
        data.put(key, value);
    }
}

I've been working over a legacy codebase recently, and have seen a lot of them. And I want to track down the original developer, give him a good shake, and say “this only works bacause you never put anything into the map after initializing it!” Actually, this is one of the least egregious ways that he (yes, he, I know his name) managed to fail at concurrent programming. Don't get me started on his abuse of static initializers.

OK, it feels good to let that out. I can at least understand why he thought it would work. A HashMap, by its nature is resilient to incorrectly managed concurrent access. Under normal operation, new items get added to a linked list; a get() should either see or not see the new entry. With put(), because two new entries might get added to the same bucket list and the scheduler might decide to switch threads in the middle, one of the entries might disappear.

And then there's resize: doubling the size of the hash table to reduce the depth of the bucket chains. And, at least in the Sun implementation, there's no attempt to preserve the chains during this operation (I can't see why any implementation would, but there might be one). So put() could get a chain using the old table, but that chain could be assigned to a different bucket by the time the new entry is attached.

OK, so in both cases get() fails to find an entry that is actually in the map, is that really so bad? Well, is it bad if you spend days trying to figure out why a transaction failed when the data was obviously present? Or if that failed transaction caused your company to lose a sale?

The problem with concurrency bugs is that they aren't easily reproducible. Sometimes they appear once and never again. But they're still bugs, and they're not that difficult to avoid: when in doubt, synchronize all access to shared state. If you understand your access patterns (eg, you always initialize and then only read), don't synchronize at all.

But never go halfway.