Java Client

Installation

To add the Concord client library to your project, simply add our jars to your projects dependencies. Our dependencies can be located at https://conjars.org/repo. Make sure to add this to your list of revolvers since it will not be the default. Here is an example using sbt:

libraryDependencies ++= Seq(
  "io.concord" % "concord" % "0.1.2",
  "io.concord" % "rawapi" % "0.2.5"
)

resolvers += Resolver.sonatypeRepo("public")
resolvers += "conjars" at "http://conjars.org/repo"

Java API

The abstract class you must extend our client API like this:

public abstract class Computation {

  /**
   * Called when the framework has registered the computation successfully.
   * Gives users a first opportunity to schedule timer callbacks and
   * produce records.
   *
   * @param ctx: The computation context object provided by the system.
   */
  public abstract void init(ComputationContext ctx);

  /**
   * Process incoming records on one of the computation's `istreams`.
   *
   * @param ctx: The computation context object provided by the system.
   * @param record: The `Record` to emit downstream.
   */
  public abstract void processRecord(ComputationContext ctx, Record record);

  /**
   * Process a timer callback previously set via `setTimer`.
   *
   * @param ctx: The computation context object provided by the system.
   * @param key: The name of the timer.
   * @param time: The time (in ms) for which teh callback was scheduled.
   */
  public abstract void processTimer(ComputationContext ctx, String key,
                                 long time);

 /**
  * This function is called by the concord proxy to identify the user
  * computation.
  *
  * @returns: Metadata object representing the user computation.
  */
 public abstract Metadata metadata();
}

Note that each instance of ctx in the arguments is a context object. In concord, the context object is used to communicate from the client computation to the framework. You can use it to produce records and set timers. The computation context object looks like this:

public abstract class ComputationContext {

 /**
  * Set local state for a given key
  *
  * @param: key:         Key to set in local store.
  * @param: binaryValue: Value to store at key.
  */
 public abstract void setState(final String key, final byte[] binaryValue);

 /**
  * Get local state for a given key
  *
  * @param key: Key to recrieve from local store.
  * @returns the state executed upon data retrieval.
  */
 public abstract byte[] getState(final String key);

 /**
  * Register a timer callback for some point in the future
  *
  * @param key:  Name of the timer callback.
  * @param time: The (ms since epoch) at which the callback should
  *              be fired
  */
 public abstract void setTimer(final String key, final long time);

 /**
  * Emit a record downstream
  *
  * @param streamName: The name of the stream on which the record should
  *                    be emitted.
  * @param binaryKey:  The key associated with the record. Only relevant
  *                    when routing method is `GROUP_BY`.
  * @param binaryData: The binary blob to send downstream.
  */
 public abstract void produceRecord(final byte[] streamName,
                                    final byte[] binaryKey,
                                    final byte[] binaryData);
}

Finally, the Metadata object you need to prepare in the metadata method should be initialized as follows:

public Metadata metadata() {
  Set<StreamTuple> istreams = new HashSet<StreamTuple>();
  Set<String> ostreams = new HashSet<String>();
  istreams.add(new StreamTuple("stream1", StreamGrouping.GROUP.BY));
  ostreams.add(new String("stream2"))
  return new Metadata("my-computation", istreams, ostreams);
}
  • name: a string identifying the computation
  • istreams: a set of streams to subscribe to. Each element in this set is a unique StreamTuple object which contains the stream name, and routing strategy
  • ostreams: a set of strings representing the streams this computation may produce on.

Directory Structure

Here’s a sample directory layout for a simple Java project using Concord:

my-project/                # project root
  build.sbt
  deploy.json              # computation manifest
  src/
    main/
      java/
        Computation.java   # the computation logic
  run.sh                   # a wrapper to run jar

Computation Manifest

The computation manifest for our sample project would appear as follows:

{
  // comma-separated list of host:port zookeepers
  "zookeeper_hosts":"localhost:2181",
  // zookeeper base path for concord framework
  "zookeeper_path":"/foo",
  // name of the command that should be executed (in a shell environment)
  "executable_name": "run",
  // files (rooted in your CWD) to include in the package sent to the cluster
  "compress_files": ["getting_started-assembly-1.jar"],
  // globally unique name of the computation (found in your implementation
  // of the metadata function)
  "computation_name": "my-computation"
}

Run Script

After building your project and placing all resources in the manifest file, your run.sh could look like this:

#!/bin/bash --login

dir=$(pwd)
jar_file=$(find $dir -iname "getting_started-assembly-*.jar");
if [[ ! -f $jar_file ]]; then
  echo "Cannot find getting_started assembly file";
  exit 1
fi
echo "Found java executable at: $jar_file"
exec java -cp $jar_file $@