Python Client

Installation

To add the Concord client library to your project, simply add our python module to your requirements.txt file:

concord-py

and perform a pip install.

Python API

The abstract class you must extend our abstract interface:

class Computation:
    """Abstract class for users to extend when making computations.
    """

    def init(self, ctx):
        """Called when the framework has registered the computation
            successfully. Gives users a first opportunity to schedule
            timer callbacks and produce records.
        :param ctx: The computation context object provided by the system.
        :type ctx: ComputationContext.
        """
        pass

    def process_record(self, ctx, record):
        """Process an incoming record on one of the computation's `istreams`.
        :param ctx: The computation context object provided by the system.
        :type ctx: ComputationContext.
        :param record: The `Record` to emit downstream.
        :type record: Record.
        """
        raise Exception('process_record not implemented')

    def process_timer(self, ctx, key, time):
        """Process a timer callback previously set via `set_timer`.
        :param ctx: The computation context object provided by the system.
        :type ctx: ComputationContext.
        :param key: The name of the timer.
        :type key: str.
        :param time: The time (in ms) for which the callback was scheduled.
        :type time: int.
        """
        raise Exception('process_timer not implemented')

    def metadata():
        """The metadata defining this computation.
        :returns: Metadata.
        """
        raise Exception('metadata not implemented')

Note that each instance of ctx in the arguments is a context object. In concord, the context object is used to communicate from the client computation to the framework. You can use it to produce records and set timers. The computation context object looks like this:

class ComputationContext:
    """Wrapper class exposing a convenient API for computation to proxy 
       interactions.
    """
    def produce_record(self, stream, key, value):
        """Produce a record to be emitted down stream.

        :param stream: The stream to emit the record on.
        :type stream: str.
        :param key: The key to route this message by (only used when
            using GROUP_BY routing).
        :type key: str.
        :param value: The binary blob to emit down stream.
        :type value: str.
        """
        pass

    def set_timer(self, key, time):
        """Set a timer callback for some point in the future.
        :name key: The name of the timer.
        :type key: str.
        :name time: The time (in ms) at which the callback should trigger.
        :type time: int.
        """
        pass

    def set_state(self, key, value):
        """Set local state at `key` to be `value`
        """
        pass

    def get_state(self, key):
        """Retrieve local state stored at `key`
        """
        pass

Finally, the Metadata object you need to prepare in the metadata method should be initialized as follows:

def metadata(self):
    return Metadata(
        name='my-computation',
        istreams=[('stream1', StreamGrouping.GROUP_BY)],
        ostreams=['stream2'])
  • name: a string identifying the computation
  • istreams: a list of pairs of streams to subscribe to. In this list, the first item is the name of the stream and the second item is the StreamGrouping you wish to group by.
  • ostreams: a list of strings representing the streams this computation may produce on.

Directory Structure

Here's a sample directory layout for a python project using Concord:

my-project/             # project root
  requirements.txt
  deploy.json           # computation manifest
  computation.py        # the computation logic
  run.sh                # a wrapper to install deps and run

Computation Manifest

The computation manifest for our sample project would appear as follows:

{
  // comma-separated list of host:port zookeepers
  "zookeeper_hosts":"localhost:2181",
  // zookeeper base path for concord framework
  "zookeeper_path":"/foo",
  // name of the command that should be executed (in a shell environment)
  "executable_name": "run",
  // files (rooted in your CWD) to include in the package sent to the cluster
  "compress_files": ["computation.py", "requirements.txt", "run.sh"],
  // globally unique name of the computation (found in your implementation
  // of the metadata function)
  "computation_name": "my-computation"
}

Run Script

Then, to actually install dependencies and run, your run.sh would look like this:

#!/bin/bash
set -ex

SOURCE="${BASH_SOURCE[0]}"
while [ -h "$SOURCE" ]; do # resolve $SOURCE until the file is no longer a symlink
    exec_dir="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
    SOURCE="$(readlink "$SOURCE")"
    [[ $SOURCE != /* ]] && SOURCE="$exec_dir/$SOURCE"
    # if $SOURCE was a relative symlink, we need to resolve it
    # relative to the path where the symlink file was located
done
exec_dir="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
original=${PWD}
requirements=$(find $exec_dir -name requirements.txt)

echo "Mesos directory: ${PWD}"
echo "Exec directory: ${exec_dir}"

if [[ -f $requirements ]]; then
    # need to overcome pip 128 chars path - software... :'(
    work_dir=$(mktemp -d -p $original)
    symlink_dir=$(mktemp -d)
    ln -s $work_dir $symlink_dir/concord
    dir=$symlink_dir/concord
    cd $dir
    echo "Installing venv in $dir"
    virtualenv $dir/env
    $dir/env/bin/pip install -r $requirements
    cd $exec_dir
    exec $dir/env/bin/python "$original/$@"
else
    exec python "$original/$@"
fi

rc = $?
if [[ $rc != 0 ]]; then
    echo "Client exited with: $rc"
    exit $rc
fi