Streaming With Probabilistic Data Structures: Why & How

Eliav Lavi
Data
December 22, 2020

In recent years, streaming libraries seem to have evolved significantly. To name a few, we’ve seen Akka Streams, KafkaStreams, Flink, Spark Streaming and others, becoming increasingly popular. There might be numerous reasons for that. A common motivation for using stream processing in your systems is to avoid heavy computations upon raw data in read-time. Instead, we can move those computations to an earlier stage — around the time when the raw data is produced. This architectural pattern allows us to obtain better response times in time-critical transactions, and have surged in popularity in correlation to the general growth of the data organizations handle.

In this story, I will examine a rather complicated scenario that can not be easily solved by the intuitive capabilities that streamlining libraries usually offer. I will demonstrate how probabilistic data structures can help us mitigate a common anti-pattern often encountered in stream processing applications: carrying non-aggregative raw data deep down into a streaming topology for calculations, such as distinct count of elements.

Before that, I will briefly review how streaming, in general, helps in maintaining aggregations of data, and why it might be a good idea to adopt it in some use-cases.

I will use KafkaStreams for demonstrations along the way, but the concepts explored here can be applied in virtually any streaming library.

Examples are written in Scala.

Aggregating Upon A Stream

Oftentimes, we want to aggregate raw data into some meaningful representation that will serve a business need later on. The simplest example for this, perhaps, is the WordCount program, which is kind of the HelloWorld of many streaming libraries. Here is an implementation of it using KafkaStreams. Basically, what it does is:

  • consume some source Kafka topic as a stream
  • split each value into single words
  • group that stream by each word
  • count the occurrences per word
  • produce the results to another Kafka topic

The basic idea behind using aggregations in your systems is planning ahead. If you figure out what you want to know about the raw data at a later stage, you can aggregate it and shape it into a form that represents the answers to those questions — right when you first know about the raw data. That means, it happens before those questions are being asked. In fact, some might never be asked, because practically, we are preparing answers for all possible questions we might need answers for!

Stream processing aggregation in a nutshell

This approach stands in complete opposition to the more conventional one — querying a database upon request and then crunching the results in order to achieve some desired result. This might work well in small apps maintained by small or medium sized teams, but becomes less practical with big data and boundaries between domains and teams naturally emerge. In that scenario, maintaining aggregations often are the adequate solution to various business requirements.

Without stream processing, applications need to query and compute upon state in real time

The Problem At Hand: Distinct Hashtag Count

Alas, not all aggregations are achieved with the same degree of ease. It is no wonder that WordCount is so common as a beginner’s example — it is very easy to implement and understand. But let’s explore a different scenario. Take a social media ecosystem where we need to keep track of how many unique hashtags each user has mentioned in their posts.

At a first glance, the streaming solution for this request seems like a direct continuation of what we’ve seen in WordCount. We could consume posts data, group it by user, and then extract & aggregate the hashtags used, perhaps in some Set, which would allow us to easily obtain our desired metric — distinct count.

This is how our KafkaStreams topology might look like:



import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.{ByteArrayKeyValueStore, StreamsBuilder}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.scala.kstream.Materialized

object TopologyBuilder {
  type User = String
  type Post = String
  type Hashtag = String

  def build(implicit
      materialized: Materialized[User, Set[Hashtag], ByteArrayKeyValueStore]
  ): Topology = {
    val builder = new StreamsBuilder()
    builder
      .stream[User, Post]("posts")
      .groupByKey
      .aggregate(Set.empty[Hashtag])((_, post, hashtags) => hashtags ++ extractHashtags(post))
      .toStream
      .map((user, hashtags) => (user, hashtags.size))
      .to("user-hashtag-counts")

    builder.build()
  }

  private val HashtagPattern = """(#\w+)""".r
  private def extractHashtags(post: Post): Set[Hashtag] = {
    HashtagPattern.findAllIn(post).toSet
  }
}

A few things happen here. First, I’ve used type aliases in order to avoid the semantically-meaningless String flooding the code. Furthermore, I’ve extracted the logic of obtaining a Set[Hashtag] from a Post to a private function. Other than that, this is exactly what was just described.
One of the things I like about KafkaStreams is how intuitive the API is — I think it is pretty easy to grasp and understand this piece of code, even if you haven’t worked with KafkaStreams before.

One thing to remember is that this topology will continuously produce massages upon each change to the aggregation, and might be seen as a stream of updates. If you only need the latest state, you can define the output topic as log-compacted.

There’s just one problem with this implementation: we have an unbounded data-structure in our topology, which means our streaming application can become more memory heavy than we might have predicted.

Remember we said that aggregations are about transforming raw data in a way that fits our read-time needs? Well, our current implementation seems to have violated that concept. We don’t need that Set[Hashtag] really, we just want to know its size. But how can we maintain that number in a streaming application without keeping the underlying Set available? Can we do better?

Probabilistic Data Structures To The Rescue

Well, of course we can! This is where probabilistic data structures come in. If you haven’t heard of them, don’t worry, we’re going to explore an example together. We will focus on HyperLogLog (aka HLL), a probabilistic data structure that is aimed at solving the very problem we’re facing:

… the count-distinct problem… [which] is the problem of finding the number of distinct elements in a data stream with repeated elements (Wikipedia)

While the initial, Set-based solution will always be 100% accurate, HyperLogLog suggests a tradeoff: the allocated memory will be of a fixed size, but it might not be absolutely accurate at all times. By and large, the error rate is correlative to the allocated memory. Moreover, in most cases, the error will be of relatively small severity — that is, the estimated count might be off by just a bit. This is why HyperLogLog is considered a probabilistic data structure. In many use cases, this is a reasonable deal. If you’re working on a scenario in which you cannot have any error at all, then this kind of data structures are probably not suitable for your needs.

A Scala Implementation

Algebird is a neat Scala library created by the folks at Twitter, which is aimed at providing “abstractions for abstract algebra”. A significant part of that library revolves around approximate data types, and includes a HyperLogLog implementation. We’ll try to adapt our KafkaStreams app to use it, but first, let’s examine how to work with Algebird’s HyperLogLog implementation.

The HLL type is the data structure itself. It responds to the #approximateSize method, allowing us to obtain the desired number — set-size, which is also known as cardinality. Similarly to working with the naïve Set, here we will also need to add elements to our data structure. Unlike Set, though, adding elements to an HLL is slightly more complex. The thing is, elements added aren’t kept within the HLL, as they are in a conventional Set. That’s the magic of HyperLogLog! If you’re curious about how it actually works, there are tons of videos or articles about it online.

Previously, we relied on Set‘s direct API for adding entries to the set. Algebird’s support for HyperLogLog relies on a common abstraction to achieve the same goal — combining things. That abstraction is called Monoid. Generally speaking, a Monoid for some type A lets us get an empty A and combine any two A’s. And so, in order to add an element to an HLL, we need to obtain a HyperLogLogMonoid. This is achieved easily:


val hllMonoid: HyperLogLogMonoid = new HyperLogLogMonoid(bits = 8)

Note that you decide how many bits to allocate — this allows us to control the error rate.

We can then get our empty, zero-state HLL:


val init: HLL = hllMonoid.zero


And simply add elements to it:


val newElementData: Array[Byte] = "foobar".toCharArray.map(_.toByte)
val newElement: HLL = hllMonoid.create(newElementData)
val updatedHLL: HLL = init + newElement

As you can see, we can use the HyperLogLogMonoid#create method in order to create a new HLL by passing an Array[Byte] to it. After that, we can add our new HLL to the existing one and get a new one with an updated state.

With this knowledge, we can prepare an aggregation function that will replace the previous one we’ve had. We will group all this goodness together under a helper object, Aggregation:


import com.eliavlavi.probablistic.streaming.optimized.TopologyBuilder.{Hashtag, Post, User}
import com.twitter.algebird.{HLL, HyperLogLogMonoid}

object Aggregation {
  val hllMonoid = new HyperLogLogMonoid(bits = 8)

  val init: HLL = hllMonoid.zero
  val aggregate: (User, Post, HLL) => HLL = (_, post, currentHLL) =>
    currentHLL + hllMonoid.sum(
      extractHashtags(post).map(hashtag => hllMonoid.create(hashtag.toCharArray.map(_.toByte)))
    )

  private val HashtagPattern = """(#\w+)""".r
  private def extractHashtags(post: Post): Set[Hashtag] = {
    HashtagPattern.findAllIn(post).toSet
  }
}

As you can see, we are using HyperLogLogMonoid#sum here, in addition to #create. It allows us to combine several HLLs into one, which suits our needs perfectly: we’ll extract the Hashtags from each Post, then sum them into a HLL, which we will add to the existing, aggregative HLL. Exactly what we wanted to achieve!

Putting It All Together

With our aggregation function and initialization value ready, we can now go back to our KafkaStreams topology and use them there:


package com.eliavlavi.probablistic.streaming.optimized

import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.scala.kstream.Materialized
import org.apache.kafka.streams.scala.{ByteArrayKeyValueStore, StreamsBuilder}
import com.twitter.algebird.HLL

object TopologyBuilder {
  type User = String
  type Post = String
  type Hashtag = String

  def build(implicit
      materialized: Materialized[String, HLL, ByteArrayKeyValueStore]
  ): Topology = {
    val builder = new StreamsBuilder()
    builder
      .stream[User, Post]("posts")
      .groupByKey
      .aggregate(Aggregation.init)(Aggregation.aggregate)
      .toStream
      .map((user, hll) => (user, hll.approximateSize.estimate.toInt))
      .to("user-hashtag-counts")

    builder.build()
  }
}

I needed to adapt just two lines from the former implementation — the parameters passed to aggregate (line 22) and the way to obtain the (estimated) cardinality, in the map function (line 24).

There’s just one thing left — we need to find a way to obtain a Serde[HLL]. If you are unfamiliar with KafkaStreams, this is Serde‘s definition according to the official documentation:

Every Kafka Streams application must provide SerDes (Serializer/Deserializer) for the data types of record keys and record values (e.g. java.lang.String) to materialize the data when necessary.

Essentially, KafkaStreams might need a certain Serde for various operations. Our code would not compile without it.

Since aggregation by nature is a stateful operation (simply because we operate on information which is not bounded at the current message being processed), KafkaStreams needs to know how the information can be serialized and deserialized. Luckily, it is pretty easy to get a Serde[HLL], like this:


package com.eliavlavi.probablistic.streaming.optimized

import com.twitter.algebird.{HLL, HyperLogLog}
import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer}

object HLLSerde {
  implicit val hllSerde: Serde[HLL] = new Serde[HLL] {
    override def serializer(): Serializer[HLL] = (_: String, data: HLL) => HyperLogLog.toBytes(data)
    override def deserializer(): Deserializer[HLL] =
      (topic: String, data: Array[Byte]) => HyperLogLog.fromBytes(data)
  }
}

And with that we’re pretty much done! We’ve managed to incorporate HyperLogLog into our KafkaStreams topology, and honestly, we could have done that in any other Scala streaming library with the same effort, roughly. The main takeaway is how easy this change was and how elegant and concise the end result is.

The full code, which includes tests and a runnable apps, is available here.

Beyond HyperLogLog

Perhaps by now you’re convinced that probabilistic data structures are really fascinating — and there is more than HyperLogLog! If you’re interested, don’t hesitate checking out other data structures implemented in Algebird:

  • Bloom Filters can help us know whether an element exists in a set without keeping the full set.
  • Count Min Sketch can help us get the frequency of an element in a collection without maintaining that collection.
  • QTree can help us keep track of how data is distributed within a collection by querying specific quantiles. For instance, this can help us calculate the median value in a stream.

I hope you found the concepts we’ve explored relevant and useful — and perhaps even consider them for some of your upcoming projects! Feel free to hit me up with any question in the comments below or on twitter.

If you found this story interesting or helpful, please support it by clapping it👏 . Thank you for reading!

Thanks to Raviv Gurtensten and Nadav Wiener.

This article was originally posted on Medium

Eliav Lavi

Tech Lead @ Riskified. Appreciates elegant solutions to real problems.

Using Scala, Akka, Kafka & Kafka Streams, Ruby, K8S, and a lot more to achieve that.

Architecture enthusiast, refactoring addict, clean-code junkie.

Keep Reading

Newsletter EuropeClouds.com

Thank you! Your submission has been received!

Oops! Something went wrong while submitting the form