接收消息
我们可以通过配置一个 MessageListenerContainer 或者使用 @KafkaListener 注解来接收消息。
当你接收消息的时候,你需要提供一个监听者去接收消息,目前支持消息监听者接口有以下八种:
// 自动提交 接收一条消息
public interface MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data);
}
// 手动提交 接收一条消息
public interface AcknowledgingMessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}
// 自动提交 接收一条消息 并提供对消费者的访问
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}
// 手动提交 接收一条消息 并提供对消费者的访问
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
// 下面这四个 与上面的类似 唯一的不同 便是 批量接收消息
public interface BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data);
}
public interface BatchAcknowledgingMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
复制代码
注意:
Consumer对象不是线程安全的,只能在监听器的线程上使用它的方法。- 禁止在监听器线程上执行任何影响
consumer位置或者提交offset的方法,因为容器需要管理这些信息。
消息监听容器
官方提供了 MessageListenerContainer 的两个实现:
KafkaMessageListenerContainer:使用单线程接收Topic或者Partition的所有消息。ConcurrentMessageListenerContainer:提供一个或者多个KafkaMessageListenerContainer的实例去消费消息。
从 2.2.7 版本开始,你可以在监听容器中添加 RecordInterceptor ,它可以在监听器监听到消息之前,对消息执行检查或者修改操作,如果 RecordInterceptor 返回 null 监听器就不会被调用了。
从 2.7 版本开始,RecordInterceptor 还添加了一些其他的方法,可以在退出监听容器之后被调用,处理一些正常的逻辑或者通过抛出异常;在这个版本也提供了对应 batchListener 的 BatchInterceptor。除此之外在 ConsumerAwareRecordInterceptor 和 BatchInterceptor 中提供了访问 Consumer<?, ?> 的方式,它可以被用于在拦截器中访问 consumer metrics。
注意:
与监听器类似,在上述的拦截器中也不能执行任何影响
consumer位置或者提交offset的方法,因为容器需要管理这些信息。
从 2.8 版本开始,拦截器执行的优先级是高于事务的,如果你想要执行事务的优先级高于拦截器的执行,可以将监听容器的 interceptBeforeTx 属性设置为 false。
从 2.4.6 版本开始,当并发超过 1 时,ConcurrentMessageListenerContainer 支持静态成员,group.instance.id 以 -n 结尾,n 从 1 开始。它与 session.timeout.ms 结合使用,可以减少重平衡时间的发生,例如:减少服务重启的时发生的重平衡的情况。
KafkaMessageListenerContainer 的使用
构造函数:
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
复制代码
它在 ContainerProperties 对象中接收 ConsumerFactory 的 Topic、Partition 和一些其它的配置信息,它的构造函数有以下几个:
// 接收一个 topicPartition 数组,明确的指明使用哪一个 partition 和可选的 offset (使用 consumer assign() 方法实现指定 partition的) 正值就是指定相应的 offset,负值就是使用当前分区默认的最后一个偏移量
// TopicPartitionOffset 还提供了一个额外的 boolean 参数,如果是 true,初始化的 offset 不论是正还是负都是相对于这个消费者的当前位置而言的
// 在容器启动的时候应用这个偏移量
public ContainerProperties(TopicPartitionOffset... topicPartitions)
// 接收 topics 的分组,根据 group.id 来进行分配 partition 的
public ContainerProperties(String... topics)
// 接收 topic 的正则表达是
public ContainerProperties(Pattern topicPattern)
复制代码
默认情况下 offset 的提交使用的日志级别是 debug 级别,从 2.1.2 版本开始,可以通过设置 ContainerProperties 的 commitLogLevel 属性,修改日志的级别。例如:将其设置为 INFO 级别:containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
从 2.2 版本,容器添加了一个新的属性 missingTopicsFatal (从 2.3.4 版本,它的默认值是 false)。
它的作用就是当容器启动的时候,如果 Topic 不存在,它就会阻止容器启动,它不适用于 regex 的模式。
在这个属性应用之前,容器会在 consumer.poll() 方法里面等待 Topic 的出现并打印很多日志,除了会打印许多日志,没有其他问题。
KafkaConsumer 中的 AuthenticationException 和 AuthorizationException 异常被认为是致命的错误,会导致容器停止。从 2.8 版本开始,可以设置 authExceptionRetryInterval 属性,每隔固定的时间去重新拉取权限信息,当权限被授予之后,是允许容器恢复正常运行的。
ConcurrentMessageListenerContainer 的使用
下面的整个构造函数与 KafkaListenerContainer 的类似:
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
复制代码
它具有 concurrency 属性。container.setConcurrency(3) 表示创建 3 个 KafkaMessageListenerContainer 。对于这个构造函数, Kafka 使用它的组管理能力,来进行分配分区。
注意:当我们监听多个
Topic的时候,Partition的默认分配策略可能不是你想要的效果。例如:
你创建三个都拥有 5 个分区的Topic,这时候你就想我要创建 15 消费者每个消费者消费一个分区的数据,于是你就设置了container.setConcurrency(15),但是实际运行起来你发现,只有 5 分消费者在消费数据,其他的 10 个都在闲置。这是因为Kafka默认的PartitionAssignor是RangeAssignor。你如果想达到上述的效果的话,需要设置partition.assignment.strategy属性,可以通过ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG或是通过Spring Boot的配置:
spring.kafka.consumer.properties.partition.assignment.strategy=
org.apache.kafka.clients.consumer.RoundRobinAssignor
复制代码
从 1.3 版本,MessageListenerContainer 提供对底层 kafkaConsumer 的 metrics 信息的访问,通过 clinetId 进行分组。
offset 的提交
对于 offset 的提交提供了很多选项,如果 enable.auto.commit 是 true,kafka 就会根据它自己的配置进行自动提交。如果它是 false,容器支持一些 AckMod,默认的 AckMod 是 Batch。2.3 版本之前默认的是 true ,2.3 版本及以后是 false。
消费者的 poll() 方法返回一个或者多个 ConsumerRecords。每一条消息都会调用 MessageListener 。当不使用事务的情况下,AckMod 的具体描述为:
RECORD:当监听器在处理消息之后提交偏移量BATCH:当poll()返回的所有消息都被处理后提交偏移量TIME:当poll()返回的所有消息都被处理后提交偏移量,只要超过上次提交以来的ackTime。COUNT:当poll()返回的所有消息都被处理后提交偏移量,只要从上次提交以来收到ackCount。COUNT_TIME:和Time,COUNT相同,只要满足二者的其中之一,就会提交偏移量。MANUAL:调用Acknowledgment.acknowledge()之后和BATCH进行相同的操作。MANUAL_IMMEDIATE:在Acknowledgment.acknowledge()立刻提交偏移量。
MANUAL和MANUAL_IMMEDIATE要求监听器是AcknowledgingMessageListener或者BatchAcknowledgingMessageListener。
当使用事务的时候,偏移量被提交到事务中,语义相当于 RECORD 或者 BATCH 具体是哪一个取决于监听器的类型是 record or batch。
容器有一个 syncCommit 的属性,要根据这个属性决定 consumer 使用 commitSync() 还是 commitAsync()。syncCommit 属性默认是 true,设置超时时间参考 setSyncCommitTimeout,参考 setCommitCallback 获取异步提交的结果;默认回调使用的是 LoggingCommitCallback,它会打印错误日志,在 debug 级别下也会打印成功日志。
Acknowledgment 介绍:
package org.springframework.kafka.support;
public interface Acknowledgment {
// 监听器控制何时提交 offset
void acknowledge();
// 2.3 版本新增的,用于 record listener
// nack 是 negative acknowledgement 缩写 意思为 拒收
// record listener 中调用这个方法后,会将挂起的 offset 进行提交,丢弃最后一次 poll 没有处理或者执行失败的消息,下一次 poll 会重新分发,sleep 参数可以在消息重新分发的时候暂停使消费者线程
default void nack(long sleep) {
throw new UnsupportedOperationException("nack(sleep) is not supported by this Acknowledgment");
}
// 2.3 版本新增的,用于 batch listener,如果作用于其它的 listener will throw an IllegalStateException
// 如果想要提交 batch 的一部分,使用这个方法,当使用事务的话 set the AckMode to MANUAL 当调用这个方法的时候会将处理成功的记录的偏移量进行提交。
// 相对于第一个 nack,这边多了一个 index 参数,这参数是指定批处理中错误发生的索引,这个索引前面的offset都会被正常提交,在这个索引后面寻找执行失败或被丢弃的消息以便下一次 poll 进行重新分发
default void nack(int index, long sleep) {
throw new UnsupportedOperationException("nack(index, sleep) is not supported by this Acknowledgment");
}
}
复制代码
注意:
要保证上一次的
poll方法消耗的时间加上sleep的时间小于max.poll.interval.ms
手动提交 offset
正常情况下,当使用 AckMode.MANUAL 或者 AckMode.MANUAL_IMMEDIATE 的时候,acknowledgment 只能被顺序的确认,因为 kafka 不能管理每一条记录的状态,只能为每一个 partition 或者 group 维护一个偏移量。
从 2.8 版本开始,可以设置容器的 asyncAcks 属性,它可以允许 acknowledgment 以任何的顺序进行确认。监听器将会推迟无序提交,指导丢失的消息被确认。消费者将会被暂停直到上一次 poll() 的偏移量都被提交。
注意:
虽然提供了异步处理消息机制,但是一般情况下它会增加消息处理失败重新分发的情况。
下面也展示了如何使用不同的 containerFactory:
@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
复制代码
@KafkaListener 注解的使用
@KafkaLister 注解的作用是声明一个 bean 为监听容器的一个监听器。
被这个注解标记的 bean 将会被包装成一个 MessagingMessageListenerAdapter ,它提供了很多特性。例如:它可以自动适配你方法所需要的参数。
你可以通过 SpEL 以 #{} 的形式或者使用属性占位符 (${}) 来配置大多数的属性。
消息监听
@KafkaListener 注解的简单示例:
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
}
}
复制代码
默认使用的监听容器是 kafkaListenerContainerFactory,如果要 ConcurrentMessageListenerContainer 需要编写配置类,并且添加 @EnableKafka 注解,以下是配置 ConcurrentMessageListenerContainer 的示例:
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
...
return props;
}
}
复制代码
在设置监听容器的属性的时候必须先调用 getContainerProperties() 方法,然后在执行 setXXX 来设置容器的具体属性。
从 2.2 版本,你可以在 @KafkaListener 注解覆盖容器工厂设置的 concurrency 和 autoStartup 属性,参考以下示例:
@KafkaListener(id = "myListener", topics = "myTopic",
autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
...
}
复制代码
autoStartup:是否自动启动,默认是 true。
指定消费的分区
@KafkaListener(id = "myId", topicPartitions = {
@TopicPartition(topic = "topic1",
partitions = "0-10",
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")),
@TopicPartition(topic = "topic2",
partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(String message) {
System.out.println(message);
}
复制代码
@TopicPartition 指定消费哪一个 Topic 的哪些分区,并且还可以指定分区从哪一个 offset 开始进行消费。
// 指定消费 topic1 的 0-10 分区,这些分区都从 0 开始消费。
@TopicPartition(topic = "topic1",
partitions = "0-10",
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
复制代码
注意:
每一个
TopicPartition只能有一个带有通配符的partitionOffsets。
指定 containerFactory
@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
复制代码
Record 的元数据介绍
KafkaHeaders.OFFSETKafkaHeaders.RECEIVED_MESSAGE_KEYKafkaHeaders.RECEIVED_TOPICKafkaHeaders.RECEIVED_PARTITION_IDKafkaHeaders.RECEIVED_TIMESTAMPKafkaHeaders.TIMESTAMP_TYPE
从 2.5 版本开始,当
RECEIVED_MESSAGE_KEY为空的时候就不会展示,以前的版本是会展示null,这种改变是为了与spring-measssge持久化规范保持一致。
使用这些消息头的示例:
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
...
}
复制代码
从 2.5 版本开始,可以使用一个 ConsumerRecordMetadata 对象来接收这些元数据信息,但是它不包含消息的 key 和 value 信息:
@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
...
}
复制代码
批量监听
从 1.1 版本开始,使用 @KafkaListener 可以接收消费者 poll 的整批数据。代码示例如下:
配置 batchFactory:
/**
* 配置批量监听容器 - 底层也是使用 ConcurrentKafkaListenerContainerFactory
* @return batchFactory
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> batchFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>(3);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getBootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaConfiguration.getConsumer().getKeyDeserializer());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaConfiguration.getConsumer().getValueDeserializer());
return props;
}
复制代码
消费者示例:
/**
* 批量消费 topic1 的数据
* @param messageList 消息集合
*/
@KafkaListener(id = "listenBatch", topics = "topic2", containerFactory = "batchFactory", groupId = "listenBatch")
public void listenBatch(List<String> messageList) {
log.info("========== messageList: {}, messageList - size: {}", messageList, messageList.size());
}
复制代码
批量监听时也可以使用 header 中数据,示例如下:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
复制代码
或者你直接使用 List<Message<?>> 或者 List<ConsumerRecord<Integer, String>> 来进行接收,示例如下:
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen14(List<Message<?>> list) {
...
}
@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen15(List<Message<?>> list, Acknowledgment ack) {
...
}
@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen16(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
...
}
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
...
}
@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
...
}
复制代码
@KafkaListener 注解属性说明
group.id相关:从 2.0 版本开始,id属性如果存在,它就会被用作group.id,会覆盖在配置consumer factory时指定的group.id。如果你也可以不使用id作为group.id,这样你就需要在@KafkaListener中显式的指定group.id;如果你不想让id覆盖consumer factory中指定的group.id,这样你就需要将idIsGroup属性设置为false。- 直接指定消费者属性: 从 2.4 版本开始我们可以直接在这个注解中指定消费者的属性,它会覆盖你在创建
consumer factory指定的一些消费者配置。示例如下:
@KafkaListener(topics = "myTopic", groupId = "group", properties = {
"max.poll.interval.ms:60000",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})
// 在讲解 RoutingKafkaTemplate 时也用到了
@KafkaListener(id = "one", topics = "one")
public void listen1(String in) {
System.out.println("1: " + in);
}
@KafkaListener(id = "two", topics = "two",
properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
System.out.println("2: " + new String(in));
}
复制代码
获取消费者的 group.id
当运行在不同的容器中运行了相同的监听器的时候,你要区分这条消息是哪一个监听器监听到的,这个时候可以通过 group.id 这个属性来区分。 获取 group.id 可以在监听线程中运行 KafkaUtils.getConsumerGroupId() 或者通过方法参数的方式,以下为代码示例:
@KafkaListener(id = "bar", topicPattern = "${topicTwo:annotated2}", exposeGroupId = "${always:true}")
public void listener(@Payload String foo,
@Header(KafkaHeaders.GROUP_ID) String groupId) {
...
}
复制代码
注意:
上面这种形式适用于接收
List<?>参数的batchListener但是不适用于接收ConsumerRecords<?, ?>的batchListener。接收ConsumerRecords<?, ?>的batchListener可以使用KafkaUtils.getConsumerGroupId()的方式。
@KafkaListener 作为元注解使用
从 2.2 版本开始支持,参考以下示例:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@KafkaListener
public @interface MyThreeConsumersListener {
@AliasFor(annotation = KafkaListener.class, attribute = "id")
String id();
@AliasFor(annotation = KafkaListener.class, attribute = "topics")
String[] topics();
@AliasFor(annotation = KafkaListener.class, attribute = "concurrency")
String concurrency() default "3";
}
复制代码
你必须指定 topics, topicPattern, topicPartitions 其中的一个,自定义注解的使用参考:
@MyThreeConsumersListener(id = "my.group", topics = "my.topic")
public void listen1(String in) {
...
}
复制代码
@KafkaListener 作用到类上
当使用 @KafkaListener 作用到类上时,你必须在方法上 @KafkaHander,如果你在多个方法上指定了 @KafkaHander,那么在分发消息的时候使用哪一个取决于 playload 的类型,参考以下代码示例:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String foo) {
...
}
@KafkaHandler
public void listen(Integer bar) {
...
}
@KafkaHandler(isDefault = true)
public void listenDefault(Object object) {
...
}
}
复制代码
从 2.1.3 版本开始,可以指定默认的 @KafkaHandler 方法,当其它的方法都不匹配的时候,默认就会走这个方法,最多只能指定一个默认的方法。因为 Spring 解析参数的一些限制,你要获取默认的 KafkaHandler 消息头中的元数据,只能使用 ConsumerRecordMetadata 来接收,反例:
@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
...
}
复制代码
正例:
@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
String topic = meta.topic();
...
}
复制代码
@KafkaListener 的生命周期
通过 @KafkaListener 创建的监听容器,不是以 Bean 的形式被 Spring 容器管理的,它是被注册到一个名为 KafkaListenerEndpointRegistry 基础设施 bean,这个 bean 通过框架自动声明并且管理生命周期。当 autoStartup 设置为 true 的监听容器都会在同一阶段被启动。但让你也可以手动的管理什么时候将监听容器注册到 register 中,参考下面的示例:
@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
// ====
@Autowired
private KafkaListenerEndpointRegistry registry;
...
this.registry.getListenerContainer("myContainer").start();
...
复制代码
校验 @KafkaListener 的 playload
从 2.2 版本开始,可以更加简单的校验 playload,从前你要校验 playload 需要自定义校验器,现在你可以使用 springBoot 自动注入的 LocalValidatorFactoryBean,参考一下代码:
@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {
@Autowired
private LocalValidatorFactoryBean validator;
...
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(this.validator);
}
}
// =====
public static class ValidatedClass {
@Max(10)
private int bar;
public int getBar() {
return this.bar;
}
public void setBar(int bar) {
this.bar = bar;
}
}
// ====
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
...
}
@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
return (m, e) -> {
...
};
}
复制代码
从 2.5.11 版本开始这种方法也适用于 @KafkaListener 作用于类时,加在 @KafkaHandler 上。
@SendTo 注解的使用
从 2.0 版本开始,你可以将 @KafkaListener 与 @SendTo 注解一起使用,如果被修饰的方法有返回值的话,这个返回值将会被发送给 @SendTo 指定的 Topic 。
@SendTo 主要有以几种使用方式:
@SendTo("someTopic")路由到指定的Topic.@SendTo("#{someExpression}")路由到表达式指定的Topic,表达式的取值是容器上下文初始化期间确定的。@SendTo("!{someExpression}")路由到表达式指定的Topic,取值于以下三个属性:request:入参中的ConsumerRecord或者批量监听ConsumerRecordssource:从request中转化而来的org.springframework.messaging.Message<?>.result: 方法返回值
@SendTo:不添加任何属性,从 2.1.3 版本出现的,直接取值于!{source.headers['kafka_replyTopic']}。- 从
2.1.11版本之后,属性占位符也可以被解析
使用表达式解析出来的值必须是字符串,下面是一些参考示例:
@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
...
}
@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
...
}
@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {
@KafkaHandler
public String foo(String in) {
...
}
@KafkaHandler
@SendTo("!{'annotated25reply2'}")
public String bar(@Payload(required = false) KafkaNull nul,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
...
}
}
复制代码
注意:
为了支持
sendTo注解,监听工厂必须在replyTemplate属性中设置一个KafkaTemplate来用于回复,而不是ReplyingKafkaTemplate。当你使用Spring Boot的时候,是会自动装配的,没有使用的时候参考以下示例:
从 2.2 版本开始,你可以将 ReplyHeadersConfigurer 添加到侦听器容器工厂,参考以下示例:
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
return factory;
}
// =====
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {
@Override
public boolean shouldCopy(String headerName, Object headerValue) {
return false;
}
@Override
public Map<String, Object> additionalHeaders() {
return Collections.singletonMap("qux", "fiz");
}
});
return factory;
}
复制代码
使用 kafkaTemplate 接收消息
从 2.8 版本开始,可以使用 KafkaTemplate 来接收消息,它提供了 receive() 方法:
ConsumerRecord<K, V> receive(String topic, int partition, long offset);
ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout);
ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested);
ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout);
复制代码
根据方法的参数可以看出,你需要知道消息的具体的 partition 和 offset,在使用这个方法的时候,它会给每一步操作都创建一个消费者。
使用最后两种方法,每个记录都被单独检索,并将结果组装到一个 ConsumerRecords 对象中。为请求创建 TopicPartitionOffset 时,仅支持正的绝对偏移量。
附录1:Spring Boot yaml 文件参考
spring:
kafka:
## 生产者配置,如果本实例只是消费者,可以不配置该部分
producer:
# client id,随意配置,不可重复
client-id: boot-producer
# kafka 服务地址,ip + 端口
bootstrap-servers: 10.211.55.3:9092,10.211.55.4:9092,10.211.55.5:9092
# 用于序列化的工具类
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 消息发送失败情况下的重试次数
retries: 1
# 批量上传的 buffer size,可以是消息数量,也可以是内存容量
batch-size: 10000
buffer-memory: 300000
# 等待副本同步之后才确认消息发送成功,可选的值有 0,1,-1,all; -1 和 all 是等价的
# 设置为 0 的意思是不等待任何副本同步完成就直接返回
# 设置为 1 的意思是只等待 leader 同步完成 并写入本地日志
# all 的意思是全部的 ISR 副本同步完才确认,但是速度会比较慢
acks: 1
## 消费者配置,如果本实例只是生产者,可以不配置该部分
consumer:
# client id,随意配置,不可重复
client-id: boot-consumer
# 消费者分组 id,同一组别的不同消费者共同消费一份数据
group-id: consumer-group-1
# kafka 服务地址,ip + 端口
bootstrap-servers: 10.211.55.3:9092,10.211.55.4:9092,10.211.55.5:9092
# 用于反序列化的工具类
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 自动更新 offset
enable-auto-commit: true
# 如果 enable-auto-commit 设置为 true,则每隔一段时间提交一次 offset
# 时间单位为毫秒,默认值 5000 (5s)
auto-commit-interval: 1000
# offset 偏移量
# earliest 代表从头开始消费,lastest 代表从新产生的部分开始消费
auto-offset-reset: earliest
复制代码




近期评论