The new Consumer Producer

We all know the classic Consumer-producer programming exercise. There is even an entire wiki page available for it. In this article, I start out with the classic implementation that is used in the Java land. Although there is nothing wrong with this approach, some issues can arise when using this implementation. What happens if we do not have control over the producer and want him to wait with sending new data to use (Backpressure), Errors during the processing (Fault tolerance)

To solve this I describe an implementation written with the help of the Akka library. This implementation will provide you with backpressure and additional error handling.

With the Akka library, I implemented a solution (running in production) that handles a load of 3000 Transactions per second with an average size of 5Kb on a single 6 core system.

Classic approach

In classical implementations (for example C/C++) semaphores and monitors are used to synchronize between the consumer and producer. When doing this is Java 8, most solutions implement a blocking queue and Threaded consumers.

In Java 8 the producer will look similar to this:

class MyProducer<T> {
 
    private BlockingQueue<T> queue;
 
    public MyProducer(BlockingQueue<T> queue) {
        this.queue = queue;
    }
 
    /**
     * Insert the supplied object in the queue
     * 
     * @param supplier
     *            Is responsible for supplying the object that will be put
     *            in the queue
     */
    public void produce(Supplier<T> supplier) {
        final T msg = supplier.get();
        try {
            queue.put(msg);
            out.println("Added message: " + msg);
             
            // Simulate a long running process
            MILLISECONDS.sleep(900);
             
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
 }

And the consumer something like:

class MyConsumer<T> {
 
    private BlockingQueue<T> queue;
 
    public MyConsumer(BlockingQueue<T> queue) {
        this.queue = queue;
    }
 
    /**
     * Retrieves an object from the head of the queue and passes it to the
     * consumer
     * 
     * @param consumer
     *            Contains the logic on what to do with the retrieved object
     */
    public void consume(Consumer<T> consumer) {
        try {
            consumer.accept(queue.take());
                 
            // Simulate a long running process
            MILLISECONDS.sleep(1250);
             
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

As mentioned this approach has several drawbacks. Various implementations are available. Each of these queues has their limitations one of them being the amount of memory used. Some implementations have a maximum capacity but not all. In the case of errors, all kinds of exceptions can be thrown. Errors need handling and that can become quite a hassle in the system.

Akka

Akka is an alternative for implementing the consumer-producer problem.

The introduction found on the Akka website is the following: > «resilient elastic distributed real-time transaction processing»

We believe that writing correct distributed, concurrent, fault-tolerant and scalable applications is too hard. Most of the time it’s because we are using the wrong tools and the wrong level of abstraction. Akka is here to change that. Using the Actor Model we raise the abstraction level and provide a better platform to build scalable, resilient and responsive applications—see the Reactive Manifesto for more details. For fault-tolerance, we adopt the “let it crash” model which the telecom industry has used with great success to build applications that self-heal and systems that never stop. Actors also provide the abstraction for transparent distribution and the basis for truly scalable and fault-tolerant applications.

The keywords in this piece of text are, distributed, fault-tolerance, resilient, elastic and last but not least “let it crash”

Although the introduction tempts to describe a fit for all system you still have to decide if you want to go this way. Apache Spark (streaming), Kafka are systems that can probably do a lot of things that are similar to an Akka system.

Akka v.s. Spark

There are more systems and libraries that can implement a distributed fault-tolerant system. For example Spark and Kafka, even in a streaming manner. The main difference between them and Akka is that Akka is the one that does not use micro batches. Delays and lack can be eliminated with Akka

The Akka programming model

The Akka implementation is based on the Actor model.

Actor model

In the actor model, each actor runs isolated and has, if required an isolated state. Communication between the various actors is executed by means of mailboxes. Actors are standalone units that do some kind of computation on the messages that are in its mailbox. Scaling of the system can be achieved with a threading model that spawns multiple actors. Limitation on the performance of the system are the mailboxes. Akka defaults to each actor having its own mailbox. Other options are available.

Akka streams

Implementing a system that processes a continuous stream of data is a tedious process. Think about resource management and programming the flow of data through the system. For this Akka provides the Stream API. This API is a DSL on top of the Actor system. With this DSL you can easily write down a flow of messages processing stages without being burdened with the resource management.

With the DSL (Full name: Graph DSL) writing a flow of events becomes a simple exercise. The code snippet below takes Tweets received and broadcasts them to two “Actors” to write the Authors and the Hashtags used in the tweet.

val writeAuthors: Sink[Author, NotUsed] = ???
val writeHashtags: Sink[Hashtag, NotUsed] = ???
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
  import GraphDSL.Implicits._
 
  val bcast = b.add(Broadcast[Tweet](2))
  tweets ~> bcast.in
  bcast.out(0) ~> Flow[Tweet].map(_.author) ~> writeAuthors
  bcast.out(1) ~> Flow[Tweet].mapConcat(_.hashtags.toList) ~> writeHashtags
  ClosedShape
})
g.run()

I took the Scala version of the sample because this shows the power of the DSL in the best way

Summary

The classical Producer-consumer problem can now be solved using for example Akka. Using the library and especially the DSL gives you great power to write large systems in a scalable and resilliant way.