kafkaISR、时间戳索引

副本、ISR、一致性

在kafka中,我们创建topic的时候,会通过replication-factor参数指定副本数量。

创建topic:
kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181/kafka --create --topic test007 --partitions 2 --replication-factor 3
复制代码

创建完成后可以通过kafka-topics.sh查看topic对应的patition、replicas、isr等信息。

kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181/kafka --describe --topic test007
Topic:test007	PartitionCount:2	ReplicationFactor:3	Configs:
	Topic: test007	Partition: 0	Leader: 2	Replicas: 2,1,0	Isr: 2,1,0
	Topic: test007	Partition: 1	Leader: 0	Replicas: 0,2,1	Isr: 0,2,1
复制代码

topic test007 有两个partition,以partition: 0为例:leader:2 ,副本replicas: 2,1,0,isr: 2,1,0
有副本就会设计到leader和replicas(副本)之间数据同步的问题,要保持主副本之间数据的一致性。
通常一致性会有两种方式:

  1. 强一致性,即所有副本节点必须全部可用,强一致性会破坏可用性
  2. 最终一致性,过半通过即可,这个是最常用的分布式一致性解决方案

image.png
在kafka中,副本数据同步策略选择了第二种方案(全部完成同步,才发送ack),原因如下:

  • 同样为了容忍n台节点的故障,第一种方案需要2n+1个副本,而第二种方案只需要n+1个副本,而Kafka的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。
  • 虽然第二种方案的网络延迟会比较高,但网络延迟对Kafka的影响较小。

在kafka中,Leader会维护了一个动态的in-sync replica set (ISR-同步副本列表),意为和leader保持同步的follower集合。关于kafka的ACK/ISR/HW相关的介绍,这个文章介绍的很明白,有兴趣的可以去看看。

索引

在创建topic后,在kafka的指定的log.dir目录下,会生成对应的数据目录,上面我们创建了Topic: test007,有两个 Partition,Partition0和Partition1,在数据目录下我们能看到test007-0和test007-1。以test007-0为例,进入test007-0目录,目录下的文件如下:

image.png
其中,00000000000000000000.log为数据文件,00000000000000000000.index和00000000000000000000.timeindex为索引文件。
通过kafka提供的kafka-dump-log.sh命令可以查看文件内容。

kafka-dump-log.sh --files 00000000000000000000.log
Dumping 00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1625896634163 isvalid: true size: 81 magic: 2 compresscodec: NONE crc: 1367333611
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 81 CreateTime: 1625896634401 isvalid: true size: 81 magic: 2 compresscodec: NONE crc: 593410223
baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 162 CreateTime: 1625896634433 isvalid: true size: 81 magic: 2 compresscodec: NONE crc: 1448478144
baseOffset: 3 lastOffset: 3 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 243 CreateTime: 1625896634465 isvalid: true size: 81 magic: 2 compresscodec: NONE crc: 3001936851
baseOffset: 4 lastOffset: 4 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 324 CreateTime: 1625896634483 isvalid: true size: 81 magic: 2 compresscodec: NONE crc: 2645175270
baseOffset: 5 lastOffset: 5 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 405 CreateTime: 1625896634522 isvalid: true size: 81 magic: 2 compresscodec: NONE crc: 4146059855
baseOffset: 6 lastOffset: 6 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 486 CreateTime: 1625896634550 isvalid: true size: 81 magic: 2 compresscodec: NONE crc: 3625250854
baseOffset: 7 lastOffset: 7 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 567 CreateTime: 1625896634570 isvalid: true size: 81 magic: 2 compresscodec: NONE crc: 2732649947
baseOffset: 8 lastOffset: 8 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 648 CreateTime: 1625896634579 isvalid: true size: 81 magic: 2 compresscodec: NONE crc: 1803337061
baseOffset: 9 lastOffset: 9 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 729 CreateTime: 1625896634594 isvalid: true size: 81 magic: 2 compresscodec: NONE crc: 54002002
baseOffset: 10 lastOffset: 10 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 810 CreateTime: 1625896634617 isvalid: true size: 81 magic: 2 compresscodec: NONE crc: 839740742
baseOffset: 11 lastOffset: 11 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 891 CreateTime: 1625896634656 isvalid: true size: 81 magic: 2 compresscodec: NONE crc: 3114671675
baseOffset: 12 lastOffset: 12 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 972 CreateTime: 1625896634699 isvalid: true size: 81 magic: 2 compresscodec: NONE crc: 512924486
baseOffset: 13 lastOffset: 13 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 1053 CreateTime: 1625896634733 isvalid: true size: 81 magic: 2 compresscodec: NONE crc: 2870394865
复制代码

上面这个可以看到,内容记录了offset、size、CreateTime等信息。

kafka-dump-log.sh --files 00000000000000000000.index
Dumping 00000000000000000000.index
offset: 51 position: 4131
offset: 102 position: 8262
offset: 153 position: 12393
offset: 204 position: 16524
offset: 255 position: 20655
offset: 306 position: 24786
offset: 357 position: 28917
offset: 408 position: 33048
offset: 459 position: 37179
offset: 510 position: 41310
offset: 561 position: 45441

kafka-dump-log.sh --files 00000000000000000000.timeindex
Dumping 00000000000000000000.timeindex
timestamp: 1625896635654 offset: 51
timestamp: 1625896636752 offset: 102
timestamp: 1625896637830 offset: 153
timestamp: 1625896639150 offset: 204
timestamp: 1625896639729 offset: 255
timestamp: 1625897012620 offset: 306
timestamp: 1625897013413 offset: 357
timestamp: 1625897013954 offset: 408
timestamp: 1625897014458 offset: 459
timestamp: 1625897015002 offset: 510
timestamp: 1625897015462 offset: 561
timestamp: 1625897016150 offset: 612

复制代码

上面分别是对应的索引信息。

image.png
app去调用对应api获取数据的时候,都是走index索引去查询定位,给定offset,通过offset拿到对应的position,知道position后,通过seek就可以定位到文件要读取的位置。timeindex日志文件是通过timestamp得到offset,在通过offset到index日志文件中拿到position。

producer ack

The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed:

  • acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to -1.
  • acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.
  • acks=all This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.
    kafka ack的默认值为1,即leader成功返回ack即可。关于ack更详细的说明可以参照 Kafka生产者ack机制剖析
    在生产者的代码中配置acks ProducerConfig.ACKS_CONFIG效果:

    • pro.setProperty(ProducerConfig.ACKS_CONFIG, "0");
    key:item-1, value:value-0, partition:1, offset:-1
    key:item-2, value:value-0, partition:1, offset:-1
    key:item-0, value:value-1, partition:0, offset:-1
    key:item-1, value:value-1, partition:1, offset:-1
    key:item-2, value:value-1, partition:1, offset:-1
    key:item-0, value:value-2, partition:0, offset:-1
    key:item-1, value:value-2, partition:1, offset:-1
    key:item-2, value:value-2, partition:1, offset:-1
    key:item-0, value:value-0, partition:0, offset:-1
    key:item-1, value:value-0, partition:1, offset:-1
    复制代码
    • pro.setProperty(ProducerConfig.ACKS_CONFIG, "-1");
    key:item-0, value:value-1, partition:0, offset:10009
    key:item-1, value:value-1, partition:1, offset:20017
    key:item-2, value:value-1, partition:1, offset:20018
    key:item-0, value:value-2, partition:0, offset:10010
    key:item-1, value:value-2, partition:1, offset:20019
    key:item-2, value:value-2, partition:1, offset:20020
    key:item-0, value:value-0, partition:0, offset:10011
    key:item-1, value:value-0, partition:1, offset:20021
    key:item-2, value:value-0, partition:1, offset:20022
    key:item-0, value:value-1, partition:0, offset:10012
    复制代码

    通过时间戳来定位消费数据偏移

    HashMap<TopicPartition, Long> tts = new HashMap<>();
    Set<TopicPartition> tps = consumer.assignment();
    while (tps.size() == 0) {
        consumer.poll(Duration.ofMillis(100));
        tps = consumer.assignment();
    }
    for (TopicPartition partition : tps) {
        //时间根据需要自己定义,自定义消费数据位置
        tts.put(partition, System.currentTimeMillis() - 24 * 3600 * 1000);
    }
    Map<TopicPartition, OffsetAndTimestamp> forTimes = consumer.offsetsForTimes(tts);
    
    for (TopicPartition partition : tps) {
        //通过取回的offset数据,通过consumer的seek方法,修正自己的消费偏移
        OffsetAndTimestamp offsetAndTimestamp = forTimes.get(partition);
        long offset = offsetAndTimestamp.offset();
        consumer.seek(partition, offset);
    }
    复制代码