728x90
Overview
There is a lot of insufficient examples of ReplyingKafkaTemplate, and it did not work properly even after searching after searching. Implement it yourself and share the code that confirmed the action. I followed the example on https://docs.spring.io/spring-kafka/reference/html/#replying-template
After implementation, it was driven by SpringBoot.
It is easy to check the log in console by creating a project of the transmitter and the receiver.
Kafka server is the same as below and must be installed in advance.
kafka.bootstrap-servers=127.0.0.1:31090
For descriptions of Kafka installation, look for posts.
Request Project Code Example
package com.example.demo.req;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
@Configuration
@EnableKafka
public class KafkaConfiguration {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.topic.requestreply-topic}")
private String requestReplyTopic;
@Value("${kafka.consumergroup}")
private String consumerGroup;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello2");
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyKafkaTemplate(ProducerFactory<String, String> pf,
ConcurrentMessageListenerContainer<String, String> replyContainer) {
return new ReplyingKafkaTemplate<>(pf, replyContainer);
}
@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
factory.setConsumerFactory(consumerFactory());
factory.setReplyTemplate(kafkaTemplate());
ConcurrentMessageListenerContainer<String, String> repliesContainer = factory.createContainer(requestReplyTopic);
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new StringDeserializer());
}
}
package com.example.demo.req;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.requestreply.RequestReplyFuture;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
@SpringBootApplication
public class RequestingKafka {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.topic.requestreply-topic}")
private String requestReplyTopic;
public static void main(String[] args) {
SpringApplication.run(RequestingKafka.class, args).close();
}
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new KafkaAdmin(configs);
}
@Bean
public ApplicationRunner runner(KafkaAdmin kafkaAdmin, ReplyingKafkaTemplate<String, String, String> replyKafkaTemplate) {
return args -> {
AdminClient admin = AdminClient.create(kafkaAdmin.getConfigurationProperties());
// Create Topic
List<String> topicNames = new ArrayList<>();
List<NewTopic> topics = new ArrayList<>();
String topic = "Perf-Topic-Test";
topicNames.add(topic);
topics.add(new NewTopic(topic, 1, (short) 1));
admin.createTopics(topics);
// Create Request and receive reply
ProducerRecord<String, String> record = new ProducerRecord<>(topic, 0, "", "Message:" + new Date());
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes(StandardCharsets.UTF_8)));
RequestReplyFuture<String, String, String> replyFuture = replyKafkaTemplate.sendAndReceive(record, Duration.ofSeconds(30 * 1000));
SendResult<String, String> sendResult = null;
try {
sendResult = replyFuture.getSendFuture().get(30 * 1000, TimeUnit.SECONDS); // timeout
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
System.out.println("Sent ok: " + sendResult.getRecordMetadata());
ConsumerRecord<String, String> consumerRecord = null;
try {
consumerRecord = replyFuture.get(30 * 1000, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
System.out.println("Return value: " + consumerRecord.value());
};
}
@Bean
public NewTopic kReplies() {
return TopicBuilder.name("kReplies").partitions(10).replicas(2).build();
}
}
// build.gradle
buildscript {
repositories {
maven {
url "{{input your repository}}"
allowInsecureProtocol = true
}
}
}
plugins {
id 'org.springframework.boot' version '2.6.6'
id 'io.spring.dependency-management' version '1.0.11.RELEASE'
id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
repositories {
maven {
url "{{input your repository}}"
allowInsecureProtocol = true
}
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation group: 'org.springframework.kafka', name: 'spring-kafka'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-annotations'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core'
implementation 'org.projectlombok:lombok'
compileOnly 'org.projectlombok:lombok'
annotationProcessor "org.springframework.boot:spring-boot-configuration-processor"
annotationProcessor 'org.projectlombok:lombok'
testImplementation('org.springframework.boot:spring-boot-starter-test')
testImplementation 'org.springframework.kafka:spring-kafka-test'
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:2021.0.0"
mavenBom "org.springframework.boot:spring-boot-dependencies:2.6.1"
}
}
test {
useJUnitPlatform()
}
# application.properties
server.port=8888
kafka.bootstrap-servers=127.0.0.1:31090
kafka.consumer.group-id=test
kafka.topic.request-topic=request-topic
kafka.topic.requestreply-topic=kReplies
kafka.consumergroup=requestreplygorup
kafka.ack=1
Recipient Project Code Example
package com.example.demo.res;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
@Configuration
@EnableKafka
public class KafkaConfiguration {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.topic.requestreply-topic}")
private String requestReplyTopic;
@Value("${kafka.consumer.group-id}")
private String consumerGroup;
@Value("${kafka.send.partition}")
String sendPartitionValue;
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new KafkaAdmin(configs);
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setReplyTemplate(kafkaTemplate());
factory.setConcurrency(10);
return factory;
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new StringDeserializer());
}
}
package com.example.demo.res;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.support.MessageBuilder;
@SpringBootApplication
public class ReplyingKafka {
public static void main(String[] args) {
SpringApplication.run(ReplyingKafka.class, args);
}
@Value("${kafka.send.partition}")
String sendPartitionValue;
int sleep[] = { 9000, 2000 };
static int cnt = 0;
@KafkaListener(id = "server", topicPattern = "Perf-Topic-.*")
public Message<?> listen(ConsumerRecord in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] replyTo,
@Header(KafkaHeaders.CORRELATION_ID) byte[] correlation) throws InterruptedException {
System.out.println(Thread.currentThread().getName() + ":Server received: " + in);
return MessageBuilder.withPayload(("New " + in.value().toString()).toUpperCase()).setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.MESSAGE_KEY, "ReturnValue").setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.setHeader(KafkaHeaders.PARTITION_ID, Integer.parseInt(sendPartitionValue)).build();
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests").partitions(10).replicas(2).build();
}
@Bean
public NewTopic kReplies() {
return TopicBuilder.name("kReplies").partitions(10).replicas(2).build();
}
}
// build.gradle
buildscript {
repositories {
maven {
url "{{input your repository}}"
allowInsecureProtocol = true
}
}
}
plugins {
id 'org.springframework.boot' version '2.6.1'
id 'io.spring.dependency-management' version '1.0.11.RELEASE'
id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
repositories {
maven {
url "{{input your repository}}"
allowInsecureProtocol = true
}
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation group: 'org.springframework.kafka', name: 'spring-kafka'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-annotations'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core'
implementation 'org.projectlombok:lombok'
compileOnly 'org.projectlombok:lombok'
annotationProcessor "org.springframework.boot:spring-boot-configuration-processor"
annotationProcessor 'org.projectlombok:lombok'
testImplementation('org.springframework.boot:spring-boot-starter-test')
testImplementation 'org.springframework.kafka:spring-kafka-test'
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:2021.0.0"
mavenBom "org.springframework.boot:spring-boot-dependencies:2.6.1"
}
}
test {
useJUnitPlatform()
}
# application.properties
server.port=8889
kafka.bootstrap-servers=127.0.0.1:31090
kafka.consumer.group-id=test
kafka.topic.request-topic=request-topic
kafka.topic.requestreply-topic=kReplies
kafka.consumergroup=requestreplygorup
kafka.receive.partition=2
kafka.send.partition=5
Console Result
N/A
Conclusion
N/A
728x90