Kafka从入门到实战(二)

消息发送

1. Producer 消息发送的流程

新版本的kafka发送消息是通过异步的方式. 在消息发送的过程中,涉及到两个线程.一个主线程main线程,一个守护线程send线程.还有一个线程共享变量(缓存) RecordAccumulator . 主线程的作用是负责创建消息并放到缓存RecordAccumulator中. 守护线程从缓存中将消息发送到kafka broker

2. 消息发送流程细化

2.1 主线程

  1. 将消息封装为ProducerRecord对象格式,并调用send方法发送消息
  2. 进入 producer 拦截器 (消息格式检测等)
  3. 更新kafka集群数据
  4. 进行序列化,将消息对象序列化成byte数组
  5. 使用分区器进行分区计算
  6. 将消息追加到共享线程变量RecordAccumulator

2.2 当KafkaProducer实例化完成 send开始工作

  1. send线程从RecordAccumulator取出消息并处理消息格式
  2. 构建消息发送请求对象request
  3. 将请求交给Selector,并将消息放在队列上
  4. 收到响应就移除请求队列的请求,调用每个消息上的回调函数

3. 消息发送实战代码案例

3.1 项目依赖引用

注意你引用的maven依赖版本需和你服务器上安装的kafka版本一致

<!--kafka 消息服务 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>

复制代码

3.2 相关类了解

  1. KafkaProducer 创建生产者对象,用来发送数据
  2. ProducerConfig 获取连接kafka的一些基础配置(魔法值记录)
  3. ProducerRecord 每条数据必须封装为ProducerRecord对象才能发送

3.3 简单的发送消息demo

public class ProductTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        
        Properties prop = new Properties();
        # 每个参数配置可以点进去 查看每个属性代表什么意思
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.10.126:9092");
        //确认机制
        prop.put(ProducerConfig.ACKS_CONFIG,"all");
        //重试次数
        prop.put(ProducerConfig.RETRIES_CONFIG,1);
        //批次大小
        prop.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
        //等待时间
        prop.put(ProducerConfig.LINGER_MS_CONFIG,1);
        //缓冲区大小
        prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33356444);
        //key序列化方式
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //value 序列化方式
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String,String> producer = new KafkaProducer<String, String>(props);

        /*// 发送消息
        for (int i = 0; i < 100; i++) {
            // send方法有回调方法,如果异常则发送失败,没有异常就发送成功,返回的是RecordMetadata数据对象,可以取到你发送的值
            producer.send(new ProducerRecord<>("test",i+"", i+"") ,(metadata,exception) -> {
                if(exception == null) {
                    System.out.println("消息发送成功->" + metadata.offset());
                }else {
                    exception.printStackTrace();
                }
            });
        }*/

        for (int i = 0; i < 100; i++) {
            RecordMetadata metadata = producer.send(
                    new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i))
            ).get();
            System.out.println(metadata.offset());
        }
        // 关闭生产者 
        producer.close();
    }
}

复制代码

3.4 消息查看

当执行完上面代码之后,可以去服务器通过指令查看刚刚创建的消息

./kafka-console-consumer.sh --bootstrap-server 192.168.10.126:9092 --from-beginning --topic test

复制代码

image.png

4. 发送失败重试机制

image.png

消息接收

1. 消费者相关的类

  1. KafkaConsumer 创建一个对象去消费消息
  2. ConsumerConfig 消费者对象实例化相关的配置项
  3. ConsumerRecords 每条数据封装成该对象才能进行消费

需要注意的是, 消费者对象创建之后不需要close操作的,线程需要一直在后台进行,一直在回去消息服务器中的消息

2. 消费服务器消息Demo

public class ConsumerTest {

    public static void main(String[] args) {

        Properties props = new Properties();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.10.126:9092");
        //序列化key
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //序列化value
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        // 自动offset
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        // 分组
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"1205");
        //自动提交间隔时间
        props.put("auto.commit.interval.ms", "1000");

        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);
        //订阅主题
        consumer.subscribe(Arrays.asList("test"));

        while (true) {
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(100));
            for(ConsumerRecord<String,String> str : records) {
                System.out.println(String.format("topic = %s, offset= %s ,value= %s",str.topic(),str.offset(),str.value()));
            }
            
        }


    }
}

复制代码

3. offset提交方式

3.1 自动提交

通过properties参数配置

# 开启自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
# 设置自动提交间隔时间爱
props.put("auto.commit.interval.ms", "1000");
复制代码

3.2 手动提交

手动提交,需要在代码中调用指定方法

while (true) {
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(100));
            for(ConsumerRecord<String,String> str : records) {
                System.out.println(String.format("topic = %s, offset= %s ,value= %s",str.topic(),str.offset(),str.value()));
            }
            //手动提交offset
            consumer.commitAsync();
        }

复制代码

3.3 手动提交与自动提交的区域

  1. 在实际生产环境是提供手动提交的,这个消息消费是可控的.每次消费完一个消息会有一个偏移量,就算消息服务中途崩了,重启消息服务会从zk中读取到上次消费的offset,确保消息继续从当前消费不会出现重复消费或者丢失的情况
  2. 手动提交offset是有序的,确保数据的有序性

springboot整合kafka请参阅下一期