版本说明
版本对应的问题请参考:Spring for Apahe Kafka
本文基于 Spring for Apache Kafka 的 2.8.0 版本,Kafka 的 3.0.0 版本,如果还不了解 kafka 怎么搭建和它的核心概念请参考:Kafka-1: 安装与关键概念介绍,关于 Spring for Apache Kafka 问题请参考 官方文档。
不使用 Spring Boot
在不使用 SpringBoot 的时,所有的 kafka 配置都不能使用自动装配来实现,需要自己写配置类,并且添加 @EnableKafka 来启动 AbstractListenerContainerFactory 子类的监听,如果不添加 @EnableKafka 的话 @KafkaListener 这些注解是没有办法使用的。
演示实例
public class Sender {
public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class);
context.getBean(Sender.class).send("test", 42);
}
private final KafkaTemplate<Integer, String> template;
public Sender(KafkaTemplate<Integer, String> template) {
this.template = template;
}
public void send(String toSend, int key) {
this.template.send("topic1", key, toSend);
}
}
public class Listener {
@KafkaListener(id = "listen1", topics = "topic1")
public void listen1(String in) {
System.out.println(in);
}
}
@Configuration
@EnableKafka
public class Config {
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String>
kafkaListenerContainerFactory(ConsumerFactory<Integer, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProps());
}
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// ...
return props;
}
@Bean
public Sender sender(KafkaTemplate<Integer, String> template) {
return new Sender(template);
}
@Bean
public Listener listener() {
return new Listener();
}
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(senderProps());
}
private Map<String, Object> senderProps() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//...
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {
return new KafkaTemplate<Integer, String>(producerFactory);
}
}
复制代码
使用 Spring Boot
使用 Spring Boot 我们便可以省去很多配置类的编写,很多配置都是可以通过 yaml 的方式来注入。
@EnableKafka 在 Spring Boot 中也不是必须的。如果你不想使用 Kafka 自动配置,比如测试中不使用 Kafka 的自定配置,那么你只需要将 KafkaAutoConfiguration 配置类给移除就行了,这种方式也适用于 Spring Boot 中移除其他的自动装配:@SpringBootTest("spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration") 。因为 Spring for Apache Kafka 的官方文档大多是以 Spring 为例进行介绍的,要了解 Spring boot 和它的集成可以参考 Spring Boot 的官方文档的 Messaging 部分。
如果想要了解 Spring Boot 帮我们自动装配了哪些 bean 的话可以查看 org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration 类,如果想要了解我们在 yaml 文件中可以配置那些关于 spring.kafka 的内容的话,可以查看 org.springframework.boot.autoconfigure.kafka.KafkaProperties 类,或者查看 Spring Boot 官网提供的 Application Properties,并且要善用 command(ctrl) + f;如果想要了解 Spring Boot 自动装配的原理可以查看:自定义开发 Spring Boot Starter
采坑指南:
在我们编写
Spring Boot的配置文件不知道都有什么配置项的时候,都可以去找对应autoConfiuration的Properties类。
最小化方式演示 Spring for kafka
导入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
复制代码
代码示例
package com.aha.kafka;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaTemplate;
@SpringBootApplication
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args);
}
/**
* 没有对应的 topic 就创建,有就不做任何处理
* @return
*/
@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build();
}
/**
* 消费者 - 监听 topic1
* @param in
*/
@KafkaListener(id = "myId", topics = "topic1")
public void listen(String in) {
System.out.println(in);
}
/**
* 生产者 - 向 topic1 中发送消息
* ApplicationRunner 的作用就是在项目启动的时候 会执行 run 方法
*
* @param template
* @return
*/
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
// 使用 lambda 表达式的方式 - 使用匿名内部类实现 ApplicationRunner 并重写 run 方法
return args -> {
template.send("topic1", "test");
};
}
}
复制代码
yaml 文件
server:
port: 8088
spring:
kafka:
bootstrap-servers: 10.211.55.3:9092,10.211.55.4:9092,10.211.55.5:9092
consumer:
auto-offset-reset: earliest
复制代码
启动工程便可以看到控制台中打印 test,这说明最小化演示就成功了。
创建 Topic
在启动时就创建 Topic,如果该 Topic 存在则忽略。
编写 yaml 文件
server:
port: 8088
spring:
kafka:
template:
# 当使用 kafkaTemplate 的 sendDefault 方法的时候,使用的是这里配置的 topic
defaultTopic: topic-1
# partition-num 和 replication-num KafkaProperties 没有提供配置的地方
partition-num: 3
replication-num: 3
bootstrap-servers: 10.211.55.3:9092,10.211.55.4:9092,10.211.55.5:9092
producer:
retries: 3
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
# @KafkaListener 中不指定 group-id 使用这边 groupId,这边也不指定项目就报错起不来,指定了就覆盖这边的
# group-id: test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: latest
# auto-offset-reset: earliest
enable-auto-commit: true
复制代码
在 yaml 中配置完这个以后,kafkaAdmin 便会自动去连接 Kafka 的地址。
编写 Topic 的配置配置
package com.aha.kafka.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.TopicConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaAdmin;
import java.util.Arrays;
/**
* 在启动时就自动创建 Topic,如果该 Topic 存在则忽略。
*
* @author: WT
* @date: 2021/12/5 15:38
*/
@Configuration
@Slf4j
public class KafkaTopicConfig {
private final KafkaProperties kafkaProperties;
public KafkaTopicConfig (KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
/**
* When using Spring Boot,
* a KafkaAdmin bean is automatically registered
* so you only need the NewTopic (and/or NewTopics) @Bean s.
* 当使用 springBoot 的时候 kafkaAdmin 会自动注入。
* 可以参考 org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
* @return
*/
// @Bean
// public KafkaAdmin kafkaAdmin() {
// Map<String, Object> configs = new HashMap<>();
// // AdminClientConfig 这里面的属性 都是可以配置的,如果想要配置属性不知道名字是什么可以点这个类进去看一下
// configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
// return new KafkaAdmin(configs);
// }
/**
* 老版本创建 topic 的方式
*
* 创建 Topic
*/
@Bean
public NewTopic topicinfo() {
// 创建topic,需要指定创建的topic的"名称"、"分区数"、"副本数量(副本数数目的值要小于等于Broker数量)"
return new NewTopic(
kafkaProperties.getTemplate().getDefaultTopic(),
kafkaProperties.getPartitionNum(),
kafkaProperties.getReplicationNum()
);
}
/**
* 新版本建议使用 TopicBuilder 来创建 topic
* @return
*/
@Bean
public NewTopic topic1() {
return TopicBuilder.name(kafkaProperties.getTemplate().getDefaultTopic())
.partitions(kafkaProperties.getPartitionNum())
.replicas(kafkaProperties.getReplicationNum())
.compact()
.build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
@Bean
public NewTopic topic3() {
return TopicBuilder.name("thing3")
.assignReplicas(0, Arrays.asList(0, 1))
.assignReplicas(1, Arrays.asList(1, 2))
.assignReplicas(2, Arrays.asList(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
/**
* Starting with version 2.7,
* you can declare multiple NewTopic s in a single KafkaAdmin.NewTopics bean definition
* @return
*/
@Bean
public KafkaAdmin.NewTopics topics456() {
return new KafkaAdmin.NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build());
}
}
复制代码
注意:
@KafkaListener(topics = {"topic-2"}),当topic-2不存在的时候是会自动创建的,不过自动创建的Topic的Partition和Replication都是 1 ,可以通过修改kafka的默认配置来改变。- 通过
yaml文件不会自动创建Topic。- 通过
NewTopic的bean可以指定分区数和副本数量。
使用 KafkaTemplate 发送消息
方法列表如下
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
// kafkaTemplate 使用的 Producer 的 metrics 信息
Map<MetricName, ? extends Metric> metrics();
// 可以查询指定 topic 的 分区信息
List<PartitionInfo> partitionsFor(String topic);
// 在生产者上执行一些任意操作并返回结果
<T> T execute(ProducerCallback<K, V, T> callback);
// Flush the producer.
void flush();
// 用于执行生产者方法后异步回调
interface ProducerCallback<K, V, T> {
T doInKafka(Producer<K, V> producer);
}
复制代码
sendDefault和send的区别就是Topic是不是默认的。默认取值取的是spring.kafka.template.defaultTopic。消息在发送的时候,会有一个
timestamp的字段记录到record中,这个属性用户也可以自定义传入。用户能不能自定义这个属性是取决于Topic的配置的,如果Topic配置的是CREATE_TIME,用户指定的timestamp可以生效,用户不指定会默认生成;如果配置的是LOG_APPEND_TIME用户指定的timestamp会被忽略掉,broker会它本地的时间。
Topic的配置为:TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG
同步发送消息 - 阻塞
/**
* 同步发送消息
*
* @param topicName
* @param message
*/
public void sendMessageSync (String topicName, String message) throws ExecutionException, InterruptedException, TimeoutException {
// 同步发送消息 - 建议设置超时时间 - 等待 10 s - 同步返回之后后面在添加同步处理逻辑
kafkaTemplate.send(topicName, "key", message).get(10, TimeUnit.SECONDS);
}
复制代码
异步发送消息 - 非阻塞
/**
* 异步发送消息 - 利用 future 实现发送成功或失败之后的回调
*
* @param topicName
* @param msg
*/
public void sendMessageAsync(String topicName, String msg) {
log.info("===Producing message[{}]: {}", topicName, msg);
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, msg);
// future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
// @Override
// public void onSuccess(SendResult<String, String> result) {
// log.info(result.toString());
// log.info("===Producing message success");
// }
//
// @Override
// public void onFailure(Throwable ex) {
// log.info("===Producing message failed");
// }
// });
/**
* Starting with version 2.5, you can use a KafkaSendCallback instead of a
* ListenableFutureCallback, making it easier to extract the failed ProducerRecord,
* avoiding the need to cast the Throwable
*
* 2.5 版本或者以后的建议使用这种方式来进行异步发送结果的监听,主要的改进便是 onFailure 发生异常的
* 时候不需要进行异常转化了
*/
future.addCallback(
new KafkaSendCallback<String, String>() {
@Override
public void onSuccess(SendResult<String, String> result) {
log.info(result.toString());
log.info("===Producing message success");
}
@Override
public void onFailure(KafkaProducerException ex) {
log.info("===Producing message failed");
}
});
}
/**
* 其实也是异步发送消息 - 只不过没有利用返回值 - 向指定的 topic 中发送消息
*
* @param topicName
* @param message
*/
public void sendMessage (String topicName, String message) {
// kafkaTemplate 的 metrics 信息 和 指定的 topic 的 分区信息
log.info("=== kafkaTemplate.metrics(): {}, kafkaTemplate.partitionsFor: {}",
kafkaTemplate.metrics(), kafkaTemplate.partitionsFor(topicName));
kafkaTemplate.send(topicName, "key", message);
}
复制代码
其实只要不使用
get()方法进行阻塞的话,都是异步进行发送的。当然在使用匿名内部类实现
KafkaSendCallback的时候也可以使用lambda的写法,这时候你可能会有函数式编程只能有一个方法的疑问,这是因为KafkaSendCallback分别继承两个接口一个用于成功回调方法的定义,一个用于失败方法的定义,这样就可以使用lambda来进行实现了具体代码参考:
ListenableFuture<SendResult<Integer, String>> future = template.send("topic", 1, "thing");
future.addCallback(result -> {
...
}, (KafkaFailureCallback<Integer, String>) ex -> {
ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
...
});
复制代码
使用 RoutingKafkaTemplate 发送消息
从 2.5 版本开始,可以使用 RoutingKafkaTemplate 在运行时根据目标的 Topic 名称来选择不同 producer 。
注意: RoutingKafkaTemplate 不支持事务,execute, flush, metrics 操作,因为不知道去操作哪一个 Topic 。RoutingKafkaTemplate 尽量不要与 KafkaTemplate 共用,RoutingKafkaTemplate 是 KafkaTemplate 的子类,可以使用 KafkaTemplate 的方法,所以没有必要共用。
构建 RoutingKafkaTemplate 并发送消息
package com.aha.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.RoutingKafkaTemplate;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.regex.Pattern;
@SpringBootApplication
@Slf4j
public class KafkaRoutingTemplateApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaRoutingTemplateApplication.class, args);
}
// ========== RoutingKafkaTemplate =========
@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
ProducerFactory<Object, Object> pf) {
// Clone the PF with a different Serializer, register with Spring for shutdown
Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
// 使用原有的 ProducerFactory 配置,覆盖 value 序列化方式,创建 字节序列化的 ProducerFactory
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
// 将创建的 字节序列化 ProducerFactory 注册到 spring 容器中
context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);
// 使用 map 需要是有序的 map
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
// 字节序列化方式的 ProducerFactory,如果是发送到 topic name 为 two 的使用这个工厂
map.put(Pattern.compile("two"), bytesPF);
// Default PF with StringSerializer ,匹配任何的 topic name 使用 这个工厂
map.put(Pattern.compile(".+"), pf);
return new RoutingKafkaTemplate(map);
}
@Bean
public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
return args -> {
routingTemplate.send("one", "thing1");
routingTemplate.send("two", "thing2".getBytes());
};
}
}
复制代码
在构建
RoutingTemplate的时候需要一个有序的Map,例如:LinkedHashMap,map的key应该是java.util.regex.Pattern类型用来匹配Topic的名称,value是ProducerFactory<Object, Object>跟正则匹配上的使用的Producer。
监听 RoutingKafkaTemplate 发送的消息
package com.aha.kafka.service;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* kafka 消费者
*
* @author: WT
* @date: 2021/12/4 20:42
*/
@Component
@Slf4j
public class KafkaConsumerService {
@KafkaListener(topics = "${spring.kafka.routingTemplate.topicOne}", groupId = "group1")
public void routingTopicOne (ConsumerRecord<String, String> record) {
Optional<String> message = Optional.ofNullable(record.value());
log.info("record:{}", record);
if (message.isPresent()) {
log.info("topicOne 接收到的消息 message:{}", message.get());
}
}
@KafkaListener(topics = "${spring.kafka.routingTemplate.topicTwo}", groupId = "group1")
public void routingTopicTwo (ConsumerRecord<String, String> record) {
Optional<String> message = Optional.ofNullable(record.value());
log.info("record:{}", record);
if (message.isPresent()) {
log.info("topicTwo 接收到的消息 message:{}", message.get());
}
}
}
复制代码
控制台打印日志内容
2021-12-08 15:40:41.158 INFO 73269 --- [ntainer#1-0-C-1] c.a.kafka.service.KafkaConsumerService : topicTwo 接收到的消息 message:thing2
2021-12-08 15:40:41.158 INFO 73269 --- [ntainer#0-0-C-1] c.a.kafka.service.KafkaConsumerService : topicOne 接收到的消息 message:thing1
复制代码
接收到了消息,并且消息都没有乱码。
DefaultKafkaProducerFactory 相关问题说明
flush 方法阻塞问题
ProducerFactory 的作用是创建生产者,当不使用事务的时候 DefaultKafkaProducerFactory 默认是创建一个单例的生产者给所有的客户端使用,这个时候就有一个问题,当其中的一个客户端调用 flush() 方法的时候,它就会将其它使用此生产者的客户端阻塞。为了解决这个问题,从 2.3 版本开始 DefaultKafkaProducerFactory 提供了一个新的属性 producerPerThread,当它被设置成 true 的时候,它就会为每一个客户端(线程)创建一个单独的生产者。
注意:
当 producerPerThread 被设置成 true 的时候,当客户端不在需要这个producer 的时候,必须调用 closeThreadBoundProducer() 来物理上的关闭 producer 和移除 ThreadLocal 中的变量。调用 reset() 或者 destroy() 是没有办法关闭的。
自定义序列化
从 2.3 版本开始,用户可以自定义序列化方式,不单单使用官方提供的序列化方式,自定义序列化参考如下代码:
@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}
@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}
复制代码
Producer 创建之后更新配置
从 2.5.10 版本开是,用户可以在创建完 producerFactory 之后,修改其配置,但是它不会影响已经存在的 Producer 如果想要更新已经存在 produer 的配置,需要调用 reset() 方法。修改配置参考如下代码:
void updateConfigs(Map<String, Object> updates);
void removeConfig(String configKey);
复制代码
注意:你不能将事务的生产者工厂修改非事务的,反之亦然。
ReplyingKafkaTemplate 的使用
ReplyingKafkaTemplate 是 KafkaTemplate 的子类,它主要是为了是实现 请求/回复 的场景的。它有两个额外的方法:
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
Duration replyTimeout);
复制代码
第一个方法和第二个方法的区别便是可不可以自定义相应时间,第一个不能自定义默认是 5s,第二个方法传入 replyTimeout 为 null 的话也默认是 5s。
Request 实例:
package com.aha.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.requestreply.RequestReplyFuture;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.SimpleKafkaHeaderMapper;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import java.util.concurrent.TimeUnit;
@SpringBootApplication
@Slf4j
public class KRequestingApplication {
public static void main(String[] args) {
SpringApplication.run(KRequestingApplication.class, args).close();
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
log.info("=== Sent ok: {}", sendResult.getRecordMetadata().toString());
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
log.info("=== Return value: {}", consumerRecord.value());
};
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
ProducerFactory<String, String> pf,
ConcurrentMessageListenerContainer<String, String> repliesContainer) {
// 创建 ReplyingKafkaTemplate 并设置了 reply 的 topic
return new ReplyingKafkaTemplate<>(pf, repliesContainer);
}
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> repliesContainer =
containerFactory.createContainer("kReplies");
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean
public NewTopic kReplies() {
return TopicBuilder.name("kReplies")
.partitions(10)
.replicas(2)
.build();
}
}
复制代码
reply 实例:
package com.aha.kafka;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.support.SimpleKafkaHeaderMapper;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.messaging.handler.annotation.SendTo;
/**
* @author: WT
* @date: 2021/12/8 17:24
*/
@SpringBootApplication
public class KReplyingApplication {
public static void main(String[] args) {
SpringApplication.run(KReplyingApplication.class, args);
}
@KafkaListener(id="server", topics = "kRequests")
// @SendTo (no properties): This is treated as !{source.headers['kafka_replyTopic']} (since version 2.1.3).
@SendTo // use default replyTo expression
public String listen(ConsumerRecord<String, String> record) {
// 在 headers 里面会记录 kafka_replyTopic 的值 , sendTo 不设置属性的时候 默认会去找个值进行转发
System.out.println(record.headers());
String message = record.value();
System.out.println("Server received: " + message);
return message.toUpperCase();
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean // not required if Jackson is on the classpath
public MessagingMessageConverter simpleMapperConverter() {
MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
return messagingMessageConverter;
}
}
复制代码
注意
@SendTo注解:
- 需要依赖
KafkaTemplate不能是它的子类@SendTo (no properties): This is treated as !{source.headers['kafka_replyTopic']} (since version 2.1.3).ConsumerRecord中记录kafka_replyTopic的值关于
ReplyingKafkaTemplate这边不在详细介绍了,因为在实际开发中没有遇到相应的场景,如果想要了解更多的话参考本文开头提供的官网网址。
采坑指南
每次消息都会偶尔失败
仔细排查日志发现每次到指定 partition 的时候才会出现问题,猜测应该是某个节点出现了问题,在 CMAK 中,看节点信息都是正常的,在 zk 中拉取 /broker/ids ,发现三个节点也都在;然后 telnet 对应的节点的 9092 端口,发现是不同的,排查了很多地方发现最终还是节点的问题,所以有些时候不要过于相信 CMAK 这些工具。




近期评论