Kafka, Streams and Avro serialization
I this post I will show how to easily run a Kafka broker on the local host and use it to exchange data between a producer and a consumer.
We will see how to serialize the data in the JSON format and the efficient Avro format.
Requirements
- Java 8 or higher
- Docker and docker-compose
Instructions can be found in this quickstart from Confluent. - gradle
The Kafka broker
Kafka is a distributed streaming platform and the Kafka broker is the channel through which the messages are passed.
The easiest way to start a single Kafka broker locally is probably to run the pre-packaged Docker images with this docker-compose.yml file.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:3.3.1
network_mode: host
environment:
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:3.3.1
network_mode: host
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: localhost:32181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
JSON Serialization
The producer
The producer creates the objects, convert (serialize) them to JSON and publish them by sending and enqueuing to Kafka.
The basic properties of the producer are the address of the broker and the serializer of the key and values. The serializer of the key is set to the StringSerializer and should be set according to its type. The value is sent as a json string so StringSerializer is selected. .
1
2
3
4
5
6
7
private static Producer<String, String> createProducer() {
Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
}
The object that is sent (Item) is a simple Java object (POJO) and should be serializable by the serialization library (Jackson in our case).
1
2
3
4
5
private void send(Item item) throws IOException {
String message = mapper.writeValueAsString(item);
ProducerRecord<String, String> record = new ProducerRecord<>("items", message);
producer.send(record);
}
The consumer
The consumer reads the objects as JSON from the Kafka queue and convert (deserializes) them back to the original object .
The basic properties of the consumer similar to the ones of the producer (note that the Serializer are replaced with a Deserializer) In addition, the consumer group must be specified.
1
2
3
4
5
6
7
8
private Consumer<String, String> createConsumer() {
Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(GROUP_ID_CONFIG, "example");
props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<>(props);
}
The consumer reads the objects from subscribed topics and then convert and process them.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void consume() {
consumer.subscribe(Arrays.asList("items"));
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
records.forEach(this::processRecord);
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void processRecord(ConsumerRecord<String, String> record) {
try {
Item item = mapper.readValue(record.value(), Item.class);
processItem(item);
} catch (IOException e) {
// handle error
}
}
Avro Serialization
Apache Avro is a data serialization system that provides a compact and fast binary data format.
We will use it to send serialized objects and read them from Kafka.
The Schema Registry
Schema Registry is a service that manages the schemas of Avro so the producer and the consumer speaks the same language. Running the registry locally is as simple as adding its settings to the docker-compose.yml file:
1
2
3
4
5
6
7
8
9
10
schema-registry:
image: confluentinc/cp-schema-registry:3.3.1
network_mode: host
depends_on:
- zookeeper
- kafka
environment:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: localhost:32181
SCHEMA_REGISTRY_HOST_NAME: localhost
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
GenericRecord vs. SpecificRecord
There are basically 2 ways to exchange Avro objects GenericRecord and SpecificRecord.
GenericRecord is a record that contains the object data in the form of a map structure.
An Item object, for example, can be represented as:
1
2
3
4
5
Item:
UTF8: name
UTF8: description
long: sku
double: price
SpecificRecord, on the other hand, contains a modified version ot the object, that knows how to serialize / deserialize itself so the consumer doesn’t have to deserialize it explicitly. First, let’s examine the generic way
The generic way
The producer
The producer has to be modified to create and send the serialized objects instead of JSON Strings, so we have to tell it to serialize the values with the KafkaAvroSerializer and to use the schema registry for exchanging the schema with the consumer.
1
2
3
4
5
6
7
private static Producer<String, Object> createProducer() {
Properties props = new Properties();
...
props.put(VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
props.put(SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
return new KafkaProducer<>(props);
}
At this stage, the schema has to be specified inline and object has to be explicitly serialized before sending. Let’s first create a schema for the Item object:
1
2
3
4
5
6
7
8
9
10
11
12
13
private Schema createSchema() {
//language=JSON
String userSchema = "{\"type\":\"record\",\n" +
" \"name\":\"item\",\n" +
" \"fields\":[\n" +
" {\"name\":\"name\",\"type\":\"string\"},\n" +
" {\"name\":\"description\",\"type\":\"string\"},\n" +
" {\"name\":\"sku\",\"type\":\"long\"},\n" +
" {\"name\":\"price\",\"type\":\"double\"}\n" +
" ]}";
Schema.Parser parser = new Schema.Parser();
return parser.parse(userSchema);
}
And the send() method should be modified to:
1
2
3
4
5
6
7
8
9
private void send(Item item) {
GenericRecord avroRecord = createAvroRecord(item);
ProducerRecord<String, Object> record = new ProducerRecord<>("items", avroRecord);
try {
producer.send(record);
} catch (SerializationException e) {
// handle error
}
}
The consumer
The consumer has to be modified to expect serialized objects and deserialize them with
1
2
3
4
5
6
7
private static Consumer<String, Item> createConsumer() {
Properties props = new Properties();
...
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
return new KafkaConsumer<>(props);
}
The consumer reads has to deserialize the Avro object back to the Java’s Item POJO.
1
2
3
4
5
6
7
8
9
10
11
12
13
private void processRecord(ConsumerRecord<String, Object> record) {
Record value = (Record) record.value();
Item item = parseItem(value);
processItem(item);
}
private Item parseItem(Record record) {
return new Item(
((Utf8) record.get("name")).toString(),
((Utf8) record.get("description")).toString(),
(Long) record.get("sku"),
(Double) record.get("price"));
}
The specific way
As noted above, SpecificRecord contains a modified version ot the object, that knows how to serialize / deserialize itself.
The class of that object can be generated automatically from an Avro schema file.
Let’s create the following schema and place it under src/main/avro/Item.avsc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{
"namespace": "io.github.msayag.kafka.avro",
"type": "record",
"name": "Item",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "description",
"type": "string"
},
{
"name": "sku",
"type": "long"
},
{
"name": "price",
"type": "double"
}
]
}
After adding the following to the build.gradle file, the class Item will be created every time the build task is invoked.
1
2
3
4
5
6
7
8
9
10
apply plugin: "com.commercehub.gradle.plugin.avro"
buildscript {
repositories {
jcenter()
}
dependencies {
classpath "com.commercehub.gradle.plugin:gradle-avro-plugin:0.11.0"
}
}
The producer
The use of the generated class let us simplify the producer and the consumer.
The producer doesn’t have to create the schema explicitly, nor to create a GenericRecord before sending the message.
The createSchema() methos is not used any more and can be removed, and the send() method is simplified to:
1
2
3
4
5
6
7
8
private void send(Item item) {
ProducerRecord<String, Item> record = new ProducerRecord<>("items", item);
try {
producer.send(record);
} catch (SerializationException e) {
// handle error
}
}
The consumer
We need to tell the consumer to deserialize the object as a specific record
1
2
3
4
5
6
private static Consumer<String, Item> createConsumer() {
Properties props = new Properties();
...
props.put(SPECIFIC_AVRO_READER_CONFIG, "true");
return new KafkaConsumer<>(props);
}
It also doesn’t have to parse the item anymore and the processRecord() is simplified to:
1
2
3
4
private void processRecord(ConsumerRecord<String, Item> record) {
Item item = record.value();
processItem(item);
}
Compression
To achieve even smaller messages, an additional compression can be added on top of the Avro serialization.
The message can be kept un-compressed (‘none’), or be compressed by the producer with either gzip, snappy, or lz4.
The compression codec can be set by adding “compression.type” to the producer’s properties, for example:
1
props.put(COMPRESSION_TYPE_CONFIG, "snappy");
For more information check here and here
Kafka Streams
Kafka Streams is a client library for building applications and microservices.
It let us stream messages from one service to another and process, aggregate and group them without the need to explicitly poll, parse and send them back to other Kafka topics.
The consumer has to be rewritten as
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static Properties getProperties() {
Properties props = new Properties();
props.put(APPLICATION_ID_CONFIG, "example-application");
props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
return props;
}
public void consume() {
StreamsBuilder builder = new StreamsBuilder();
builder.<String, Item>stream("items")
.mapValues(item -> processItem(item))
.to("items2", Produced.with(Serdes.String(), Serdes.String()));
Properties config = getProperties();
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
}
private String processItem(Item item) {
return item.getName() + ": " + item.getPrice();
}
The producer, on the other hand, doesn’t have to be modified at all.
Execution Instructions
All the code for this tutorial can be downloaded from the GitHub repository using the links below.To run the examples:
- Enter the folder of the specific section
- Run the Docker containers: docker-compose up -d
- Run the producer: ./gradlew run -Pmode=producer
- Run the consumer: ./gradlew run -Pmode=consumer
- When done, stop the dockers: docker-compose down
Tags
kafka
avro
docker
spring
webflux
java9
modules
jpms