Kafka streams Java application to aggregate messages using a session window

In this post, I’ll share a Kafka streams Java app that listens on an input topic, aggregates using a session window to group by message, and output to another topic. This working example could be helpful to find the most frequent log entries over a certain time period.

I used gradle as the build tool and for dependency management. I created a new project via: gradle init.

I added the dependencies to the build file: kafka-log-aggregator/build.gradle

buildscript {
  dependencies {
    classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
  }
}

plugins {
  id 'java'
  id 'scala'
  id 'com.github.johnrengelman.shadow' version '2.0.4'
}

repositories {
  mavenLocal()
  jcenter()
  mavenCentral()
}

dependencies {
  compile group: 'org.apache.kafka', name: 'kafka-clients', version: '1.1.0'
  compile group: 'org.apache.kafka', name: 'kafka-streams', version: '1.1.0'

  compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.25'
  compile group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.25'

  compile 'com.google.code.gson:gson:2.8.0'

  testCompile 'org.scala-lang:scala-library:2.11.8'
  testCompile 'org.scalatest:scalatest_2.11:3.0.0'
  testCompile 'org.apache.kafka:kafka-streams-test-utils:1.1.0'
  testCompile group: 'junit', name: 'junit', version: '4.12'
}

I created a class to represent a log entry consisting of a code, message, and the aggegate count. It uses Gson to deserialize and serialize to json. new file: kafka-log-aggregator/src/main/java/LogEntry.java

import com.google.gson.Gson;

public class LogEntry {
  public int code;
  public String message;
  public Long count;

  public LogEntry(int code, String message) {
    this.code = code;
    this.message = message;
  }

  public static LogEntry fromJson(String jsonString, Long count) {
    LogEntry logEntry = new Gson().fromJson(jsonString, LogEntry.class);
    logEntry.count = count;
    return logEntry;
  }

  public String asJsonString() {
    return new Gson().toJson(this);
  }
}

Next I create the LogAggregator class which will be used by the Kafka streams app to contain and aggregate all the log entries, file: kafka-log-aggregator/src/main/java/LogAggregator.java

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.function.Function;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class LogAggregator {
  ArrayList<LogEntry> logs = new ArrayList<>();
  Gson gson = new Gson();

  public LogAggregator() {
  }

  public LogAggregator(LogAggregator logAgg1, LogAggregator logAgg2) {
    logs.addAll(logAgg1.logs);
    logs.addAll(logAgg2.logs);
  }

  public LogAggregator(String jsonString) {
    ArrayList<LogEntry> logEntries = gson.fromJson(jsonString, new TypeToken<List<LogEntry>>(){}.getType());
    logs.addAll(logEntries);
  }

  public LogAggregator(byte[] bytes) {
    this(new String(bytes));
  }

  public LogAggregator add(String log) {
    LogEntry logEntry = gson.fromJson(log, LogEntry.class);
    logs.add(logEntry);
    return this;
  }

  public String asJsonString() {
    return gson.toJson(logs);
  }

  public byte[] asByteArray() {
    return asJsonString().getBytes(StandardCharsets.UTF_8);
  }

  public String groupedLimitedBy(Integer limitSize) {
    Map<String, Long> counted = logs.stream()
      .map(logEntry -> logEntry.asJsonString())
      .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));

    ArrayList<LogEntry> listSubset = new ArrayList<>();

    counted.entrySet().stream()
      .sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
      .limit(limitSize)
      .forEachOrdered(e -> {
        LogEntry logEntry = LogEntry.fromJson(e.getKey(), e.getValue());
        listSubset.add(logEntry);
      });

    return gson.toJson(listSubset);
  }

}

The LogAggregator class has to implement a serializer class to convert to a byte array. file: kafka-log-aggregator/src/main/java/LogAggregatorSerializer.java

import java.util.Map;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

public class LogAggregatorSerializer implements Serializer<LogAggregator> {

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
  }

  @Override
  public void close() {
  }

  @Override
  public byte[] serialize(String topic, LogAggregator logAgg) {
    if (logAgg == null) {
      return null;
    }

    try {
      return logAgg.asByteArray();
    } catch (RuntimeException e) {
      throw new SerializationException("Error serializing value", e);
    }

  }

}

And here is the class to deserialize from the byte array. file: kafka-log-aggregator/src/main/java/LogAggregatorDeserializer.java

import java.util.Map;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

public class LogAggregatorDeserializer implements Deserializer<LogAggregator> {

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
  }

  @Override
  public void close() {
  }

  @Override
  public LogAggregator deserialize(String topic, byte[] bytes) {
    if (bytes == null) {
      return null;
    }

    try {
      return new LogAggregator(bytes);
    } catch (RuntimeException e) {
      throw new SerializationException("Error deserializing value", e);
    }

  }

}

Next I created the main class to build and run the log aggregator kafka streams app. file: kafka-log-aggregator/src/main/java/LogAggregatorApp.java

import java.util.concurrent.TimeUnit;
import java.util.Properties;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.internals.WindowedDeserializer;
import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;

public class LogAggregatorApp {
  public static final String APPLICATION_ID = "log-aggregator";
  public static final String INPUT_TOPIC = "log-input-stream";
  public static final String OUTPUT_TOPIC = "log-output-stream";

  public String bootstrapServers;
  public Topology topology;
  public KafkaStreams streams;
  public Properties streamsConfig;

  public static void main(String[] args) throws Exception {
    String bootstrapServers = "localhost:9092";
    LogAggregatorApp logAggregatorApp = new LogAggregatorApp(bootstrapServers);
    logAggregatorApp.build();
    logAggregatorApp.run();
  }

  public LogAggregatorApp(String bootstrapServers) {
    this.bootstrapServers = bootstrapServers;
  }

  protected void build() {
    streamsConfig = buildStreamsConfig(bootstrapServers);
    StreamsBuilder streamsBuilder = configureStreamsBuilder(new StreamsBuilder());

    this.topology = streamsBuilder.build();
    this.streams = new KafkaStreams(topology, streamsConfig);
  }

  protected void run() {
    streams.start();
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  }

  protected Properties buildStreamsConfig(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    return properties;
  }

  protected StreamsBuilder configureStreamsBuilder(StreamsBuilder streamsBuilder) {

    // 1 minute session, go ahead and change this
    final Long inactivityGap = TimeUnit.MINUTES.toMillis(1);

    Serializer<LogAggregator> logAggSerializer = new LogAggregatorSerializer();
    Deserializer<LogAggregator> logAggDeserializer = new LogAggregatorDeserializer();
    Serde<LogAggregator> logAggSerde = Serdes.serdeFrom(logAggSerializer, logAggDeserializer);

    StringSerializer stringSerializer = new StringSerializer();
    StringDeserializer stringDeserializer = new StringDeserializer();

    WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(stringSerializer);
    WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(stringDeserializer);
    Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer);

    KStream<String, String> inputStream = streamsBuilder.stream(INPUT_TOPIC);

    inputStream
      .groupByKey()
      .windowedBy(SessionWindows.with(inactivityGap))
      .aggregate(
        LogAggregator::new,
        (key, value, logAgg) -> logAgg.add(value),
        (key, loggAgg1, logAgg2) -> new LogAggregator(loggAgg1, logAgg2),
        Materialized.<String, LogAggregator, SessionStore<Bytes, byte[]>>
          as("log-input-stream-aggregated")
            .withKeySerde(Serdes.String())
            .withValueSerde(logAggSerde)
      )
      .mapValues(logAgg -> logAgg.groupedLimitedBy(10))
      .toStream()
      .to(windowedSerde, Serdes.String(), OUTPUT_TOPIC);

    return streamsBuilder;
  }
}

I added a scala unit test to ensure the aggregation of logs works as planned. file: kafka-log-aggregator/src/test/scala/LogAggregatorAppTest.scala

import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.streams.test.ConsumerRecordFactory
import org.apache.kafka.streams.TopologyTestDriver
import org.scalatest._
import scala.collection.JavaConversions._

class LogAggregatorAppTest extends FunSpec with Matchers with GivenWhenThen {

  def jsonLogs(): collection.mutable.ListBuffer[String] = {
    val logs = new collection.mutable.ListBuffer[String]()

    val jsonString1 = """{"code":200,"message":"OK"}"""
    val jsonString2 = """{"code":301,"message":"Moved Permanently"}"""
    val jsonString3 = """{"code":302,"message":"Found"}"""
    val jsonString4 = """{"code":304,"message":"Not Modified"}"""
    val jsonString5 = """{"code":400,"message":"Bad Request"}"""
    val jsonString6 = """{"code":401,"message":"Unauthorized"}"""
    val jsonString7 = """{"code":403,"message":"Forbidden"}"""
    val jsonString8 = """{"code":418,"message":"Im a teapot"}"""
    val jsonString9 = """{"code":422,"message":"Unprocessable Entity"}"""
    val jsonString10 = """{"code":500,"message":"Internal Server Error"}"""
    val jsonString11 = """{"code":503,"message":"Service Unavailable"}"""

    1 to 11 foreach { _ => logs += jsonString1 }
    1 to 10 foreach { _ => logs += jsonString2 }
    1 to 9 foreach { _ => logs += jsonString3 }
    1 to 8 foreach { _ => logs += jsonString4 }
    1 to 7 foreach { _ => logs += jsonString5 }
    1 to 6 foreach { _ => logs += jsonString6 }
    1 to 5 foreach { _ => logs += jsonString7 }
    1 to 4 foreach { _ => logs += jsonString8 }
    1 to 3 foreach { _ => logs += jsonString9 }
    1 to 2 foreach { _ => logs += jsonString10 }
    1 to 1 foreach { _ => logs += jsonString11 }

    logs
  }

  def lastRecordFromStream(app:LogAggregatorApp, testDriver:TopologyTestDriver, outputTopic:String): String = {
    var recordValue: String = null
    val stringDeserializer = Serdes.String().deserializer()
    var keepLooking = true
    while(keepLooking) {
      try {
        val record = testDriver.readOutput(outputTopic, stringDeserializer, stringDeserializer)
        recordValue = record.value()
      } catch {
        case e: Exception => {
          keepLooking = false
        }
      }
    }
    recordValue
  }

  describe("LogAggregatorApp") {
    it("aggregates json messages") {

      val bootstrapServers = "localhost:9092"
      val inputTopic = "log-input-stream"
      val outputTopic = "log-output-stream"

      val logAggregatorApp = new LogAggregatorApp(bootstrapServers)
      logAggregatorApp.build()

      val testDriver = new TopologyTestDriver(logAggregatorApp.topology, logAggregatorApp.streamsConfig)

      val stringSerializer = new StringSerializer
      val factory = new ConsumerRecordFactory(stringSerializer, stringSerializer)
      val key = "kafka-key"

      val logs = jsonLogs

      val records = logs.map(jsonString => factory.create(inputTopic, key, jsonString)).toList
      testDriver.pipeInput(records)

      val recordValue = lastRecordFromStream(logAggregatorApp, testDriver, outputTopic)
      testDriver.close()

      val expected =
        """[{"code":200,"message":"OK","count":11},
          |{"code":301,"message":"Moved Permanently","count":10},
          |{"code":302,"message":"Found","count":9},
          |{"code":304,"message":"Not Modified","count":8},
          |{"code":400,"message":"Bad Request","count":7},
          |{"code":401,"message":"Unauthorized","count":6},
          |{"code":403,"message":"Forbidden","count":5},
          |{"code":418,"message":"Im a teapot","count":4},
          |{"code":422,"message":"Unprocessable Entity","count":3},
          |{"code":500,"message":"Internal Server Error","count":2}]""".stripMargin.replaceAll("\n", "")

      recordValue shouldEqual expected
    }
  }
}

I create a simple Kafka producer ruby script to pipe messages onto the topic, wait a while (in this case a minute for the next session window), and pipe some more. file: kafka-log-aggregator/ruby/producer.rb

#!/usr/bin/env ruby

require 'kafka'
require 'json'

kafka = Kafka.new(['localhost:9092'], client_id: "sample-producer")

kafka_key = 'logs'
kafka_topic = 'log-input-stream'

messages = [
  { code: 200, message: 'OK' }.to_json,
  { code: 301, message: 'Moved Permanently' }.to_json,
  { code: 302, message: 'Found' }.to_json,
  { code: 304, message: 'Not Modified' }.to_json,
  { code: 400, message: 'Bad Request' }.to_json,
  { code: 401, message: 'Unauthorized' }.to_json,
  { code: 403, message: 'Forbidden' }.to_json,
  { code: 418, message: "I'm a teapot" }.to_json,
  { code: 422, message: 'Unprocessable Entity' }.to_json,
  { code: 500, message: 'Internal Server Error' }.to_json,
  { code: 503, message: 'Service Unavailable' }.to_json,
]

50.times do
  kafka.deliver_message(messages.sample, topic: kafka_topic, key: kafka_key)
end

sleep(60)

50.times do
  kafka.deliver_message(messages.sample, topic: kafka_topic, key: kafka_key)
end

At this point I was ready to start Zookeeper, Kafka, and build/run the streams app:

# start zookeeper
zookeeper-server-start $KAFKA_CONF/zookeeper.properties

# start kafka
$KAFKA_HOME/bin/kafka-server-start $KAFKA_CONF/server.properties

# create topics
$KAFKA_HOME/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic log-input-stream
$KAFKA_HOME/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic log-output-stream

# start console consumers (to watch the topics)
$KAFKA_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic log-input-stream
$KAFKA_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic log-output-stream

# build jar
./gradlew clean shadowJar

# run app
java -cp "./build/libs/*" LogAggregatorApp

# execute producer
cd ruby && ./producer.rb

# verified output topic log aggregation
[{"code":401,"message":"Unauthorized","count":8},{"code":500,"message":"Internal Server Error","count":8},{"code":301,"message":"Moved Permanently","count":6},{"code":403,"message":"Forbidden","count":6},{"code":503,"message":"Service Unavailable","count":4},{"code":400,"message":"Bad Request","count":4},{"code":200,"message":"OK","count":4},{"code":418,"message":"I\u0027m a teapot","count":3},{"code":422,"message":"Unprocessable Entity","count":3},{"code":304,"message":"Not Modified","count":2}]
[{"code":503,"message":"Service Unavailable","count":7},{"code":302,"message":"Found","count":7},{"code":301,"message":"Moved Permanently","count":6},{"code":304,"message":"Not Modified","count":5},{"code":418,"message":"I\u0027m a teapot","count":5},{"code":400,"message":"Bad Request","count":5},{"code":422,"message":"Unprocessable Entity","count":4},{"code":200,"message":"OK","count":4},{"code":403,"message":"Forbidden","count":3},{"code":401,"message":"Unauthorized","count":2}]

I develop on a Mac using Brew or Docker, here is my environment for this post:

java -version
java version "1.8.0_121"
Java(TM) SE Runtime Environment (build 1.8.0_121-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.121-b13, mixed mode)

brew list --versions | egrep -i "(scala|zookeeper|gradle|kafka)"
gradle 4.9
kafka 1.1.0
scala@2.11 2.11.12
zookeeper 3.4.12

# in ~/.profile:
export JAVA_HOME="$(/usr/libexec/java_home)"
export ZOOKEEPER_HOME="/usr/local/Cellar/zookeeper/3.4.12"
export KAFKA_HOME=/usr/local/Cellar/kafka/1.1.0
export KAFKA_CONF=/usr/local/etc/kafka
export PATH="/usr/local/opt/scala@2.11/bin:$PATH"

# zookeeper conf
cat $KAFKA_CONF/zookeeper.properties | egrep -iv "^#"
dataDir=/usr/local/var/lib/zookeeper
clientPort=2181
maxClientCnxns=0

# kafka conf
cat $KAFKA_CONF/server.properties | egrep -iv "^(#|$)"
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/var/lib/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

Source code on Github

Updated: