这是我参与11月更文挑战的第16天,活动详情查看:2021最后一次更文挑战
kafka消费者
Kafka消费者相关的概念
- 消费者与消费组
- 一个分区,只能被一个组内的一个消费者消费。不能多个消费者共同消费一个分区。
- 消费组与分区重平衡
- 当新的消费者加入消费组,它会消费一个或多个分区,而这些分区之前是由其他消费者负责的,需要转移分担。
- 当消费者离开消费组时,它所消费的分区会分配给其他分区。
消费者添加和减少,kafka具有横向的伸缩性,但是消费者不是越多越好,如果消费者 > 分区数。那么会导致有些消费者分不到分区。(所以,消费者数量 一定要小于 分区数量)
对于一般消息中间件而言,一般有两种消息投递模式
- 点对点,P2P( Point-to-Point)模式
- 点对点模式是基于队列的
- 发 布/订阅( Pub/Sub )模式
- 发布订阅模式定义了如何向 一个内容节点发布和订阅消息。发布/订阅模式在消息的一对多广播时采用 。
kafka都支持这2种消息投递模式。
- 一个消费组:相当于点对点
- 不同的消费组:相当于发布订阅
消费者代码
- 配置相关属性,创建消费者对象
- 订阅topic(可以订阅多个topic) 还可以指定特定的分区。
- 循环调用poll去拉数据
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
//循环拉取数据
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.println(JSONUtil.toJsonString(record))
}
}
} finally {
consumer.close();
}
复制代码
Kafka 中的消费是基于拉模式的。消息的消费一般有两种模式 : 推模式和拉模式。
- 推模式是服务端主动将消息推送给消费者
- 拉模式是消费者主动向服务端发起请求来拉取消息
消费者很重要的概念-offset:表示消息位移,记录消费在消费者中的位置。
这个位置需要被持久化!
- 在旧消费者客户端:消费位移是存储在 ZooKeeper
- 新的消费者客户端:存储在Kafka 内部的主题 consumer_offsets中
消费者每次消费一条消息,需要将该位移提交。 提交的offset值为 当前的offset+1。
近期评论