Ruby Client

Installation

To add the Concord client library to your project, simply add our gem to your Gemfile

gem 'concord'

and perform a bundle install.

Ruby API

The abstract class you must extend our client API like this:

class MyComputation < Concord::Computation
  def init(context)
    # do some initialization
  end

  def process_record(context, record)
    # process a record
  end

  def process_timer(context, key, time)
    # process a timer callback
  end

  def metadata
    # return some metadata
  end
end

Note that each instance of context in the arguments is a context object. In concord, the context object is used to communicate from the client computation to the framework. You can use it to produce records and set timers. The computation context object looks like this:

class ComputationContext
  def produce_record(stream, key, data)
    # produce a record on `stream` with `key` and `value`
  end

  def set_timer(key, time)
    # trigger a timer named `key` to go off at `time` (ms)
  end

  def set_state(key, value)
    # set some arbitrary state to retrieve later
    # key => value
  end

  def get_state(key)
    # retrieve state at `key`
  end
end

Finally, the Metadata object you need to prepare in the metadata method should be initialized as follows:

def metadata
  Concord::Metadata.new(name: 'my-computation',
                        istreams: [
                          ['stream1',
                           Concord::Thrift::StreamGrouping::GROUP_BY]],
                        ostreams: ['stream2'])
end
  • name: a string identifying the computation
  • istreams: a list of pairs (lists of length two) of streams to subscribe to. In this list, the first item is the name of the stream and the second item is the Concord::Thrift::StreamGrouping you wish to group by.
  • ostreams: a list of strings representing the streams this computation may produce on.

Directory Structure

Here's a sample directory layout for a ruby project using Concord:

my-project/             # project root
  Gemfile
  deploy.json           # computation manifest
  computation.rb        # the computation logic
  run.sh                # a wrapper to install deps and run

Computation Manifest

The computation manifest for our sample project would appear as follows:

{
  // comma-separated list of host:port zookeepers
  "zookeeper_hosts":"localhost:2181",
  // zookeeper base path for concord framework
  "zookeeper_path":"/foo",
  // name of the command that should be executed (in a shell environment)
  "executable_name": "run",
  // files (rooted in your CWD) to include in the package sent to the cluster
  "compress_files": ["computation.rb", "Gemfile", "run.sh"],
  // globally unique name of the computation (found in your implementation
  // of the metadata function)
  "computation_name": "my-computation"
}

Run Script

Then, to actually install dependencies and run, your run.sh would look like this:

#!/bin/bash --login

bundle install
bundle exec computation.rb