Kafka streams Java application to aggregate messages using a session window
In this post, I’ll share a Kafka streams Java app that listens on an input topic, aggregates using a session window to group by message, and output to another topic. This working example could be helpful to find the most frequent log entries over a certain time period.
I used gradle as the build tool and for dependency management. I created a new project via: gradle init
.
I added the dependencies to the build file: kafka-log-aggregator/build.gradle
buildscript {
dependencies {
classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
}
}
plugins {
id 'java'
id 'scala'
id 'com.github.johnrengelman.shadow' version '2.0.4'
}
repositories {
mavenLocal()
jcenter()
mavenCentral()
}
dependencies {
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '1.1.0'
compile group: 'org.apache.kafka', name: 'kafka-streams', version: '1.1.0'
compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.25'
compile group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.25'
compile 'com.google.code.gson:gson:2.8.0'
testCompile 'org.scala-lang:scala-library:2.11.8'
testCompile 'org.scalatest:scalatest_2.11:3.0.0'
testCompile 'org.apache.kafka:kafka-streams-test-utils:1.1.0'
testCompile group: 'junit', name: 'junit', version: '4.12'
}
I created a class to represent a log entry consisting of a code, message, and the aggegate count. It uses Gson to deserialize and serialize to json. new file: kafka-log-aggregator/src/main/java/LogEntry.java
import com.google.gson.Gson;
public class LogEntry {
public int code;
public String message;
public Long count;
public LogEntry(int code, String message) {
this.code = code;
this.message = message;
}
public static LogEntry fromJson(String jsonString, Long count) {
LogEntry logEntry = new Gson().fromJson(jsonString, LogEntry.class);
logEntry.count = count;
return logEntry;
}
public String asJsonString() {
return new Gson().toJson(this);
}
}
Next I create the LogAggregator class which will be used by the Kafka streams app to contain and aggregate all the log entries, file: kafka-log-aggregator/src/main/java/LogAggregator.java
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.function.Function;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class LogAggregator {
ArrayList<LogEntry> logs = new ArrayList<>();
Gson gson = new Gson();
public LogAggregator() {
}
public LogAggregator(LogAggregator logAgg1, LogAggregator logAgg2) {
logs.addAll(logAgg1.logs);
logs.addAll(logAgg2.logs);
}
public LogAggregator(String jsonString) {
ArrayList<LogEntry> logEntries = gson.fromJson(jsonString, new TypeToken<List<LogEntry>>(){}.getType());
logs.addAll(logEntries);
}
public LogAggregator(byte[] bytes) {
this(new String(bytes));
}
public LogAggregator add(String log) {
LogEntry logEntry = gson.fromJson(log, LogEntry.class);
logs.add(logEntry);
return this;
}
public String asJsonString() {
return gson.toJson(logs);
}
public byte[] asByteArray() {
return asJsonString().getBytes(StandardCharsets.UTF_8);
}
public String groupedLimitedBy(Integer limitSize) {
Map<String, Long> counted = logs.stream()
.map(logEntry -> logEntry.asJsonString())
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
ArrayList<LogEntry> listSubset = new ArrayList<>();
counted.entrySet().stream()
.sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
.limit(limitSize)
.forEachOrdered(e -> {
LogEntry logEntry = LogEntry.fromJson(e.getKey(), e.getValue());
listSubset.add(logEntry);
});
return gson.toJson(listSubset);
}
}
The LogAggregator class has to implement a serializer class to convert to a byte array. file: kafka-log-aggregator/src/main/java/LogAggregatorSerializer.java
import java.util.Map;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
public class LogAggregatorSerializer implements Serializer<LogAggregator> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public void close() {
}
@Override
public byte[] serialize(String topic, LogAggregator logAgg) {
if (logAgg == null) {
return null;
}
try {
return logAgg.asByteArray();
} catch (RuntimeException e) {
throw new SerializationException("Error serializing value", e);
}
}
}
And here is the class to deserialize from the byte array. file: kafka-log-aggregator/src/main/java/LogAggregatorDeserializer.java
import java.util.Map;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
public class LogAggregatorDeserializer implements Deserializer<LogAggregator> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public void close() {
}
@Override
public LogAggregator deserialize(String topic, byte[] bytes) {
if (bytes == null) {
return null;
}
try {
return new LogAggregator(bytes);
} catch (RuntimeException e) {
throw new SerializationException("Error deserializing value", e);
}
}
}
Next I created the main class to build and run the log aggregator kafka streams app. file: kafka-log-aggregator/src/main/java/LogAggregatorApp.java
import java.util.concurrent.TimeUnit;
import java.util.Properties;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.internals.WindowedDeserializer;
import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
public class LogAggregatorApp {
public static final String APPLICATION_ID = "log-aggregator";
public static final String INPUT_TOPIC = "log-input-stream";
public static final String OUTPUT_TOPIC = "log-output-stream";
public String bootstrapServers;
public Topology topology;
public KafkaStreams streams;
public Properties streamsConfig;
public static void main(String[] args) throws Exception {
String bootstrapServers = "localhost:9092";
LogAggregatorApp logAggregatorApp = new LogAggregatorApp(bootstrapServers);
logAggregatorApp.build();
logAggregatorApp.run();
}
public LogAggregatorApp(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
protected void build() {
streamsConfig = buildStreamsConfig(bootstrapServers);
StreamsBuilder streamsBuilder = configureStreamsBuilder(new StreamsBuilder());
this.topology = streamsBuilder.build();
this.streams = new KafkaStreams(topology, streamsConfig);
}
protected void run() {
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
protected Properties buildStreamsConfig(String bootstrapServers) {
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return properties;
}
protected StreamsBuilder configureStreamsBuilder(StreamsBuilder streamsBuilder) {
// 1 minute session, go ahead and change this
final Long inactivityGap = TimeUnit.MINUTES.toMillis(1);
Serializer<LogAggregator> logAggSerializer = new LogAggregatorSerializer();
Deserializer<LogAggregator> logAggDeserializer = new LogAggregatorDeserializer();
Serde<LogAggregator> logAggSerde = Serdes.serdeFrom(logAggSerializer, logAggDeserializer);
StringSerializer stringSerializer = new StringSerializer();
StringDeserializer stringDeserializer = new StringDeserializer();
WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(stringSerializer);
WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(stringDeserializer);
Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer);
KStream<String, String> inputStream = streamsBuilder.stream(INPUT_TOPIC);
inputStream
.groupByKey()
.windowedBy(SessionWindows.with(inactivityGap))
.aggregate(
LogAggregator::new,
(key, value, logAgg) -> logAgg.add(value),
(key, loggAgg1, logAgg2) -> new LogAggregator(loggAgg1, logAgg2),
Materialized.<String, LogAggregator, SessionStore<Bytes, byte[]>>
as("log-input-stream-aggregated")
.withKeySerde(Serdes.String())
.withValueSerde(logAggSerde)
)
.mapValues(logAgg -> logAgg.groupedLimitedBy(10))
.toStream()
.to(windowedSerde, Serdes.String(), OUTPUT_TOPIC);
return streamsBuilder;
}
}
I added a scala unit test to ensure the aggregation of logs works as planned. file: kafka-log-aggregator/src/test/scala/LogAggregatorAppTest.scala
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.streams.test.ConsumerRecordFactory
import org.apache.kafka.streams.TopologyTestDriver
import org.scalatest._
import scala.collection.JavaConversions._
class LogAggregatorAppTest extends FunSpec with Matchers with GivenWhenThen {
def jsonLogs(): collection.mutable.ListBuffer[String] = {
val logs = new collection.mutable.ListBuffer[String]()
val jsonString1 = """{"code":200,"message":"OK"}"""
val jsonString2 = """{"code":301,"message":"Moved Permanently"}"""
val jsonString3 = """{"code":302,"message":"Found"}"""
val jsonString4 = """{"code":304,"message":"Not Modified"}"""
val jsonString5 = """{"code":400,"message":"Bad Request"}"""
val jsonString6 = """{"code":401,"message":"Unauthorized"}"""
val jsonString7 = """{"code":403,"message":"Forbidden"}"""
val jsonString8 = """{"code":418,"message":"Im a teapot"}"""
val jsonString9 = """{"code":422,"message":"Unprocessable Entity"}"""
val jsonString10 = """{"code":500,"message":"Internal Server Error"}"""
val jsonString11 = """{"code":503,"message":"Service Unavailable"}"""
1 to 11 foreach { _ => logs += jsonString1 }
1 to 10 foreach { _ => logs += jsonString2 }
1 to 9 foreach { _ => logs += jsonString3 }
1 to 8 foreach { _ => logs += jsonString4 }
1 to 7 foreach { _ => logs += jsonString5 }
1 to 6 foreach { _ => logs += jsonString6 }
1 to 5 foreach { _ => logs += jsonString7 }
1 to 4 foreach { _ => logs += jsonString8 }
1 to 3 foreach { _ => logs += jsonString9 }
1 to 2 foreach { _ => logs += jsonString10 }
1 to 1 foreach { _ => logs += jsonString11 }
logs
}
def lastRecordFromStream(app:LogAggregatorApp, testDriver:TopologyTestDriver, outputTopic:String): String = {
var recordValue: String = null
val stringDeserializer = Serdes.String().deserializer()
var keepLooking = true
while(keepLooking) {
try {
val record = testDriver.readOutput(outputTopic, stringDeserializer, stringDeserializer)
recordValue = record.value()
} catch {
case e: Exception => {
keepLooking = false
}
}
}
recordValue
}
describe("LogAggregatorApp") {
it("aggregates json messages") {
val bootstrapServers = "localhost:9092"
val inputTopic = "log-input-stream"
val outputTopic = "log-output-stream"
val logAggregatorApp = new LogAggregatorApp(bootstrapServers)
logAggregatorApp.build()
val testDriver = new TopologyTestDriver(logAggregatorApp.topology, logAggregatorApp.streamsConfig)
val stringSerializer = new StringSerializer
val factory = new ConsumerRecordFactory(stringSerializer, stringSerializer)
val key = "kafka-key"
val logs = jsonLogs
val records = logs.map(jsonString => factory.create(inputTopic, key, jsonString)).toList
testDriver.pipeInput(records)
val recordValue = lastRecordFromStream(logAggregatorApp, testDriver, outputTopic)
testDriver.close()
val expected =
"""[{"code":200,"message":"OK","count":11},
|{"code":301,"message":"Moved Permanently","count":10},
|{"code":302,"message":"Found","count":9},
|{"code":304,"message":"Not Modified","count":8},
|{"code":400,"message":"Bad Request","count":7},
|{"code":401,"message":"Unauthorized","count":6},
|{"code":403,"message":"Forbidden","count":5},
|{"code":418,"message":"Im a teapot","count":4},
|{"code":422,"message":"Unprocessable Entity","count":3},
|{"code":500,"message":"Internal Server Error","count":2}]""".stripMargin.replaceAll("\n", "")
recordValue shouldEqual expected
}
}
}
I create a simple Kafka producer ruby script to pipe messages onto the topic, wait a while (in this case a minute for the next session window), and pipe some more. file: kafka-log-aggregator/ruby/producer.rb
#!/usr/bin/env ruby
require 'kafka'
require 'json'
kafka = Kafka.new(['localhost:9092'], client_id: "sample-producer")
kafka_key = 'logs'
kafka_topic = 'log-input-stream'
messages = [
{ code: 200, message: 'OK' }.to_json,
{ code: 301, message: 'Moved Permanently' }.to_json,
{ code: 302, message: 'Found' }.to_json,
{ code: 304, message: 'Not Modified' }.to_json,
{ code: 400, message: 'Bad Request' }.to_json,
{ code: 401, message: 'Unauthorized' }.to_json,
{ code: 403, message: 'Forbidden' }.to_json,
{ code: 418, message: "I'm a teapot" }.to_json,
{ code: 422, message: 'Unprocessable Entity' }.to_json,
{ code: 500, message: 'Internal Server Error' }.to_json,
{ code: 503, message: 'Service Unavailable' }.to_json,
]
50.times do
kafka.deliver_message(messages.sample, topic: kafka_topic, key: kafka_key)
end
sleep(60)
50.times do
kafka.deliver_message(messages.sample, topic: kafka_topic, key: kafka_key)
end
At this point I was ready to start Zookeeper, Kafka, and build/run the streams app:
# start zookeeper
zookeeper-server-start $KAFKA_CONF/zookeeper.properties
# start kafka
$KAFKA_HOME/bin/kafka-server-start $KAFKA_CONF/server.properties
# create topics
$KAFKA_HOME/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic log-input-stream
$KAFKA_HOME/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic log-output-stream
# start console consumers (to watch the topics)
$KAFKA_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic log-input-stream
$KAFKA_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic log-output-stream
# build jar
./gradlew clean shadowJar
# run app
java -cp "./build/libs/*" LogAggregatorApp
# execute producer
cd ruby && ./producer.rb
# verified output topic log aggregation
[{"code":401,"message":"Unauthorized","count":8},{"code":500,"message":"Internal Server Error","count":8},{"code":301,"message":"Moved Permanently","count":6},{"code":403,"message":"Forbidden","count":6},{"code":503,"message":"Service Unavailable","count":4},{"code":400,"message":"Bad Request","count":4},{"code":200,"message":"OK","count":4},{"code":418,"message":"I\u0027m a teapot","count":3},{"code":422,"message":"Unprocessable Entity","count":3},{"code":304,"message":"Not Modified","count":2}]
[{"code":503,"message":"Service Unavailable","count":7},{"code":302,"message":"Found","count":7},{"code":301,"message":"Moved Permanently","count":6},{"code":304,"message":"Not Modified","count":5},{"code":418,"message":"I\u0027m a teapot","count":5},{"code":400,"message":"Bad Request","count":5},{"code":422,"message":"Unprocessable Entity","count":4},{"code":200,"message":"OK","count":4},{"code":403,"message":"Forbidden","count":3},{"code":401,"message":"Unauthorized","count":2}]
I develop on a Mac using Brew or Docker, here is my environment for this post:
java -version
java version "1.8.0_121"
Java(TM) SE Runtime Environment (build 1.8.0_121-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.121-b13, mixed mode)
brew list --versions | egrep -i "(scala|zookeeper|gradle|kafka)"
gradle 4.9
kafka 1.1.0
scala@2.11 2.11.12
zookeeper 3.4.12
# in ~/.profile:
export JAVA_HOME="$(/usr/libexec/java_home)"
export ZOOKEEPER_HOME="/usr/local/Cellar/zookeeper/3.4.12"
export KAFKA_HOME=/usr/local/Cellar/kafka/1.1.0
export KAFKA_CONF=/usr/local/etc/kafka
export PATH="/usr/local/opt/scala@2.11/bin:$PATH"
# zookeeper conf
cat $KAFKA_CONF/zookeeper.properties | egrep -iv "^#"
dataDir=/usr/local/var/lib/zookeeper
clientPort=2181
maxClientCnxns=0
# kafka conf
cat $KAFKA_CONF/server.properties | egrep -iv "^(#|$)"
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/var/lib/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0