Friday, October 18, 2013

Immutability and Thread-Safety, Part 2

I've had a few offline conversations about my last post, and decided to dive a little deeper into concurrency and immutability. As an example, I'll use a shared map: the sort of thing you might use to hold session data in a web application. The first implementation simply wraps Scala's immutable.Map:

class SharedMap {

  var theMap : Map[String,String] = Map.empty;

  def size = theMap.size

  def get(key:String) = theMap.get(key)

  def put(key:String, value:String) {
    theMap = theMap + (key -> value)
  }
}

Because theMap is immutable, you have the following guarantee: put() will not interfere with get(). If they happen to be called at the same time, the get will use the old instance of the map, and not have to worry about partial changes due to the put. This is a powerful guarantee, but it does not mean that this code is thread-safe.

The problem is that this implementation allows an update race: if two threads call put() at the same time, they make their own immutable changes to the same immutable base map. But neither's changes include the other's, and only one change will be saved as the new base map.

Appending an item to an immutable map is a very fast operation. So fast that you might never hit the update race in testing. But in a production environment, with dozens or hundreds of concurrent updates, it is bound to happen, and you'll lose data. To see this happen rather spectacularly, run the following program.*

object ConcurrentUpdateRunner extends App {

  val map = new SharedMap

  val pool = java.util.concurrent.Executors.newFixedThreadPool(20)

  val futures = (1 to 100).map(ii => {
    val callable = new java.util.concurrent.Callable[String]() {
      def call() = {
        val x = Thread.currentThread.getName
        map.put(ii.toString, x)
        x
      }
    }
    pool.submit(callable)
  })

  val results = futures.map(_.get)
  println(s"number of executed Callables: ${results.size}");

  println(s"final map size: ${map.size}")
}

So what can you do?

One option is to introduce synchronization. For a simple data structure like this map, that's the route that I'd take, although some people see any use of synchronization as taboo.

def get(key:String) = theMap.get(key)

def put(key:String, value:String) {
  synchronized { theMap = theMap + (key -> value) }
}

Note that I just synchronized put(), something that I warned against in a previous post. But that warning applies only to mutable data structures. Since the underlying map in this example is immutable, we can get use half-synchronization without worry, and avoid contention between get and put.

A second alternative is to forego the Scala library, and use a ConcurrentHashMap.

At first glance, this appears to be a perfect solution: you don't have to think about concurrent access, just rely on the fact that Doug Lea understands it far better than you. However, a ConcurrentHashMap only guarantees thread safety of individual gets and puts. For our trivial map example this is sufficient. For most real-world applications it isn't, because you need atomic updates of data that's already in the map.

A third option, which I mentioned in my last post, is to introduce a message queue between the map and its consumers. This is not a particularly novel idea: Odersky suggests it in Programming in Scala (in the section “Good actors style”), and Armstrong uses a similar example in Programming Erlang. Here is one possible implementation, using Scala actors (which, although deprecated, are usable without the need for an additional library in the classpath).

class ActorMap1 {

  case class SizeMessage
  case class GetMessage(key:String)
  case class PutMessage(Key:String, value:String)

  val theActor = new DaemonActor {
    var theMap : Map[String,String] = Map.empty;

    def act() {
      loop {
        receive {
          case SizeMessage() => {
            sender ! theMap.size
          }
          case GetMessage(key) => {
            sender ! theMap.get(key)
          }
          case PutMessage(key,value) => {
            theMap = theMap + (key -> value)
          }
        }
      }
    }
  }

  theActor.start

  def size = {
    val fut = theActor !! new SizeMessage
    fut.apply
  }

  def get(key:String) = {
    val fut = theActor !! new GetMessage(key)
    fut.apply
  }

  def put(key:String, value:String) {
    theActor ! new PutMessage(key, value)
  }
}

This example adds quite a bit of complexity to the shared map: new classes to hold messages for the actor, and a facade object to handle the communications. And it still has a significant flaw: there's only one loop, handling both gets and puts. We've achieved thread safety, at the cost of introducing a bottleneck for concurrent operations.

In the case of an in-memory map, this bottleneck probably isn't significant. Unless you have dozens of threads constantly hitting the map, to the exclusion of other actions, the occasional waits will go unnoticed. But what if this map is actually a write-through cache for a database or socket? In that case, the put will take significant time, and contention becomes a real possibility. To resolve this problem, we can add another actor to the mix.

class ActorMap2 {

  case class SizeMessage
  case class GetMessage(key:String)
  case class PutMessage(Key:String, value:String)
  case class InternalPutMessage(Key:String, value:String)

  val getActor = new DaemonActor {
    var theMap : Map[String,String] = Map.empty;

    def act() {
      loop {
        receive {
          case SizeMessage() => {
            sender ! theMap.size
          }
          case GetMessage(key) => {
            sender ! theMap.get(key)
          }
          case InternalPutMessage(key,value) => {
            theMap = theMap + (key -> value)
          }
        }
      }
    }
  }

  val putActor = new DaemonActor {
    def act() {
      loop {
        receive {
          case PutMessage(key,value) => {
            // some long-running operation
            getActor ! InternalPutMessage(key, value)
          }
        }
      }
    }
  }

  getActor.start
  putActor.start

  def size = {
    val fut = getActor !! new SizeMessage
    fut.apply
  }

  def get(key:String) = {
    val fut = getActor !! new GetMessage(key)
    fut.apply
  }

  def put(key:String, value:String) {
    putActor ! new PutMessage(key, value)
  }
}

Yet more complexity, and it still isn't complete: put() doesn't have any way to signal the caller that an error occurred, and real-world actor systems need supervisors. Plus, there's still the possibility that parts of the application will hold onto stale data (a problem that can never truly be solved).

But that was the entire point of my last post. It isn't easy to correctly implement concurrent applications, and immutable data structures don't help you solve the bigger issues. If you rely on them to do so, you will be burned. You need to understand how the larger application mutates state, and choose an appropriate strategy for managing those mutations.

Finally, although in this example it appears to be a case of hunting mosquitoes with a shotgun, decomposition into independent actors does solve many of the bigger problems of concurrency. But that's simply because it forces you into the techniques of information hiding and limited interfaces that have been part of software engineering since the 1970s.


* I used a threadpool, rather than actors, to have better control over the number of threads. I'm also running on a multi-core processor, which means that I'm really hitting the map from multiple threads at the same time. Switching between single-core and multi-core machines tends to highlight different threading bugs.

2 comments:

lcn said...

One comment about your words on ConcurrentHashMap: "For most real-world applications it isn't, because you need atomic updates of data that's already in the map." ConcurrentHashMap does have atomic update for existing data: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ConcurrentMap.html#replace%28K,%20V%29

Keith Gregory said...

replace() updates the map itself, not the data that it contains.

To understand what I mean by that, consider a map that's used to store bank balances. It will be updated by transactions, and two transactions may attempt to update the same account concurrently. You need to ensure that both transactions succeed, and that requires an atomic operation that retrieves the current value, updates it, and stores the result.

With JDK 8, you can use computeIfPresent(), which performs the update using a lambda. But with earlier versions, you need to manage synchronization yourself.