The following introduces the basics of the Concord API. It is relatively low level API that simply expresses the features of the Concord realtime streaming engine. There are minor differences between each language specific implementation. Checkout our references docs for more information.
The basic API we expose includes the following functions:
- Metadata (required) defines the names of the operator, input and output streams
- Process Record gets called every time your operator receives a message
- Produce Record called by your code to emit messages
- Set Timer call this to start a timer
- Timer Callback gets called when a timer is done
When writing computations to deploy on Concord, users can simply extend the client libraries in the language of their choosing and implement the necessary functions.
Each computation is responsible for defining a metadata object, which helps the Concord scheduler determine how a new computation fits into the existing topology. Upon submitting a new computation, the scheduler will call this metadata function to determine how to connect it to other computations running in the system.
This function should return a
Metadata object, as defined in our client
libraries, including the following fields:
Name: The globally unique identifier of the computation.
The list of streams this computation should be subscribed to. These streams can
be optionally overloaded with a grouping strategy. This grouping strategy
determines how the framework routes messages to instances of a given
computation. For example, if a computation is performing an aggregation,
it's necessary to ensure that all messages with a given key are routed to
the same instance to ensure you don't get a partial sum. Concord's
GROUP_BY routing method allows for this.
Output Streams: The list of streams this computation may produce on. Since it is up to a computation to define how messages are routed to it, the programmer needn't specify a routing strategy for the output streams.
Your record processing function will be called for every record that passes through your computation, giving you an opportunity to compute aggregates, manipulate records and filter records from a stream.
Almost every computation will implement Process Record.
def process_record(self, ctx, record): self.concord_logger("Key processed: %s" % record.key) self.inspect_record(record.data)
"Producing" a record refers to emitting records downstream. By producing a record on a stream with a specific key and value, you can emit data onto any computation subscribed to that stream.
def process_record(self, ctx, record): title, info = enrich_data(record.key, record.data) ctx.produce_record("stories", title, info)
Set Timer & Timer Callback
Concord allows users to set “timers” for some arbitrary point in the future. To set a timer, use the set_timer method available on the computation context object. This method accepts a timer name (String) and a timestamp representing the absolute time from epoch for sometime in the future (in milliseconds). When the scheduled time passes, Concord will call a function on your computation - referred to as the timer callback.
Timers are useful for aggregation - schedule a timer to trigger at some interval (e.g. every second/minute/hour), aggregate events received until the timer triggers, then emit the result downstream. Timers are also useful for debouncing, joining/filtering records within a time window.
def process_timer(self, ctx, key, time): minute_agg = reduce(lambda a, b: a + b, self.data, 0) self.data =  ctx.produce_record("threshold_limiter", minute_agg, "") ctx.set_timer("loop", self.minuteFromNow())