Versatile streaming data processing using Kafka Streams

Rob Vadai
Codelook
Published in
5 min readOct 22, 2017

--

In my previous article, I’ve discussed the challenges faced with real-time data processing. So far, Kafka Streams seems to be the solution that satisfies the requirements against a streaming data processing framework.

The code of the example Kafka Streams application, discussed in this article, can be found here.

Minimalist streaming library

The intention behind creating Kafka Streams was to create a library that can consume messages from an upstream Kafka topic and produce messages into a downstream topic while transformations can be applied onto the messages. It does not support other data sources than Kafka out of box (although custom sinks can be implemented using the low-level API). If other data sources are needed, messages can be piped into Kafka (or written from Kafka to other stores) using Kafka Connect.

Kafka Streams is a Java library that takes advantage of the standard Kafka consumer and producer APIs:

  • It uses the high-level Kafka consumer to fetch messages from a topic. That is, it relies on consumer groups. This makes clustered deployment really simple. All we have to do is configure the same consumer group (identified by a simple string) for all Kafka Streams instances. Instances in the same consumer group will consume messages from the same topic, in parallel. Messages will be evenly distributed across all nodes given they have appropriate keys set.
  • It uses the standard Kafka producer to emit the result of the transformation into a topic.

In Kafka Streams, a data pipeline can be defined that connects the source topic to the destination topic. Transformations (such as mapping and filtering) can also be defined and the topology will be applied onto each consumed message.

Defining data pipelines

The topology of a data pipeline (that is, transformations and actions on the messages) is the backbone of a Kafka Streams application. Kafka Streams offers two ways to define a pipeline:

  • Processor API: a more conventional, typical Java API, where each pipeline step is individually defined. The chain of steps can be declared by referencing another step by its name (where the current step pipes into).
    This is the low-level API that can be used to define different destination to the messages other than Kafka. It is not as intuitive to use as the DSL API. Example here
  • Kafka Streams DSL: a typical, modern API to declare pipelines. The API is very similar to the Scala collection, Akka Streams and Apache Spark APIs so it provides a convenient way to set up message transformations. It is simple to use and intuitive. This is really just a wrapper around the Processor API, but hides lot of its complexity. Example here

In our tutorial we’ll use Kafka Streams DSL. Our streaming pipeline takes simple messages and transforms them into another format. The data pipeline consists of the following steps:

  1. Consume plain text messages from an upstream topic. The messages represent a temperature value (numeric value) in Fahrenheit, although they are in String type.
  2. Filter out the messages we need. Basically, we’ll make sure that the messages can be interpreted as valid numbers, and we’ll throw away anything that can not be converted into a valid decimal number.
  3. Transform them into Avro message format. Avro provides a structured message format with a set schema that helps a consumer and a producer to adhere to the same message format. It is more reliable to use messages with a schema because we can avoid unnecessary validation all across our application.
    The Avro message will have the timestamp when the message was processed, the original Fahrenheit temperature and the corresponding Celsius temperature value.
  4. Produce messages into a downstream Kafka topic.
    Also publish our Avro schema definition into Schema Registry. It is a standard way to store all Avro schemas so other applications could query the Schema Registry to retrieve the schema for any specific message. It also comes with automatic versioning for schemas.

The complete pipeline looks like this:

  1. Setting up a source (on line 2) is really simple, we just have to define the source topic name and the serializer/deserializer for the key and message types.
    Serializers and deserializers are basically converters so that Kafka Streams knows the type of the messages to handle. In Kafka Streams terminology they are called “Serdes” (an acronym from serializer and deserializer).
  2. A simple filter is used to filter out messages we need (on line 4).
    In our example, we’re only letting through values which can be converted into a double-precision number.
  3. Converting the simple numeric value into an Avro message is the next step (on line 12). This is a simple mapping step from the plain text message into Avro format.
  4. Finally, we have to produce messages into a downstream topic.
    Just like for the Source topic, we have to set the Serdes, for the message key and value types (on line 16).

Once we have declared the above steps we’ve got a full featured data pipeline that is ready to handle messages flowing through in real-time.

Starting a data pipeline as a standalone application

Once we’ve defined our data pipeline we have to add some bootstrapping to actually start up our Kafka Streams application.

First, the entire pipeline has to passed to an instance of theKafkaStreams object. Given our pipeline is defined in a variable calledstreamBuilder, then we can define new KafkaStreams(streamBuilder, kafkaStreamsConfig)(line 6 below).

Secondly, the created KafkaStreams instance has to be started (line 10 below).

Test run of the data pipeline

The bootstrap project includes integration test for the pipeline. It can be found here. The integration test runs a pseudo-cluster consisting of Zookeeper, Kafka and Schema Registry, all provisioned using Docker containers. We need Zookeeper to coordinate the Kafka brokers (even if we only have one), while Schema Registry is used to store the Avro schemas the application produces. The tests can be started from the run-integration-tests.sh script.

The integration test case verifies that only valid numeric values are processed and the valid messages are transformed into an Avro message.
They are then sent to the downstream topic.

What functionality is supported?

In this article I highlighted a simple use case for Kafka Streams. The API is not as feature-rich as the Spark API for example, however it supports filtering, mapping, flat-mapping, reducing by key and a few other transformations. These operations cover most of the real-life use cases.

Kafka Streams excels in state processing compared to other frameworks. One of the key features are stream processing with recoverable state and it also supports querying state just like it was a database. I’ll discuss these features in my next article.

--

--