Stream Windowing Performance Analysis: Concord and Spark Streaming
One question we get asked often is “How does Concord compare to [name your stream processing framework here]”? It came up recently when we were talking about real-time alerting with a Principal Architect at a major CDN and a cloud services provider. Our conversation is what prompted us to write this post.
Benchmarking stream processor performance isn’t as straightforward as it might seem, and could leave a lot of questions at the end. The purpose of this post is to introduce the topic of performance benchmarking and compare Concord to Apache Spark Streaming at a high level for windowing computations. In future posts, we’ll take a deeper look at other traits of both systems in order to make direct comparisons between their common features.
When you’re operating at the scale of a major CDN, delivering several trillion internet interactions daily, it’s critical to detect any anomalies in real-time and resolve any issues as soon as they happen. For stream processing use cases like real-time detection of fraudulent behavior and real-time alerting, it’s key that data is evaluated quickly and continuously over multiple time ranges. So in this blog post, we’ll primarily look at the performance of sliding windowing operations - tumbling window, and overlapping window computations.
To evaluate Concord and Spark Streaming on performance, we focused on three characteristics: latency, throughput, and predictability. High throughput and low latency are important for real-time use cases, but predictability of the system at scale is also very important for capacity planning and when you need to meet a high SLA.
Benchmarking stream processing frameworks is not a trivial task. Architectural differences between frameworks are wide reaching: for example, they have different message queueing strategies, wire protocols, routing implementations, message delivery guarantees, resource management, etc. making direct comparisons and succinct conclusions difficult.
The goal of this benchmark is to measure cluster-wide throughput, end to end latency (wire to node where applicable), and hardware utilization. The benchmarks we ran did not attempt to test failure and recovery performance or multi-data-center use. We’ll cover those topics in future posts, as they can be important aspects to consider depending on your use case.
Micro-batching vs. Event-based
One of the reasons why it’s difficult to make direct comparison between Spark Streaming and Concord is that Spark uses micro-batching while Concord uses an event-based / message-at-a-time processing approach.
The micro-batching model that Spark follows batches up records into a predetermined interval called the ‘batch interval’ before they are processed. Because records are batched and processed as a group, the system doesn’t need to make frequent network requests within small intervals of time. In the message-at-a-time systems, internally you still can perform some form of message batching, buffering and pipelining to amortize round trip times.
Execution model of Spark Streaming: Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine (Source: Apache Spark)
Micro-batching comes at the cost of latency - processing latencies any faster than a 1 sec window are difficult to achieve with Spark Streaming at this time. Moreover, micro-batching frameworks require tuning to ensure that processing time don’t exceed the batch window size. If this happens, batches will be queued in memory faster than they are processed, starving the system for memory and eventually leading to dropped records and cascading failures.
Execution model of event-based stream processing systems (Source: Databricks)
In event-based systems such as Concord, streams are viewed as first class citizens. This lends to a more natural API and programming paradigm based on “operators”. Developers can perform transformations on streams without navigating through layers of abstractions, and programming stateful computations is straight-forward as there’s no need to worry about shipping around state through the topology. Basically, each operator can be seen as a separate program that you can develop to your liking.
Moreover, micro-batching has simpler fault tolerance mechanism: if the system needs to recover from a crash, it can restart processing at a microbatch boundary. Message-at-a-time systems need some other recovery mechanism (snapshots, checkpointing, punctuation markers, etc).
Because of these differences, this benchmark is focused on high level system-wide performance such as overall throughput and end-to-end time spent for completing the whole dataset.
Test setup & configuration
We used Supercomputer Event Logs of “Liberty” which is composed of 265MM records, totalling about 29GB in file size. The raw dataset can be found from archive site of USENIX.
The dataset looks like this:
- 1102911148 2004.12.12 ladmin2 Dec 12 20:12:28 src@ladmin2 ....... : LTS : 0 - 1102911148 2004.12.12 ladmin2 Dec 12 20:12:28 src@ladmin2 ....... : LTS : 0 - 1102911148 2004.12.12 ladmin2 Dec 12 20:12:28 src@ladmin2 ....... : physical APIC id: 02 - 1102911148 2004.12.12 ladmin2 Dec 12 20:12:28 src@ladmin2 ....... : physical APIC id: 03 - 1102911148 2004.12.12 ladmin2 Dec 12 20:12:28 src@ladmin2 ....... : physical APIC id: 04 - 1102911148 2004.12.12 ladmin2 Dec 12 20:12:28 src@ladmin2 ....... : physical APIC id: 05 - 1102911148 2004.12.12 ladmin2 Dec 12 20:12:28 src@ladmin2 .................................... done. - 1102911148 2004.12.12 ladmin2 Dec 12 20:12:28 src@ladmin2 ...changing IO-APIC physical APIC ID to 2 ... ok.
The dataset was pre-loaded into a single Kafka topic with 144 partitions. Then, a stream processor consumed the data from Kafka and wrote the output to Cassandra using its preferred API. All processing was performed within the stream processor.
Each test ran on a 18-node cluster on GCE (Google Compute Engine), where each node was GCE’s n1-standard-8 of 8 CPUs, 30GB of RAM, and 250GB SSD. Out of 18 nodes, only 5 nodes were dedicated to Concord or Spark execution, and the rest were dedicated to Kafka, Cassandra and Mesos Masters.
Each stream processing component was setup as the following:
Kafka: All the data was loaded into 1 Kafka topic, split across 144 partitions.
Concord: All operators were set in the exact same setting. The cluster could have handled more operators to run, but any more parallelization wouldn’t have increased the throughput of the tests.
- Operators were set as: 4 Kafka sources → 3 Counting sinks
- 3 CPUs and 4G memory were assigned to each operator, resulting in total of 21 CPUs and 28GB memory.
- Kafka sources were written using Kafka’s C++ client, librdkafka.
- Note: librdkafka has a bug on load balancing evenly across Kafka partitions, which has affected some of Concord’s cluster throughput numbers. Special thanks to Magnus Edenhill, the author of librdkafka, for answering our questions right away.
- Was given the same amount of resources - 21 CPUs and 28GB of memory
- Kafka integration: native direct source over the native receiver based approach (Approach 2 from Spark - Kafka integration)
- Micro-batches at a 1s interval
- spark.streaming.kafka.maxRatePerPartition = 700
- spark.default.parallelism = 3
- Everything else was standard / default setting of Spark
We ran a set of 7 different tests in total, running each test 3 times. In this blog post, we’ll talk about 2 different types of sliding window tests - one where the windows don’t overlap (tumbling window) and one where they do (sliding window).*
Test A. Tumbling Window Test: 10sec window sliding for every 10sec
This test consisted of consuming the Kafka topic and writing unique log messages that contained the string ‘IRQ’ per window to Cassandra. This Pattern Matching test ran with a Window Width of 10 seconds and Window Slide of 10 seconds, in which we concatenate the filtered records in memory and every 10 seconds, prune duplicates from the window and write the result to Cassandra.
Tumbling window computation for Test A
- Run time: Concord’s runs finished in around 6 min for the whole cluster, whereas Spark tests took about 2min and 20 sec.
- Throughput: Concord’s throughput was about 1.4M messages / sec throughout the cluster, whereas Spark’s throughput was consistent at 3.5-3.7M messages / sec.
- One can spot throughput of each node (S0-S3) is uneven. This is due to the librdkafka consumer’s uneven partitioning issue. Concord's overall throughput would be higher if it wasn't for this issue.
Concord's throughput over time
- Latency: Concord’s P999 latency goes up after 2:30 mark due to the Cassandra driver getting saturated. Concord’s P50 latency is stable throughout the run.
Concord's P999 vs. P50 latency over time
Test B. Overlapping Window Test: 10sec window sliding every 1 sec
This test consisted of consuming the Kafka topic and count all unique space-delimited strings, grouped by month and year. The test ran with Window Width of 10s, with Window Slide of 1s, in which we aggregate the records in memory by month-year as the key, prune duplicates from the window, and write the result to Cassandra every 1 second.
Overlapping window computation for Test B
Overlapping window computation for Test B
When running this test in Spark, we ran into JVM out-of-memory exceptions with the default hardware allocation (21 CPUs and 28G memory). Hence we increased Spark’s hardware allocation to 40 CPUs and 48G of memory (12G per executor * 4 executors), which is about 70% more memory and 90% more CPUs than what Concord had.
Run time: Concord’s test finished under 10 minutes, whereas all Spark tests took 45 minutes or longer.
- Spark’s throughput was 183-195K messages / sec throughout all the tests.
- Concord’s throughput varied per run, about 5x higher than Spark, from 965K up to 1.1M messages / sec.
- When comparing these results with the tumbling window test, Spark can process records at approx 10x the rate if there’s no overlap between the windows. Managing internal state in Spark requires reprocessing the overlapped data, and hence the throughput is low in this case.
- Concord’s throughput varies over time. This is because Concord’s API is single threaded, and when the time horizon comes in, Concord counts the number of events in the window and then it doesn’t need to redo any work until the next window horizon.
Concord’s latency vs. throughput over time
- Latency: During the initialization of an operator, Concord establishes the TCP connection with all of the interested downstream subscribers, which contributes to the higher P999 latency before it stabilizes at the 3:00 mark.
Screenshot of Spark UI during the test
This screenshot was taken after we added additional resources to the Spark cluster, as Spark initially ran into OOM issues at the same amount of resources that Concord was allocated. As seen in the screenshot, the average Processing Time is below the dashed line marked as “stable”, yet there are multiple spikes that exceed the stable line. This erratic increase in processing time means that future work is queued in memory, which in turn increases scheduling delay times. We believe this unpredictable behavior can be attributed to memory pressure on the JVM.
Conclusions and Future Work
It’s interesting to see how Concord and Spark Streaming perform under given constraints due to differences in their designs. Throughout Concord's benchmarks we can see predictable and consistent performance and hardware utilization. Spark’s results, on the other hand, are dependent on the nature of the job (the degree of parallelism, window slide/duration lengths, etc). This difference is important to note for real-time use cases of stream processing that require capacity planning, such as fraud detection and real-time ad serving.
Over the coming months we look forward to testing out Spark 2.0, as well as Concord once the issue with the Librdkafka consumer is updated. If you’re interested in the computations we’ve run, you can check out all the client code for both Concord and Spark here.