Overview

There is a lot of insufficient examples of ReplyingKafkaTemplate, and it did not work properly even after searching after searching. Implement it yourself and share the code that confirmed the action. I followed the example on https://docs.spring.io/spring-kafka/reference/html/#replying-template
After implementation, it was driven by SpringBoot.

 

It is easy to check the log in console by creating a project of the transmitter and the receiver.
Kafka server is the same as below and must be installed in advance.

kafka.bootstrap-servers=127.0.0.1:31090
For descriptions of Kafka installation, look for posts.

 

Request Project Code Example

package com.example.demo.req;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;

@Configuration
@EnableKafka
public class KafkaConfiguration {

	@Value("${kafka.bootstrap-servers}")
	private String bootstrapServers;

	@Value("${kafka.topic.requestreply-topic}")
	private String requestReplyTopic;

	@Value("${kafka.consumergroup}")
	private String consumerGroup;

	@Bean
	public Map<String, Object> producerConfigs() {
		Map<String, Object> props = new HashMap<>();
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		return props;
	}

	@Bean
	public Map<String, Object> consumerConfigs() {
		Map<String, Object> props = new HashMap<>();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello2");
		return props;
	}

	@Bean
	public ProducerFactory<String, String> producerFactory() {
		return new DefaultKafkaProducerFactory<>(producerConfigs());
	}

	@Bean
	public KafkaTemplate<String, String> kafkaTemplate() {
		return new KafkaTemplate<>(producerFactory());
	}

	@Bean
	public ReplyingKafkaTemplate<String, String, String> replyKafkaTemplate(ProducerFactory<String, String> pf,
			ConcurrentMessageListenerContainer<String, String> replyContainer) {
		return new ReplyingKafkaTemplate<>(pf, replyContainer);

	}

	@Bean
	public ConcurrentMessageListenerContainer<String, String> replyContainer(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
		factory.setConsumerFactory(consumerFactory());
		factory.setReplyTemplate(kafkaTemplate());
		ConcurrentMessageListenerContainer<String, String> repliesContainer = factory.createContainer(requestReplyTopic);

		repliesContainer.setAutoStartup(false);

		return repliesContainer;
	}

	@Bean
	public ConsumerFactory<String, String> consumerFactory() {
		return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new StringDeserializer());
	}
	
}
package com.example.demo.req;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.requestreply.RequestReplyFuture;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;

@SpringBootApplication
public class RequestingKafka {

	@Value("${kafka.bootstrap-servers}")
	private String bootstrapServers;
	
	@Value("${kafka.topic.requestreply-topic}")
	private String requestReplyTopic;

	public static void main(String[] args) {
		SpringApplication.run(RequestingKafka.class, args).close();
	}

	@Bean
	public KafkaAdmin admin() {
		Map<String, Object> configs = new HashMap<>();
		configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		return new KafkaAdmin(configs);
	}

	@Bean
	public ApplicationRunner runner(KafkaAdmin kafkaAdmin, ReplyingKafkaTemplate<String, String, String> replyKafkaTemplate) {

		return args -> {

			AdminClient admin = AdminClient.create(kafkaAdmin.getConfigurationProperties());

			// Create Topic
			List<String> topicNames = new ArrayList<>();
			List<NewTopic> topics = new ArrayList<>();

			String topic = "Perf-Topic-Test";
			topicNames.add(topic);
			topics.add(new NewTopic(topic, 1, (short) 1));
			admin.createTopics(topics);

			// Create Request and receive reply
			ProducerRecord<String, String> record = new ProducerRecord<>(topic, 0, "", "Message:" + new Date());
			record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes(StandardCharsets.UTF_8)));

			RequestReplyFuture<String, String, String> replyFuture = replyKafkaTemplate.sendAndReceive(record, Duration.ofSeconds(30 * 1000));
			SendResult<String, String> sendResult = null;
			try {
				sendResult = replyFuture.getSendFuture().get(30 * 1000, TimeUnit.SECONDS);	// timeout
			} catch (InterruptedException | ExecutionException | TimeoutException e) {
				e.printStackTrace();
			}

			System.out.println("Sent ok: " + sendResult.getRecordMetadata());
			ConsumerRecord<String, String> consumerRecord = null;
			try {
				consumerRecord = replyFuture.get(30 * 1000, TimeUnit.SECONDS);
			} catch (InterruptedException | ExecutionException | TimeoutException e) {
				e.printStackTrace();
			}
			System.out.println("Return value: " + consumerRecord.value());
		};
	}

	@Bean
	public NewTopic kReplies() {
		return TopicBuilder.name("kReplies").partitions(10).replicas(2).build();
	}

}
// build.gradle

buildscript {
	repositories {
		maven {
			url "{{input your repository}}"
			allowInsecureProtocol = true
		}
	}
}

plugins {
	id 'org.springframework.boot' version '2.6.6'
	id 'io.spring.dependency-management' version '1.0.11.RELEASE'
	id 'java'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

repositories {
	maven {
		url "{{input your repository}}"
		allowInsecureProtocol = true
	}
}

dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation group: 'org.springframework.kafka', name: 'spring-kafka'
	implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind'
	implementation group: 'com.fasterxml.jackson.core', name: 'jackson-annotations'
	implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core'
    
  implementation 'org.projectlombok:lombok'
  compileOnly 'org.projectlombok:lombok'
  annotationProcessor "org.springframework.boot:spring-boot-configuration-processor"
	annotationProcessor 'org.projectlombok:lombok'
	testImplementation('org.springframework.boot:spring-boot-starter-test')
	testImplementation 'org.springframework.kafka:spring-kafka-test'
}

dependencyManagement {
	imports {
		mavenBom "org.springframework.cloud:spring-cloud-dependencies:2021.0.0"
		mavenBom "org.springframework.boot:spring-boot-dependencies:2.6.1"
	}
}

test {
	useJUnitPlatform()
}
# application.properties
server.port=8888
kafka.bootstrap-servers=127.0.0.1:31090
kafka.consumer.group-id=test
kafka.topic.request-topic=request-topic
kafka.topic.requestreply-topic=kReplies
kafka.consumergroup=requestreplygorup
kafka.ack=1

 

Recipient Project Code Example

package com.example.demo.res;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

@Configuration
@EnableKafka
public class KafkaConfiguration {

	@Value("${kafka.bootstrap-servers}")
	private String bootstrapServers;

	@Value("${kafka.topic.requestreply-topic}")
	private String requestReplyTopic;

	@Value("${kafka.consumer.group-id}")
	private String consumerGroup;

	@Value("${kafka.send.partition}")
	String sendPartitionValue;

	@Bean
	public KafkaAdmin admin() {
		Map<String, Object> configs = new HashMap<>();
		configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		return new KafkaAdmin(configs);
	}

	@Bean
	public Map<String, Object> producerConfigs() {
		Map<String, Object> props = new HashMap<>();
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		return props;
	}

	@Bean
	public ProducerFactory<String, String> producerFactory() {
		return new DefaultKafkaProducerFactory<>(producerConfigs());
	}

	@Bean
	public KafkaTemplate<String, String> kafkaTemplate() {
		return new KafkaTemplate<>(producerFactory());
	}

	@Bean
	public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
		ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(consumerFactory());
		factory.setReplyTemplate(kafkaTemplate());
		factory.setConcurrency(10);

		return factory;
	}

	@Bean
	public Map<String, Object> consumerConfigs() {
		Map<String, Object> props = new HashMap<>();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
		return props;
	}

	@Bean
	public ConsumerFactory<String, String> consumerFactory() {
		return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new StringDeserializer());
	}

}
 package com.example.demo.res;

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.support.MessageBuilder;

@SpringBootApplication
public class ReplyingKafka {

	public static void main(String[] args) {
		SpringApplication.run(ReplyingKafka.class, args);
	}

	@Value("${kafka.send.partition}")
	String sendPartitionValue;

	int sleep[] = { 9000, 2000 };
	static int cnt = 0;

	@KafkaListener(id = "server", topicPattern = "Perf-Topic-.*")
	public Message<?> listen(ConsumerRecord in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] replyTo,
			@Header(KafkaHeaders.CORRELATION_ID) byte[] correlation) throws InterruptedException {
			
		System.out.println(Thread.currentThread().getName() + ":Server received: " + in);
 
		return MessageBuilder.withPayload(("New " + in.value().toString()).toUpperCase()).setHeader(KafkaHeaders.TOPIC, replyTo)
				.setHeader(KafkaHeaders.MESSAGE_KEY, "ReturnValue").setHeader(KafkaHeaders.CORRELATION_ID, correlation)
				.setHeader(KafkaHeaders.PARTITION_ID, Integer.parseInt(sendPartitionValue)).build();
	}

	@Bean
	public NewTopic kRequests() {
		return TopicBuilder.name("kRequests").partitions(10).replicas(2).build();
	}

	@Bean
	public NewTopic kReplies() {
		return TopicBuilder.name("kReplies").partitions(10).replicas(2).build();
	}
}
// build.gradle

buildscript {
	repositories {
		maven {
			url "{{input your repository}}"
			allowInsecureProtocol = true
		}
	}
}

plugins {
	id 'org.springframework.boot' version '2.6.1'
	id 'io.spring.dependency-management' version '1.0.11.RELEASE'
	id 'java'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

repositories {
	maven {
		url "{{input your repository}}"
		allowInsecureProtocol = true
	}
}

dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-web'
  implementation group: 'org.springframework.kafka', name: 'spring-kafka'
	implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind'
	implementation group: 'com.fasterxml.jackson.core', name: 'jackson-annotations'
	implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core'
    
  implementation 'org.projectlombok:lombok'
  compileOnly 'org.projectlombok:lombok'
  annotationProcessor "org.springframework.boot:spring-boot-configuration-processor"
	annotationProcessor 'org.projectlombok:lombok'
	testImplementation('org.springframework.boot:spring-boot-starter-test')
	testImplementation 'org.springframework.kafka:spring-kafka-test'
}

dependencyManagement {
	imports {
		mavenBom "org.springframework.cloud:spring-cloud-dependencies:2021.0.0"
		mavenBom "org.springframework.boot:spring-boot-dependencies:2.6.1"
	}
}

test {
	useJUnitPlatform()
}
# application.properties

server.port=8889
kafka.bootstrap-servers=127.0.0.1:31090
kafka.consumer.group-id=test
kafka.topic.request-topic=request-topic
kafka.topic.requestreply-topic=kReplies
kafka.consumergroup=requestreplygorup
kafka.receive.partition=2
kafka.send.partition=5

 

Console Result

N/A

 

Conclusion

N/A

728x90
1
반응형

+ Recent posts