Kafka-3:SpringforKafka-2

接收消息

我们可以通过配置一个 MessageListenerContainer 或者使用 @KafkaListener 注解来接收消息。

当你接收消息的时候,你需要提供一个监听者去接收消息,目前支持消息监听者接口有以下八种:

// 自动提交 接收一条消息
public interface MessageListener<K, V> { 

    void onMessage(ConsumerRecord<K, V> data);

}

// 手动提交 接收一条消息
public interface AcknowledgingMessageListener<K, V> { 

    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);

}

// 自动提交 接收一条消息 并提供对消费者的访问
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 

    void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);

}

// 手动提交 接收一条消息 并提供对消费者的访问
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 

    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);

}

// 下面这四个 与上面的类似 唯一的不同 便是 批量接收消息
public interface BatchMessageListener<K, V> { 

    void onMessage(List<ConsumerRecord<K, V>> data);

}

public interface BatchAcknowledgingMessageListener<K, V> { 

    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);

}

public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 

    void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);

}

public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 

    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);

}
复制代码

注意:

  1. Consumer 对象不是线程安全的,只能在监听器的线程上使用它的方法。
  2. 禁止在监听器线程上执行任何影响 consumer 位置或者提交 offset 的方法,因为容器需要管理这些信息。

消息监听容器

官方提供了 MessageListenerContainer 的两个实现:

  1. KafkaMessageListenerContainer :使用单线程接收 Topic 或者 Partition 的所有消息。
  2. ConcurrentMessageListenerContainer:提供一个或者多个 KafkaMessageListenerContainer 的实例去消费消息。

从 2.2.7 版本开始,你可以在监听容器中添加 RecordInterceptor ,它可以在监听器监听到消息之前,对消息执行检查或者修改操作,如果 RecordInterceptor 返回 null 监听器就不会被调用了。

从 2.7 版本开始,RecordInterceptor 还添加了一些其他的方法,可以在退出监听容器之后被调用,处理一些正常的逻辑或者通过抛出异常;在这个版本也提供了对应 batchListenerBatchInterceptor。除此之外在 ConsumerAwareRecordInterceptorBatchInterceptor 中提供了访问 Consumer<?, ?> 的方式,它可以被用于在拦截器中访问 consumer metrics

注意:

与监听器类似,在上述的拦截器中也不能执行任何影响 consumer 位置或者提交 offset 的方法,因为容器需要管理这些信息。

从 2.8 版本开始,拦截器执行的优先级是高于事务的,如果你想要执行事务的优先级高于拦截器的执行,可以将监听容器的 interceptBeforeTx 属性设置为 false

从 2.4.6 版本开始,当并发超过 1 时,ConcurrentMessageListenerContainer 支持静态成员,group.instance.id-n 结尾,n 从 1 开始。它与 session.timeout.ms 结合使用,可以减少重平衡时间的发生,例如:减少服务重启的时发生的重平衡的情况。

KafkaMessageListenerContainer 的使用

构造函数:

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties)
复制代码

它在 ContainerProperties 对象中接收 ConsumerFactoryTopicPartition 和一些其它的配置信息,它的构造函数有以下几个:

// 接收一个 topicPartition 数组,明确的指明使用哪一个 partition 和可选的 offset (使用 consumer assign() 方法实现指定 partition的) 正值就是指定相应的 offset,负值就是使用当前分区默认的最后一个偏移量
// TopicPartitionOffset 还提供了一个额外的 boolean 参数,如果是 true,初始化的 offset 不论是正还是负都是相对于这个消费者的当前位置而言的
// 在容器启动的时候应用这个偏移量
public ContainerProperties(TopicPartitionOffset... topicPartitions)

// 接收 topics 的分组,根据 group.id 来进行分配 partition 的
public ContainerProperties(String... topics)

// 接收 topic 的正则表达是
public ContainerProperties(Pattern topicPattern)
复制代码

默认情况下 offset 的提交使用的日志级别是 debug 级别,从 2.1.2 版本开始,可以通过设置 ContainerPropertiescommitLogLevel 属性,修改日志的级别。例如:将其设置为 INFO 级别:containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);

从 2.2 版本,容器添加了一个新的属性 missingTopicsFatal (从 2.3.4 版本,它的默认值是 false)。

它的作用就是当容器启动的时候,如果 Topic 不存在,它就会阻止容器启动,它不适用于 regex 的模式。

在这个属性应用之前,容器会在 consumer.poll() 方法里面等待 Topic 的出现并打印很多日志,除了会打印许多日志,没有其他问题。

KafkaConsumer 中的 AuthenticationExceptionAuthorizationException 异常被认为是致命的错误,会导致容器停止。从 2.8 版本开始,可以设置 authExceptionRetryInterval 属性,每隔固定的时间去重新拉取权限信息,当权限被授予之后,是允许容器恢复正常运行的。

ConcurrentMessageListenerContainer 的使用

下面的整个构造函数与 KafkaListenerContainer 的类似:

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                            ContainerProperties containerProperties)
复制代码

它具有 concurrency 属性。container.setConcurrency(3) 表示创建 3 个 KafkaMessageListenerContainer 。对于这个构造函数, Kafka 使用它的组管理能力,来进行分配分区。

注意:当我们监听多个 Topic 的时候,Partition 的默认分配策略可能不是你想要的效果。

例如:
你创建三个都拥有 5 个分区的 Topic,这时候你就想我要创建 15 消费者每个消费者消费一个分区的数据,于是你就设置了 container.setConcurrency(15),但是实际运行起来你发现,只有 5 分消费者在消费数据,其他的 10 个都在闲置。这是因为 Kafka 默认的 PartitionAssignorRangeAssignor。你如果想达到上述的效果的话,需要设置 partition.assignment.strategy 属性,可以通过 ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG 或是通过 Spring Boot 的配置:

spring.kafka.consumer.properties.partition.assignment.strategy=
org.apache.kafka.clients.consumer.RoundRobinAssignor
复制代码

从 1.3 版本,MessageListenerContainer 提供对底层 kafkaConsumermetrics 信息的访问,通过 clinetId 进行分组。

offset 的提交

对于 offset 的提交提供了很多选项,如果 enable.auto.committruekafka 就会根据它自己的配置进行自动提交。如果它是 false,容器支持一些 AckMod,默认的 AckModBatch。2.3 版本之前默认的是 true ,2.3 版本及以后是 false

消费者的 poll() 方法返回一个或者多个 ConsumerRecords。每一条消息都会调用 MessageListener 。当不使用事务的情况下,AckMod 的具体描述为:

  1. RECORD:当监听器在处理消息之后提交偏移量
  2. BATCH:当 poll() 返回的所有消息都被处理后提交偏移量
  3. TIME:当 poll() 返回的所有消息都被处理后提交偏移量,只要超过上次提交以来的 ackTime
  4. COUNT:当 poll() 返回的所有消息都被处理后提交偏移量,只要从上次提交以来收到 ackCount
  5. COUNT_TIME:和 Time , COUNT 相同,只要满足二者的其中之一,就会提交偏移量。
  6. MANUAL:调用 Acknowledgment.acknowledge() 之后和 BATCH 进行相同的操作。
  7. MANUAL_IMMEDIATE:在 Acknowledgment.acknowledge() 立刻提交偏移量。

MANUALMANUAL_IMMEDIATE 要求监听器是 AcknowledgingMessageListener 或者 BatchAcknowledgingMessageListener

当使用事务的时候,偏移量被提交到事务中,语义相当于 RECORD 或者 BATCH 具体是哪一个取决于监听器的类型是 record or batch

容器有一个 syncCommit 的属性,要根据这个属性决定 consumer 使用 commitSync() 还是 commitAsync()syncCommit 属性默认是 true,设置超时时间参考 setSyncCommitTimeout,参考 setCommitCallback 获取异步提交的结果;默认回调使用的是 LoggingCommitCallback,它会打印错误日志,在 debug 级别下也会打印成功日志。

Acknowledgment 介绍:

package org.springframework.kafka.support;


public interface Acknowledgment {

    // 监听器控制何时提交 offset
	void acknowledge();

    // 2.3 版本新增的,用于 record listener
    // nack 是 negative acknowledgement 缩写 意思为 拒收
    // record listener 中调用这个方法后,会将挂起的 offset 进行提交,丢弃最后一次 poll 没有处理或者执行失败的消息,下一次 poll 会重新分发,sleep 参数可以在消息重新分发的时候暂停使消费者线程
	default void nack(long sleep) {
		throw new UnsupportedOperationException("nack(sleep) is not supported by this Acknowledgment");
	}

    // 2.3 版本新增的,用于 batch listener,如果作用于其它的 listener  will throw an IllegalStateException
    // 如果想要提交 batch 的一部分,使用这个方法,当使用事务的话 set the AckMode to MANUAL 当调用这个方法的时候会将处理成功的记录的偏移量进行提交。
    // 相对于第一个 nack,这边多了一个 index 参数,这参数是指定批处理中错误发生的索引,这个索引前面的offset都会被正常提交,在这个索引后面寻找执行失败或被丢弃的消息以便下一次 poll 进行重新分发
	default void nack(int index, long sleep) {
		throw new UnsupportedOperationException("nack(index, sleep) is not supported by this Acknowledgment");
	}

}
复制代码

注意:

要保证上一次的 poll 方法消耗的时间加上 sleep 的时间小于 max.poll.interval.ms

手动提交 offset

正常情况下,当使用 AckMode.MANUAL 或者 AckMode.MANUAL_IMMEDIATE 的时候,acknowledgment 只能被顺序的确认,因为 kafka 不能管理每一条记录的状态,只能为每一个 partition 或者 group 维护一个偏移量。

从 2.8 版本开始,可以设置容器的 asyncAcks 属性,它可以允许 acknowledgment 以任何的顺序进行确认。监听器将会推迟无序提交,指导丢失的消息被确认。消费者将会被暂停直到上一次 poll() 的偏移量都被提交。

注意:

虽然提供了异步处理消息机制,但是一般情况下它会增加消息处理失败重新分发的情况。

下面也展示了如何使用不同的 containerFactory

@KafkaListener(id = "cat", topics = "myTopic",
          containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
}
复制代码

@KafkaListener 注解的使用

@KafkaLister 注解的作用是声明一个 bean 为监听容器的一个监听器。

被这个注解标记的 bean 将会被包装成一个 MessagingMessageListenerAdapter ,它提供了很多特性。例如:它可以自动适配你方法所需要的参数。

你可以通过 SpEL#{} 的形式或者使用属性占位符 (${}) 来配置大多数的属性。

消息监听

@KafkaListener 注解的简单示例:

public class Listener {

    @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
    public void listen(String data) {
        ...
    }

}
复制代码

默认使用的监听容器是 kafkaListenerContainerFactory,如果要 ConcurrentMessageListenerContainer 需要编写配置类,并且添加 @EnableKafka 注解,以下是配置 ConcurrentMessageListenerContainer 的示例:

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }
}
复制代码

在设置监听容器的属性的时候必须先调用 getContainerProperties() 方法,然后在执行 setXXX 来设置容器的具体属性。

从 2.2 版本,你可以在 @KafkaListener 注解覆盖容器工厂设置的 concurrencyautoStartup 属性,参考以下示例:

@KafkaListener(id = "myListener", topics = "myTopic",
        autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
    ...
}
复制代码

autoStartup:是否自动启动,默认是 true

指定消费的分区

@KafkaListener(id = "myId", topicPartitions = {
    @TopicPartition(topic = "topic1",
                    partitions = "0-10",
                    partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")),
    @TopicPartition(topic = "topic2", 
                    partitions = "0",
             		partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(String message) {
    System.out.println(message);
}
复制代码

@TopicPartition 指定消费哪一个 Topic 的哪些分区,并且还可以指定分区从哪一个 offset 开始进行消费。

// 指定消费 topic1 的 0-10 分区,这些分区都从 0 开始消费。
@TopicPartition(topic = "topic1",
                    partitions = "0-10",
                    partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
复制代码

注意:

每一个 TopicPartition 只能有一个带有通配符的 partitionOffsets

指定 containerFactory

@KafkaListener(id = "cat", topics = "myTopic",
          containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
}
复制代码

Record 的元数据介绍

  1. KafkaHeaders.OFFSET
  2. KafkaHeaders.RECEIVED_MESSAGE_KEY
  3. KafkaHeaders.RECEIVED_TOPIC
  4. KafkaHeaders.RECEIVED_PARTITION_ID
  5. KafkaHeaders.RECEIVED_TIMESTAMP
  6. KafkaHeaders.TIMESTAMP_TYPE

从 2.5 版本开始,当 RECEIVED_MESSAGE_KEY 为空的时候就不会展示,以前的版本是会展示 null,这种改变是为了与 spring-measssge 持久化规范保持一致。

使用这些消息头的示例:

@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
        @Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
        ) {
    ...
}
复制代码

从 2.5 版本开始,可以使用一个 ConsumerRecordMetadata 对象来接收这些元数据信息,但是它不包含消息的 keyvalue 信息:

@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
    ...
}
复制代码

批量监听

从 1.1 版本开始,使用 @KafkaListener 可以接收消费者 poll 的整批数据。代码示例如下:

配置 batchFactory

/**
  * 配置批量监听容器 - 底层也是使用 ConcurrentKafkaListenerContainerFactory
  * @return batchFactory
  */
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> batchFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    return factory;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>(3);
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getBootstrapServers());
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaConfiguration.getConsumer().getKeyDeserializer());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaConfiguration.getConsumer().getValueDeserializer());
    return props;
}
复制代码

消费者示例:

/**
  * 批量消费 topic1 的数据
  * @param messageList 消息集合
  */
@KafkaListener(id = "listenBatch", topics = "topic2", containerFactory = "batchFactory", groupId = "listenBatch")
public void listenBatch(List<String> messageList) {
    log.info("========== messageList: {}, messageList - size: {}", messageList, messageList.size());
}
复制代码

批量监听时也可以使用 header 中数据,示例如下:

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}
复制代码

或者你直接使用 List<Message<?>> 或者 List<ConsumerRecord<Integer, String>> 来进行接收,示例如下:

@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen14(List<Message<?>> list) {
    ...
}

@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen15(List<Message<?>> list, Acknowledgment ack) {
    ...
}

@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen16(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
    ...
}

@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
    ...
}

@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
    ...
}
复制代码

@KafkaListener 注解属性说明

  1. group.id 相关:从 2.0 版本开始,id 属性如果存在,它就会被用作 group.id,会覆盖在配置 consumer factory 时指定的 group.id 。如果你也可以不使用 id 作为 group.id,这样你就需要在 @KafkaListener 中显式的指定 group.id;如果你不想让 id 覆盖 consumer factory 中指定的 group.id,这样你就需要将 idIsGroup 属性设置为 false
  2. 直接指定消费者属性: 从 2.4 版本开始我们可以直接在这个注解中指定消费者的属性,它会覆盖你在创建 consumer factory 指定的一些消费者配置。示例如下:
@KafkaListener(topics = "myTopic", groupId = "group", properties = {
    "max.poll.interval.ms:60000",
    ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})

// 在讲解 RoutingKafkaTemplate 时也用到了
@KafkaListener(id = "one", topics = "one")
public void listen1(String in) {
    System.out.println("1: " + in);
}

@KafkaListener(id = "two", topics = "two",
        properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
    System.out.println("2: " + new String(in));
}
复制代码

获取消费者的 group.id

当运行在不同的容器中运行了相同的监听器的时候,你要区分这条消息是哪一个监听器监听到的,这个时候可以通过 group.id 这个属性来区分。 获取 group.id 可以在监听线程中运行 KafkaUtils.getConsumerGroupId() 或者通过方法参数的方式,以下为代码示例:

@KafkaListener(id = "bar", topicPattern = "${topicTwo:annotated2}", exposeGroupId = "${always:true}")
public void listener(@Payload String foo,
        @Header(KafkaHeaders.GROUP_ID) String groupId) {
...
}
复制代码

注意:

上面这种形式适用于接收 List<?> 参数的 batchListener 但是不适用于接收 ConsumerRecords<?, ?>batchListener。接收 ConsumerRecords<?, ?>batchListener 可以使用 KafkaUtils.getConsumerGroupId() 的方式。

@KafkaListener 作为元注解使用

从 2.2 版本开始支持,参考以下示例:

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@KafkaListener
public @interface MyThreeConsumersListener {

    @AliasFor(annotation = KafkaListener.class, attribute = "id")
    String id();

    @AliasFor(annotation = KafkaListener.class, attribute = "topics")
    String[] topics();

    @AliasFor(annotation = KafkaListener.class, attribute = "concurrency")
    String concurrency() default "3";

}
复制代码

你必须指定 topics, topicPattern, topicPartitions 其中的一个,自定义注解的使用参考:

@MyThreeConsumersListener(id = "my.group", topics = "my.topic")
public void listen1(String in) {
    ...
}
复制代码

@KafkaListener 作用到类上

当使用 @KafkaListener 作用到类上时,你必须在方法上 @KafkaHander,如果你在多个方法上指定了 @KafkaHander,那么在分发消息的时候使用哪一个取决于 playload 的类型,参考以下代码示例:

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String foo) {
        ...
    }

    @KafkaHandler
    public void listen(Integer bar) {
        ...
    }

    @KafkaHandler(isDefault = true)
    public void listenDefault(Object object) {
        ...
    }

}
复制代码

从 2.1.3 版本开始,可以指定默认的 @KafkaHandler 方法,当其它的方法都不匹配的时候,默认就会走这个方法,最多只能指定一个默认的方法。因为 Spring 解析参数的一些限制,你要获取默认的 KafkaHandler 消息头中的元数据,只能使用 ConsumerRecordMetadata 来接收,反例:

@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    ...
}
复制代码

正例:

@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
    String topic = meta.topic();
    ...
}
复制代码

@KafkaListener 的生命周期

通过 @KafkaListener 创建的监听容器,不是以 Bean 的形式被 Spring 容器管理的,它是被注册到一个名为 KafkaListenerEndpointRegistry 基础设施 bean,这个 bean 通过框架自动声明并且管理生命周期。当 autoStartup 设置为 true 的监听容器都会在同一阶段被启动。但让你也可以手动的管理什么时候将监听容器注册到 register 中,参考下面的示例:

@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }

// ====

@Autowired
private KafkaListenerEndpointRegistry registry;

...

    this.registry.getListenerContainer("myContainer").start();

...
复制代码

校验 @KafkaListener 的 playload

从 2.2 版本开始,可以更加简单的校验 playload,从前你要校验 playload 需要自定义校验器,现在你可以使用 springBoot 自动注入的 LocalValidatorFactoryBean,参考一下代码:

@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

    @Autowired
    private LocalValidatorFactoryBean validator;
    ...

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(this.validator);
    }
}

// =====

public static class ValidatedClass {

  @Max(10)
  private int bar;

  public int getBar() {
    return this.bar;
  }

  public void setBar(int bar) {
    this.bar = bar;
  }

}

// ====

@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
      containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
    ...
}

@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
    return (m, e) -> {
        ...
    };
}
复制代码

从 2.5.11 版本开始这种方法也适用于 @KafkaListener 作用于类时,加在 @KafkaHandler 上。

@SendTo 注解的使用

从 2.0 版本开始,你可以将 @KafkaListener@SendTo 注解一起使用,如果被修饰的方法有返回值的话,这个返回值将会被发送给 @SendTo 指定的 Topic

@SendTo 主要有以几种使用方式:

  1. @SendTo("someTopic") 路由到指定的 Topic .
  2. @SendTo("#{someExpression}") 路由到表达式指定的 Topic ,表达式的取值是容器上下文初始化期间确定的。
  3. @SendTo("!{someExpression}") 路由到表达式指定的 Topic,取值于以下三个属性:
    1. request:入参中的 ConsumerRecord 或者批量监听 ConsumerRecords
    2. source :从 request 中转化而来的 org.springframework.messaging.Message<?> .
    3. result: 方法返回值
  4. @SendTo:不添加任何属性,从 2.1.3 版本出现的,直接取值于 !{source.headers['kafka_replyTopic']}
  5. 2.1.11 版本之后,属性占位符也可以被解析

使用表达式解析出来的值必须是字符串,下面是一些参考示例:

@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
    ...
}

@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
    ...
}

@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
    ...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {

    @KafkaHandler
    public String foo(String in) {
        ...
    }

    @KafkaHandler
    @SendTo("!{'annotated25reply2'}")
    public String bar(@Payload(required = false) KafkaNull nul,
            @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
        ...
    }

}
复制代码

注意:

为了支持 sendTo 注解,监听工厂必须在 replyTemplate 属性中设置一个 KafkaTemplate 来用于回复,而不是 ReplyingKafkaTemplate。当你使用 Spring Boot 的时候,是会自动装配的,没有使用的时候参考以下示例:

从 2.2 版本开始,你可以将 ReplyHeadersConfigurer 添加到侦听器容器工厂,参考以下示例:

@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(cf());
    factory.setReplyTemplate(template());
    factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
    return factory;
}

// =====


@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(cf());
    factory.setReplyTemplate(template());
    factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {

      @Override
      public boolean shouldCopy(String headerName, Object headerValue) {
        return false;
      }

      @Override
      public Map<String, Object> additionalHeaders() {
        return Collections.singletonMap("qux", "fiz");
      }

    });
    return factory;
}
复制代码

使用 kafkaTemplate 接收消息

从 2.8 版本开始,可以使用 KafkaTemplate 来接收消息,它提供了 receive() 方法:

ConsumerRecord<K, V> receive(String topic, int partition, long offset);

ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout);

ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested);

ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout);
复制代码

根据方法的参数可以看出,你需要知道消息的具体的 partition 和 offset,在使用这个方法的时候,它会给每一步操作都创建一个消费者。

使用最后两种方法,每个记录都被单独检索,并将结果组装到一个 ConsumerRecords 对象中。为请求创建 TopicPartitionOffset 时,仅支持正的绝对偏移量。

附录1:Spring Boot yaml 文件参考

spring:
  kafka:
    ## 生产者配置,如果本实例只是消费者,可以不配置该部分
    producer:
      # client id,随意配置,不可重复
      client-id: boot-producer
      # kafka 服务地址,ip + 端口
      bootstrap-servers: 10.211.55.3:9092,10.211.55.4:9092,10.211.55.5:9092
      # 用于序列化的工具类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 消息发送失败情况下的重试次数
      retries: 1
      # 批量上传的 buffer size,可以是消息数量,也可以是内存容量
      batch-size: 10000
      buffer-memory: 300000
      # 等待副本同步之后才确认消息发送成功,可选的值有 0,1,-1,all; -1 和 all 是等价的
      # 设置为 0 的意思是不等待任何副本同步完成就直接返回
      # 设置为 1 的意思是只等待 leader 同步完成 并写入本地日志
      # all 的意思是全部的 ISR 副本同步完才确认,但是速度会比较慢
      acks: 1

    ## 消费者配置,如果本实例只是生产者,可以不配置该部分
    consumer:
      # client id,随意配置,不可重复
      client-id: boot-consumer
      # 消费者分组 id,同一组别的不同消费者共同消费一份数据
      group-id: consumer-group-1
      # kafka 服务地址,ip + 端口
      bootstrap-servers: 10.211.55.3:9092,10.211.55.4:9092,10.211.55.5:9092
      # 用于反序列化的工具类
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 自动更新 offset
      enable-auto-commit: true
      # 如果 enable-auto-commit 设置为 true,则每隔一段时间提交一次 offset
      # 时间单位为毫秒,默认值 5000 (5s)
      auto-commit-interval: 1000
      # offset 偏移量
      # earliest 代表从头开始消费,lastest 代表从新产生的部分开始消费
      auto-offset-reset: earliest
复制代码