一 简介
跟Spring Data Redis、Spring Data MongoDB、Spring Data JPA等项目类似,Spring Kafka提供了在Spring应用中通过简单配置从而访问Kafka集群的途径。
本篇文章我将介绍在Spring应用中消息生产者如何向Kafka集群发送消息、消息消费者如何消费消息、如何批量消费消息以及多消费者组同时消费消息等等。
需要注意的是,为了使用Spring Kafka的最新特性,以下测试代码采用了 Spring Boot 2.0.0构建,全部可用源码参考地址:gitee.com/zifangsky/K…
二 Spring Kafka的基本用法
(1)在pom.xml中添加依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>复制代码
注:因为我采用了Spring Boot构建项目,因此添加依赖时没有指定Spring Kafka的具体版本(实际jar包版本是2.1.x)。详细pom.xml文件可以参考:gitee.com/zifangsky/K…
(2)基本配置:
i)如果使用Spring Boot构建项目,那么可以简单在 properties 属性文件中添加以下配置:
#kafka,更多配置:org.springframework.boot.autoconfigure.kafka.KafkaProperties
#指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=192.168.1.159:9092,192.168.1.159:9093,192.168.1.159:9094
#指定默认topic id
spring.kafka.template.default-topic=topic-test
#指定listener 容器中的线程数,用于提高并发量
spring.kafka.listener.concurrency=3
#每次批量发送消息的数量
spring.kafka.producer.batch-size=1000
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#指定默认消费者group id
spring.kafka.consumer.group-id=myGroup1
#若设置为earliest,那么会从头开始读partition
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer复制代码
ii)如果使用普通Maven构建项目,或者想要自定义更多配置,可以采用以下JavaConfig配置:
package cn.zifangsky.kafkademo.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
/**
* Kafka配置
* @author zifangsky
*/
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${kafka.producer.bootstrapServers}")
private String producerBootstrapServers; //生产者连接Server地址
@Value("${kafka.producer.retries}")
private String producerRetries; //生产者重试次数
@Value("${kafka.producer.batchSize}")
private String producerBatchSize;
@Value("${kafka.producer.lingerMs}")
private String producerLingerMs;
@Value("${kafka.producer.bufferMemory}")
private String producerBufferMemory;
@Value("${kafka.consumer.bootstrapServers}")
private String consumerBootstrapServers;
@Value("${kafka.consumer.groupId}")
private String consumerGroupId;
@Value("${kafka.consumer.enableAutoCommit}")
private String consumerEnableAutoCommit;
@Value("${kafka.consumer.autoCommitIntervalMs}")
private String consumerAutoCommitIntervalMs;
@Value("${kafka.consumer.sessionTimeoutMs}")
private String consumerSessionTimeoutMs;
@Value("${kafka.consumer.maxPollRecords}")
private String consumerMaxPollRecords;
@Value("${kafka.consumer.autoOffsetReset}")
private String consumerAutoOffsetReset;
/**
* ProducerFactory
* @return
*/
@Bean
public ProducerFactory<Object, Object> producerFactory(){
Map<String, Object> configs = new HashMap<String, Object>(); //参数
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);
configs.put(ProducerConfig.RETRIES_CONFIG, producerRetries);
configs.put(ProducerConfig.BATCH_SIZE_CONFIG, producerBatchSize);
configs.put(ProducerConfig.LINGER_MS_CONFIG, producerLingerMs);
configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferMemory);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
return new DefaultKafkaProducerFactory<Object, Object>(configs);
}
/**
* KafkaTemplate
* @param producerFactory
* @return
*/
@Bean
public KafkaTemplate<Object, Object> kafkaTemplate(){
return new KafkaTemplate<Object, Object>(producerFactory(), true);
}
/**
* ConsumerFactory
* @return
*/
@Bean
public ConsumerFactory<Object, Object> consumerFactory(){
Map<String, Object> configs = new HashMap<String, Object>(); //参数
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerEnableAutoCommit);
configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, consumerAutoCommitIntervalMs);
configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, consumerSessionTimeoutMs);
configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumerMaxPollRecords); //批量消费数量
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerAutoOffsetReset);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
return new DefaultKafkaConsumerFactory<Object, Object>(configs);
}
/**
* 添加KafkaListenerContainerFactory,用于批量消费消息
* @return
*/
@Bean
public KafkaListenerContainerFactory<?> batchContainerFactory(){
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
containerFactory.setConsumerFactory(consumerFactory());
containerFactory.setConcurrency(4);
containerFactory.setBatchListener(true); //批量消费
containerFactory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
return containerFactory;
}
}复制代码
注:在上面用到的属性分别是:
kafka.producer.bootstrapServers=192.168.1.159:9092,192.168.1.159:9093,192.168.1.159:9094
kafka.producer.retries=3
#16K
kafka.producer.batchSize=16384
kafka.producer.lingerMs=1
#32M
kafka.producer.bufferMemory=33554432
kafka.consumer.bootstrapServers=192.168.1.159:9092,192.168.1.159:9093,192.168.1.159:9094
kafka.consumer.groupId=0
kafka.consumer.enableAutoCommit=false
kafka.consumer.autoCommitIntervalMs=1000
kafka.consumer.sessionTimeoutMs=30000
kafka.consumer.maxPollRecords=100
#earliest,latest
kafka.consumer.autoOffsetReset=earliest复制代码
注:关于这些属性的详细含义可以参考官方文档:kafka.apache.org/documentati…
(3)第一个消息生产/消费的实例:
i)消息生产者:
package cn.zifangsky.kafkademo.producer;
import java.text.MessageFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
/**
* 消息生产者的第一个示例
* @author zifangsky
*/
@Component("simpleProducer")
public class SimpleProducer {
private static final Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
/**
* 使用KafkaTemplate向Kafka推送数据
* @param topicName topic
* @param data
*/
public void sendMessage(String topicName,String data){
logger.info(MessageFormat.format("开始向Kafka推送数据:{0}", data));
try {
kafkaTemplate.send(topicName, data);
logger.info("推送数据成功!");
} catch (Exception e) {
logger.error(MessageFormat.format("推送数据出错,topic:{0},data:{1}"
,topicName,data));
}
}
}
复制代码
ii)在controller里调用:
package cn.zifangsky.kafkademo.controller;
import javax.annotation.Resource;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import cn.zifangsky.kafkademo.producer.SimpleProducer;
@RestController
@RequestMapping("/kafka")
public class TestKafkaController {
@Resource(name="simpleProducer")
private SimpleProducer producer;
private final String TOPIC = "topic-test"; //测试使用topic
@RequestMapping("/send")
public String send(String data){
producer.sendMessage(TOPIC, data);
return "发送数据【" + data + "】成功!";
}
}
复制代码
iii)消息消费者:
package cn.zifangsky.kafkademo.consumer;
import java.text.MessageFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import cn.zifangsky.kafkademo.producer.SimpleProducer;
/**
* 消息消费者的第一个示例
* @author zifangsky
*/
@Component("simpleConsumer")
public class SimpleConsumer {
private static final Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
@KafkaListener(id="test",topics={"topic-test"})
public void listen(String data){
System.out.println("SimpleConsumer收到消息:" + data);
logger.info(MessageFormat.format("SimpleConsumer收到消息:{0}", data));
}
}
复制代码
iv)测试:
启动项目后,在浏览器中访问:http://127.0.0.1:9090/kafka/send?data=哈哈哈1111
控制台中输出如下:
可以发现,这个最简单的实例已经可以正常运行了。
(4)发送/接收自定义类型消息:
在上面的示例中,我们发送/接收的消息均是简单字符串,其本质是使用 StringDeserializer 和 StringDeserializer 来编码、解码消息。然而在实际开发中,可能有时我们想要发送比较复杂的消息(比如想要发送一条描述某个地区天气状况的消息,此消息同时包含了温度、湿度、污染指数等多个维度)。这种情况下我们通常有两种方式来实现:
- 方式一:在发送消息前将Java对象转化为JSON字符串,然后再发送到Kafka集群
- 方式二:自定义消息编码器和解码器,直接发送Java对象
i)自定义消息编码器和解码器:
package cn.zifangsky.kafkademo.common;
import java.util.Map;
import org.apache.kafka.common.serialization.Serializer;
import org.springframework.util.SerializationUtils;
public class ObjectSerializer implements Serializer<Object> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
/**
* 序列化
*/
@Override
public byte[] serialize(String topic, Object data) {
return SerializationUtils.serialize(data);
}
@Override
public void close() {
}
}
复制代码
package cn.zifangsky.kafkademo.common;
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import org.springframework.util.SerializationUtils;
public class ObjectDeserializer implements Deserializer<Object> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
/**
* 反序列化
*/
@Override
public Object deserialize(String topic, byte[] data) {
return SerializationUtils.deserialize(data);
}
@Override
public void close() {
}
}
复制代码
ii)修改KafkaConfig里的相关配置:
/**
* ProducerFactory
* @return
*/
@Bean
public ProducerFactory<Object, Object> producerFactory(){
Map<String, Object> configs = new HashMap<String, Object>(); //参数
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);
configs.put(ProducerConfig.RETRIES_CONFIG, producerRetries);
configs.put(ProducerConfig.BATCH_SIZE_CONFIG, producerBatchSize);
configs.put(ProducerConfig.LINGER_MS_CONFIG, producerLingerMs);
configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferMemory);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
// configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ObjectSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,ObjectSerializer.class);
return new DefaultKafkaProducerFactory<Object, Object>(configs);
}
...
/**
* ConsumerFactory
* @return
*/
@Bean
public ConsumerFactory<Object, Object> consumerFactory(){
Map<String, Object> configs = new HashMap<String, Object>(); //参数
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerEnableAutoCommit);
configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, consumerAutoCommitIntervalMs);
configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, consumerSessionTimeoutMs);
configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumerMaxPollRecords); //批量消费数量
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerAutoOffsetReset);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
// configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ObjectDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ObjectDeserializer.class); //需要把原来的消息删掉,不然会出现反序列化失败的问题
return new DefaultKafkaConsumerFactory<Object, Object>(configs);
}复制代码
需要注意的是,改变消息编码器和解码器之后需要清空Topic中原有消息或者使用新的Topic,否则原来的字符串消息在反序列化时会出现异常,切记。
iii)在SimpleProducer.java中添加一个新的方法:
/**
* 使用KafkaTemplate向Kafka推送数据
* @param topicName topic
* @param data
*/
public void sendObjectMessage(String topicName,Object data){
logger.info(MessageFormat.format("开始向Kafka推送数据:{0}", data));
try {
kafkaTemplate.send(topicName, data);
logger.info("推送数据成功!");
} catch (Exception e) {
logger.error(MessageFormat.format("推送数据出错,topic:{0},data:{1}"
,topicName,data));
}
}复制代码
iv)测试:
先新增一个测试使用的实体类DemoObj.java:
package cn.zifangsky.kafkademo.model;
import java.io.Serializable;
public class DemoObj implements Serializable{
private static final long serialVersionUID = -8094247978023094250L;
private Long id;
private String data;
public DemoObj() {
}
public DemoObj(Long id, String data) {
this.id = id;
this.data = data;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
@Override
public String toString() {
return "DemoObj [id=" + id + ", data=" + data + "]";
}
}复制代码
接着在TestKafkaController.java中添加一个新方法:
@RequestMapping("/send2")
public String send2(DemoObj demoObj){
producer.sendObjectMessage(TOPIC2, demoObj);
return "发送数据【" + demoObj + "】成功!";
}复制代码
最后添加对应的消息消费者:
package cn.zifangsky.kafkademo.consumer;
import java.text.MessageFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import cn.zifangsky.kafkademo.model.DemoObj;
/**
* 消息消费者(group1)
* @author zifangsky
*
*/
@Component("groupListener1")
public class GroupListener1 {
private static final Logger logger = LoggerFactory.getLogger(GroupListener1.class);
@KafkaListener(topics={"topic-test2"},groupId="group1")
public void listenTopic2(DemoObj data){
System.out.println("Group1收到消息:" + data);
logger.info(MessageFormat.format("Group1收到消息:{0}", data));
}
}复制代码
在浏览器中访问:http://127.0.0.1:9090/kafka/send2?id=8&data=测试9
控制台中输出效果如下:
(5)多消费者组消费同一条消息:
在Spring Kafka 2.x版本之后,@KafkaListener注解新增了一个 groupId 参数,用于指定其所属的 consumer group。根据Kafka的设计原理可知,如果两个不同的 consumer 分别处于两个不同的 consumer group,那么它们就可以同时消费同一条消息(producer发送到某个 topic 的某个 partition 的某条消息)
因此在上面代码的基础上添加一个新的消费者组:
package cn.zifangsky.kafkademo.consumer;
import java.text.MessageFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import cn.zifangsky.kafkademo.model.DemoObj;
/**
* 消息消费者(group2)
* @author zifangsky
*
*/
@Component("groupListener2")
public class GroupListener2 {
private static final Logger logger = LoggerFactory.getLogger(GroupListener2.class);
@KafkaListener(topics={"topic-test2"},groupId="group2")
public void listenTopic2(DemoObj data){
System.out.println("Group2收到消息:" + data);
logger.info(MessageFormat.format("Group2收到消息:{0}", data));
}
// @KafkaListener(topics={"topic-test2"},groupId="group2")
// public void listenTopic2_2(DemoObj data){
// System.out.println("Group2_2收到消息:" + data);
// logger.info(MessageFormat.format("Group2_2收到消息:{0}", data));
// }
}复制代码
再次测试,可以发现输出如下:
很明显,输出结果是符合我们预期的。
(6)设置@KafkaListener批量消费消息:
由Spring Kafka官方文档可知,从1.1版本开始@KafkaListener支持批量消费消息,官方示例是这样的:
实际上相关配置我在上面的KafkaConfig里已经配置过了,也就是:
/**
* ConsumerFactory
* @return
*/
@Bean
public ConsumerFactory<Object, Object> consumerFactory(){
Map<String, Object> configs = new HashMap<String, Object>(); //参数
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerEnableAutoCommit);
configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, consumerAutoCommitIntervalMs);
configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, consumerSessionTimeoutMs);
configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumerMaxPollRecords); //批量消费数量
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerAutoOffsetReset);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
// configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ObjectDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ObjectDeserializer.class); //需要把原来的消息删掉,不然会出现反序列化失败的问题
return new DefaultKafkaConsumerFactory<Object, Object>(configs);
}
/**
* 添加KafkaListenerContainerFactory,用于批量消费消息
* @return
*/
@Bean
public KafkaListenerContainerFactory<?> batchContainerFactory(){
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
containerFactory.setConsumerFactory(consumerFactory());
containerFactory.setConcurrency(4);
containerFactory.setBatchListener(true); //批量消费
containerFactory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
return containerFactory;
}复制代码
接着在上面的GroupListener1.java中添加一个新方法用于测试:
@KafkaListener(topics={"topic-test"},groupId="group1",containerFactory="batchContainerFactory")
public void listenTopic1(List<String> data){
System.out.println("Group1收到消息:" + data);
logger.info(MessageFormat.format("Group1收到消息:{0}", data));
}复制代码
最后测试效果如下:
参考:
近期评论