This tutorial will help you create and deploy your first concord computations. Our first computational topology will be a canonical word count:


The computations in this walkthrough are already implemented and can be found in the within the getting-started project. If you are just interesting in running the example you can skip to the quick start tutorial. Others interested in glancing at the entire source for this example can check it out on our github page.

Sentence Generator

This computation will serve as the source of all data for this example. It generates random sentences and emits them on a stream named "Sentences".

First, let's include the concord dependency and create the 'SentenceGenerator', inheriting from "Computation" if neccessary:

from concord.computation import Computation
class SentenceGenerator(Computation): ...
import io.concord.Computation;
import io.concord.swift.*;
public class SentenceGenerator extends Computation { ... }
import io.concord._
import io.concord.swift._
class SentenceGenerator extends Computation {
#include <concord/Computation.hpp>
class SentenceGenerator final : public bolt::Computation { ... };
require 'concord'
class SentenceGenerator
type SentenceGenerator struct { ... }


The first function you'll implement for a new computation is metadata. The Concord framework provides a pub-sub-like architecture in which you publish messages to and consume messages from named streams. These stream names must be globally unique.

In order to allow ad-hoc deployments, Concord computations must define their own metadata which is queried by the Concord scheduler when it is launched in the cluster.

For example, sentence-generator publishes to the stream named "sentences" and doesn't consume from any streams:

def metadata(self):
  return Metadata(
public Metadata metadata() {
    return new Metadata("sentence-generator", new HashSet(),
      new HashSet(Arrays.asList("sentences")));
override def metadata(): Metadata = {
  val ostreams = new MutableHashSet[String](java.util.Arrays.asList("sentences"))
  new Metadata("sentence-generator", new MutableHashSet[StreamTuple](), ostreams)
virtual bolt::Metadata metadata() override {
    bolt::Metadata m;
    m.name = "sentence-generator";
    return m;
def metadata
  Concord::Metadata.new(name: 'word-source', ostreams: ['sentences'])
func (p *SentenceGenerator) Metadata() *concord.Metadata {
    return &concord.Metadata{
        Name:    "sentence-generator",
        Outputs: []string{"sentences"},

This may seem odd at first, but it's because sentence-generator is a data source, meaning it is a point at which data enters the Concord topology for processing.

The Metadata object consists of the following fields:

Name (required) Computation name
istreams List of (input stream, routing strategy) pairs
ostreams List of outputstreams

If either istreams or ostreams are left undefined, they will default to an empty list. A more complete look at the metadata function can be found in our client_docs.


The next step in defining your computation is writing your init function. The init function gets called when the framework is ready to start sending your computation messages. In this operator, we use it to start our infinite loop that emit words.

def init(self, ctx):
  self.concord_logger.info("Source initialized")
  ctx.set_timer('loop', time_millis())
override def init(ctx: ComputationContext): Unit = {
  println(s"${this.getClass.getSimpleName} initialized")
  ctx.setTimer("loop", System.currentTimeMillis())
public void init(ComputationContext ctx) {
    System.out.println("SentenceGenerator.java initialized");
    ctx.setTimer("loop", System.currentTimeMillis());
virtual void init(CtxPtr ctx) override {
    LOG(INFO) << "Initializing sentence generator [cpp]";
    ctx->setTimer("loop", bolt::timeNowMilli());
def init(context)
  Concord::Utils.log_to_stderr("Initialized Computation")
  context.set_timer('default', Concord::Utils.time_millis)
func (p *SentenceGenerator) Init(ctx *concord.Context) error {
    log.Println("[INFO] Initializing SentenceGenerator")
    // schedule next ProcessTimer call
    ctx.SetTimer(producerTimer, time.Now())

    return nil

We do this via the context’s set_timer function, which sets a timer callback to trigger at some arbitrary time in the future. set_timer ‘s only parameter is the epoch time in milliseconds. Timers can optionally have a unique name, which will come in handy when performing windowed aggregations.

For more information on the init function, check out our client docs.

The Context Object
The context object is an object the framework passes on to developers for interacting with the framework. There are two classes of function used by framework users:

  • Callback functions: Callbacks are defined by the user and are functions meant to process events being propagated by the framework. When the framework has a new record available, the framework calls your process_record callback, allowing your code to process the record.

  • Context functions: Context functions stand opposite to the callbacks. They are defined by the framework (their implementation is hidden from the user) and they are called by users whenever they need to interact with the framework. Emitting a record to downstream operators, referred to as producing a record, is accomplished by calling context.produce_record. Similarly, when users want to set up a timer to trigger their process_timer callback (explained later) at some point in the future, they call context.set_timer.

Process timer

Timer callbacks are handled through the process_timer function. When the scheduled time has elapsed, Concord will call the process_timer function, passing it three arguments:

  • context: A ComputationContext object for interacting with the framework
  • key: The name of the timer (in this case 'default')
  • time: The time you scheduled the callback for

In our process_timer callback, we'll emit a sentence downstream and then schedule another callback for five seconds (5000 ms) in the future:

def process_timer(self, ctx, key, time):
  # stream, key, value. empty value, no need for val
  for _ in range(0, 1024):
    ctx.produce_record("sentences", self.sample(), '-')

  # emit records every 500ms
  ctx.set_timer("main_loop", time_millis() + 5000)
override def processTimer(ctx: ComputationContext, key: String, time: Long): Unit = {
  // Stream, key, value. Empty value, no need for val
  Range(0, 10000).foreach {
    i => ctx.produceRecord("sentences".getBytes, sample().getBytes, "-".getBytes)

  ctx.setTimer(key, System.currentTimeMillis())
public void processTimer(ComputationContext ctx, String key, long time) {
  // Stream, key, value. Empty value, no need for val
  for (int i = 0; i < 1024; ++i) {
    ctx.produceRecord("sentences".getBytes(), this.sample().getBytes(),

  ctx.setTimer(key, System.currentTimeMillis() + 5000);
virtual void
processTimer(CtxPtr ctx, const std::string &key, int64_t time) override {
  for(auto i = 0u; i < 10000; ++i) {
    std::string bin = kHowDoILoveTheeLines[i % kHowDoILoveTheeLines.size()];
    ctx->produceRecord("sentences", bin, "");
  ctx->setTimer(key, bolt::timeNowMilli());
def process_timer(context, key, time)
  (0..1024).each do |i|
    context.produce_record('words', SENTENCES.sample, '')
  context.set_timer(key, Concord::Utils.time_millis + 5000)
func (p *SentenceGenerator) ProcessTimer(ctx *concord.Context, ts int64, name string) error {
    for i := 0; i < 10000; i++ {
        var sentence = sentences[rand.Intn(len(sentences))]
        ctx.ProduceRecord("sentences", sentence, "-")

    // schedule next ProcessTimer invocation
    ctx.SetTimer(producerTimer, time.Now())

    return nil

By calling the produce_record function on the context object, we emit a record on the stream sentences', with a random word from our dictionary.

For a deeper explanation of the process_timer callback, look at our client documentation

Serve the computation

Now that your 'SentenceGenerator' class is fully defined, you can serve it by calling the respective serve computation method:

object SentenceGenerator {
  def main(args: Array[String]): Unit = {
    ServeComputation.serve(new SentenceGenerator())
public static void main(String[] args) {
  ServeComputation.serve(new SentenceSplitter());
int main(int argc, char *argv[]) {
  bolt::client::serveComputation(std::make_shared(), argc,
  return 0;

Sentence Splitter

Since we are counting words we will need a computation that will transform a stream of sentences into a stream of words. This allows us to aggregate by word making our final word counter implementation trivial. This splitting computation is stateless and is essentially performing a map over the sentences stream. Let's call this computation the 'SentenceSplitter'.


Since this computation performs operations on an incoming stream and produces data downstream, this operator will have values for both istreams and ostreams. The metadata struct for 'SentenceSplitter' should look like this:

def metadata(self):
   return Metadata(
override def metadata(): Metadata = {
  val ostreams = new MutableHashSet[String](java.util.Arrays.asList("words"))
  val istreams = new MutableHashSet[StreamTuple]()
  istreams.add(new StreamTuple("sentences", StreamGrouping.SHUFFLE))
  new Metadata("sentence-splitter", istreams, ostreams)
public Metadata metadata() {
  Set istreams = new HashSet();
  istreams.add(new StreamTuple("sentences", StreamGrouping.SHUFFLE));
  return new Metadata("sentence-splitter", istreams, new HashSet("words"));
virtual bolt::Metadata metadata() override {
  bolt::Metadata m;
  m.name = "word-splitter";
  m.istreams.insert({"sentences", bolt::Grouping::SHUFFLE});
  return m;

def metadata
  Concord::Metadata.new(name: 'sentence-splittler',
                        istreams: ['sentences'],
                        ostreams: 'words')
func (c SentenceSplitter) Metadata() *concord.Metadata {
    return &concord.Metadata{
        Name:   "sentence-splitter",
        Inputs: []concord.Stream{concord.NewDefaultStream("sentences")},
        Outputs: []string{"words"},

Process Record

Now lets implement the actual logic that splits sentences into words. For each incoming sentence we will need to split the string into pieces using the space char as our delimiter:

def process_record(self, ctx, record):
    for word in record.key.split(" "):
      ctx.produce_record('words', word, '-')
override def processRecord(ctx: ComputationContext, record: Record): Unit = {
  val sentences = new String(record.getKey, "UTF-8")
  val words = sentences.split(" ")
  words.foreach((word) => {
    ctx.produceRecord("words".getBytes, word.getBytes, "-".getBytes)
public void processRecord(ComputationContext ctx, Record record) {
    try {
      String sentences = new String(record.getKey(), "UTF-8");    
      String words[] = sentences.split(" ");
      for (int i = 0; i < words.length; ++i) {
        ctx.produceRecord("words".getBytes(), words[i].getBytes(), "-".getBytes());
    } catch(Exception e) {
      throw new RuntimeException(e);
virtual void processRecord(CtxPtr ctx, bolt::FrameworkRecord &&r) override {
    std::stringstream ss(r.key());
    std::string token;
    do {
      ss >> token;
      ctx->produceRecord("words", token, "");
    } while(ss);
def process_record(context, record)
    record.key.split do |word|
      context.produce_record('words', word, '-')
func (p *SentenceSplitter) ProcessRecords(ctx *concord.Context, record *concord.Record) error {
    sentences := string(record.Key)
    words := strings.Fields(sentences)
    for _, word := range words {
        ctx.ProduceRecord("words", word, "-")
    return nil

As before, we will need to serve the computation the same way we served the WordSource above.

Word Counter

Finally, we can create a computation that counts the words generated by the 'SentenceSplitter'.


This computation will read messages from the 'words' stream and will never produce data onto any streams. Following that, we must add it to the istreams rather then the ostreams.

For each istream you must define a "grouping strategy" -- this is the method that Concord uses to propogate messages throughout the network. The options are as follows:

Strategy Description
ROUND_ROBIN (default) Messages will be cyclically routed to all instances of a computation
GROUP_BY Messages of a given key will ALWAYS be routed to the same downstream node
SHUFFLE Messages will be randomly routed to instances of the downstream operators

def metadata(self):
    return Metadata(
      istreams=[('words', StreamGrouping.GROUP_BY)],
override def metadata(): Metadata = {
  val istreams = new MutableHashSet[StreamTuple]()
  istreams.add(new StreamTuple("words", StreamGrouping.GROUP_BY))
  new Metadata("word-counter", istreams, new MutableHashSet[String]())
public Metadata metadata() {
    Set istreams = new HashSet();
    istreams.add(new StreamTuple("words", StreamGrouping.GROUP_BY));
    return new Metadata("word-counter", istreams, new HashSet());
virtual bolt::Metadata metadata() override {
    bolt::Metadata m;
    m.name = "word-counter";
    m.istreams.insert({"words", bolt::Grouping::GROUP_BY});
    return m;
def metadata
    Concord::Utils.log_to_stderr("Metadata called")
    Concord::Metadata.new(name: 'word-counter',
                          istreams: [['words',
func (c *WordCounter) Metadata() *concord.Metadata {
    return &concord.Metadata{
        Name:   "word-counter",
        Inputs: []*concord.Stream{concord.NewStream("words", 2)},

For stateless computations, you can use ROUND_ROBIN or SHUFFLE. For stateful computations like aggregations, you should use GROUP_BY.

In the case of WordCounter, we’re using GROUP_BY so we can keep track of a counter for each word.


Let's define a variable called words as a hashmap to store word counts.

self.words = {}
private val wordFrequencyMap = scala.collection.mutable.HashMap[String, Int]()
private final HashMap<String, Integer> histogram = new HashMap<>();
std::unordered_map<std::string, uint64_t> map_{};
self.words = {}
type WordCounter struct {
    stats map[string]int64
    count int64

Processing Records

Once more we define process_record, the function that determines what the computation will do every time it receives a record.

In this operators implementation of process_record, we will increment the count for a given word (using our word -> count map) every time a word arrives.

def process_record(self, ctx, record):
  if self.dict.has_key(record.key):
     self.dict[record.key] += 1
     self.dict[record.key] = 1
override def processRecord(ctx: ComputationContext, record: Record): Unit = {
  val key = new String(record.getKey, "UTF-8")
  val currentValue = wordFrequencyMap.get(key)
  currentValue match {
    case Some(x: Int) => wordFrequencyMap.put(key, x + 1)
    case _ => wordFrequencyMap.put(key, 1)
public void processRecord(ComputationContext ctx, Record record) {
  String key = new String(record.getKey(), "UTF-8");
  Integer currentValue = this.histogram.get(key);
  if (currentValue != null) {
     this.histogram.put(key, ++currentValue);
  } else {
     this.histogram.put(key, 1);
virtual void processRecord(CtxPtr ctx, bolt::FrameworkRecord &&r) override {
def process_record(context, record)
  key = record.key
  self.words[key] ||= 0
func (c *WordCounter) ProcessRecords(ctx *concord.Context, record *concord.Record) error {
      c.stats[string(record.Key)] += 1
      return nil

Finally, in order to report meaningful output without flushing too much unnessecary information to stdout, we will log the contents of our word map every 10,000 records.

JSON Deployment File

In order to deploy a computation you will need to construct a computation manifest file. This file is a small JSON file that the concord CLI will use to schedule a task within the framework. Here is an example of a manifest file:

  "zookeeper_hosts": "localhost:2181",
  "zookeeper_path": "/concord",
  "executable_arguments": ["word_counter.rb"],
  "executable_name": "runner.bash",
  "compress_files": ["word_counter.rb", "runner.bash"],
  "computation_name": "word-counter",
  "environment_variables": [
  "mem": 1024,
  "cpus": 1.0

Note that this manifest contains metadata such as the hosts comprising our Zookeeper quorum as well as project-specific information, like the files to include in the tarball sent to the cluster and the file to be executed.

runner.bash is the point of execution for most computations. Since the framework will simply attempt to fork and exec the given executable, it is necessary for computations written in any non compiled language to be launched from a runner script. The runner script also gives the developer a chance to install computation dependences before the computation is actually launched. Here is an example:

#!/bin/bash --login

echo "Directory: $(pwd)"
gem2.0 install concord-ruby
exec ruby2.0 "$@"

For more information on the Concord CLI, including a complete list of valid parameters for the JSON manifest, look at our CLI documentation.

Deploy Your Computation

Now you’re ready to deploy your computation to your cluster! Simply run the following command to deploy your computations and see it running on Mesos dashboard:

You must be in the root directory of the getting-started project

vagrant@vagrant-ubuntu-trusty-64:~$ cd /vagrant/rb
vagrant@vagrant-ubuntu-trusty-64:/vagrant/rb$ concord deploy word_counter.json
vagrant@vagrant-ubuntu-trusty-64:/vagrant/rb$ concord deploy sentence_splitter.json
vagrant@vagrant-ubuntu-trusty-64:/vagrant/rb$ concord deploy sentence_generator.json

You should now be able to navigate to the Mesos web UI by going to http://localhost:5050


To verify that the computation is counting words as expected, click the link titled Sandbox on the WordCounter computation, then stderr to see the computations output. You should see something like this:

INFO:2015-10-12 17:22:34,143 computation.py:261] About to serve computation and service INFO:2015-10-12 17:22:34,143 computation.py:282] Starting python service port: 31003 INFO:2015-10-12 17:22:34,143 computation.py:285] registering with framework at: INFO:2015-10-12 17:22:34,143 computation.py:209] Getting client metadata INFO:2015-10-12 17:22:34,143 computation.py:220] Got metadata: ComputationMetadata(istreams=[StreamMetadata(name='words', grouping=1)], proxyEndpoint=None, ostreams=[], name='word-counter', taskId=None) INFO:2015-10-12 17:22:34,190 word_counter.py:15] Counter initialized INFO:2015-10-12 17:22:36,247 word_counter.py:28] {'fiz': 206, 'bar': 185, 'foo': 198, 'baz': 216, 'buzz': 219} INFO:2015-10-12 17:22:41,253 word_counter.py:28] {'fiz': 400, 'bar': 393, 'foo': 426, 'baz': 415, 'buzz': 414} INFO:2015-10-12 17:22:46,249 word_counter.py:28] {'fiz': 600, 'bar': 603, 'foo': 630, 'baz': 608, 'buzz': 631} INFO:2015-10-12 17:22:51,247 word_counter.py:28] {'fiz': 824, 'bar': 805, 'foo': 824, 'baz': 818, 'buzz': 825} INFO:2015-10-12 17:22:56,240 word_counter.py:28] {'fiz': 1022, 'bar': 1019, 'foo': 1038, 'baz': 1001, 'buzz': 1040} INFO:2015-10-12 17:23:01,231 word_counter.py:28] {'fiz': 1236, 'bar': 1233, 'foo': 1239, 'baz': 1204, 'buzz': 1232}

Heads Up..
If you do not see computation output in the stderr logs, navigate to the folder next to the stderr link. Any computation logs should be in a file ending in '.log'.