Kafka Producer & Consumer with batch listener with Customized object type in Spring Boot.

Iroshan Aberathne
3 min readNov 4, 2019

--

In this article we are going to discuss implementation process for Kafka producer with consumer batch listener with custom data type instead of primitive types like int or String.

Prerequisite

  1. Fundamental knowledge in spring boot and java
  2. Already setup kafka brokers
  3. Kafka consumer & producer parameters

1.0 Spring boot POM.xml File

Add following dependencies to pom.xml to enable the kafka and json mapping and serialization apart from default spring boot dependencies.

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

2.0 Custom Data Type

Our custom data type would be Container since we are dealing kafka messages with custom data type.

public class Container {
private String id;
private List<Object> data;
}

The container class has String type id field and Object type list field where any kind of data type can be stored.

3.0 Kafka Producer

3.1 Kafka Configuration Class

There should be a configuration class so that all the producer and consumer parameters can be configured. Following sample code illustrates the essential annotation of kafka configuration class.

@Configuration
@EnableKafka
public class KafkaConfig {}

3.2 Producer Configuration

To enable the kafka producer kafkaTemplate() and producerFactory() methods should be implemented in KafkaConfig class. The kafkaTemplate() will return a new kafkaTemplate based on the configuration defined in producerFactory(). This KafkaTemplate sends messages to kafka broker with the custom datatype called Container which is defined above.

@Bean
KafkaTemplate<String, Container> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, Container> producerFactory(){
Map<String, Object> config = new HashMap<>();

config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka_broker_ip:9092, kafka_broker_ip:9093, kafka_broker_ip:9094");
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.RETRIES_CONFIG, 0);
config.put(ProducerConfig.BATCH_SIZE_CONFIG, 1000);
config.put(ProducerConfig.LINGER_MS_CONFIG, 1);
config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

return new DefaultKafkaProducerFactory(config);
}

3.3 Producer

The following code segment show the way of implementing kafka producer in Producer service class(in my code). In kafka producer template we can configure the payload, topic and partitionId if needed for a given message.

@Autowired
private KafkaTemplate<String, Container> kafkaTemplate;
public boolean sendMessage(Container containerMsg) {
Message<Container> message = MessageBuilder
.withPayload(containerMsg)
.setHeader(KafkaHeaders.TOPIC, "test_topic")
.setHeader(KafkaHeaders.PARTITION_ID, 1)
.build();
this.kafkaTemplate.send(message);
}

Payload : the message to be pushed to kafka broker

Topic : the topic name where sent message will be stored in kafka broker

Partition_ID : if given topic have multiple partition sender has to mention the partition id of the topic.

Note that if you are new to kafka please refer Apache Kafka documentation first.

4.0 Kafka Consumer

4.1 Batch Listener Consumer Configuration

Implement consumerFactory() and kafkaListenerContainerFactory() methods in KafkaConfig class where both methods are used to enable kafka batch listener. In consumerFactory method has special json dezerializer to de-serialized our custom Container datatype. In order to enable the batch listening capabilities in kafka listener following configuration parameter in kafkaListenerContainerFactory method should be enabled.

factory.setBatchListener(true)

@Bean
public ConsumerFactory<String, Container> consumerFactory(){
JsonDeserializer<Container> deserializer = new JsonDeserializer<>(Container.class);
deserializer.setRemoveTypeHeaders(false);
deserializer.addTrustedPackages("*");
deserializer.setUseTypeMapperForKey(true);

Map<String, Object> config = new HashMap<>();

config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka_broker_ip:9092, kafka_broker_ip:9093, kafka_broker_ip:9094");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_one");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);

return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Container> kafkaListenerContainerFactory(){
Concu can be usedrrentKafkaListenerContainerFactory<String, Container> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}

4.2 Consumer

There will be a kafka consumer with the capability to consume messages in bulk for giver topic and partition as follows in consumer service class.

@KafkaListener(
topicPartitions = { @TopicPartition(topic = "test_topic", partitions = { "1" })},
groupId = "group_one",
containerFactory = "kafkaListenerContainerFactory",
autoStartup = "true")
public void consumeFromCoreTopicPartitionZERO(@Payload List<Container> containers){
LOGGER.info("\n/******* Consume TEST-TOPIC Partition ---->>>>>> ONE ********/\n"+containers);
}

You can implement your logic once receives the container messages.

Thank you !!!!

--

--

Responses (2)