Kafka 消息幂等以及事务原理

Kafka

实习过程中接触到了 Kafka 分布式消息队列,对这个消息队列也查了很多资料,了解到 Kafka 之所以支持很高QPS的原因以及内部实现原理,但是今天不聊这个,主要来了解一下 Kafka 幂等性以及事务,主要从 Kafka 的幂等性、事务性以及 Kafka Exactly Once 语义的实现这几个方面进行总结。

Kafka Producer 幂等性

Kafka Producer 幂等性主要是规定无论 Producer 提交多少次消息,一条消息只会被持久化一次,不会出现消息重复消费或者消息丢失的情况。

消息重复消费或者消息丢失

消息重复消费和消息丢失是很严重的事情,比如在支付金融业务中,需要保证数据的精确性,如果在支付业务中出现了重复执行业务或者丢失消息的情况会造成很严重的后果。所以幂等性是很重要的。

Kafka Producer 的幂等性是有条件的:

  • 只能保证单会话的幂等性,如果出现了 Producer 意外挂掉了重启是不能保证幂等性
  • 幂等性不能跨多个 Topic-Partition,只能保证单个 partition 内的幂等性,当涉及多个 Topic-Partition 时,这中间的状态并没有同步。

Kafka Producer 幂等的原理

要实现幂等性,只需要在配置中加入

1
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

Kafka 在客户端做了很好的封装,在 Kafka 中是通过管理 PID 和 sequence numbers 来实现幂等的

  • PID:用来标识每个 producer client 的生产者ID
  • squence numbers,client 发送的每条消息都会带相应的序列号,Server 端就是根据这个值来判断数据是否重复。

Kafka Producer 幂等的作用范围是 topic-partition,squence number 从0开始自增,在作用范围内,producer生产消息时,每条消息会有一个对应的squence number,同时 broker 上也会维护 pid-seq 的映射,并且每次 commit 都会更新映射,当broker收到生产者的消息时,有三种情况:

  • 当消息的squence number等于broker维护的squence number + 1,表示消息有序且第一次消费
  • 当消息的squence number小于或等于broker维护的squence number,表示重复消费额
  • 当消息的squence number等于broker维护的squence number + n(n > 1),表示存在消息丢失

2.png

Kafka Producer 幂等的实现

1.png

这里重点关注幂等性相关的内容,首先,KafkaProducer启动时,会初始化一个 TransactionManager 实例,它的作用有以下几个部分:

  • 记录本地的事务状态(事务性时必须)
  • 记录一些状态信息以保证幂等性,比如:每个 topic-partition 对应的下一个 sequence numbers 和 last acked batch(最近一个已经确认的 batch)的最大的 sequence number 等;
  • 记录 ProducerIdAndEpoch 信息(PID 信息)。

幂等性时,Producer 的发送流程如下:

  1. 调用kafkaProducer的send方法将数据添加到 RecordAccumulator 中,添加时会判断是否需要新建一个 ProducerBatch,这时这个 ProducerBatch 还是没有 PID 和 sequence number 信息的;
  2. Producer 后台发送线程 Sender,在 run() 方法中,会先根据 TransactionManager 的 shouldResetProducerStateAfterResolvingSequences() 方法判断当前的 PID 是否需要重置,重置的原因是因为:如果有topic-partition的batch已经超时还没处理完,此时可能会造成sequence number 不连续。因为sequence number 有部分已经分配出去了,而Kafka服务端没有收到这部分sequence number 的序号,Kafka服务端为了保证幂等性,只会接受同一个pid的sequence number 等于服务端缓存sequence number +1的消息,所有这时候需要重置Pid来保证幂等性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
synchronized boolean shouldResetProducerStateAfterResolvingSequences() {
/**
* 是否是事务即配置了Tid
* 如果是事务则不需重置Pid
*/
if (isTransactional())
// We should not reset producer state if we are transactional. We will transition to a fatal error instead.
return false;
for (Iterator<TopicPartition> iter = partitionsWithUnresolvedSequences.iterator(); iter.hasNext(); ) {
TopicPartition topicPartition = iter.next();
if (!hasInflightBatches(topicPartition)) {//没有该分区的消息在发送中
// The partition has been fully drained. At this point, the last ack'd sequence should be once less than
// next sequence destined for the partition. If so, the partition is fully resolved. If not, we should
// reset the sequence number if necessary.
/**
* 判断SequenceNo是否连续
* 如果连续的,就不需要重置Pid
*/
if (isNextSequence(topicPartition, sequenceNumber(topicPartition))) {
// This would happen when a batch was expired, but subsequent batches succeeded.
iter.remove();
} else {
// We would enter this branch if all in flight batches were ultimately expired in the producer.
log.info("No inflight batches remaining for {}, last ack'd sequence for partition is {}, next sequence is {}. " +
"Going to reset producer state.", topicPartition, lastAckedSequence(topicPartition), sequenceNumber(topicPartition));
return true;
}
}
}
return false;
}
  1. Sender线程调用maybeWaitForProducerId()方法判断是否要申请Pid,如果需要,会阻塞直到成功申请到Pid

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    ProducerIdAndEpoch producerIdAndEpoch = null;       
    boolean isTransactional = false;
    if (transactionManager != null) {//有事务或者启用幂等
    //事务是否允许向此分区发送消息
    if (!transactionManager.isSendToPartitionAllowed(tp))
    break;

    producerIdAndEpoch = transactionManager.producerIdAndEpoch();
    if (!producerIdAndEpoch.isValid())
    // we cannot send the batch until we have refreshed the producer id
    break;
    //是否支持事务
    isTransactional = transactionManager.isTransactional();

    /**
    * 如果该分区的前面还有没发送完成的Batch,则需要跳过该分区的Batch,等待之前batch发送完成
    */
    if (!first.hasSequence() && transactionManager.hasUnresolvedSequence(first.topicPartition))

    break;

    /**
    * 该分区存在发送中的Batch,该Batch有Sequence,和first的不相等。则跳过、
    *
    * 也即first是个重试的Batch(因为它有Sequence),需要等待该分区发送中的Batch完成
    */
    int firstInFlightSequence = transactionManager.firstInFlightSequence(first.topicPartition);
    if (firstInFlightSequence != RecordBatch.NO_SEQUENCE && first.hasSequence()
    && first.baseSequence() != firstInFlightSequence)

    break;
    }

    ProducerBatch batch = deque.pollFirst();
    /**
    * 校验当前batch是否已经设置了Sequence
    * 如果没有,则需要设置batch的Sequence,增加对应分区的Next Sequence,将batch加入到inflightBatchesBySequence中
    */
    if (producerIdAndEpoch != null && !batch.hasSequence()) {

    //设置Batch的sequenceNumber 和isTransactional
    batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
    //增加该分区的sequenceNumber,增加值为Batch中消息的个数
    transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
    log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " +
    "{} being sent to partition {}", producerIdAndEpoch.producerId,
    producerIdAndEpoch.epoch, batch.baseSequence(), tp);
    //加入到发送队列中
    transactionManager.addInFlightBatch(batch);
    }
    batch.close();//关闭此batch,不可追加消息
    size += batch.records().sizeInBytes();//累计size
    ready.add(batch);//加到集合中,最后一起返回出去
    batch.drained(now);//更新drainedMs时间戳
  2. 最后调用sendProduceRequest方法将消息发送出去

Kafka 事务

Kafka 事务是Kafka 0.11.0.0引入的新特性,Kafka事务与数据库事务类似,在对事务进行一系列生产消费操作能保证原子性,同时成功或者同时回滚失败回滚。

Kafka 为什么引入事务

Kafka 需要将生产者生产以及消费者提交位点封装成一个原子操作,能够实现要么同时成功,要么同时失败回滚。如果没有原子性会导致消费者在处理消息和提交位点之间的某个时间挂掉了而出现重复消费的问题。

事务操作API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//producer提供的事务方法
/**
* 初始化事务。需要注意的有:
* 1、前提
* 需要保证transation.id属性被配置。
* 2、这个方法执行逻辑是:
* (1)Ensures any transactions initiated by previous instances of the producer with the same
* transactional.id are completed. If the previous instance had failed with a transaction in
* progress, it will be aborted. If the last transaction had begun completion,
* but not yet finished, this method awaits its completion.
* (2)Gets the internal producer id and epoch, used in all future transactional
* messages issued by the producer.
*
*/
public void initTransactions();

/**
* 开启事务
*/
public void beginTransaction() throws ProducerFencedException ;

/**
* 为消费者提供的在事务内提交偏移量的操作
*/
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException ;

/**
* 提交事务
*/
public void commitTransaction() throws ProducerFencedException;

/**
* 放弃事务,类似回滚事务的操作
*/
public void abortTransaction() throws ProducerFencedException ;

Kafka 幂等性和事务性的关系

事务属性实现前提是幂等性,即在配置事务属性transaction id时,必须还得配置幂等性;但是幂等性是可以独立使用的,不需要依赖事务属性。

幂等性引入了Porducer ID

事务属性引入了Transaction Id属性。

enable.idempotence = true,transactional.id不设置:只支持幂等性。

enable.idempotence = true,transactional.id设置:支持事务属性和幂等性

enable.idempotence = false,transactional.id不设置:没有事务属性和幂等性的kafka

enable.idempotence = false,transactional.id设置:无法获取到PID,此时会报错

参考资料