消息发送
1. Producer 消息发送的流程
新版本的kafka发送消息是通过异步的方式. 在消息发送的过程中,涉及到两个线程.一个主线程main线程,一个守护线程send线程.还有一个线程共享变量(缓存) RecordAccumulator . 主线程的作用是负责创建消息并放到缓存RecordAccumulator中. 守护线程从缓存中将消息发送到kafka broker中
2. 消息发送流程细化
2.1 主线程
- 将消息封装为
ProducerRecord对象格式,并调用send方法发送消息 - 进入
producer拦截器 (消息格式检测等) - 更新
kafka集群数据 - 进行序列化,将消息对象序列化成
byte数组 - 使用分区器进行分区计算
- 将消息追加到共享线程变量
RecordAccumulator中
2.2 当KafkaProducer实例化完成 send开始工作
send线程从RecordAccumulator取出消息并处理消息格式- 构建消息发送请求对象
request - 将请求交给
Selector,并将消息放在队列上 - 收到响应就移除请求队列的请求,调用每个消息上的回调函数
3. 消息发送实战代码案例
3.1 项目依赖引用
注意你引用的maven依赖版本需和你服务器上安装的kafka版本一致
<!--kafka 消息服务 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
复制代码
3.2 相关类了解
KafkaProducer创建生产者对象,用来发送数据ProducerConfig获取连接kafka的一些基础配置(魔法值记录)ProducerRecord每条数据必须封装为ProducerRecord对象才能发送
3.3 简单的发送消息demo
public class ProductTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties prop = new Properties();
# 每个参数配置可以点进去 查看每个属性代表什么意思
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.10.126:9092");
//确认机制
prop.put(ProducerConfig.ACKS_CONFIG,"all");
//重试次数
prop.put(ProducerConfig.RETRIES_CONFIG,1);
//批次大小
prop.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
//等待时间
prop.put(ProducerConfig.LINGER_MS_CONFIG,1);
//缓冲区大小
prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33356444);
//key序列化方式
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//value 序列化方式
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String,String> producer = new KafkaProducer<String, String>(props);
/*// 发送消息
for (int i = 0; i < 100; i++) {
// send方法有回调方法,如果异常则发送失败,没有异常就发送成功,返回的是RecordMetadata数据对象,可以取到你发送的值
producer.send(new ProducerRecord<>("test",i+"", i+"") ,(metadata,exception) -> {
if(exception == null) {
System.out.println("消息发送成功->" + metadata.offset());
}else {
exception.printStackTrace();
}
});
}*/
for (int i = 0; i < 100; i++) {
RecordMetadata metadata = producer.send(
new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i))
).get();
System.out.println(metadata.offset());
}
// 关闭生产者
producer.close();
}
}
复制代码
3.4 消息查看
当执行完上面代码之后,可以去服务器通过指令查看刚刚创建的消息
./kafka-console-consumer.sh --bootstrap-server 192.168.10.126:9092 --from-beginning --topic test
复制代码
4. 发送失败重试机制
消息接收
1. 消费者相关的类
KafkaConsumer创建一个对象去消费消息ConsumerConfig消费者对象实例化相关的配置项ConsumerRecords每条数据封装成该对象才能进行消费
需要注意的是, 消费者对象创建之后不需要close操作的,线程需要一直在后台进行,一直在回去消息服务器中的消息
2. 消费服务器消息Demo
public class ConsumerTest {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.10.126:9092");
//序列化key
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
//序列化value
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
// 自动offset
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
// 分组
props.put(ConsumerConfig.GROUP_ID_CONFIG,"1205");
//自动提交间隔时间
props.put("auto.commit.interval.ms", "1000");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);
//订阅主题
consumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(100));
for(ConsumerRecord<String,String> str : records) {
System.out.println(String.format("topic = %s, offset= %s ,value= %s",str.topic(),str.offset(),str.value()));
}
}
}
}
复制代码
3. offset提交方式
3.1 自动提交
通过properties参数配置
# 开启自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
# 设置自动提交间隔时间爱
props.put("auto.commit.interval.ms", "1000");
复制代码
3.2 手动提交
手动提交,需要在代码中调用指定方法
while (true) {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(100));
for(ConsumerRecord<String,String> str : records) {
System.out.println(String.format("topic = %s, offset= %s ,value= %s",str.topic(),str.offset(),str.value()));
}
//手动提交offset
consumer.commitAsync();
}
复制代码
3.3 手动提交与自动提交的区域
- 在实际生产环境是提供手动提交的,这个消息消费是可控的.每次消费完一个消息会有一个偏移量,就算消息服务中途崩了,重启消息服务会从zk中读取到上次消费的
offset,确保消息继续从当前消费不会出现重复消费或者丢失的情况 - 手动提交
offset是有序的,确保数据的有序性




近期评论