The Kafka Streams Experience Pt. 1

What And Why?

Kafka Streams is a relatively new technology available as part of the standard Kafka distribution. I’ve mentioned it before in previous blog posts such as my massive wall of text on event sourcing. It’s attractive to us because it solves a lot of the difficult problems with event sourcing with Kafka, many of which I discussed in that post. We have several use cases for event sourcing, where its use makes certain features much easier to implement at scale than more traditional approaches.  One such application is discussed in this post.

We have now implemented several of our microservices using Kafka Streams, and have discovered and overcome a host of challenges along the way to becoming (relatively) fluent.  In this series of posts, I’ll discuss these challenges, how we addressed them, and what the outlook is for Kafka Streams in our products going forward.

We also make extensive use of the “interactive queries” feature of Kafka Streams, which has allowed us to address querying data that is both big and fast in ways that other storage technologies have challenges with.  This feature has its own set of specific implementation challenges, some of which I’ll discuss in this post, as well as later posts in this series.

This post assumes basic familiarity with Kafka and Kafka Streams concepts, such as the difference between a KStream and a KTable, as well as the basics of how Kafka topics and partitions work.  If you’re not there yet, take a look at this introductory post on Kafka Streams concepts.

Challenge: Topic/Partition Explosion

The Problem

Kafka Streams creates a lot of topics for you behind the scenes under a host of circumstances. For instance, you get a topic created:

  • For any stage that has state, such as an aggregate or KTable join.
  • For any interactive query store you create with logging support.
  • Any time you join two KStreams that are not co-partitioned already

For even a medium-complexity streaming application, this can quickly reach tens of topics, each with as many partitions as the input topics to the stream.  This has several issues, all of which we have experienced in one form or another:

  • More latency.  If you are publishing data back and forth to the brokers many times, you pay the round trip time at least, and possibly the commit interval time (a configuration parameter for the stream), for every stage that does this.
  • More disk usage.  Publishing stream state through many topics means that you’re using disk space for redundant or partially-redundant copies of data on your brokers.  In more than one case we ran our brokers out of disk space with disastrous results.  Details in a later post…
  • Drain on broker resources.  Brokers have to do work for each partition of each topic they host, even if those partitions are not particularly active.  For log-compacted topics (which many of the auto-created topics are), this amount of work can be large.  We observed several cases where our brokers would experience significant slow-down simply due to the load of hosting many partitions.
  • Drain on client resources.  A lot of the work of re-publishing and subscribing to all these intermediate topics is done client-side.  For large volumes of data, this load can be significant.  This manifests as additional latency, and in extreme cases, consumers falling behind.
  • Slow client startup. When clients with state stores start up, they have to replay all the partitions of all the KTables backing their state stores into their local storage.  They do this one partition at a time, and the time this takes scales roughly with the total number of partitions in all state stores in the application.

The Solution

We took several measures to control the topic/partition explosion problem.

Repartition source topics down.

In a few cases, we had source topics with large numbers of partitions, because those topics are very active.  Downstream processing might reduce or aggregate the data so that it is much less active or large, making the number of partitions required much less.  Don’t let the needs of the input data dictate the partitioning of your entire stream, especially because complicated streams may create many topics.

For example, we have an application where a stream took high-frequency robot operational data as input.  This is the biggest, fastest data in our entire application and flows through a topic with many partitions.  However the output was quite small: we simply aggregate the input data to a single value per robot, namely the frequency of the data in Hz.  To accomplish this, we first transformed the input data into something much smaller (namely a time stamp and number of data points) in a simple map step.  Then we repartitioned that down to 1/10th as many partitions, and used that as an input to the aggregation stage.  This in turn ensured that the topic created behind the scenes to capture the state of the aggregation had a small number of partitions. In pseudocode, this looks like:

stream("TopicWithLotsOfPartitions")
  .mapValues(transformToTimeStampAndPoints)
  .through("TopicWithFewPartitions")
  .groupByKey()
  .aggregate(calculateRunningFrequency, "frequencyStoreName")

Now we have a queryable store with the update frequency for each robot without having to host a topic with the same (large) number of partitions as the raw robot high-frequency data.

Reduce Number Of Stateful Streaming Stages

This seems pretty obvious in retrospect, but it’s easy to do and pays big dividends.  In short, be mindful of all the streaming stages that cause topics to be created behind the scenes, and minimize those.

As an example, we had an application where we were joining a KStream of robot events with a KTable of meta-information about the robot such as a user-assigned name.  We then aggregate the events into a notion of current state for a robot (resulting from all the events happening to it over time).  Initially, the stream looked like this:

stream("RobotEventsTopic")
  .groupByKey()
  .aggregate(calculateRobotEventState)
  .leftJoin("RobotMetadataTable", joinRobotData, "queryableStoreName")

This looks reasonable enough, but it creates two topics behind the scenes: one for the state of the aggregate stage and one for the queryable store we specify in the left-join stage.  Each will have as many partitions as “RobotEventsTopic”.  The data between these two topics is almost completely redundant. With a simple rewrite however, we were able to eliminate one of these topics with no compromise in functionality:

stream("RobotEventsTopic")
  .leftJoin("RobotMetadataTable", joinRobotData)
  .groupByKey()
  .aggregate(calculateRobotEventState, "queryableStoreName")

The trick here was that we made the aggregate stage the same stage as the queryable store, so we only need one topic that now does double duty: stores the state of the aggregation as well as serves queries.

With the above example and some others, we derived some rules of thumb that we follow that allow us to minimize the auto-created topics when defining the stream itself:

  • Prefer to join KStreams to KTables over KTables to KTables.  Stream-to-table joins do not require backing topics.
  • Minimize the number of aggregate steps (also applies to reduce, count, or transform), preferably to one per stream.  If you are aggregating many KStream events into a single state, first merge all the input KStreams into one, then do a single aggregate on the merged KStream.  We often package disparate events into a single protobuf message with a oneof field for this purpose.
  • Design your topics ahead of time to minimize the need for repartitioning.  We are still in the process of perfecting this, and it is difficult because different topics have different partitioning needs.  Our current thinking is to define “tee shirt size” levels of topic partitioning (very busy, kinda busy, not busy, etc.).  Then repartition early, only when you are crossing from one tee shirt size to another, to the lowest tee shirt size possible.

Use Change Log Topics Directly

One of the most common auto-created topics for Kafka Streams is the “change log” topic, which backs any stateful streaming stage or queryable store.  These topics always end with the string “-changelog.”  Initially, if we wanted a microservice to publish a KTable for other services to consume, we’d add a “to” step at the end of the stream, as in this example:

stream("InputStreamTopic")
  .groupByKey()
  .aggregate(someAggregationFunction)
  .to("OutputTableTopic")

This ends up creating two topics with exact copies of the same information.  “OutputTableTopic” is one; the other is created behind the scenes for the aggregate stage.  Both have the exact same keys and values (i.e. the output of the aggregation stage).  We did this because we have our own naming conventions for topics, and using our custom topic in the “to” stage allowed us to adhere to those.  However, we have decided that adhering naming conventions is not worth the cost of a redundant topic (which is disk space as well as CPU on the broker and client, plus some latency).  We now do this:

stream("InputStreamTopic")
 .groupByKey()
 .aggregate(someAggregationFunction, "queryableStoreName")

This causes the backing topic for the aggregate stage to have a deterministic name, namely “application-name-queryableStoreName-changelog” (where “application-name” is the name you give to the entire Kafka Streams application).  We then  allow other microservices to subscribe directly to this topic to use it as an input KTable.

Note that this approach is not without challenges.  For instance, a common practice in Kafka Streams is to attach a version number to the name of a streaming app, which you increment any time you change the app in a way that is not compatible with the old version.  Doing this also changes the names of any change log topics, since they include the name of the streaming app.  This would in turn break any services that consume the old topic.  We don’t have a good solution for this yet.

Use Kafka 1.0 (Only Don’t Yet)

The Kafka 1.0 client libraries include many improvements that reduce the number of backing topics created for stateful stages.  For instance, in some cases, KTable-to-KTable joins can be performed without a backing topic at all. There is one problem though: Kafka 1.0 has a critical bug precisely with this optimization, which renders it unusable for most streaming applications.  We are currently still using the 0.11.0.2 client libraries with plans to switch as soon as we’re satisfied the issue is fixed.

Coming Soon…

Topic/partition explosion is but one of the many weird and wonderful issues we’ve encountered since implementing Kafka Streams into many of our microservices.  In upcoming entries, I’ll discuss other challenges, such as:

  • Revving streams.  Specific challenges apply if you ever change the topology of a streaming application, including changing the version of the Kafka client libraries, or even certain configuration parameters. Some of these are covered by published best practices, some we have had to figure out ourselves.
  • Interactive Query Startup.  For large stores, the interactive query feature of Kafka Streams can take up to several minutes to start up, during which time queries do not work. We had to use Kubernetes readiness probes in combination with some specific calls into Kafka Streams to cause this not to manifest as downtime to the end user.
  • Interactive Query Downtime.  If a replica of a microservice goes offline temporarily, any queryable Kafka Streams stores associated with that service also go offline, possibly for several minutes. This ended up being unacceptable for us since it causes several minutes of partial downtime every time we restart a replica of a service for any reason. An open issue with Kafka covers this but has no published ETA. We have also implemented a fairly involved system to allow interactive queries to remain online during restarts or crashes of service replicas.

See you next time!

Combining Kafka Streams and Akka

In our KUKA Connect robot management web app, we have begun refactoring our code to use Kafka Streams as much as possible.

We were already using Kafka, publishing data to various topics, and consuming that data, either with simple consumers or by using Akka Streams.

But now we wanted to move to Kafka Streams: can we just replace our usages of Akka Streams with it? The short answer is, no. Here’s a use case from our web app that shows how combining the two frameworks still makes sense for us.

Kafka Streams and Akka Streams, Each Great at Something

Use Case: When a KUKA robot hits a critical fault, notify the owner of that robot via text message.

Microservices Involved:

Notification Service: Main player, responsible for joining various Kafka Streams together from other microservices and calling into Messaging Service to actually send the messages to the right users.

Device Access Service: Responsible for knowing which users have access to which robots at any given time, publishes a full state KTable with this information.

Device Licensing Service: Responsible for knowing whether a robot has a Plus license or not, publishes to a full state KTable.

Device Fault Service: Publishes robot faults to a KStream as they occur.

Messaging Service: Knows how to send text messages to users.

We build our main stream by constructing a KStream off the deviceFaults topic, joining those fault events to the deviceAccess KTable to find the users who have access to the device, joining that to the deviceLicensing KTable to filter out robots that are not Plus licensed, we do a flatMap to rekey the result from deviceId to userId, and finally we publish those events to a new topic we call the userFaultStream.

Kafka Streams gives us the ability to easily combine these different flows of data, filter based on various criteria, then rekey it from device identifier to user identifier.

But there’s a problem.

Enter the Akka

People dislike getting spammed with hundreds of text messages.

Further, robots can sometimes hiccup and throw a spurious fault that is then immediately cleared by an internal system. The fault stream flows every single fault however, so if we blindly call into Messaging Service with each fault, users will sometimes get flooded with fault notifications.

What we really want is a way to batch the faults to any given user every X seconds or so, to give spurious faults time to clear themselves and also to group up rapid-occurring faults into a single message.

Kafka Streams is not good at that.
But Akka Streams is.

Recall we published our fault stream to a new topic userFaultStream.

We then made an Akka Stream consumer of that topic, grouped by userId, merged with another source that throttles out a “Send Message” signal every 5 seconds, and then we buffer up all text messages intended for a given user during those 5 seconds, until we get the Send Message signal.

This magic means we can be getting robot faults from many robots at once, each handled by a different instance of our faults microservice, join those together to filter and discover which users need to know about the faults, and then key them by those userIds, buffer up and combine messages to the same user, and then send out as a batch.

Streamification

Because programming with Kafka Streams is so powerful, we are adopting it whenever we can, “streamifying” existing microservices one at a time as it makes sense for us to do so.

That said, we still have many use cases for Akka Streams, as the broadcast and flow processing provide rich abilities that Kafka Streams do not (yet?) offer.

Now that we are programming using this paradigm, we don’t want to go back to the “old” way of doing things, where we made lots of calls between microservices to piece together all the data we needed at any given time.