Blog

Distributed Runtimes: at-least-once is here

Introducing at-least-once processing with Apache kafka!

If you are charging money for processing logs, you simply cannot afford to lose a single data item. At LinkedIn, where Apache Kafka was incubated, they used at-least-once guarantees to create connections, match jobs, and optimize ad display.

If they didn't use an at-least-once system, it would mean dropping a resume, missing a job-match, and ultimately losing money.

Until now, users coming to Concord were interested in low latency processing. Concord gives them a set of tools to support micro-services-like deployments for streaming applications. Users come because they want multi-language support, tracing, unified deployment, Mesos integration, native runtime, c++ support, centralized scheduling and constraint solving, debugging hooks, etc.

The problem came when users needed guaranteed delivery of messages, which we didn't have a solution to. Today users no longer have to worry about losing their data.

First, our at-most-once delivery (think HTTP requests) was a good starting point since our beta users were in the financial services industry, evaluating idempotent market functions, i.e.:

    val x = ticker_symbol.price * 2

Our second use case for at-most-once was in ad-tech. To be precise, in the post processing of real time bids to update advertiser's budgets. The idea is that the faster the feedback loop, the more revenue the company will generate.

In both cases you want low latency, scalable processing as the top priority. To guarantee correctness, however, there are accounting systems that reprocess logs for more reasons than just accidental complexity of a lambda architecture.

What is a distributed runtime?

Runtimes run code on behalf of the user during program execution. This includes input, output, state management, etc. For Concord this means constraint solving, key=value storage, message passing, TCP network pooling, monitoring, bootstrapping, packaging deployment, debugging, tracing, etc.

Concord users are able to debug and deploy an operator as they do a local process, something we already wrote a lot about on Why Concord. Concord worries about the difficult task of clustering, supervision, deployment, etc, while users worry about their business logic - at the end of the day, code is only useful when it runs. All of the tooling and support is available for 5 officially supported languages already: Go, Java (clojure, scala), C++, Ruby and Python.

Introducing at-least-once with Apache Kafka

While expanding the capabilities of real time processing systems, we need to maintain backward compatibility. We have new users asking us to support new and richer semantics while keeping our API the same. So today we introduce: at-least-once message delivery guarantee, our second swappable distributed runtime.

A swappable runtime gives you the ability to change the capabilities of your message delivery guarantees, state management, etc, without changing a single line of code

Via a single command line flag you can fine tune for low latency, at-most-once or guarantee the delivery of every single data item with a Kafka-backed runtime. Think of your code as a lego piece that can simply be moved around as your customer needs change, without the engineering effort required to support it.

How do I use it?

First, you need to launch the Concord scheduler with an additional command line flag --kafka-distributed-runtime true, like this:

    java -DMESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so
         $JAVA_OPTS -cp /usr/local/share/concord/concord.jar
         ...
         --kafka-path zk://localhost:2181/kafka
         --kafka-distributed-runtime true                # only difference
         ...
         com.concord.scheduler.Scheduler

Second, ... there is no second, just make sure there actually is a Kafka cluster up and running.

How does it work?

While interviewing users that were using Kafka as the main real-time component to their architecture, we realized that that most users' backends looks like this:

With Concord, the new architecture looks like this:

The way Concord interacts with Kafka is by using it for intra node communication. That is, instead of nodes communicating directly, our proxy engine talks to Kafka on behalf of the user.

This means that all of the Concord operators communicate with kafka for intra node communication.


The main difference between our at-least-once and our at-most-once systems is that there is no longer a point-to-point/direct communication between nodes.

What happens if a kafka broker fails?

Depending on your configuration, it means you can simply boot up another broker asynchronously and business should continue as usual.

The way at-least-once works with kafka is by replicating messages to queue silos. Kafka's replication protocol is called In-Sync-Replicas(ISR). The way ISR works is by writing the data durably to the leader for the (topic,partition) tuple and then at some point in the future, the followers of that (topic,partition) tuple will consume the data that they have yet to replicate. You can fine tune your brokers so that they don't acknowledge the writes until after all the in-sync-replicas have acknowledged the message.

Kyle Kingsbury wrote empirical tests that detail the internals of the ISR protocol with his popular tool Jepsen (effectively hand crafted edge cases to test the promises & limits of distributed systems).

What happens if the kafka cluster is disconnected?

This is a bigger issue from the perspective of a Concord operator. On the at-least-once system it means we can no longer do input or output. In other words, it means that there is no reason for us to live =)

The thundering herd effect is a problem that can occur when multiple consumers are compeating for the same resource where the accounting and bookeeping of access to such resource nearly prevents forward progress. In our case, that resource is reading from Kafka.

We mitigate the herd effect by retrying 10 times (configurable) and sleeping for a random number of milliseconds rand(0,1000ms) in between each retry before operator failure. If at the end of the timeout period, the operator still cannot connect to the broker queues, it will exit with a failed message. Not randomizing the failure could mean an overwhelming number of requests to the Kafka brokers, potentially bringing them down, or exhausting other shared resources such as network bandwidth.

We realize that people are running production critical applications on Concord. To deal with a failure of an operator, users can program the operator to be respawned on a different machine, all from a simple configuration file setting:

{
 ...
 "retries": 3
}

With this configuration, it would take 30 total attempts at recovering from failure, 3 of which may be on different hardware and different racks. Especially if the problem is intermittent, we are likely to find a good home for the container trying to make forward progress... phew!

However, if a bomb hits the Kafka cluster the operators are configured to connect to, at some point they will give up and percolate the failure up to our scheduler and Mesos master UI.

Now that we have a distributed queue that keeps our messages safe in case of failure, respawning a new operator is just everyday life. In case of operator failure, the next operator redeployed will pick up the messages from the latest commited offset in Kafka.

Runtime scaling with Concord

The interesting questions both from a technical and business perspective comes when you want to use Concord to scale operators at runtime. The main attraction for using an operator-by-operator style deployment is that you empower your site reliability engineers to scale nodes to meet the demand in the middle of the night without having to wake up the rest of the team.

To scale up is no different from deploying a new operator for Concord. You simply:


$> concord deploy operator_spec.json

...et voila! now you have more operators handling live traffic.

What happens when you want to scale up or down, under the hood?

Each Concord operator type, where type is defined as follows:

    # python example

    def metadata(self):
        return Metadata(
            name='word-counter',
            istreams=['words']
            ostreams=[])

has an implicit Kafka consumer group. When a new operator instance connects, or an old instance of an operator dies (note: you can have many instances of the same operator), then the Kafka driver kicks in a rebalancing mechanism to accommodate the remaining Concord operators.

At its heart, you can only have as many tasks as there are (topic,partition) tuples. Each (topic,partition) can have at max one consumer, per consumer-group. To deal with this Kafka restriction, we (recommend users to) artificially increase the number of partitions to something like 144 while creating the topic. This is so that you don't have to worry about creating a new topic with more partitions, halting producers and resuming work later on. This strategy of logically over-provisioning is well known and works in practice!

Why Apache Kafka?

Some savvy readers will quickly point out that replicating messages durably is only one approach to achieving at-least-once delivery. One way to think about at-least-once is through the concept of upstream backup. To be precise, your parent is responsible for sending you data until you've acknowledged receipt. You do this all the way to the root, at which point the root will need to be hooked up to a replayable source like HDFS, Cassandra, RabbitMQ, Kafka, etc.

The simplest version I can think of explaining upstream backup is through an asynchronous RPC mechanism. In pseudocode:

    # 2 step pipeline
    var root = new rpc(127.0.0.1:1234); // sink ip:port
    root.send(data, function(){
        Kafka.checkpoint(data.offset);
    });

In plain English, this means that the source of the tree won't move the offset in Kafka until the downstream subscriber has processed the data. You can then do the same at each step. The downside of this approach is that you'll need to track partial progress, deal with failed connections, etc, not to mention preventing progress at each step, all the way up to the source until the last step acknowledges the write.

This is one of the main reasons other accounting systems were developed to achieve at-least-once delivery of messages. Apache Storm and in turn Twitter Heron use an XOR-hashing algorithm with some expiration timeout to reprocess and resend tuples downstream. The upside of this approach is that it allows partial progress in the middle of your pipeline, while keeping a very small memory footprint for its tuple acknowledgement system. The downside of individual message acknowledgements (as in Storm/Heron) is that it introduces out-of-order replay of messages, making idempotent state updates more expensive than if messages are totally ordered, complicating checkpointing, and introducing additional non-determinism.

Choosing Kafka was the next logical step as it a system already deployed in nearly all the companies we've talked to, which meant that we get to leverage an ecosystem of tooling, forums, knowledge base while getting a performant at-least-once implementation out of the box!


So there you have it, at-least-once support from Concord with a single configuration change.

Check out our code samples, give the new runtime a spin, and stop by our user group to ask questions!


Especial thanks to Sarah Rohrbach, Robert Blafford, Martin Kleppmann, Shinji Kim, An Nguyen, Igor Vasilcovsky for reading drafts of this post.