Conquering Streaming Analytics With Akka And Kafka

Introduction

I almost called this post “conquering the non-idempotent event materialization function,” in reference to my event sourcing wall o’ text from last year, but I didn’t want to lose 80% of my audience before the first paragraph. It would have been an appropriate title however. Still here? In the previous event sourcing post, I talked about the challenges of working with events whose materialization functions are non-idempotent, meaning that if you replay the same event multiple times, your system does not come out the same.

A trivial example of this would be an event that says “$100 added to bank account.” If you play this event 4 times, you add $400 to the account. By contrast, an event that says “account balance is now $100” can be played as many times as you want, and the account balance will remain consistent. In the previous post, I concluded that the latter type of event is preferable, since it allows you to materialize events into system state in an idempotent fashion, and duplicate events are a non-issue. This is convenient, because it’s common for Kafka to replay duplicate events (and in general, “at least once” is a much easier delivery guarantee to enforce than “exactly once”).

In this post, I detail a case where idempotent materialization was impossible: namely a streaming analytics routine that counted incoming numerical data and aggregated the sum of that data, very similar to aggregating “$100 added to account balance” events. We have several new cases where this is critical. For instance, KUKA’s “ready2_fasten” screw fastening package uses our KUKA Connect cloud application to report metrics on screw fastening over time, such as number of successful or unsuccessful attempts in a particular time window. In this case the events looked like “this many new screws have been fastened.” We had to prevent duplicate event replay if the data were to be accurate. This had to be done in a way that could be resilient to transient Kafka issues, microservice crashes/reboots/re-deployments, and network retires. Here is how we did that.

…And How We Didn’t Do That

We did not use Kafka Streams to solve this problem. Soon, I will write a post detailing why we abandoned Kafka Streams for the time being due to various challenges implementing it in production. For now, suffice it to say that we had concluded it was not an option. This was a shame, because if it actually worked, Kafka Streams has some powerful facilities for defining non-idempotent aggregations without having to worry about many of the complicated underlying details.

We did use Akka Streams and Akka’s Kafka connector to accomplish this however. This means we had to implement a few things that Kafka Streams supposedly handles for you, such as guarding against duplicate event replay even when events are consumed and republished by what I refer to as “republish systems” (systems that consume Kafka messages, transform them, and publish them to new topics) in the previous event sourcing post. This had to hold true even if the republish system and the aggregation system were in different microservices.

The Details

The breakthrough in this approach is relatively obvious in retrospect: use the Kafka partition offset. Any time you receive a Kafka message, it has an “offset”, which is a 64-bit integer that increments by one for every message published to a Kafka partition. Every message with an offset of n occurred after any message with offset <n and before any message with offset >n. Also, due to the way Kakfa handles partitioning, we knew that every message for any particular “thing” (in this case a robot, represented by a Kafka partition key corresponding to the robot’s serial number) is in the same partition, and therefore ordered with respect to any other message from that robot. Therefore, the procedure became, simply:

  • When you aggregate events into a state for a robot, that state should include the Kafka offset of the latest message that was aggregated into that state.
  • If you encounter a message with an offset earlier than that stored in the current state, discard it.

This sounds straightforward, but there was some complexity to it. We had several stages in our streaming app that were effectively “flat maps” meaning that one incoming event could produce several outgoing events. The outgoing events all corresponded to a single incoming Kafka message with a single offset. This meant 2 things:

  1. We needed to keep track of the source message offset through all stages of the Akka stream so we could update any aggregation state at any stage with the latest offset of the original Kafka message that produced that state (and discard earlier ones as described above).
  2. Events produced by a “flat map” stage (i.e. multiple events produced by a single Kafka message) must be processed as a batch! It was critical that we avoided processing part of a batch of messages, then possibly crashing. In that case we would not know where in the single Kafka message offset that produced the collection of messages we were processing and had left off. This could cause us to either lose data (by skipping part of a partially-processed collection) or replay duplicate events (by double-processing part of pa partially-processed collection), both of which were unacceptable. Therefore, we passed “flat mapped” events through the stream as a collection (usually a Scala immutable.Queue due to its good append/prepend/dequeue performance) with a single attached Kafka offset, and the aggregate states were updated and persisted only after processing the entire collection.

Persistence And Load Handling

The last remaining problem to solve was how to persist and load aggregated states. When a microservice boots up and starts receiving Kafka messages, we first need to load up the current state of every aggregation stage in the Akka stream (which remember, includes the latest-processed Kafka offset in each stage). Then we can start playing events on top of that state (or discarding events whose Kafka offset is too early).

For this, we used a mix of Redis and Cassandra. A full discussion of the trade offs between the two is somewhat beyond the scope here, but a short treatment would be that Redis is better with smaller, faster data while Cassandra is better with bigger, slower data. We had both flavors of data in our system.

Akka gave us a lot of flexibility in how to persist states. The basic approach we used was:

  • Pick a persistence interval. This is how often we write our aggregated states to Cassandra/Redis. This is a tunable parameter of the system. The longer it is, the less we stress out the database, but the more Kafka events we have to potentially replay on a reboot or crash. Also we can only count on our data being as “fresh” as this interval. We used 1 minute. This was very easy to accomplish using Akka’s built in throttle and conflate functions. We throttled the stream elements representing updated state to 1 per minute per device, and conflated them by simply selecting the most recent state. This is what went to Cassandra/Redis. Kafka Streams has no equivalent of throttle/conflate, or really any time- or rate-aware streaming primitives, which is one of my chief gripes about it.
  • Only commit the message to Kafka after the states that include that Kafka event have been written to Cassandra/Redis. This means that if the system crashes, we will replay any lost Kafka events. We might also replay events that have already been persisted if the system crashes after persisting to Cassandra/Redis but before committing to Kafka. However, this is okay since we ignore Kafka messages with an offset earlier than the latest one that has already been aggregated into the state, as described above.
  • Persist states in the reverse order that the stages appear in the Akka stream. This required some thought. If a stream has 4 aggregation stages, what happens if you persist only 2 of them then crash? When you recover, if the ones you persisted before crashing are the earlier stages, then you might have earlier stages in the stream with later Kafka offsets. If an earlier Kafka message comes in, the earlier stream stage with the later offset will discard that message, and it will never make it to the later stages that still need that event since they were not persisted before the crash. This is trivially avoided by persisting the later (more downstream) stages before the earlier (upstream) ones.

And Then, Oops

Once we had implemented the above system, we ran it in our test environment for several days. To our shock and dismay, we still observed analytics results indicating duplicate events had been replaying. After some frustrating investigation, we identified the cause: republish systems. I discuss these in the previous event sourcing post: they represent any system that takes in Kafka messages, does calculations, and publishes other Kafka messages representing the results of those calculations.

In the case of our streaming analytics, there was in fact a republish system upstream of our analytics stream. This system would translate raw incoming data from robots into a standard data format that most of KUKA Connect uses. Obviously this system could crash just like our analytics system. And equally-obviously, crashing could cause Kafka to replay recent messages that had not been committed. Since our analytics system paid attention only to the Kafka offset of the topic it was directly consuming, and not the input topic to any republish streams that came before it, we could still get duplicate events.

One possible fix would have been to add the same behavior of tracking the Kafka message offset — then discarding earlier messages — into the republish system as well. The problem was that (in this case at least), the republish system was stateless. It had no aggregation state at all, and we’d have had to add one for no other purpose than tracking Kafka offsets.  This could have worked, but it would have meant saving to (and possibly stressing out) a database where we did not previously do this. Stateful streams are also more memory intensive and not as high performing as stateless streams. Finally, several services downstream of this republish system were idempotent already and did not care about message replay. In these cases we’d be paying a cost to persist an aggregation state we didn’t even need. We wanted a general solution that would work for stateless streams just as well as stateful.

Fortunately, we already had a standard protobuf message that we used to wrap any messages we publish to Kafka event streams, that handled things like timestamp and a unique ID for the event. To this, we added what we termed a source topic chain, which is an ordered list of tuples of the following data for each republish system that a message passes through:

  • Topic name
  • Partition number
  • Partition offset

This collection of topic-information tuples is ordered according to the order in which messages pass through republish systems, so we can always tell the “history” of a message no matter how many times it was republished.

Using this information, we only had to change our existing logic that would track the latest Kafka offset for each aggregation state a little bit:

  • Instead of looking at the offset of the current message, look at the offset of the very first message in the republish chain, i.e. the “root cause” of this event. Track that offset in the aggregation state and discard messages of earlier offsets as before.
  • Also track topic and partition. If either of those changes, assumed we changed something about the topology of the republish chain, or possibly repartitioned a Kafka topic. In that case, start over (throw out the tracked latest offset for the different partition/topic and start with the new topic/partition/offset)

Conclusion

Using these approaches together, we were able to implement a streaming analytics system that was resilient to replayed events even though it included non-idempotent event materialization functions. To summarize, the approach was:

  • Track the path of a message through republish systems by appending the topic/partition/offset of every input message to a republish system in the output message of that republish system.
  • In every non-idempotent aggregation state of a stream, also keep track of the latest offset processed, as well as its topic name and partition number.
  • If an incoming message is of the same topic/partition and an earlier offset than the latest one tracked in the aggregated state, discard it. If the topic or partition changes, discard the old offset information and start over.
  • Only commit an offset to Kafka after persisting all the aggregation states corresponding to that offset.
  • For a stream with multiple aggregation stages, persist aggregation states in downstream-first order.
  • Use multi-rate Akka primitives such as conflate and throttle to control how often you persist aggregation states. Tune this value to achieve the proper trade off between performance and data “freshness.”
  • If a streaming stage “flat maps” its inputs, i.e. produces multiple output elements for a single input, avoid Akka primitives that expand the output stream such as mapConcat. Instead, track the output elements as a collection (i.e. the output of the flow should be a collection of things, not a single thing). Process that collection as a single element and only persist the aggregated state after processing the entire collection.
  • Create a class you can use as a streaming element that provides access to the Kafka topic/partition/offset and source topic chain information at every stage in the Akka stream.

Until next time!

The Kafka Streams Experience Pt. 1

What And Why?

Kafka Streams is a relatively new technology available as part of the standard Kafka distribution. I’ve mentioned it before in previous blog posts such as my massive wall of text on event sourcing. It’s attractive to us because it solves a lot of the difficult problems with event sourcing with Kafka, many of which I discussed in that post. We have several use cases for event sourcing, where its use makes certain features much easier to implement at scale than more traditional approaches.  One such application is discussed in this post.

We have now implemented several of our microservices using Kafka Streams, and have discovered and overcome a host of challenges along the way to becoming (relatively) fluent.  In this series of posts, I’ll discuss these challenges, how we addressed them, and what the outlook is for Kafka Streams in our products going forward.

We also make extensive use of the “interactive queries” feature of Kafka Streams, which has allowed us to address querying data that is both big and fast in ways that other storage technologies have challenges with.  This feature has its own set of specific implementation challenges, some of which I’ll discuss in this post, as well as later posts in this series.

This post assumes basic familiarity with Kafka and Kafka Streams concepts, such as the difference between a KStream and a KTable, as well as the basics of how Kafka topics and partitions work.  If you’re not there yet, take a look at this introductory post on Kafka Streams concepts.

Challenge: Topic/Partition Explosion

The Problem

Kafka Streams creates a lot of topics for you behind the scenes under a host of circumstances. For instance, you get a topic created:

  • For any stage that has state, such as an aggregate or KTable join.
  • For any interactive query store you create with logging support.
  • Any time you join two KStreams that are not co-partitioned already

For even a medium-complexity streaming application, this can quickly reach tens of topics, each with as many partitions as the input topics to the stream.  This has several issues, all of which we have experienced in one form or another:

  • More latency.  If you are publishing data back and forth to the brokers many times, you pay the round trip time at least, and possibly the commit interval time (a configuration parameter for the stream), for every stage that does this.
  • More disk usage.  Publishing stream state through many topics means that you’re using disk space for redundant or partially-redundant copies of data on your brokers.  In more than one case we ran our brokers out of disk space with disastrous results.  Details in a later post…
  • Drain on broker resources.  Brokers have to do work for each partition of each topic they host, even if those partitions are not particularly active.  For log-compacted topics (which many of the auto-created topics are), this amount of work can be large.  We observed several cases where our brokers would experience significant slow-down simply due to the load of hosting many partitions.
  • Drain on client resources.  A lot of the work of re-publishing and subscribing to all these intermediate topics is done client-side.  For large volumes of data, this load can be significant.  This manifests as additional latency, and in extreme cases, consumers falling behind.
  • Slow client startup. When clients with state stores start up, they have to replay all the partitions of all the KTables backing their state stores into their local storage.  They do this one partition at a time, and the time this takes scales roughly with the total number of partitions in all state stores in the application.

The Solution

We took several measures to control the topic/partition explosion problem.

Repartition source topics down.

In a few cases, we had source topics with large numbers of partitions, because those topics are very active.  Downstream processing might reduce or aggregate the data so that it is much less active or large, making the number of partitions required much less.  Don’t let the needs of the input data dictate the partitioning of your entire stream, especially because complicated streams may create many topics.

For example, we have an application where a stream took high-frequency robot operational data as input.  This is the biggest, fastest data in our entire application and flows through a topic with many partitions.  However the output was quite small: we simply aggregate the input data to a single value per robot, namely the frequency of the data in Hz.  To accomplish this, we first transformed the input data into something much smaller (namely a time stamp and number of data points) in a simple map step.  Then we repartitioned that down to 1/10th as many partitions, and used that as an input to the aggregation stage.  This in turn ensured that the topic created behind the scenes to capture the state of the aggregation had a small number of partitions. In pseudocode, this looks like:

stream("TopicWithLotsOfPartitions")
  .mapValues(transformToTimeStampAndPoints)
  .through("TopicWithFewPartitions")
  .groupByKey()
  .aggregate(calculateRunningFrequency, "frequencyStoreName")

Now we have a queryable store with the update frequency for each robot without having to host a topic with the same (large) number of partitions as the raw robot high-frequency data.

Reduce Number Of Stateful Streaming Stages

This seems pretty obvious in retrospect, but it’s easy to do and pays big dividends.  In short, be mindful of all the streaming stages that cause topics to be created behind the scenes, and minimize those.

As an example, we had an application where we were joining a KStream of robot events with a KTable of meta-information about the robot such as a user-assigned name.  We then aggregate the events into a notion of current state for a robot (resulting from all the events happening to it over time).  Initially, the stream looked like this:

stream("RobotEventsTopic")
  .groupByKey()
  .aggregate(calculateRobotEventState)
  .leftJoin("RobotMetadataTable", joinRobotData, "queryableStoreName")

This looks reasonable enough, but it creates two topics behind the scenes: one for the state of the aggregate stage and one for the queryable store we specify in the left-join stage.  Each will have as many partitions as “RobotEventsTopic”.  The data between these two topics is almost completely redundant. With a simple rewrite however, we were able to eliminate one of these topics with no compromise in functionality:

stream("RobotEventsTopic")
  .leftJoin("RobotMetadataTable", joinRobotData)
  .groupByKey()
  .aggregate(calculateRobotEventState, "queryableStoreName")

The trick here was that we made the aggregate stage the same stage as the queryable store, so we only need one topic that now does double duty: stores the state of the aggregation as well as serves queries.

With the above example and some others, we derived some rules of thumb that we follow that allow us to minimize the auto-created topics when defining the stream itself:

  • Prefer to join KStreams to KTables over KTables to KTables.  Stream-to-table joins do not require backing topics.
  • Minimize the number of aggregate steps (also applies to reduce, count, or transform), preferably to one per stream.  If you are aggregating many KStream events into a single state, first merge all the input KStreams into one, then do a single aggregate on the merged KStream.  We often package disparate events into a single protobuf message with a oneof field for this purpose.
  • Design your topics ahead of time to minimize the need for repartitioning.  We are still in the process of perfecting this, and it is difficult because different topics have different partitioning needs.  Our current thinking is to define “tee shirt size” levels of topic partitioning (very busy, kinda busy, not busy, etc.).  Then repartition early, only when you are crossing from one tee shirt size to another, to the lowest tee shirt size possible.

Use Change Log Topics Directly

One of the most common auto-created topics for Kafka Streams is the “change log” topic, which backs any stateful streaming stage or queryable store.  These topics always end with the string “-changelog.”  Initially, if we wanted a microservice to publish a KTable for other services to consume, we’d add a “to” step at the end of the stream, as in this example:

stream("InputStreamTopic")
  .groupByKey()
  .aggregate(someAggregationFunction)
  .to("OutputTableTopic")

This ends up creating two topics with exact copies of the same information.  “OutputTableTopic” is one; the other is created behind the scenes for the aggregate stage.  Both have the exact same keys and values (i.e. the output of the aggregation stage).  We did this because we have our own naming conventions for topics, and using our custom topic in the “to” stage allowed us to adhere to those.  However, we have decided that adhering naming conventions is not worth the cost of a redundant topic (which is disk space as well as CPU on the broker and client, plus some latency).  We now do this:

stream("InputStreamTopic")
 .groupByKey()
 .aggregate(someAggregationFunction, "queryableStoreName")

This causes the backing topic for the aggregate stage to have a deterministic name, namely “application-name-queryableStoreName-changelog” (where “application-name” is the name you give to the entire Kafka Streams application).  We then  allow other microservices to subscribe directly to this topic to use it as an input KTable.

Note that this approach is not without challenges.  For instance, a common practice in Kafka Streams is to attach a version number to the name of a streaming app, which you increment any time you change the app in a way that is not compatible with the old version.  Doing this also changes the names of any change log topics, since they include the name of the streaming app.  This would in turn break any services that consume the old topic.  We don’t have a good solution for this yet.

Use Kafka 1.0 (Only Don’t Yet)

The Kafka 1.0 client libraries include many improvements that reduce the number of backing topics created for stateful stages.  For instance, in some cases, KTable-to-KTable joins can be performed without a backing topic at all. There is one problem though: Kafka 1.0 has a critical bug precisely with this optimization, which renders it unusable for most streaming applications.  We are currently still using the 0.11.0.2 client libraries with plans to switch as soon as we’re satisfied the issue is fixed.

Coming Soon…

Topic/partition explosion is but one of the many weird and wonderful issues we’ve encountered since implementing Kafka Streams into many of our microservices.  In upcoming entries, I’ll discuss other challenges, such as:

  • Revving streams.  Specific challenges apply if you ever change the topology of a streaming application, including changing the version of the Kafka client libraries, or even certain configuration parameters. Some of these are covered by published best practices, some we have had to figure out ourselves.
  • Interactive Query Startup.  For large stores, the interactive query feature of Kafka Streams can take up to several minutes to start up, during which time queries do not work. We had to use Kubernetes readiness probes in combination with some specific calls into Kafka Streams to cause this not to manifest as downtime to the end user.
  • Interactive Query Downtime.  If a replica of a microservice goes offline temporarily, any queryable Kafka Streams stores associated with that service also go offline, possibly for several minutes. This ended up being unacceptable for us since it causes several minutes of partial downtime every time we restart a replica of a service for any reason. An open issue with Kafka covers this but has no published ETA. We have also implemented a fairly involved system to allow interactive queries to remain online during restarts or crashes of service replicas.

See you next time!