Crawling, Parsing, and Indexing Healthcare data with Concord and Elasticsearch

Concord is a distributed stream processing engine written in C++ atop Apache Mesos. The Concord framework makes stream processing accessible to developers from all walks of life (Go, C++, Scala, Ruby, Python), abstracting away the details of distributed systems so that users focus on their business logic rather than cluster operations.

Out friends at Innovatively built and deployed a patent crawling application from zero to production in 4 hours using Concord's Python API. By the end of this tutorial you should be able - like Innovatively - to collect, parse and store data from hundreds of websites, providing fast retrieval and correlation from all these places via Elasticsearch.

Use case

Innovatively uses Concord to index documents (clinical trials, grants, research articles), products (medical devices, drugs, diagnostic tests, research equipment, procedures, diseases, proteins), and entities (businesses, doctors, nurses, researchers), etc, from thousands of different data sources, providing intelligent answers to hard questions in real time.

When you use Innovatively's product to search documents, products or entities, you aren't actually searching the web. You are searching Innovatively's index of those web pages. You can think of these indexes as a snapshot in time of the web. Before Innovatively returns the results, it will try to correlate similar results, dissambiguate entities and finally filter these records so you get the top, cleanest results in about 100 milliseconds.

Innovatively is a NYC-based startup providing an easy way to access, normalize, and integrate all the sources of healthcare data whether they are using it to cure cancer or make the next strategic investment. Whether you want to track Illumina (ILMN)'s market penetration, or to monitor the medical marijuana industry, Innovatively's got your back.

What's Indexing

Data as stored on disk is saved in blocks. You can think of them as boxes, each box containing data items. Your hard drive is like a truck that delivers these boxes to your house so you can look for a particular Christmas sweater uncle Larry gave you when you were 16.

If you had no idea where in the thousands or millions of boxes you stored on your hard drive where that sweater was stored, you'd have to look at each and every single one of those boxes. Most people call that linear search, or O(n) in Big-O-notation.

Indexes are like labels for your boxes. They allow you to skip boxes by forcing you to organize data in specific ways. That's the power of indexes: by imposing an organizational scheme on your data, they allow fast lookup. Next time you can just ask the truck driver to bring your ugly Christmas sweater collection, located on box 3271.

Innovatively needs to do that at machine scale, with low latency, several times a second - a perfect fit for Concord.


We are going to implement a scalable scraping, parsing and indexing pipeline in 3 steps:

  1. HTML Parsing: Reads the HTML contents of the FDA's website and generates a list of URL's to parse.

  2. CSV Parsing: Unzips a file, reads the contents and emits JSON objects downstream

  3. Indexing: Saves the JSON objects into an Elasticsearch index

Note: An 'operator' is a term used to describe a computation that a user deploys

On a cluster, there could be many instances per operator.

HTML Parsing

A web scraper is a program that uses the rendered HTML (what users see on a website) to generate structured data for a program. It looks for pre-registered or learned patterns and emits normalized data.

In our case, we'll scan the FDA's website for all .zip archives so we can detect and parse approved medical devices.

# google's gumbo only works w/ BeautifulSoup3
import BeautifulSoup
import requests

# google HTML5 parser. It is fast and fully compliant
import gumbo

# used to extract all the zip archives
import re

def raw_urls():
    def link_extractor(attr_array):
        for t in attr_array:
            if len(t) == 2:
                (href, link) = t
                if href == "href" and len(link) > 0:
                    return link
        return None

    urls = []
        req = requests.get("")
        soup = gumbo.soup_parse(req.text)
        links = soup.findAll('a', href=re.compile('.*\.zip'))
        attrs = map(lambda x: x.attrs, links)
        urls = map(link_extractor, attrs)
        urls = []
    return urls

Now we’re set with getting the URLs and imports, we can finally parse the CSVs.

HTML Parsing Summary

First we give the request (a bunch of HTML bytes) to the gumbo parser. We tell the parser to find all of the links which in HTML are called a tags. If the contents of these links end in .zip we have a hit!

        soup = gumbo.soup_parse(req.text)
        links = soup.findAll('a', href=re.compile('.*\.zip'))
        attrs = map(lambda x: x.attrs, links)

Creating an HTML Parsing Operator

We need to wire the URL extraction to a Concord computation so we can run it on a cluster

import sys
import time
import concord
from concord.computation import (
import hashlib

def time_millis():
    return int(round(time.time() * 1000))

# returns byte array
def url_hash(x):
    s = str(x)
    m = hashlib.md5()
    return m.digest()

class MedicalDevicesUrlGenerator(Computation):
    def init(self, ctx):"MedicalDevicesUrlGenerator init")
        ctx.set_timer('loop', time_millis())
    def destroy(self):"MedicalDevicesUrlGenerator destroyed")
    def process_timer(self, ctx, key, time):
        urls = raw_urls()
        for url in urls:
            # check in the cache if we have already processed this url
            h = url_hash(url)
            if len(ctx.get_state(h)) == 0:
                url_b = bytes(url)
                ctx.set_state(h, url_b)
                ctx.produce_record("m-device-urls", h, url_b)

        delay_ms = 1000 * 60 * 10; # 10 minutes
        ctx.set_timer(key, time_millis() + delay_ms)
    def process_record(self, ctx, record):
        raise Exception('process_record not implemented')
    def metadata(self):
        return Metadata(


At this point we have an scalable scraper, but no way to parse the CSV contents of the URLs or a way to index the data with Elasticsearch.

Concord's ctx.set_timer() Abstraction

This is the easiest way to punctuate your streams. The most common use case is for emiting values to your database at periods of say 200ms at a time. The second use case covered by this API is for sources that are polling from some external resource like the FDA's website.

    def process_timer(self, ctx, key, time):
        urls = raw_urls()

        delay_ms = 1000 * 60 * 10; # 10 minutes
        ctx.set_timer(key, time_millis() + delay_ms)

There are 2 ideas here. First process_timer() is a callback that will be fired after your timeout period and it will block any process_records calls, as would all our API's since they are single threaded for the user. Second, your ctx.set_timer(key, time_millis() + delay_ms) is a lower bound. One can think about the timer API as one would a recursive function call.

Concord's Key=Value Storage Abstraction

This operator makes use of our key=value data storage api. At the moment the key=value store is backed by RocksDB. This is useful in this context because the number of URLs can be larger than the amount of memory available to the operator. In order to alleviate the memory pressure, we simply store these urls on disk.

It is as simple to use as ctx.get_state and ctx.set_state, like so:

            if len(ctx.get_state(h)) == 0:
                url_b = bytes(url)
                ctx.set_state(h, url_b)
                ctx.produce_record("m-device-urls", h, url_b)

and it is available with every single callback on the ctx object.

Note that you can expand this scheme to check the checksum of the URL to make sure that the contents of the URL have not changed. For this demo and for most use cases, hashing the URL is good enough. We use python's default hashlib.md5() hash for this.

# returns byte array
def url_hash(x):
    s = str(x)
    m = hashlib.md5()
    return m.digest()

In many production environments, these urls or configuration files often come from MySQL, PostgreSQL, or other globally visible data store. The reason is that you want to store a few things about each url without hard coding them for each deployment. The MD5 of the url contents would be a useful property to have for a production system since the FDA website might update the same url with corrections.

CSV Parser

All of the medical records stored in the FDA's site are stored as a giant CSV file with 22 columns in it:

# ['KNUMBER',       == 0
#  'APPLICANT',     == 1
#  'CONTACT',       == 2
#  'STREET1',       == 3
#  'STREET2',       == 4
#  'CITY',          == 5
#  'STATE',         == 6
#  'COUNTRY_CODE',  == 7
#  'ZIP',           == 8
#  'POSTAL_CODE',   == 9
#  'DATERECEIVED',  == 10
#  'DECISIONDATE',  == 11
#  'DECISION',      == 12
#  'PRODUCTCODE',   == 14
#  'STATEORSUMM',   == 15
#  'SSPINDICATOR',  == 17
#  'TYPE',          == 18
#  'THIRDPARTY',    == 19
#  'DEVICENAME']    == 21

However, we are only going to pluck a few fields for indexing. Namely, we want a mapping from the CSV fields to a JSON object we can store in Elasticsearch:

    KNUMBER -> id
    APPLICANT -> company
    CONTACT -> person
    DATERECEIVED -> device_index_date
    DECISIONDATE -> status_date
    DECISION -> status
    DEVICENAME -> product

Now that we have this clean mapping of the most important fields, we can simply create a python object with a to_json(self): method and we're done modeling.

class MDevice:
    def __init__(self, id, company, person, index_date,
                 device_index_date, status_date, status, product,): = id = company
        self.person = person
        self.index_date = index_date
        self.device_index_date = device_index_date
        self.status_date = status_date
        self.status = status
        self.product = product
    def to_json(self):
            return json.dumps(self, default=lambda o: o.__dict__, indent=2)
            return None

def line_to_mdevice(line_arr):
    if len(line_arr) != 22:
        return None
    a = line_arr  # alias
    return MDevice(id=a[0], company=a[1], person=a[2], index_date=time_millis(),
                   device_index_date=a[10], status_date=a[11], status=a[12],

Since we've built a clean mapping from a CSV line to a JSON object, let's download, unzip, and parse the files. We are going to do this with the help of python iterators.

Python iterators are nothing more than an object with a __iter__(self) method. They are a useful abstraction around state that needs to advance one item at a time in a sequence while keeping the index and all the other implementation details hidden.

import os
import time
import json
import csv
import requests
from zipfile import ZipFile

# returns a tuple of and name.txt from the url
def download_zip_url(url):
    zip_name = os.path.basename(url)
    if zip_name == None or len(zip_name) <=0:
        return (None,None)
    with open(zip_name, 'wb') as handle:
        req = requests.get(url)
        if not req.ok:
            print "Something went wrong downloading the zip archive ", zip_name
            return (None,None)
            for block in req.iter_content(1024):
    def zip_name_to_txt_name(n):
        name_parts = n.split(".")
        name_parts = name_parts[:len(name_parts)-1] # minus last elem
        return ".".join(name_parts)

    return (zip_name, zip_name_to_txt_name(zip_name))

class MedicalDeviceIterator:
    def __init__(self, url):
        self.url = url
        (zip_name, text_name) = download_zip_url(self.url)
        if zip_name == None or text_name == None:
            raise Exception("Errors downloading url")
        self.zip_name = zip_name
        self.text_name = text_name
        self.zip_handle = ZipFile(self.zip_name, 'r')
        self.handle =, mode='r')
        self.reader = csv.reader(self.handle, delimiter='|')
        next(self.reader) # skip the header
        self.finished_parsing = False
        self.bad_records_parsed = 0
        self.records_parsed = 0

    def __iter__(self):
        return self

    def lines_read(self):
        return self.records_parsed

    # Returns a MDevice obj, and skips over bad records
    def next(self):
        if self.finished_parsing == True:
            raise StopIteration
        while True:
                line = next(self.reader)
                self.records_parsed = self.records_parsed + 1
                return line_to_mdevice(line)
            except StopIteration:
                print "Records parsed: ", self.records_parsed
                self.finished_parsing = True
                except Exception as e:
                    print "Exception closing readers ", e
                raise StopIteration
            except Exception as e:
                self.bad_records_parsed = self.bad_records_parsed + 1
                print "Unhandled error in url parsing, skipping record: ", e

CSV Parser Summary

In our case, the gist of our iterator can be summed up in 3 lines:

  1. First, we unzip / unwrap the .zip archives
  2. Second, we seek into the .zip archive for the CSV file that we are interested in and we know from manual inspection on the FDA's site that it is the same as the url -> path.txt
  3. Last, now that we have the unzip file, we create a CSV iterator
        self.zip_handle = ZipFile(self.zip_name, 'r')
        self.handle =, mode='r')
        self.reader = csv.reader(self.handle, delimiter='|')

Python uses Exceptions extensively to denote sentinel values. Iterators denote they have reached the end of the sequence by raising a StopIteration exception.

Since our MedicalDeviceIterator is a proxy to a few iterators (zip archive, csv file, csv reader), we need not forget to close all our internal iterators when we finished parsing the CSV files:

    def next(self):
            except StopIteration:
                print "Records parsed: ", self.records_parsed
                self.finished_parsing = True
                try: self.handle.close() except: pass
                try: self.zip_handle.close() except: pass
                raise StopIteration

Creating a CSV Parser Operator

After all the plumbing is in place, our Concord iterator is reduced to emitting JSON records read from the CSV file:

import concord
from concord.computation import (
class MedicalDevicesParser(Computation):
    def init(self, ctx): pass
    def destroy(self): pass
    def process_timer(self, ctx, key, time): pass
    def process_record(self, ctx, record):
        for obj in MedicalDeviceIterator(str(
    def metadata(self):
        return Metadata(


In fact, all we need to do is take what the MedicalDeviceIterator produces and emit it downstream from the Elasticsearch operator to index:

        for obj in MedicalDeviceIterator(str(

This is about as easy to parse a CSV, zipped archive as it is to iterate through a list of numbers i.e.:

    for i in range(0,10):
        print "Index: ", i

Elasticsearch indexer

Elasticsearch aims to help you take data from any source, any format and search, analyze, and visualize it in real time.

In practice, Innovatively has had operational difficulties when dumping data into Elasticsearch at full Concord speed :) - this is why we want a queue to buffer writes to Elasticsearch. If you don't know when and how much data a single step is going to generate or if you have bursty traffic, it is strongly advised that you add a buffering mechanism between your sources and your databases. Very often, stream processing engines like Concord, can easily overwhelm databases that operators are writing to.

Assuming that your database can take 100K request per second, and for simplicity, a single Concord operator can consume and generate 50K request per second, you know that you can at most have 2 data pumpers into your indexing system. Having any more would simply overwhelm and often crash the other systems. This is true for HDFS, Cassandra, Elasticsearch, etc. Since Concord now has first class citizenship for Kafka, we suggest using the at-least-once runtime for these kind of workloads.

To start the indexing operation, we need to establish a connection to our Elasticsearch cluster:

import sys
import concord

from optparse import OptionParser
from elasticsearch import Elasticsearch
def get_opts():
    parser = OptionParser()
    parser.add_option("-i", "--elastic-ip", dest="elastic_ip",
                      help="elastic cluster ip")
    parser.add_option("-p", "--elastic-port", dest="elastic_port",
                      help="elastic cluster port")
    (options, args) = parser.parse_args()
    return (options, args)
def get_ip_port_tuple_from_args():
    (options,args) = get_opts();
    return (options.elastic_ip, options.elastic_port)
def get_elastic_search_connection():
    (host,p) = get_ip_port_tuple_from_args()
    if host == None or p == None:
        raise Exception("Could not get ip:port for elastic search")

    print "Elasticsearch: host ", host, " and port ", p
    es = Elasticsearch(
        # sniff before doing anything
        # refresh nodes after a node fails to respond
        # and also every 60 seconds
    return es

Elasticsearch Indexing Summary

The gist of this operator is to write to an 'index' the JSON version of the medical devices we get from the CSV parsing step.

    res ="concord",

Creating an Elasticsearch Indexing Operator

We need to emit an index update to our Elasticsearch for every record that comes in.

from concord.computation import (
class MDeviceIndexer(Computation):
    def init(self, ctx): = get_elastic_search_connection()
        # ignore 400 cause by IndexAlreadyExistsException when creating an index'concord', ignore=400)
    def destroy(self): pass
    def process_timer(self, ctx, key, time): pass
    def process_record(self, ctx, record):
            res ="concord",
            if not res['created']:
                print "Error saving to elastic search: ", res
        except Exception as e:
            print "Couldn't index record: ", e
    def metadata(self):
        return Metadata(

For a high-throughput system (>500KQPS) we recommend batching your updates to Elasticsearch. Even if you have a Kafka queue to buffer your writes to your indexing cluster, you don't want to spend an eternity saving your data into a cluster. In fact, on a large system you want your consumers to be equal or faster than the rate your producers are producing data most of the time.

Try it!

It's all open source, give it a shot!


In this example, we are assumming you are trying this on your Linux box or your development environment. To productionize, just package the operators on a Docker image.

Outside of the pip installable dependencies, there is only 1 non standard install which is the HTML parser: gumbo.

Gumbo is an implementation of the HTML5 parsing algorithm implemented as a pure C99 library with no outside dependencies.

Deploying the demo

Let's run a couple of scripts to get the dependencies installed. First, let's clone the repo:

    $> git clone
    $> cd demo/innovatively
    $> source environment

    vagrant@vagrant-ubuntu-trusty-64:/workspace/demo/innovatively$ source environment
    New python executable in /workspace/demo/innovatively/demo_venv/bin/python
    Installing setuptools, pip, wheel...done.

Next, let's install the requirements that we need to run the Innovatively pipeline.

    # Notice the (demo_venv)
    (demo_venv) vagrant@vagrant-ubuntu-trusty-64:~/demo/innovatively$

    pip install -r requirements.txt


        (demo_venv) agallego@agbuild:~/workspace/demo/innovatively$ pip install -r requirements.txt
    Collecting concord-py==0.3.6 (from -r requirements.txt (line 2))
      Downloading concord-py-0.3.6.tar.gz
    Collecting requests==2.10.0 (from -r requirements.txt (line 3))
      Downloading requests-2.10.0-py2.py3-none-any.whl (506kB)
        100% |████████████████████████████████| 512kB 2.6MB/s
    Collecting six==1.9.0 (from -r requirements.txt (line 4))
      Downloading six-1.9.0-py2.py3-none-any.whl
    Collecting thrift==0.9.2 (from -r requirements.txt (line 5))
      Downloading thrift-0.9.2.tar.gz
    Collecting zope.interface==4.1.2 (from -r requirements.txt (line 6))
      Downloading zope.interface-4.1.2.tar.gz (919kB)
        100% |████████████████████████████████| 921kB 1.4MB/s
    Collecting wheel==0.24.0 (from concord-py==0.3.6->-r requirements.txt (line 2))
      Downloading wheel-0.24.0-py2.py3-none-any.whl (63kB)
        100% |████████████████████████████████| 71kB 11.5MB/s

Last, let's build the HTML gumbo parser.

    (demo_venv) agallego@agbuild:~/workspace/demo/innovatively$ ./
    Cloning into 'gumbo-parser'...
    remote: Counting objects: 1900, done.
    remote: Total 1900 (delta 0), reused 0 (delta 0), pack-reused 1899
    Receiving objects: 100% (1900/1900), 3.85 MiB | 0 bytes/s, done.
    Resolving deltas: 100% (1186/1186), done.
    Checking connectivity... done.
    + libtoolize
    libtoolize: putting auxiliary files in `.'.
    libtoolize: linking file `./'
    libtoolize: putting macros in AC_CONFIG_MACRO_DIR, `m4'.
    libtoolize: linking file `m4/libtool.m4'
    libtoolize: linking file `m4/ltoptions.m4'
    libtoolize: linking file `m4/ltsugar.m4'
    libtoolize: linking file `m4/ltversion.m4'
    libtoolize: linking file `m4/lt~obsolete.m4'
    + aclocal -I m4
    + autoconf
    + automake --add-missing installing './compile' installing './config.guess'


    Libraries have been installed in:

    If you ever happen to want to link against installed libraries
    in a given directory, LIBDIR, you must either use libtool, and
    specify the full pathname of the library, or use the `-LLIBDIR'
    flag during linking and do at least one of the following:
      - add LIBDIR to the `LD_LIBRARY_PATH' environment variable
        during execution
      - add LIBDIR to the `LD_RUN_PATH' environment variable
        during linking
      - use the `-Wl,-rpath -Wl,LIBDIR' linker flag
      - have your system administrator add LIBDIR to `/etc/'

    See any operating system documentation about shared libraries for
    more information, such as the ld(1) and manual pages.


    /bin/mkdir -p '/home/agallego/workspace/demo/innovatively/demo_venv/include'


    Installed /home/agallego/workspace/demo/innovatively/demo_venv/lib/python2.7/site-packages/gumbo-0.10.1-py2.7.egg
    Processing dependencies for gumbo==0.10.1
    Finished processing dependencies for gumbo==0.10.1

Last, since we aren't building a Docker image, we have to relocate our virtualenv so that all of our runtime dependencies are met.

    $> ./
    ++ which virtualenv
    + venv=/usr/local/bin/virtualenv
    +++ git rev-parse --show-toplevel
    ++ basename /home/agallego/workspace/demo
    + venv_name=demo_venv
    ++ git rev-parse --show-toplevel
    + venv_path=/home/agallego/workspace/demo/innovatively/demo_venv
    + pip install -r requirements.txt
    Requirement already satisfied (use --upgrade to upgrade): BeautifulSoup==3.2.1 in ./demo_venv/lib/python2.7/site-packages (from -r requirements.txt (line 1))
    Requirement already satisfied (use --upgrade to upgrade): concord-py==0.3.6 in ./demo_venv/lib/python2.7/site-packages (from -r requirements.txt (line 2))
    Requirement already satisfied (use --upgrade to upgrade): requests==2.10.0 in ./demo_venv/lib/python2.7/site-packages (from -r requirements.txt (line 3))
    Requirement already satisfied (use --upgrade to upgrade): six==1.9.0 in ./demo_venv/lib/python2.7/site-packages (from -r requirements.txt (line 4))
    Requirement already satisfied (use --upgrade to upgrade): thrift==0.9.2 in ./demo_venv/lib/python2.7/site-packages (from -r requirements.txt (line 5))
    Requirement already satisfied (use --upgrade to upgrade): zope.interface==4.1.2 in ./demo_venv/lib/python2.7/site-packages (from -r requirements.txt (line 6))
    Requirement already satisfied (use --upgrade to upgrade): wheel==0.24.0 in ./demo_venv/lib/python2.7/site-packages (from concord-py==0.3.6->-r requirements.txt (line 2))
    Requirement already satisfied (use --upgrade to upgrade): setuptools in ./demo_venv/lib/python2.7/site-packages (from zope.interface==4.1.2->-r requirements.txt (line 6))
    + bash -c 'virtualenv --relocatable /home/agallego/workspace/demo/innovatively/demo_venv'
    Making script /home/agallego/workspace/demo/innovatively/demo_venv/bin/wheel relative
    Making script /home/agallego/workspace/demo/innovatively/demo_venv/bin/easy_install-2.7 relative
    Making script /home/agallego/workspace/demo/innovatively/demo_venv/bin/easy_install relative
    Making script /home/agallego/workspace/demo/innovatively/demo_venv/bin/pip2 relative
    Making script /home/agallego/workspace/demo/innovatively/demo_venv/bin/pip relative
    Making script /home/agallego/workspace/demo/innovatively/demo_venv/bin/pip2.7 relative
    Making script /home/agallego/workspace/demo/innovatively/demo_venv/bin/python-config relative

Dependencies Summary

In summary, you execute 3 scripts to get your dependencies met:

   $> source environment
   $> ./
   $> ./

and now that your environment is good to go, simply deploy the operators via concord deploy

Let's deploy the Elasticsearch indexing operator

    $> concord deploy mdevice_es.json

Let's deploy the csv parser next.

    $> concord deploy mdevice_parser.json

    # ... Example output

    INFO:cmd.deploy:Adding tarfile:
    INFO:cmd.deploy:Size of tar file is: 15.9MB
    INFO:cmd.deploy:Thrift Request: {
        "cpus": 1.0,
        "disk": 2048,
        "executorArgs": [],
        "forcePullContainer": true,
        "forceUpdateBinary": true,
        "instances": 1,
        "mem": 512,
        "name": "m-device-parser",
        "slug": null,
        "taskHelper": {
            "client": null,
            "clientArguments": [
            "computationAliasName": null,
            "dockerContainer": "",
            "environmentExtra": [],
            "execName": "runner.bash",
            "folder": "",
            "frameworkLoggingLevel": 0,
            "frameworkVModule": "",
            "proxy": null,
            "retries": 0,
            "router": null,
            "scheduler": null,
            "user": ""
    INFO:cmd.thrift_utils:Connecting to:localhost:2181
    INFO:cmd.deploy:Sending computation to:
    INFO:cmd.deploy:Verify with the mesos host: that the service is running

Let's kick off the entire pipeline with the HTML parser

    $> concord deploy mdevices.json

Deployment Summary

In order to have a full parser, you just need to deploy 3 operators.

   $> concord deploy mdevice_es.json
   $> concord deploy mdevice_parser.json
   $> concord deploy mdevices.json

At this point you should be up and running, indexing the web.

We'd love to hear from you when you give this a shot, so stop by our user group to ask questions:!forum/concord-user

Special thanks to An Nguyen, Shinji Kim, Tobias Müller & Sarah Rohrbach for reviewing drafts of this post