Data is a first class asset for us. We also believe deep in our hearts that data is even more valuable when made available sooner. Quality data analytics requires more than consolidating data in a central location, it demands up-to-the-minute captures. In this blog post, we describe how we’ve created a real-time data capture system.

By Sam Fang and Steven Deutscher

High Level Design

Currently, we use a SDK and database capture system to gather real-time data. Both systems publish to the message system Kafka that supports our real-time pipeline business. Meanwhile, we leverage DC/OS, Marathon, Mesos and Docker to improve the application’s reliability and maintainability.


Single Source of Truth


Our first approach was to develop a SDK which allows application developers to integrate with our API. The process involves modifying their applications to publish data to a message queue system like Kafka. We use our SDK to collect several events such as web site browser events, audit log and etc. The advantages of this method is it is very simple and fast to adopt. The SDK approach is simple, but has some negative aspects. Although the APIs are easy to use they still require additional development effort (often mundane repetitive work). Another complication for this approach has to do with consistency. The issue is present for systems that writes to both the SDK and any other persistent layer (i.e database) at the same time. Ensuring consistency using some coordination protocol such as Paxos or 2-phase commit adds a lot of complexity.

Database Change Data Capture

Our second approach attempts to take advantage of the prevalence of databases in most enterprise systems. More specifically, our capture system translates the database binary log into SQL statements, and publishes to a messaging system like Kafka. This method is very powerful as it gives the capability to collect all existing data in real time flavour.

In the market, there are many database, and capture systems such as: Oracle Goldengate, VMWare Tungsten. Meanwhile, there are lots of open source like Mysql Binlog Connector, Open Replicator, Maxwell, and Alibaba Canal. Since our major database is Mysql, we compared several capture applications.

Table 1 - Mysql Log Mining Comparison

Table 1 – Mysql Log Mining Comparison

Based on our research we choose Alibaba Canal. This was due to several reasons such as it providing HA through Zookeeper. In addition, Canal has the highest adoption and provides partition support, a feature required for our scenario.

Event Process for Database Capture and Publish System

While running, Canal acts as a Mysql Slave. It captures the binary log in real-time, and converts log entries into SQL statement. To complete the pipeline, we need to develop a client to connect to Canal, retrieve the translation results, and publish to the messaging system. For this task we have chosen Kafka Connect — a framework developed by Confluent, which makes it easy to integrate new systems into stream data pipelines containing Kafka.

Connect moves data between Kafka and other systems. Users instantiate Kafka Connectors for the systems they want to import (source connector e.g a relational database) or export (sink connectors e.g the contents of a Kafka topic to an HDFS file ) data. In our case, we built our source connector to connect to Canal.

Connector Implementation

During development, we faced several classical problems, and want to share in this blog.

Offset Management

In most connect installations offset management is the responsibility of the framework. With a distributed setup offsets are stored within Kafka topics and standalone setups persist to the local file system. For a variety of reasons we have chosen to use a standalone setup. The first motivation has to do with the nature of our input source. When implementing a new connector you are required to define how your data is partitioned. For example the JDBC connector partitions data by table, the framework distributes the workload by partitions across Connect workers. In our case, we are reading from a single log file, coordinating reads across multiple workers would be difficult.

Lastly, the connect framework is still relatively new and has a few issues that are more evident in distributed mode (a single bad worker config can permanently bring down your cluster) Many of these issues have been resolved in newer versions but require an upgrade to Kafka 10.

With that said, using local files is not a sustainable approach to offset management. Luckily, Canal handles a majority of this complexity using Zookeeper. Canal sends data in batches, it’s the responsibility of the Connector to determine which batches have made to Kafka, and notify Canal. The Connect framework provides hooks allowing for implementations to provide their own offset management.

One thing to note – if you plan on providing your own offset management using Confluent 2.0.0 make sure you grab this fix.

High Availability – Active / Standby

As mentioned our Connect setup uses a standalone installation. To add some basic HA we have used Apache Curator to implement the Zookeeper leadership election recipe. This setup is easy and covers some of the ground from not using a Connect cluster. Using this setup in conjunction with Marathon / Mesos / DCOS we can ensure we always have an active / standby connector running for a given database.

Data Type Conversion

Our connector is responsible for translating SQL statements into a Kafka Connect format. Once in the Connect format, you can plug-and-play with different formatter implementations for persisting into Kafka. In our case we have chosen AVRO.

When translating SQL row events into the Kafka Connect format we have encountered a few gotchas, Enums are represented as indexes and use one-indexing. It was slightly shocking to see all ‘SUCCESS’ being shifted to ‘FAILURE’ for a status enum. Keep in mind timestamp/datetime precision levels if doing any kind of date string formatting

  1. Enums are represented as indexes and use one-indexing. It was slightly shocking to see all ‘SUCCESS’ being shifted to ‘FAILURE’ for a status enum.
  2. Keep in mind timestamp/datetime precision levels if doing any kind of date string formatting