Tutorials

Walkthrough

This tutorial will help you create and deploy your first concord computations. Our first computational topology will be a canonical word count:

wc-topology

The computations in this walkthrough are already implemented and can be found in the within the getting-started project. If you are just interesting in running the example you can skip to the quick start tutorial. Others interested in glancing at the entire source for this example can check it out on our github page.

Sentence Generator

This computation will serve as the source of all data for this example. It generates random sentences and emits them on a stream named “Sentences”.

First, let’s include the concord dependency and create the ‘SentenceGenerator’, inheriting from “Computation” if neccessary:

from concord.computation import Computation
class SentenceGenerator(Computation): ...
import io.concord.Computation;
import io.concord.swift.*;
public class SentenceGenerator extends Computation { ... }
import io.concord._
import io.concord.swift._
class SentenceGenerator extends Computation {
...
#include <concord/Computation.hpp>
class SentenceGenerator final : public bolt::Computation { ... };
require 'concord'
class SentenceGenerator
...
end
import("github.com/concord/concord-go")
type SentenceGenerator struct { ... }

Metadata

The first function you’ll implement for a new computation is metadata. The Concord framework provides a pub-sub-like architecture in which you publish messages to and consume messages from named streams. These stream names must be globally unique.

In order to allow ad-hoc deployments, Concord computations must define their own metadata which is queried by the Concord scheduler when it is launched in the cluster.

For example, sentence-generator publishes to the stream named “sentences” and doesn’t consume from any streams:

def metadata(self):
  return Metadata(
    name='sentence-generator',
    istreams=[],
    ostreams=['sentences'])
public Metadata metadata() {
    return new Metadata("sentence-generator", new HashSet(),
      new HashSet(Arrays.asList("sentences")));
}</pre>
  </div>
  
override def metadata(): Metadata = {
  val ostreams = new MutableHashSet[String](java.util.Arrays.asList("sentences"))
  new Metadata("sentence-generator", new MutableHashSet[StreamTuple](), ostreams)
}
virtual bolt::Metadata metadata() override {
    bolt::Metadata m;
    m.name = "sentence-generator";
    m.ostreams.insert("sentences");
    return m;
}
def metadata
  Concord::Metadata.new(name: 'word-source', ostreams: ['sentences'])
end
func (p *SentenceGenerator) Metadata() *concord.Metadata {
	return &concord.Metadata{
		Name:    "sentence-generator",
		Outputs: []string{"sentences"},
	}
}
</div> This may seem odd at first, but it's because ``sentence-generator`` is a data source, meaning it is a point at which data *enters* the Concord topology for processing. The ``Metadata`` object consists of the following fields: | Name (required) | Computation name | | --------------- | ---------------------------------------------- | | istreams | List of (input stream, routing strategy) pairs | | ostreams | List of outputstreams |
If either ``istreams`` or ``ostreams`` are left undefined, they will default to an empty list. A more complete look at the metadata function can be found in our [client_docs](/docs/reference/python_client.html).

#### Init The next step in defining your computation is writing your ``init`` function. The init function gets called when the framework is ready to start sending your computation messages. In this operator, we use it to start our infinite loop that emit words.
def init(self, ctx):
  self.concord_logger.info("Source initialized")
  ctx.set_timer('loop', time_millis())
override def init(ctx: ComputationContext): Unit = {
  println(s"${this.getClass.getSimpleName} initialized")
  ctx.setTimer("loop", System.currentTimeMillis())
}
public void init(ComputationContext ctx) {
    System.out.println("SentenceGenerator.java initialized");
    ctx.setTimer("loop", System.currentTimeMillis());
}
virtual void init(CtxPtr ctx) override {
    LOG(INFO) << "Initializing sentence generator [cpp]";
    ctx->setTimer("loop", bolt::timeNowMilli());
}
def init(context)
  Concord::Utils.log_to_stderr("Initialized Computation")
  context.set_timer('default', Concord::Utils.time_millis)
end
func (p *SentenceGenerator) Init(ctx *concord.Context) error {
	log.Println("[INFO] Initializing SentenceGenerator")
	rand.Seed(time.Now().UnixNano())
	
	// schedule next ProcessTimer call
	ctx.SetTimer(producerTimer, time.Now())

	return nil
}
We do this via the context’s ``set_timer`` function, which sets a timer callback to trigger at some arbitrary time in the future. ``set_timer`` ‘s only parameter is the epoch time in milliseconds. Timers can optionally have a unique name, which will come in handy when performing windowed aggregations. For more information on the ``init`` function, check out our [client docs](/docs/reference/python_client.html).
The Context Object
The context object is an object the framework passes on to developers for interacting with the framework. There are two classes of function used by framework users:

  • Callback functions: Callbacks are defined by the user and are functions meant to process events being propagated by the framework. When the framework has a new record available, the framework calls your process_record callback, allowing your code to process the record.

  • Context functions: Context functions stand opposite to the callbacks. They are defined by the framework (their implementation is hidden from the user) and they are called by users whenever they need to interact with the framework. Emitting a record to downstream operators, referred to as producing a record, is accomplished by calling context.produce_record. Similarly, when users want to set up a timer to trigger their process_timer callback (explained later) at some point in the future, they call context.set_timer.

    </li> </div> </div> #### Process timer Timer callbacks are handled through the ``process_timer`` function. When the scheduled time has elapsed, Concord will call the ``process_timer`` function, passing it three arguments: - ``context``: A ``ComputationContext`` object for interacting with the framework - ``key``: The name of the timer (in this case 'default') - ``time``: The time you scheduled the callback for In our ``process_timer`` callback, we'll emit a sentence downstream and then schedule another callback for five seconds (5000 ms) in the future:

    def process_timer(self, ctx, key, time):
      # stream, key, value. empty value, no need for val
      for _ in range(0, 1024):
        ctx.produce_record("sentences", self.sample(), '-')
    
      # emit records every 500ms
      ctx.set_timer("main_loop", time_millis() + 5000)
    override def processTimer(ctx: ComputationContext, key: String, time: Long): Unit = {
      // Stream, key, value. Empty value, no need for val
      Range(0, 10000).foreach {
        i => ctx.produceRecord("sentences".getBytes, sample().getBytes, "-".getBytes)
      }
    
      ctx.setTimer(key, System.currentTimeMillis())
    }
    public void processTimer(ComputationContext ctx, String key, long time) {
      // Stream, key, value. Empty value, no need for val
      for (int i = 0; i < 1024; ++i) {
        ctx.produceRecord("sentences".getBytes(), this.sample().getBytes(),
                          "-".getBytes());
      }
    
      ctx.setTimer(key, System.currentTimeMillis() + 5000);
    }
    virtual void
    processTimer(CtxPtr ctx, const std::string &key, int64_t time) override {
      for(auto i = 0u; i < 10000; ++i) {
        std::string bin = kHowDoILoveTheeLines[i % kHowDoILoveTheeLines.size()];
        ctx->produceRecord("sentences", bin, "");
      }
      ctx->setTimer(key, bolt::timeNowMilli());
    }
    def process_timer(context, key, time)
      (0..1024).each do |i|
        context.produce_record('words', SENTENCES.sample, '')
      end
      context.set_timer(key, Concord::Utils.time_millis + 5000)
    end
    func (p *SentenceGenerator) ProcessTimer(ctx *concord.Context, ts int64, name string) error {
    	for i := 0; i < 10000; i++ {
    		var sentence = sentences[rand.Intn(len(sentences))]
    		ctx.ProduceRecord("sentences", sentence, "-")
    	}
    
    	// schedule next ProcessTimer invocation
    	ctx.SetTimer(producerTimer, time.Now())
    
    	return nil
    }
    By calling the ``produce_record`` function on the context object, we emit a record on the stream sentences', with a random word from our dictionary. For a deeper explanation of the ``process_timer`` callback, look at our [client documentation](/docs/reference/python_client.html)
    #### Serve the computation Now that your 'SentenceGenerator' class is fully defined, you can serve it by calling the respective serve computation method:
    serve_computation(SentenceSplitter())
    object SentenceGenerator {
      def main(args: Array[String]): Unit = {
        ServeComputation.serve(new SentenceGenerator())
      }
    }
    public static void main(String[] args) {
      ServeComputation.serve(new SentenceSplitter());
    }
    int main(int argc, char *argv[]) {
      bolt::client::serveComputation(std::make_shared(), argc,
                                     argv);
      return 0;
    }</pre>
      </div>
      
    Concord::Computation.serve(SentenceSplitter.new)
    concord.Serve(generator)
    </div> ### Sentence Splitter Since we are counting words we will need a computation that will transform a stream of sentences into a stream of words. This allows us to aggregate by word making our final word counter implementation trivial. This splitting computation is stateless and is essentially performing a map over the sentences stream. Let's call this computation the 'SentenceSplitter'. #### Metadata Since this computation performs operations on an incoming stream and produces data downstream, this operator will have values for both ``istreams`` and ``ostreams``. The metadata struct for 'SentenceSplitter' should look like this:
    def metadata(self):
       return Metadata(
          name='sentence-splitter',
          istreams=['sentences'],
          ostreams=['words'])
    override def metadata(): Metadata = {
      val ostreams = new MutableHashSet[String](java.util.Arrays.asList("words"))
      val istreams = new MutableHashSet[StreamTuple]()
      istreams.add(new StreamTuple("sentences", StreamGrouping.SHUFFLE))
      new Metadata("sentence-splitter", istreams, ostreams)
    }
    public Metadata metadata() {
      Set istreams = new HashSet();
      istreams.add(new StreamTuple("sentences", StreamGrouping.SHUFFLE));
      return new Metadata("sentence-splitter", istreams, new HashSet("words"));
    }</pre>
      </div>
      
    virtual bolt::Metadata metadata() override {
      bolt::Metadata m;
      m.name = "word-splitter";
      m.istreams.insert({"sentences", bolt::Grouping::SHUFFLE});
      m.ostreams.insert("words");
      return m;
    }
    def metadata
      Concord::Metadata.new(name: 'sentence-splittler',
                            istreams: ['sentences'],
                            ostreams: 'words')
    end
    func (c *SentenceSplitter) Metadata() *concord.Metadata {
    	return &concord.Metadata{
    		Name:   "sentence-splitter",
    		Inputs: []*concord.Stream{concord.NewDefaultStream("sentences")},
    		Outputs: []string{"words"},
    	}
    }
    </div>
    #### Process Record Now lets implement the actual logic that splits sentences into words. For each incoming sentence we will need to split the string into pieces using the space char as our delimiter:
    def process_record(self, ctx, record):
        for word in record.key.split(" "):
          ctx.produce_record('words', word, '-')
    
    override def processRecord(ctx: ComputationContext, record: Record): Unit = {
      val sentences = new String(record.getKey, "UTF-8")
      val words = sentences.split(" ")
      words.foreach((word) => {
        ctx.produceRecord("words".getBytes, word.getBytes, "-".getBytes)
      })
    }
    public void processRecord(ComputationContext ctx, Record record) {
        try {
          String sentences = new String(record.getKey(), "UTF-8");    
          String words[] = sentences.split(" ");
          for (int i = 0; i < words.length; ++i) {
    	    ctx.produceRecord("words".getBytes(), words[i].getBytes(), "-".getBytes());
          }
        } catch(Exception e) {
          throw new RuntimeException(e);
        }
    }
    virtual void processRecord(CtxPtr ctx, bolt::FrameworkRecord &&r) override {
        std::stringstream ss(r.key());
        std::string token;
        do {
          ss >> token;
          ctx->produceRecord("words", token, "");
        } while(ss);
    }
    def process_record(context, record)
        record.key.split do |word|
          context.produce_record('words', word, '-')
        end
    end
    func (p *SentenceSplitter) ProcessRecords(ctx *concord.Context, record *concord.Record) error {
    	sentences := string(record.Key)
    	words := strings.Fields(sentences)
    	for _, word := range words {
    		ctx.ProduceRecord("words", word, "-")
    	}
    	return nil
    }
    As before, we will need to serve the computation the same way we served the WordSource above. ### Word Counter Finally, we can create a computation that counts the words generated by the 'SentenceSplitter'. #### Metadata This computation will *read* messages from the 'words' stream and will never produce data onto any streams. Following that, we must add it to the ``istreams`` rather then the ``ostreams``. For each ``istream`` you must define a "grouping strategy" -- this is the method that Concord uses to propogate messages throughout the network. The options are as follows: | Strategy | Description | | --------------------- | ------------------------------------------------------------------------- | | ROUND_ROBIN (default) | Messages will be cyclically routed to all instances of a computation | | GROUP_BY | Messages of a given key will ALWAYS be routed to the same downstream node | | SHUFFLE | Messages will be randomly routed to instances of the downstream operators |
    def metadata(self):
        return Metadata(
          name='word-counter',
          istreams=[('words', StreamGrouping.GROUP_BY)],
          ostreams=[])
    override def metadata(): Metadata = {
      val istreams = new MutableHashSet[StreamTuple]()
      istreams.add(new StreamTuple("words", StreamGrouping.GROUP_BY))
      new Metadata("word-counter", istreams, new MutableHashSet[String]())
    }
    public Metadata metadata() {
        Set istreams = new HashSet();
        istreams.add(new StreamTuple("words", StreamGrouping.GROUP_BY));
        return new Metadata("word-counter", istreams, new HashSet());
    }</pre>
      </div>
      
    virtual bolt::Metadata metadata() override {
        bolt::Metadata m;
        m.name = "word-counter";
        m.istreams.insert({"words", bolt::Grouping::GROUP_BY});
        return m;
    }
    def metadata
        Concord::Utils.log_to_stderr("Metadata called")
        Concord::Metadata.new(name: 'word-counter',
                              istreams: [['words',
                               Concord::Thrift::StreamGrouping::GROUP_BY]])
    end
    func (c *WordCounter) Metadata() *concord.Metadata {
    	return &concord.Metadata{
    		Name:   "word-counter",
    		Inputs: []*concord.Stream{concord.NewStream("words", 2)},
    	}
    }
    </div> For stateless computations, you can use *ROUND_ROBIN* or *SHUFFLE*. For stateful computations like aggregations, you should use *GROUP_BY*. In the case of WordCounter, we’re using GROUP_BY so we can keep track of a counter for each word.
    #### Initializing Let's define a variable called ``words`` as a hashmap to store word counts.
    self.words = {}
    private val wordFrequencyMap = scala.collection.mutable.HashMap[String, Int]()
    private final HashMap&ltString, Integer&gt histogram = new HashMap<>();
    std::unordered_map&ltstd::string, uint64_t&gt map_{};
    self.words = {}
    type WordCounter struct {
    	stats map[string]int64
    	count int64
    }

    #### Processing Records Once more we define ``process_record``, the function that determines what the computation will do every time it receives a record. In this operators implementation of ``process_record``, we will increment the count for a given word (using our word -> count map) every time a word arrives.
    def process_record(self, ctx, record):
      if self.dict.has_key(record.key):
         self.dict[record.key] += 1
      else:
         self.dict[record.key] = 1
    override def processRecord(ctx: ComputationContext, record: Record): Unit = {
      val key = new String(record.getKey, "UTF-8")
      val currentValue = wordFrequencyMap.get(key)
      currentValue match {
        case Some(x: Int) => wordFrequencyMap.put(key, x + 1)
        case _ => wordFrequencyMap.put(key, 1)
      }  
    }
    public void processRecord(ComputationContext ctx, Record record) {
      String key = new String(record.getKey(), "UTF-8");
      Integer currentValue = this.histogram.get(key);
      if (currentValue != null) {
         this.histogram.put(key, ++currentValue);
      } else {
         this.histogram.put(key, 1);
      }
    }
    virtual void processRecord(CtxPtr ctx, bolt::FrameworkRecord &&r) override {
        map_[r.key()]++;
    }
    def process_record(context, record)
      key = record.key
      self.words[key] ||= 0
    end
    func (c *WordCounter) ProcessRecords(ctx *concord.Context, record *concord.Record) error {
          c.stats[string(record.Key)] += 1
          return nil
    }

    Finally, in order to report meaningful output without flushing too much unnessecary information to stdout, we will log the contents of our word map every 10,000 records. ### JSON Deployment File In order to deploy a computation you will need to construct a computation manifest file. This file is a small JSON file that the concord CLI will use to schedule a task within the framework. Here is an example of a manifest file: ``` { "zookeeper_hosts": "localhost:2181", "zookeeper_path": "/concord", "executable_arguments": ["word_counter.rb"], "executable_name": "runner.bash", "compress_files": ["word_counter.rb", "runner.bash"], "computation_name": "word-counter", "environment_variables": [ "GLOG_v=2" ], "mem": 1024, "cpus": 1.0 } ``` Note that this manifest contains metadata such as the hosts comprising our Zookeeper quorum as well as project-specific information, like the files to include in the tarball sent to the cluster and the file to be executed. runner.bash is the point of execution for most computations. Since the framework will simply attempt to fork and exec the given executable, it is necessary for computations written in any non compiled language to be launched from a runner script. The runner script also gives the developer a chance to install computation dependences before the computation is actually launched. Here is an example: ``` #!/bin/bash --login echo "Directory: $(pwd)" gem2.0 install concord-ruby exec ruby2.0 "$@" ``` For more information on the Concord CLI, including a complete list of valid parameters for the JSON manifest, look at our CLI documentation. ### Deploy Your Computation Now you’re ready to deploy your computation to your cluster! Simply run the following command to deploy your computations and see it running on Mesos dashboard: **You must be in the root directory of the getting-started project** ``` vagrant@vagrant-ubuntu-trusty-64:~$ cd /vagrant/rb vagrant@vagrant-ubuntu-trusty-64:/vagrant/rb$ concord deploy word_counter.json vagrant@vagrant-ubuntu-trusty-64:/vagrant/rb$ concord deploy sentence_splitter.json vagrant@vagrant-ubuntu-trusty-64:/vagrant/rb$ concord deploy sentence_generator.json ``` You should now be able to navigate to the Mesos web UI by going to http://localhost:5050 mesoscomps To verify that the computation is counting words as expected, click the link titled Sandbox on the WordCounter computation, then stderr to see the computations output. You should see something like this:
    INFO:2015-10-12 17:22:34,143 computation.py:261] About to serve computation and service INFO:2015-10-12 17:22:34,143 computation.py:282] Starting python service port: 31003 INFO:2015-10-12 17:22:34,143 computation.py:285] registering with framework at: 127.0.0.1:31002 INFO:2015-10-12 17:22:34,143 computation.py:209] Getting client metadata INFO:2015-10-12 17:22:34,143 computation.py:220] Got metadata: ComputationMetadata(istreams=[StreamMetadata(name='words', grouping=1)], proxyEndpoint=None, ostreams=[], name='word-counter', taskId=None) INFO:2015-10-12 17:22:34,190 word_counter.py:15] Counter initialized INFO:2015-10-12 17:22:36,247 word_counter.py:28] {'fiz': 206, 'bar': 185, 'foo': 198, 'baz': 216, 'buzz': 219} INFO:2015-10-12 17:22:41,253 word_counter.py:28] {'fiz': 400, 'bar': 393, 'foo': 426, 'baz': 415, 'buzz': 414} INFO:2015-10-12 17:22:46,249 word_counter.py:28] {'fiz': 600, 'bar': 603, 'foo': 630, 'baz': 608, 'buzz': 631} INFO:2015-10-12 17:22:51,247 word_counter.py:28] {'fiz': 824, 'bar': 805, 'foo': 824, 'baz': 818, 'buzz': 825} INFO:2015-10-12 17:22:56,240 word_counter.py:28] {'fiz': 1022, 'bar': 1019, 'foo': 1038, 'baz': 1001, 'buzz': 1040} INFO:2015-10-12 17:23:01,231 word_counter.py:28] {'fiz': 1236, 'bar': 1233, 'foo': 1239, 'baz': 1204, 'buzz': 1232}

    Heads Up..
    If you do not see computation output in the stderr logs, navigate to the folder next to the stderr link. Any computation logs should be in a file ending in '.log'.