In this post, I’ll share a Kafka streams Java app that listens on an input topic, aggregates using a session window to group by message, and output to another topic. This working example could be helpful to find the most frequent log entries over a certain time period.
I used gradle as the build tool and for dependency management. I created a new project via: gradle init
.
I added the dependencies to the build file: kafka-log-aggregator/build.gradle
buildscript {
dependencies {
classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
}
}
plugins {
id 'java'
id 'scala'
id 'com.github.johnrengelman.shadow' version '2.0.4'
}
repositories {
mavenLocal ()
jcenter ()
mavenCentral ()
}
dependencies {
compile group: 'org.apache.kafka' , name: 'kafka-clients' , version: '1.1.0'
compile group: 'org.apache.kafka' , name: 'kafka-streams' , version: '1.1.0'
compile group: 'org.slf4j' , name: 'slf4j-api' , version: '1.7.25'
compile group: 'org.slf4j' , name: 'slf4j-log4j12' , version: '1.7.25'
compile 'com.google.code.gson:gson:2.8.0'
testCompile 'org.scala-lang:scala-library:2.11.8'
testCompile 'org.scalatest:scalatest_2.11:3.0.0'
testCompile 'org.apache.kafka:kafka-streams-test-utils:1.1.0'
testCompile group: 'junit' , name: 'junit' , version: '4.12'
}
I created a class to represent a log entry consisting of a code, message, and the aggegate count. It uses Gson to deserialize and serialize to json. new file: kafka-log-aggregator/src/main/java/LogEntry.java
import com.google.gson.Gson ;
public class LogEntry {
public int code ;
public String message ;
public Long count ;
public LogEntry ( int code , String message ) {
this . code = code ;
this . message = message ;
}
public static LogEntry fromJson ( String jsonString , Long count ) {
LogEntry logEntry = new Gson (). fromJson ( jsonString , LogEntry . class );
logEntry . count = count ;
return logEntry ;
}
public String asJsonString () {
return new Gson (). toJson ( this );
}
}
Next I create the LogAggregator class which will be used by the Kafka streams app to contain and aggregate all the log entries, file: kafka-log-aggregator/src/main/java/LogAggregator.java
import com.google.gson.Gson ;
import com.google.gson.reflect.TypeToken ;
import java.nio.charset.StandardCharsets ;
import java.util.ArrayList ;
import java.util.Collections ;
import java.util.function.Function ;
import java.util.List ;
import java.util.Map ;
import java.util.stream.Collectors ;
public class LogAggregator {
ArrayList < LogEntry > logs = new ArrayList <>();
Gson gson = new Gson ();
public LogAggregator () {
}
public LogAggregator ( LogAggregator logAgg1 , LogAggregator logAgg2 ) {
logs . addAll ( logAgg1 . logs );
logs . addAll ( logAgg2 . logs );
}
public LogAggregator ( String jsonString ) {
ArrayList < LogEntry > logEntries = gson . fromJson ( jsonString , new TypeToken < List < LogEntry >>(){}. getType ());
logs . addAll ( logEntries );
}
public LogAggregator ( byte [] bytes ) {
this ( new String ( bytes ));
}
public LogAggregator add ( String log ) {
LogEntry logEntry = gson . fromJson ( log , LogEntry . class );
logs . add ( logEntry );
return this ;
}
public String asJsonString () {
return gson . toJson ( logs );
}
public byte [] asByteArray () {
return asJsonString (). getBytes ( StandardCharsets . UTF_8 );
}
public String groupedLimitedBy ( Integer limitSize ) {
Map < String , Long > counted = logs . stream ()
. map ( logEntry -> logEntry . asJsonString ())
. collect ( Collectors . groupingBy ( Function . identity (), Collectors . counting ()));
ArrayList < LogEntry > listSubset = new ArrayList <>();
counted . entrySet (). stream ()
. sorted ( Collections . reverseOrder ( Map . Entry . comparingByValue ()))
. limit ( limitSize )
. forEachOrdered ( e -> {
LogEntry logEntry = LogEntry . fromJson ( e . getKey (), e . getValue ());
listSubset . add ( logEntry );
});
return gson . toJson ( listSubset );
}
}
The LogAggregator class has to implement a serializer class to convert to a byte array. file: kafka-log-aggregator/src/main/java/LogAggregatorSerializer.java
import java.util.Map ;
import org.apache.kafka.common.errors.SerializationException ;
import org.apache.kafka.common.serialization.Serializer ;
public class LogAggregatorSerializer implements Serializer < LogAggregator > {
@Override
public void configure ( Map < String , ?> configs , boolean isKey ) {
}
@Override
public void close () {
}
@Override
public byte [] serialize ( String topic , LogAggregator logAgg ) {
if ( logAgg == null ) {
return null ;
}
try {
return logAgg . asByteArray ();
} catch ( RuntimeException e ) {
throw new SerializationException ( "Error serializing value" , e );
}
}
}
And here is the class to deserialize from the byte array. file: kafka-log-aggregator/src/main/java/LogAggregatorDeserializer.java
import java.util.Map ;
import org.apache.kafka.common.errors.SerializationException ;
import org.apache.kafka.common.serialization.Deserializer ;
public class LogAggregatorDeserializer implements Deserializer < LogAggregator > {
@Override
public void configure ( Map < String , ?> configs , boolean isKey ) {
}
@Override
public void close () {
}
@Override
public LogAggregator deserialize ( String topic , byte [] bytes ) {
if ( bytes == null ) {
return null ;
}
try {
return new LogAggregator ( bytes );
} catch ( RuntimeException e ) {
throw new SerializationException ( "Error deserializing value" , e );
}
}
}
Next I created the main class to build and run the log aggregator kafka streams app. file: kafka-log-aggregator/src/main/java/LogAggregatorApp.java
import java.util.concurrent.TimeUnit ;
import java.util.Properties ;
import org.apache.kafka.common.serialization.Deserializer ;
import org.apache.kafka.common.serialization.Serde ;
import org.apache.kafka.common.serialization.Serdes ;
import org.apache.kafka.common.serialization.Serializer ;
import org.apache.kafka.common.serialization.StringDeserializer ;
import org.apache.kafka.common.serialization.StringSerializer ;
import org.apache.kafka.common.utils.Bytes ;
import org.apache.kafka.streams.KafkaStreams ;
import org.apache.kafka.streams.kstream.internals.WindowedDeserializer ;
import org.apache.kafka.streams.kstream.internals.WindowedSerializer ;
import org.apache.kafka.streams.kstream.KStream ;
import org.apache.kafka.streams.kstream.Materialized ;
import org.apache.kafka.streams.kstream.SessionWindows ;
import org.apache.kafka.streams.kstream.Windowed ;
import org.apache.kafka.streams.state.SessionStore ;
import org.apache.kafka.streams.StreamsBuilder ;
import org.apache.kafka.streams.StreamsConfig ;
import org.apache.kafka.streams.Topology ;
public class LogAggregatorApp {
public static final String APPLICATION_ID = "log-aggregator" ;
public static final String INPUT_TOPIC = "log-input-stream" ;
public static final String OUTPUT_TOPIC = "log-output-stream" ;
public String bootstrapServers ;
public Topology topology ;
public KafkaStreams streams ;
public Properties streamsConfig ;
public static void main ( String [] args ) throws Exception {
String bootstrapServers = "localhost:9092" ;
LogAggregatorApp logAggregatorApp = new LogAggregatorApp ( bootstrapServers );
logAggregatorApp . build ();
logAggregatorApp . run ();
}
public LogAggregatorApp ( String bootstrapServers ) {
this . bootstrapServers = bootstrapServers ;
}
protected void build () {
streamsConfig = buildStreamsConfig ( bootstrapServers );
StreamsBuilder streamsBuilder = configureStreamsBuilder ( new StreamsBuilder ());
this . topology = streamsBuilder . build ();
this . streams = new KafkaStreams ( topology , streamsConfig );
}
protected void run () {
streams . start ();
Runtime . getRuntime (). addShutdownHook ( new Thread ( streams: : close ));
}
protected Properties buildStreamsConfig ( String bootstrapServers ) {
Properties properties = new Properties ();
properties . put ( StreamsConfig . APPLICATION_ID_CONFIG , APPLICATION_ID );
properties . put ( StreamsConfig . BOOTSTRAP_SERVERS_CONFIG , bootstrapServers );
properties . put ( StreamsConfig . DEFAULT_KEY_SERDE_CLASS_CONFIG , Serdes . String (). getClass ());
properties . put ( StreamsConfig . DEFAULT_VALUE_SERDE_CLASS_CONFIG , Serdes . String (). getClass ());
return properties ;
}
protected StreamsBuilder configureStreamsBuilder ( StreamsBuilder streamsBuilder ) {
// 1 minute session, go ahead and change this
final Long inactivityGap = TimeUnit . MINUTES . toMillis ( 1 );
Serializer < LogAggregator > logAggSerializer = new LogAggregatorSerializer ();
Deserializer < LogAggregator > logAggDeserializer = new LogAggregatorDeserializer ();
Serde < LogAggregator > logAggSerde = Serdes . serdeFrom ( logAggSerializer , logAggDeserializer );
StringSerializer stringSerializer = new StringSerializer ();
StringDeserializer stringDeserializer = new StringDeserializer ();
WindowedSerializer < String > windowedSerializer = new WindowedSerializer <>( stringSerializer );
WindowedDeserializer < String > windowedDeserializer = new WindowedDeserializer <>( stringDeserializer );
Serde < Windowed < String >> windowedSerde = Serdes . serdeFrom ( windowedSerializer , windowedDeserializer );
KStream < String , String > inputStream = streamsBuilder . stream ( INPUT_TOPIC );
inputStream
. groupByKey ()
. windowedBy ( SessionWindows . with ( inactivityGap ))
. aggregate (
LogAggregator: : new ,
( key , value , logAgg ) -> logAgg . add ( value ),
( key , loggAgg1 , logAgg2 ) -> new LogAggregator ( loggAgg1 , logAgg2 ),
Materialized .< String , LogAggregator , SessionStore < Bytes , byte []>>
as ( "log-input-stream-aggregated" )
. withKeySerde ( Serdes . String ())
. withValueSerde ( logAggSerde )
)
. mapValues ( logAgg -> logAgg . groupedLimitedBy ( 10 ))
. toStream ()
. to ( windowedSerde , Serdes . String (), OUTPUT_TOPIC );
return streamsBuilder ;
}
}
I added a scala unit test to ensure the aggregation of logs works as planned. file: kafka-log-aggregator/src/test/scala/LogAggregatorAppTest.scala
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.streams.test.ConsumerRecordFactory
import org.apache.kafka.streams.TopologyTestDriver
import org.scalatest._
import scala.collection.JavaConversions._
class LogAggregatorAppTest extends FunSpec with Matchers with GivenWhenThen {
def jsonLogs () : collection.mutable.ListBuffer [ String ] = {
val logs = new collection . mutable . ListBuffer [ String ]()
val jsonString1 = """{"code":200,"message":"OK"}"""
val jsonString2 = """{"code":301,"message":"Moved Permanently"}"""
val jsonString3 = """{"code":302,"message":"Found"}"""
val jsonString4 = """{"code":304,"message":"Not Modified"}"""
val jsonString5 = """{"code":400,"message":"Bad Request"}"""
val jsonString6 = """{"code":401,"message":"Unauthorized"}"""
val jsonString7 = """{"code":403,"message":"Forbidden"}"""
val jsonString8 = """{"code":418,"message":"Im a teapot"}"""
val jsonString9 = """{"code":422,"message":"Unprocessable Entity"}"""
val jsonString10 = """{"code":500,"message":"Internal Server Error"}"""
val jsonString11 = """{"code":503,"message":"Service Unavailable"}"""
1 to 11 foreach { _ => logs += jsonString1 }
1 to 10 foreach { _ => logs += jsonString2 }
1 to 9 foreach { _ => logs += jsonString3 }
1 to 8 foreach { _ => logs += jsonString4 }
1 to 7 foreach { _ => logs += jsonString5 }
1 to 6 foreach { _ => logs += jsonString6 }
1 to 5 foreach { _ => logs += jsonString7 }
1 to 4 foreach { _ => logs += jsonString8 }
1 to 3 foreach { _ => logs += jsonString9 }
1 to 2 foreach { _ => logs += jsonString10 }
1 to 1 foreach { _ => logs += jsonString11 }
logs
}
def lastRecordFromStream ( app : LogAggregatorApp , testDriver : TopologyTestDriver , outputTopic : String ) : String = {
var recordValue : String = null
val stringDeserializer = Serdes . String (). deserializer ()
var keepLooking = true
while ( keepLooking ) {
try {
val record = testDriver . readOutput ( outputTopic , stringDeserializer , stringDeserializer )
recordValue = record . value ()
} catch {
case e : Exception => {
keepLooking = false
}
}
}
recordValue
}
describe ( "LogAggregatorApp" ) {
it ( "aggregates json messages" ) {
val bootstrapServers = "localhost:9092"
val inputTopic = "log-input-stream"
val outputTopic = "log-output-stream"
val logAggregatorApp = new LogAggregatorApp ( bootstrapServers )
logAggregatorApp . build ()
val testDriver = new TopologyTestDriver ( logAggregatorApp . topology , logAggregatorApp . streamsConfig )
val stringSerializer = new StringSerializer
val factory = new ConsumerRecordFactory ( stringSerializer , stringSerializer )
val key = "kafka-key"
val logs = jsonLogs
val records = logs . map ( jsonString => factory . create ( inputTopic , key , jsonString )). toList
testDriver . pipeInput ( records )
val recordValue = lastRecordFromStream ( logAggregatorApp , testDriver , outputTopic )
testDriver . close ()
val expected =
"""[{"code":200,"message":"OK","count":11},
|{"code":301,"message":"Moved Permanently","count":10},
|{"code":302,"message":"Found","count":9},
|{"code":304,"message":"Not Modified","count":8},
|{"code":400,"message":"Bad Request","count":7},
|{"code":401,"message":"Unauthorized","count":6},
|{"code":403,"message":"Forbidden","count":5},
|{"code":418,"message":"Im a teapot","count":4},
|{"code":422,"message":"Unprocessable Entity","count":3},
|{"code":500,"message":"Internal Server Error","count":2}]""" . stripMargin . replaceAll ( "\n" , "" )
recordValue shouldEqual expected
}
}
}
I create a simple Kafka producer ruby script to pipe messages onto the topic, wait a while (in this case a minute for the next session window), and pipe some more. file: kafka-log-aggregator/ruby/producer.rb
#!/usr/bin/env ruby
require 'kafka'
require 'json'
kafka = Kafka . new ([ 'localhost:9092' ], client_id: "sample-producer" )
kafka_key = 'logs'
kafka_topic = 'log-input-stream'
messages = [
{ code: 200 , message: 'OK' }. to_json ,
{ code: 301 , message: 'Moved Permanently' }. to_json ,
{ code: 302 , message: 'Found' }. to_json ,
{ code: 304 , message: 'Not Modified' }. to_json ,
{ code: 400 , message: 'Bad Request' }. to_json ,
{ code: 401 , message: 'Unauthorized' }. to_json ,
{ code: 403 , message: 'Forbidden' }. to_json ,
{ code: 418 , message: "I'm a teapot" }. to_json ,
{ code: 422 , message: 'Unprocessable Entity' }. to_json ,
{ code: 500 , message: 'Internal Server Error' }. to_json ,
{ code: 503 , message: 'Service Unavailable' }. to_json ,
]
50 . times do
kafka . deliver_message ( messages . sample , topic: kafka_topic , key: kafka_key )
end
sleep ( 60 )
50 . times do
kafka . deliver_message ( messages . sample , topic: kafka_topic , key: kafka_key )
end
At this point I was ready to start Zookeeper, Kafka, and build/run the streams app:
# start zookeeper
zookeeper-server-start $KAFKA_CONF /zookeeper.properties
# start kafka
$KAFKA_HOME /bin/kafka-server-start $KAFKA_CONF /server.properties
# create topics
$KAFKA_HOME /bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic log-input-stream
$KAFKA_HOME /bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic log-output-stream
# start console consumers (to watch the topics)
$KAFKA_HOME /bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic log-input-stream
$KAFKA_HOME /bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic log-output-stream
# build jar
./gradlew clean shadowJar
# run app
java -cp "./build/libs/*" LogAggregatorApp
# execute producer
cd ruby && ./producer.rb
# verified output topic log aggregation
[{ "code" :401,"message" :"Unauthorized" ,"count" :8} ,{ "code" :500,"message" :"Internal Server Error" ,"count" :8} ,{ "code" :301,"message" :"Moved Permanently" ,"count" :6} ,{ "code" :403,"message" :"Forbidden" ,"count" :6} ,{ "code" :503,"message" :"Service Unavailable" ,"count" :4} ,{ "code" :400,"message" :"Bad Request" ,"count" :4} ,{ "code" :200,"message" :"OK" ,"count" :4} ,{ "code" :418,"message" :"I \u 0027m a teapot" ,"count" :3} ,{ "code" :422,"message" :"Unprocessable Entity" ,"count" :3} ,{ "code" :304,"message" :"Not Modified" ,"count" :2}]
[{ "code" :503,"message" :"Service Unavailable" ,"count" :7} ,{ "code" :302,"message" :"Found" ,"count" :7} ,{ "code" :301,"message" :"Moved Permanently" ,"count" :6} ,{ "code" :304,"message" :"Not Modified" ,"count" :5} ,{ "code" :418,"message" :"I \u 0027m a teapot" ,"count" :5} ,{ "code" :400,"message" :"Bad Request" ,"count" :5} ,{ "code" :422,"message" :"Unprocessable Entity" ,"count" :4} ,{ "code" :200,"message" :"OK" ,"count" :4} ,{ "code" :403,"message" :"Forbidden" ,"count" :3} ,{ "code" :401,"message" :"Unauthorized" ,"count" :2}]
I develop on a Mac using Brew or Docker, here is my environment for this post:
java -version
java version "1.8.0_121"
Java( TM) SE Runtime Environment ( build 1.8.0_121-b13)
Java HotSpot( TM) 64-Bit Server VM ( build 25.121-b13, mixed mode)
brew list --versions | egrep -i "(scala|zookeeper|gradle|kafka)"
gradle 4.9
kafka 1.1.0
scala@2.11 2.11.12
zookeeper 3.4.12
# in ~/.profile:
export JAVA_HOME = " $( /usr/libexec/java_home) "
export ZOOKEEPER_HOME = "/usr/local/Cellar/zookeeper/3.4.12"
export KAFKA_HOME = /usr/local/Cellar/kafka/1.1.0
export KAFKA_CONF = /usr/local/etc/kafka
export PATH = "/usr/local/opt/scala@2.11/bin: $PATH "
# zookeeper conf
cat $KAFKA_CONF /zookeeper.properties | egrep -iv "^#"
dataDir = /usr/local/var/lib/zookeeper
clientPort = 2181
maxClientCnxns = 0
# kafka conf
cat $KAFKA_CONF /server.properties | egrep -iv "^(#| $) "
broker.id= 0
num.network.threads= 3
num.io.threads= 8
socket.send.buffer.bytes= 102400
socket.receive.buffer.bytes= 102400
socket.request.max.bytes= 104857600
log.dirs= /usr/local/var/lib/kafka-logs
num.partitions= 1
num.recovery.threads.per.data.dir= 1
offsets.topic.replication.factor= 1
transaction.state.log.replication.factor= 1
transaction.state.log.min.isr= 1
log.retention.hours= 168
log.segment.bytes= 1073741824
log.retention.check.interval.ms= 300000
zookeeper.connect= localhost:2181
zookeeper.connection.timeout.ms= 6000
group.initial.rebalance.delay.ms= 0
Source code on Github