本文正在参加「Java主题月 - Java 开发实战」,详情查看 活动链接
这是我参与更文挑战的第1天,活动详情查看: 更文挑战
Spring项目里引入Kafka非常方便,使用kafkaTemplate
(Producer的模版)+@KafkaListener
(Consumer的监听器)即可完成生产者-消费者的代码开发,相信这些,用过的同学都很清楚了,这里我不对spring-Kafka做过多讲解。我们今天主要来探讨一下如何提升kafka的消费能力。
1.简单的消费者
1.1 配置consumerFactory
首先要配置consumer的属性
@Bean(BeanNameConstant.CONSUMER_FACTORY)
public ConsumerFactory<String, String> consumerFactory() {
final StringDeserializer stringDeserializer = new StringDeserializer();
Map<String, Object> props = new HashMap<>(10);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
//设置是否自动提交offset 2.3 版本以后默认为false
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 300000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory(props, stringDeserializer, stringDeserializer);
return consumerFactory;
}
复制代码
1.2 配置KafkaListenerContainerFactory
关于consumer的主要的封装在ConcurrentKafkaListenerContainerFactory这个里头,本身的KafkaConsumer是线程不安全的,无法并发操作,这里spring又在包装了一层,根据配置的spring.kafka.listener.concurrency来生成多个并发的KafkaMessageListenerContainer实例
@Bean(BeanNameConstant.KAFKA_LISTENER_CONTAINER_FACTORY)
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory
(@Qualifier(BeanNameConstant.CONSUMER_FACTORY) ConsumerFactory<String, String> consumerFactory) {
//构建kafka并行消费监听类工厂类 此类通过topic名称创建该topic消费监听
ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory =
new ConcurrentKafkaListenerContainerFactory<>();
//可通过注解的方式进行设置
concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactory);
//手动ack
concurrentKafkaListenerContainerFactory.getContainerProperties().setAckOnError(false);
//设置ack模型机制 当发生error时 不同处理机制针对与offset有不同处理机制
concurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return concurrentKafkaListenerContainerFactory;
}
复制代码
1.3 消费者
@KafkaListener(
topics = "${kafka-topic.demo}",
containerFactory = BeanNameConstant.KAFKA_LISTENER_CONTAINER_FACTORY,
concurrency = "1"
)
public void loadListener(ConsumerRecord<?, ?> record, Acknowledgment ack){
try{
//业务方法
dealMessage(JsonUtil.readValue(String.valueOf(record.value()),Demo.class));
}catch (Exception e){
log.error("消费失败");
}finally {
//手动提交ack
ack.acknowledge();
}
}
复制代码
ConsumerRecord
类里面包含分区信息、消息头、消息体等内容,如果业务需要获取这些参数时,使用ConsumerRecord
会是个不错的选择。如果使用具体的类型接收消息体则更加方便,比如说用String类型去接收消息体。通常我们会使用ConsumerRecord
进行消费。
这里说下,如果ack未提交,consumer重启,consumer在rebalance后会从partition中重新拉去上一次的offset。可能会存在重复消费的情况。
2.多个消费者消费
上面可以看到在@KafkaListener有一个属性concurrency
它是ConcurrentKafkaListenerContainerFactory
的成员变量,我们在可以在配置KafkaListenerContainerFactory设置,也可以在每一个KafkaListener中对该配置进行覆盖。
/**
* Specify the container concurrency.
* @param concurrency the number of consumers to create.
* @see ConcurrentMessageListenerContainer#setConcurrency(int)
*/
public void setConcurrency(Integer concurrency) {
this.concurrency = concurrency;
}
复制代码
它的作用是创建n个KafkaMessageListenerContainer
实例,也就是n个kafkaconumser。是实现多个消费者消费的关键。
@KafkaListener(
topics = "${kafka-topic.demo}",
containerFactory = BeanNameConstant.KAFKA_LISTENER_CONTAINER_FACTORY,
concurrency = "12"
)
复制代码
所以我们只要更改concurrency的数量就可以实现多线程消费了?这么简单?答案是否定的。
2.1 concurrency 如何设置
concurrency的设置,取决于kafka的分区数据,也就是partition的数据量。因为一个KafkaMessageListenerContainer
只会对一个分区进行消费。
如果你的topic分区数只有8,那么你的concurrency最多只有8个可以正常工作。注意,如果是分布式系统,这里还要*节点数量。
如果你有两个节点,那么每个节点的concurrency应该设置为4
3.多线程批量消费
通过对concurrency的设置,我们确实实现了多线程的消费,速度比之前快了。如果想在快点,我们不能无休止的增加partition的数量吧。partition扩上去可就缩不下来了。所以要另辟蹊径。
官方提供的批量接口是这样的。也就是我们之前是用ConsumerRecord
类接收消息,现在换成List<ConsumerRecord>
就可以了。
public interface BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data);
}
public interface BatchAcknowledgingMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
复制代码
在此之前我们需要调整下consumer的配置
3.1 配置调整
3.1.1 配置consumerFactory
//最大拉取条数2000 最大拉取时间1200s
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,10000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,1200000);
复制代码
-
max.poll.records 控制每次拉取的条数。
-
max.poll.interval.ms 每次poll的最大时间间隔。
max.poll.interval.ms 这个参数对于批量消费很重要
。如果设置过短,消费者在未完成业务处理提交offset时,会再次poll一批数据,触发conusmer的rebalance。导致此前已经在消费的消息,分配给其他消费者再消费一次。然后走进死循环。所有消费者都一直在消费这段offset的数据。造成数据挤压和重复消费。
目前就本人实践而言,设置一个特别大的值没有什么影响。这里不是说过了max.poll.interval.ms的时间才去poll数据,还是说如果超过了这个时间都没poll,consumer回去触发一次。
3.1.2 配置KafkaListenerContainerFactory
//构建kafka并行消费监听类工厂类 此类通过topic名称创建该topic消费监听
ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory =
new ConcurrentKafkaListenerContainerFactory<>();
……
//是否并发消费
concurrentKafkaListenerContainerFactory.setBatchListener(true);
复制代码
3.2 代码实现
// 每个线程处理的最大数量
private static final int MAX_NUM = 100;
@KafkaListener(groupId = "${spring.kafka.consumer.group-id}",
topics = "${kafka-topic}",concurrency="3")
public void loadListener(List<ConsumerRecord<?, ?>> record, Acknowledgment ack) {
final long startTime = System.currentTimeMillis();
int batchSize = record.size() / MAX_NUM;
//该消费者每次消费时都积压了大量消息,提交offset时要保证所有异步线程处理完毕
if (batchSize == 0) {
// 消息小于100
this.dealMessage(record);
} else {
LinkedList<Future> futures = new LinkedList<>();
for (int i = 0; i < batchSize; i++) {
List<ConsumerRecord<?, ?>> records;
if (i == batchSize - 1) {
//需要把余数加上
records = record.subList(i * MAX_NUM, record.size());
} else {
records = record.subList(i * MAX_NUM, (i + 1) * MAX_NUM);
}
final Future<?> submit = executorService.submit(() -> {
this.dealMessage(records);
});
futures.add(submit);
}
//等待线程全部执行完
for (Future future : futures) {
try {
future.get();
} catch (InterruptedException e) {
// ignore
} catch (ExecutionException e) {
// ignore
}
}
}
log.info("批量处理完成,处理数量={},耗时={}ms", record.size(), System.currentTimeMillis() - startTime);
//手动确定 提交offset
ack.acknowledge();
}
/**
* 批量处理
* @param record
*/
public void dealMessage(List<ConsumerRecord<?, ?>> record) {
for (ConsumerRecord<?, ?> consumerRecord : record) {
//业务逻辑
this.dealMessage(consumerRecord);
}
}
复制代码
我的代码思路是,批量把消息拉取下来,多线程消费。 如果比较关心数据安全性和准确性,我们可以等这批数据全部处理完在提交offset。如果不是很在意,也可以扔到异步线程池里慢慢处理,offset直接提交。
public void loadListener(List<ConsumerRecord<?, ?>> record, Acknowledgment ack) {
final long startTime = System.currentTimeMillis();
//不用考虑消息丢失
for (ConsumerRecord<?, ?> consumerRecord : record) {
executorService.execute(()->{
dealMessage(consumerRecord);
});
}
log.info("批量处理完成,处理数量={},耗时={}ms", record.size(), System.currentTimeMillis() - startTime);
//手动确定 提交offset
ack.acknowledge();
}
复制代码
无论是哪种,性能提升都会很明显。但是要注意每次拉取的数量和线程的设置要根据实际情况。测试时,注意cpu/内存 以及 DB 的监控情况及时进行调整。不要盲目追求数据处理能力,把别的业务都搞死了。性能优化的前提,一定是要保证系统的稳定性。 祝你好运~
近期评论