Kafka-2:SpringforKafka-1

版本说明

版本对应的问题请参考:Spring for Apahe Kafka

本文基于 Spring for Apache Kafka2.8.0 版本,Kafka3.0.0 版本,如果还不了解 kafka 怎么搭建和它的核心概念请参考:Kafka-1: 安装与关键概念介绍,关于 Spring for Apache Kafka 问题请参考 官方文档

不使用 Spring Boot

在不使用 SpringBoot 的时,所有的 kafka 配置都不能使用自动装配来实现,需要自己写配置类,并且添加 @EnableKafka 来启动 AbstractListenerContainerFactory 子类的监听,如果不添加 @EnableKafka 的话 @KafkaListener 这些注解是没有办法使用的。

演示实例

public class Sender {

	public static void main(String[] args) {
		AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class);
		context.getBean(Sender.class).send("test", 42);
	}

	private final KafkaTemplate<Integer, String> template;

	public Sender(KafkaTemplate<Integer, String> template) {
		this.template = template;
	}

	public void send(String toSend, int key) {
		this.template.send("topic1", key, toSend);
	}

}

public class Listener {

    @KafkaListener(id = "listen1", topics = "topic1")
    public void listen1(String in) {
        System.out.println(in);
    }

}

@Configuration
@EnableKafka
public class Config {

    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String>
                        kafkaListenerContainerFactory(ConsumerFactory<Integer, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProps());
    }

    private Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // ...
        return props;
    }

    @Bean
    public Sender sender(KafkaTemplate<Integer, String> template) {
        return new Sender(template);
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }

    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(senderProps());
    }

    private Map<String, Object> senderProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //...
        return props;
    }

    @Bean
    public KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {
        return new KafkaTemplate<Integer, String>(producerFactory);
    }

}
复制代码

使用 Spring Boot

使用 Spring Boot 我们便可以省去很多配置类的编写,很多配置都是可以通过 yaml 的方式来注入。

@EnableKafkaSpring Boot 中也不是必须的。如果你不想使用 Kafka 自动配置,比如测试中不使用 Kafka 的自定配置,那么你只需要将 KafkaAutoConfiguration 配置类给移除就行了,这种方式也适用于 Spring Boot 中移除其他的自动装配:@SpringBootTest("spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration") 。因为 Spring for Apache Kafka 的官方文档大多是以 Spring 为例进行介绍的,要了解 Spring boot 和它的集成可以参考 Spring Boot 的官方文档的 Messaging 部分。

如果想要了解 Spring Boot 帮我们自动装配了哪些 bean 的话可以查看 org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration 类,如果想要了解我们在 yaml 文件中可以配置那些关于 spring.kafka 的内容的话,可以查看 org.springframework.boot.autoconfigure.kafka.KafkaProperties 类,或者查看 Spring Boot 官网提供的 Application Properties,并且要善用 command(ctrl) + f;如果想要了解 Spring Boot 自动装配的原理可以查看:自定义开发 Spring Boot Starter

采坑指南:

在我们编写 Spring Boot 的配置文件不知道都有什么配置项的时候,都可以去找对应 autoConfiurationProperties 类。

最小化方式演示 Spring for kafka

导入依赖

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
复制代码

代码示例

package com.aha.kafka;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaTemplate;

@SpringBootApplication
public class KafkaApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaApplication.class, args);
    }

    /**
     * 没有对应的 topic 就创建,有就不做任何处理
     * @return
     */
    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("topic1")
                .partitions(10)
                .replicas(1)
                .build();
    }

    /**
     * 消费者 - 监听 topic1
     * @param in
     */
    @KafkaListener(id = "myId", topics = "topic1")
    public void listen(String in) {
        System.out.println(in);
    }

    /**
     * 生产者 - 向 topic1 中发送消息
     * ApplicationRunner 的作用就是在项目启动的时候 会执行 run 方法
     *
     * @param template
     * @return
     */
    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        // 使用 lambda 表达式的方式 - 使用匿名内部类实现 ApplicationRunner 并重写 run 方法
        return args -> {
            template.send("topic1", "test");
        };
    }

}
复制代码

yaml 文件

server:
  port: 8088
spring:
  kafka:
    bootstrap-servers: 10.211.55.3:9092,10.211.55.4:9092,10.211.55.5:9092
    consumer: 
      auto-offset-reset: earliest
复制代码

启动工程便可以看到控制台中打印 test,这说明最小化演示就成功了。

创建 Topic

在启动时就创建 Topic,如果该 Topic 存在则忽略。

编写 yaml 文件

server:
  port: 8088
spring:
  kafka:
    template:
      # 当使用 kafkaTemplate 的 sendDefault 方法的时候,使用的是这里配置的 topic
      defaultTopic: topic-1
    # partition-num 和 replication-num KafkaProperties 没有提供配置的地方
    partition-num: 3
    replication-num: 3
    bootstrap-servers: 10.211.55.3:9092,10.211.55.4:9092,10.211.55.5:9092
    producer:
      retries: 3
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      # @KafkaListener 中不指定 group-id 使用这边 groupId,这边也不指定项目就报错起不来,指定了就覆盖这边的
      # group-id: test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: latest
      # auto-offset-reset: earliest
      enable-auto-commit: true
复制代码

yaml 中配置完这个以后,kafkaAdmin 便会自动去连接 Kafka 的地址。

编写 Topic 的配置配置

package com.aha.kafka.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.TopicConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaAdmin;

import java.util.Arrays;

/**
 * 在启动时就自动创建 Topic,如果该 Topic 存在则忽略。
 *
 * @author: WT
 * @date: 2021/12/5 15:38
 */
@Configuration
@Slf4j
public class KafkaTopicConfig {

    private final KafkaProperties kafkaProperties;

    public KafkaTopicConfig (KafkaProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
    }


    /**
     * When using Spring Boot,
     * a KafkaAdmin bean is automatically registered
     * so you only need the NewTopic (and/or NewTopics) @Bean s.
     * 当使用 springBoot 的时候 kafkaAdmin 会自动注入。
     * 可以参考 org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
     * @return
     */
//    @Bean
//    public KafkaAdmin kafkaAdmin() {
//        Map<String, Object> configs = new HashMap<>();
//        // AdminClientConfig 这里面的属性 都是可以配置的,如果想要配置属性不知道名字是什么可以点这个类进去看一下
//        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
//        return new KafkaAdmin(configs);
//    }

    /**
     * 老版本创建 topic 的方式
     *
     * 创建 Topic
     */
    @Bean
    public NewTopic topicinfo() {
        // 创建topic,需要指定创建的topic的"名称"、"分区数"、"副本数量(副本数数目的值要小于等于Broker数量)"
        return new NewTopic(
                kafkaProperties.getTemplate().getDefaultTopic(),
                kafkaProperties.getPartitionNum(),
                kafkaProperties.getReplicationNum()
        );
    }

    /**
     * 新版本建议使用 TopicBuilder 来创建 topic
     * @return
     */
    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name(kafkaProperties.getTemplate().getDefaultTopic())
                .partitions(kafkaProperties.getPartitionNum())
                .replicas(kafkaProperties.getReplicationNum())
                .compact()
                .build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("thing2")
                .partitions(10)
                .replicas(3)
                .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
                .build();
    }

    @Bean
    public NewTopic topic3() {
        return TopicBuilder.name("thing3")
                .assignReplicas(0, Arrays.asList(0, 1))
                .assignReplicas(1, Arrays.asList(1, 2))
                .assignReplicas(2, Arrays.asList(2, 0))
                .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
                .build();
    }

    /**
     * Starting with version 2.7,
     * you can declare multiple NewTopic s in a single KafkaAdmin.NewTopics bean definition
     * @return
     */
    @Bean
    public KafkaAdmin.NewTopics topics456() {
        return new KafkaAdmin.NewTopics(
                TopicBuilder.name("defaultBoth")
                        .build(),
                TopicBuilder.name("defaultPart")
                        .replicas(1)
                        .build(),
                TopicBuilder.name("defaultRepl")
                        .partitions(3)
                        .build());
    }

}
复制代码

注意:

  1. @KafkaListener(topics = {"topic-2"}),当 topic-2 不存在的时候是会自动创建的,不过自动创建的 TopicPartitionReplication 都是 1 ,可以通过修改 kafka 的默认配置来改变。
  2. 通过 yaml 文件不会自动创建 Topic
  3. 通过 NewTopicbean 可以指定分区数和副本数量。

使用 KafkaTemplate 发送消息

方法列表如下

ListenableFuture<SendResult<K, V>> sendDefault(V data);

ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);

ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

ListenableFuture<SendResult<K, V>> send(String topic, V data);

ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);

ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

ListenableFuture<SendResult<K, V>> send(Message<?> message);

// kafkaTemplate 使用的 Producer 的 metrics 信息
Map<MetricName, ? extends Metric> metrics();

// 可以查询指定 topic 的 分区信息
List<PartitionInfo> partitionsFor(String topic);

// 在生产者上执行一些任意操作并返回结果
<T> T execute(ProducerCallback<K, V, T> callback);

// Flush the producer.
void flush();

// 用于执行生产者方法后异步回调
interface ProducerCallback<K, V, T> {

    T doInKafka(Producer<K, V> producer);

}
复制代码

sendDefaultsend 的区别就是 Topic 是不是默认的。默认取值取的是 spring.kafka.template.defaultTopic

消息在发送的时候,会有一个 timestamp 的字段记录到 record 中,这个属性用户也可以自定义传入。用户能不能自定义这个属性是取决于 Topic 的配置的,如果 Topic 配置的是 CREATE_TIME,用户指定的 timestamp 可以生效,用户不指定会默认生成;如果配置的是 LOG_APPEND_TIME 用户指定的 timestamp 会被忽略掉,broker 会它本地的时间。

Topic 的配置为:TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG

同步发送消息 - 阻塞

   /**
     * 同步发送消息
     *
     * @param topicName
     * @param message
     */
public void sendMessageSync (String topicName, String message) throws ExecutionException, InterruptedException, TimeoutException {
    // 同步发送消息 - 建议设置超时时间 - 等待 10 s - 同步返回之后后面在添加同步处理逻辑
    kafkaTemplate.send(topicName, "key", message).get(10, TimeUnit.SECONDS);
}
复制代码

异步发送消息 - 非阻塞

/**
     * 异步发送消息 - 利用 future 实现发送成功或失败之后的回调
     *
     * @param topicName
     * @param msg
     */
public void sendMessageAsync(String topicName, String msg) {

    log.info("===Producing message[{}]: {}", topicName, msg);
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, msg);
    //        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
    //            @Override
    //            public void onSuccess(SendResult<String, String> result) {
    //                log.info(result.toString());
    //                log.info("===Producing message success");
    //            }
    //
    //            @Override
    //            public void onFailure(Throwable ex) {
    //                log.info("===Producing message failed");
    //            }
    //        });

    /**
         * Starting with version 2.5, you can use a KafkaSendCallback instead of a
         * ListenableFutureCallback, making it easier to extract the failed ProducerRecord,
         * avoiding the need to cast the Throwable
         *
         * 2.5  版本或者以后的建议使用这种方式来进行异步发送结果的监听,主要的改进便是 onFailure 发生异常的
         * 时候不需要进行异常转化了
         */
    future.addCallback(
        new KafkaSendCallback<String, String>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info(result.toString());
                log.info("===Producing message success");
            }

            @Override
            public void onFailure(KafkaProducerException ex) {
                log.info("===Producing message failed");
            }
        });

}

/**
     * 其实也是异步发送消息 - 只不过没有利用返回值 - 向指定的 topic 中发送消息
     *
     * @param topicName
     * @param message
     */
public void sendMessage (String topicName, String message) {
    // kafkaTemplate 的 metrics 信息 和 指定的 topic 的 分区信息
    log.info("=== kafkaTemplate.metrics(): {},  kafkaTemplate.partitionsFor: {}",
             kafkaTemplate.metrics(), kafkaTemplate.partitionsFor(topicName));
    kafkaTemplate.send(topicName, "key", message);
}
复制代码

其实只要不使用 get() 方法进行阻塞的话,都是异步进行发送的。

当然在使用匿名内部类实现 KafkaSendCallback 的时候也可以使用 lambda 的写法,这时候你可能会有函数式编程只能有一个方法的疑问,这是因为 KafkaSendCallback 分别继承两个接口一个用于成功回调方法的定义,一个用于失败方法的定义,这样就可以使用 lambda 来进行实现了具体代码参考:

ListenableFuture<SendResult<Integer, String>> future = template.send("topic", 1, "thing");
future.addCallback(result -> {
        ...
    }, (KafkaFailureCallback<Integer, String>) ex -> {
            ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
            ...
    });
复制代码

使用 RoutingKafkaTemplate 发送消息

从 2.5 版本开始,可以使用 RoutingKafkaTemplate 在运行时根据目标的 Topic 名称来选择不同 producer

注意: RoutingKafkaTemplate 不支持事务,execute, flush, metrics 操作,因为不知道去操作哪一个 TopicRoutingKafkaTemplate 尽量不要与 KafkaTemplate 共用,RoutingKafkaTemplateKafkaTemplate 的子类,可以使用 KafkaTemplate 的方法,所以没有必要共用。

构建 RoutingKafkaTemplate 并发送消息

package com.aha.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.RoutingKafkaTemplate;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.regex.Pattern;

@SpringBootApplication
@Slf4j
public class KafkaRoutingTemplateApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaRoutingTemplateApplication.class, args);
    }

    // ========== RoutingKafkaTemplate =========

    @Bean
    public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
                                                ProducerFactory<Object, Object> pf) {

        // Clone the PF with a different Serializer, register with Spring for shutdown
        Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        // 使用原有的 ProducerFactory 配置,覆盖 value 序列化方式,创建 字节序列化的 ProducerFactory
        DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
        // 将创建的 字节序列化 ProducerFactory 注册到 spring 容器中
        context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);

        // 使用 map 需要是有序的 map
        Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
        // 字节序列化方式的 ProducerFactory,如果是发送到 topic name 为 two 的使用这个工厂
        map.put(Pattern.compile("two"), bytesPF);
        // Default PF with StringSerializer ,匹配任何的 topic name 使用 这个工厂
        map.put(Pattern.compile(".+"), pf);

        return new RoutingKafkaTemplate(map);
    }

    @Bean
    public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
        return args -> {
            routingTemplate.send("one", "thing1");
            routingTemplate.send("two", "thing2".getBytes());
        };
    }

}
复制代码

在构建 RoutingTemplate 的时候需要一个有序的 Map ,例如:LinkedHashMapmapkey 应该是 java.util.regex.Pattern 类型用来匹配 Topic 的名称,valueProducerFactory<Object, Object> 跟正则匹配上的使用的 Producer

监听 RoutingKafkaTemplate 发送的消息

package com.aha.kafka.service;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * kafka 消费者
 *
 * @author: WT
 * @date: 2021/12/4 20:42
 */
@Component
@Slf4j
public class KafkaConsumerService {

    @KafkaListener(topics = "${spring.kafka.routingTemplate.topicOne}", groupId = "group1")
    public void routingTopicOne (ConsumerRecord<String, String> record) {
        Optional<String> message = Optional.ofNullable(record.value());
        log.info("record:{}", record);
        if (message.isPresent()) {
            log.info("topicOne 接收到的消息 message:{}", message.get());
        }
    }

    @KafkaListener(topics = "${spring.kafka.routingTemplate.topicTwo}", groupId = "group1")
    public void routingTopicTwo (ConsumerRecord<String, String> record) {
        Optional<String> message = Optional.ofNullable(record.value());
        log.info("record:{}", record);
        if (message.isPresent()) {
            log.info("topicTwo 接收到的消息 message:{}", message.get());
        }
    }

}
复制代码

控制台打印日志内容

2021-12-08 15:40:41.158  INFO 73269 --- [ntainer#1-0-C-1] c.a.kafka.service.KafkaConsumerService   : topicTwo 接收到的消息 message:thing2
2021-12-08 15:40:41.158  INFO 73269 --- [ntainer#0-0-C-1] c.a.kafka.service.KafkaConsumerService   : topicOne 接收到的消息 message:thing1
复制代码

接收到了消息,并且消息都没有乱码。

DefaultKafkaProducerFactory 相关问题说明

flush 方法阻塞问题

ProducerFactory 的作用是创建生产者,当不使用事务的时候 DefaultKafkaProducerFactory 默认是创建一个单例的生产者给所有的客户端使用,这个时候就有一个问题,当其中的一个客户端调用 flush() 方法的时候,它就会将其它使用此生产者的客户端阻塞。为了解决这个问题,从 2.3 版本开始 DefaultKafkaProducerFactory 提供了一个新的属性 producerPerThread,当它被设置成 true 的时候,它就会为每一个客户端(线程)创建一个单独的生产者。

注意:

producerPerThread 被设置成 true 的时候,当客户端不在需要这个producer 的时候,必须调用 closeThreadBoundProducer() 来物理上的关闭 producer 和移除 ThreadLocal 中的变量。调用 reset() 或者 destroy() 是没有办法关闭的。

自定义序列化

从 2.3 版本开始,用户可以自定义序列化方式,不单单使用官方提供的序列化方式,自定义序列化参考如下代码:

@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}

@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
    return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}
复制代码

Producer 创建之后更新配置

从 2.5.10 版本开是,用户可以在创建完 producerFactory 之后,修改其配置,但是它不会影响已经存在的 Producer 如果想要更新已经存在 produer 的配置,需要调用 reset() 方法。修改配置参考如下代码:

void updateConfigs(Map<String, Object> updates);

void removeConfig(String configKey);
复制代码

注意:你不能将事务的生产者工厂修改非事务的,反之亦然。

ReplyingKafkaTemplate 的使用

ReplyingKafkaTemplateKafkaTemplate 的子类,它主要是为了是实现 请求/回复 的场景的。它有两个额外的方法:

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
    Duration replyTimeout);
复制代码

第一个方法和第二个方法的区别便是可不可以自定义相应时间,第一个不能自定义默认是 5s,第二个方法传入 replyTimeoutnull 的话也默认是 5s。

Request 实例:

package com.aha.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.requestreply.RequestReplyFuture;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.SimpleKafkaHeaderMapper;
import org.springframework.kafka.support.converter.MessagingMessageConverter;

import java.util.concurrent.TimeUnit;

@SpringBootApplication
@Slf4j
public class KRequestingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KRequestingApplication.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
        return args -> {
            ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
            RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);

            SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
            log.info("=== Sent ok: {}", sendResult.getRecordMetadata().toString());

            ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
            log.info("=== Return value: {}", consumerRecord.value());
        };
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
            ProducerFactory<String, String> pf,
            ConcurrentMessageListenerContainer<String, String> repliesContainer) {

        // 创建 ReplyingKafkaTemplate 并设置了 reply 的 topic
        return new ReplyingKafkaTemplate<>(pf, repliesContainer);
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

        ConcurrentMessageListenerContainer<String, String> repliesContainer =
                containerFactory.createContainer("kReplies");
        repliesContainer.getContainerProperties().setGroupId("repliesGroup");
        repliesContainer.setAutoStartup(false);
        return repliesContainer;
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
                .partitions(10)
                .replicas(2)
                .build();
    }

    @Bean
    public NewTopic kReplies() {
        return TopicBuilder.name("kReplies")
                .partitions(10)
                .replicas(2)
                .build();
    }

}
复制代码

reply 实例:

package com.aha.kafka;

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.support.SimpleKafkaHeaderMapper;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.messaging.handler.annotation.SendTo;

/**
 * @author: WT
 * @date: 2021/12/8 17:24
 */
@SpringBootApplication
public class KReplyingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KReplyingApplication.class, args);
    }

    @KafkaListener(id="server", topics = "kRequests")
    // @SendTo (no properties): This is treated as !{source.headers['kafka_replyTopic']} (since version 2.1.3).
    @SendTo // use default replyTo expression
    public String listen(ConsumerRecord<String, String> record) {
        // 在 headers 里面会记录 kafka_replyTopic 的值 , sendTo 不设置属性的时候 默认会去找个值进行转发
        System.out.println(record.headers());
        String message = record.value();
        System.out.println("Server received: " + message);
        return message.toUpperCase();
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
                .partitions(10)
                .replicas(2)
                .build();
    }

    @Bean // not required if Jackson is on the classpath
    public MessagingMessageConverter simpleMapperConverter() {
        MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
        messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
        return messagingMessageConverter;
    }

}
复制代码

注意 @SendTo 注解:

  1. 需要依赖 KafkaTemplate 不能是它的子类
  2. @SendTo (no properties): This is treated as !{source.headers['kafka_replyTopic']} (since version 2.1.3).
  3. ConsumerRecord 中记录 kafka_replyTopic 的值

关于 ReplyingKafkaTemplate 这边不在详细介绍了,因为在实际开发中没有遇到相应的场景,如果想要了解更多的话参考本文开头提供的官网网址。

采坑指南

每次消息都会偶尔失败

仔细排查日志发现每次到指定 partition 的时候才会出现问题,猜测应该是某个节点出现了问题,在 CMAK 中,看节点信息都是正常的,在 zk 中拉取 /broker/ids ,发现三个节点也都在;然后 telnet 对应的节点的 9092 端口,发现是不同的,排查了很多地方发现最终还是节点的问题,所以有些时候不要过于相信 CMAK 这些工具。