14 Feb 2017 · Software Engineering

    Testing Topologies in Kafka Streams

    4 min read

    Post originally published on https://www.madewithtea.com. Republished with author’s permission.

    TLDR: Kafka Streams is a deployment-agnostic stream processing library written in Java. Even though Kafka has a great test coverage, there is no helper code for writing unit-tests for your own Kafka Streams topologies. I wrote a little helper library Mocked Streams in Scala, which allows you to create lightweight parallelizable unit-tests for your topologies without running a full Kafka cluster neither an embedded one. See this code example.

    Kafka Streams

    The latest version 0.10 of Kafka introduces Kafka Streams. It is a deployment-agnostic stream processing library with event-at-a-time (not micro-batch) semantics written in Java. It scales via partitioning and tasks, is fault-tolerant and has an at-least-once guarantee when it comes to processing records. If you have not already read it, I highly recommend reading the introduction article to Kafka Streams.

    Unit-Testing Topologies

    If you had your first hands-on Kafka Streams already, you might have noticed that there is no easy way to unit-test your topologies. Even though Kafka is documented quite well, it lacks, for now, of good documentation of how to test. In general, you can choose between running a full Kafka cluster including Zookeeper in your environment, running an embedded Kafka broker and Zookeeper, like in this Kafka Streams integration test, or you use mock producers and consumers for lightweight unit-tests of your topologies. While the first two approaches are the way to go for integration tests, we should also be able to unit-test our topologies in a simple way:

    val input = Seq(("x", "v1"), ("y", "v2"))
    val exp = Seq(("x", "V1"), ("y", "V2"))
    val strings = Serdes.String()
      .topology { builder => builder.stream(...) [...] }
      .input("topic-in", strings, strings, input)
      .output("topic-out", strings, strings, exp.size) shouldEqual exp

    Mocked Streams

    Mocked Streams is a library which allows you to do the latter without much boilerplate code and in your favourite Scala testing framework e.g. ScalaTest and Specs2. It wraps the org.apache.kafka.test.ProcessorTopologyTestDriver class, but adds more syntactic sugar to keep your test code simple. The example above is testing the following topology:

    builder.stream(strings, strings, "topic-in")
      .map((k, v) => new KeyValue(k, v.toUpperCase))
      .to(strings, strings, "topic-out")

    In the example, we have only one input and one output topic. However, we are able to define multiple inputs and multiple outputs. Assuming we have a topology MultiInputOutputTopology which consumes two input streams, does an aggregation with a local state, and sends records to two output topics, we can test it like this:

    val mstreams = MockedStreams()
      .topology { builder => builder.stream(...) [...] }
      .input("in-a", strings, ints, inputA)
      .input("in-b", strings, ints, inputB)
    mstreams.output("out-a", strings, ints, expA.size) shouldEqual(expectedA)
    mstreams.output("out-b", strings, ints, expB.size) shouldEqual(expectedB)

    Note, that the order .input(…) calls is very important: When calling .output(…) Mocked Streams produces to the Kafka input topics in the same order as specified. In the example, it would produce all messages to topic “out-a” first, then to “out-b”. Each output call will start an isolated run, fetching from the specified output topic. For a better understanding I like to refer to the tests of Mocked Streams itself.

    Usage in the Next Release of Kafka

    Personally, I work on next still unstable version 0.10.1 of Kafka. I was experiencing some issues back-porting Mocked Streams to the stable release. Therefore, I decided to only support and distribute JARs for the next stable release. However, if you are interested in scratching your own itch, contribution & collaboration would be great! Unfortunately, for now, if you want to use it, you need to add Mocked Streams manually. But I will add it to the Maven Repositories when the next Kafka version is released!

    UPDATE: I released the first version of Mocked Streams. See Released Mocked Streams for Apache Kafka.

    Leave a Reply

    Your email address will not be published. Required fields are marked *

    Writen by:
    Jendrik is a Backend and Data Engineer specialized in Scala with a strong focus on data processing. He occasionally blogs about his experiences. Follow him on @madewithtea.