C++ Client

Installation

To get started with the Concord C++ library, include our header:

#include <concord/Computation.hpp>

C++ API

The abstract class you must extend our abstract interface:

/** Abstract class for user-defined computations.
 *  All computations should be default constructable.
 */
class Computation {
  public:
  Computation() {}
  virtual ~Computation() {}

  using CtxPtr = std::shared_ptr<ComputationContext>;

  /** The callback that is called when the computation is registered.
   *  Once the computation has successfully registered with the framework,
   *  this callback is triggered. This presents a window for the programmer
   *  to set up an initial timer callback or produce a record.
   *
   * @param[in] ctx A `ComputationContext`.
   */
  virtual void init(CtxPtr ctx) = 0;

  /** The callback that is called for each incoming record.
   *  This function is analogous to the mapper in the map/reduce paradigm.
   *
   * @param[in] ctx A `ComputationContext`.
   * @param[in] r   The `FrameworkRecord` sent from upstream.
   */
  virtual void processRecord(CtxPtr ctx, FrameworkRecord &&r) = 0;

  /** The callback that is called for timers set via `setTimer`.
   *
   * @param[in] ctx  A `ComputationContext`.
   * @param[in] key  The name of the timer.
   * @param[in] time The time the timer was scheduled for.
   */
  virtual void
  processTimer(CtxPtr ctx, const std::string &key, int64_t time) = 0;

  /** Getter for the computation's `Metadata` object.
   *  This function is called by the concord proxy to identify the user
   *  computation.
   *
   * @returns A `Metadata` object representing the user computation.
   */
  virtual Metadata metadata() = 0;
};

Note that each instance of ctx in the arguments is shared pointer to a context object. In concord, the context object is used to communicate from the client computation to the framework. You can use it to produce records and set timers. The computation context object looks like this:

/** Context object used by users to manipulate state, set timers and
 *  emit records down stream.
 */
class ComputationContext {
  public:
  /** Set local state for a given key
   *
   * @param[in] binaryKey   Key to set in local store.
   * @param[in] binaryValue Value to store at key.
   */
  virtual void setState(const std::string &binaryKey,
                          const std::string &binaryValue ) = 0;

  /** Get local state for a given key
   *
   * @param[in] binaryKey Key to retrieve from local store.
   * @param[in] callback  Callback executed upon data retrieval.
   */
  virtual void getState(const std::string &binaryKey,
                          std::function<void(std::string &&)> callback) = 0;

  /** Register a timer callback for some point in the future.
   *
   * @param[in] binaryKey Name of the timer callback.
   * @param[in] time      Time (ms since epoch) at which the callback should
   *                      fire.
   */
  virtual void setTimer(const std::string &binaryKey, int64_t time) = 0;

  /** Emit a record downstream.
   *
   * @param[in] streamName  The name of the stream on which the record should
   *                        be emitted.
   * @param[in] binaryKey   The key associated with the record. Only relevant
   *                        when routing method is `GROUP_BY`.
   * @param[in] binaryValue The binary blob to send downstream.
   */
  virtual void produceRecord(const std::string &streamName,
                              const std::string &binaryKey,
                              std::string &&binValue) = 0;
  virtual ~ComputationContext() {}
};

Finally, the Metadata object you need to prepare in the metadata method should be initialized as follows:

/** User computation metadata data structure.
 */
struct Metadata {
  /** Globaly unique name of the computation. */
  std::string name;

  /** List of streams to subscribe to. */
  std::set<std::string> istreams;

  /** List of streams this computation will produce on. */
  std::set<std::string> ostreams;
};
  • name: a string identifying the computation
  • istreams: a list of streams to subscribe to.
  • ostreams: a list of streams this computation may produce on.

Directory Structure

Here's a sample directory layout for a simple C++ project using Concord:

my-project/             # project root
  Makefile
  deploy.json           # computation manifest
  src/
    computation.cc      # computation source
  build/
    computation         # computation binary

Computation Manifest

The computation manifest for our sample project would appear as follows:

{
  // comma-separated list of host:port zookeepers
  "zookeeper_hosts":"localhost:2181",
  // zookeeper base path for concord framework
  "zookeeper_path":"/foo",
  // name of the command that should be executed (in a shell environment)
  "executable_name": "computation",
  // files (rooted in your CWD) to include in the package sent to the cluster
  "compress_files": ["build/computation"],
  // globally unique name of the computation (found in your implementation
  // of the metadata function)
  "computation_name": "my-computation"
}