Kafka 的事务到底长啥样?

  • 该方案要求下游系统支持幂等操作,限制了 Kafka 的适用场景

  • 实现门槛相对较高,需要用户对 Kafka 的工作机制非常了解

  • 对于 Kafka Stream 而言,Kafka 本身即是自己的下游系统,但 Kafka 在 0.11.0.0 版本之前不具有幂等发送能力

因此,Kafka 本身对Exactly Once语义的支持就非常必要。

Kafka 幂等性

在说 Kafka 的事务之前,先要说一下 Kafka 中幂等( Idempotent )的实现。幂等和事务是 Kafka 0.11.0.0 版本引入的两个特性,以此来实现 EOS 语义。

Kafka 幂等性是 Producer 端的特性,为了实现生产端幂等性,Kafka 引入了 Producer ID(即PID)和 Sequence Number。

  • PID:每个新的 Producer 在初始化的时候会被分配一个唯一的 PID,这个PID 对用户完全是透明的。

  • Sequence Numbler:对于每个 PID,该 Producer 发送到每个 Partition 的 数据 都有对应的序列号,这些序列号是从0开始单调递增的。

Broker 端在缓存中保存了这 Sequence Numbler ,对于接收的每条消息,如果其序号比 Broker 缓存中序号大 于1则接受它,否则将其丢弃。这样就可以实现了消息重复提交了。幂等涉及的参数是 enable.idempotence,默认为 false,开启需要设置为 ture。

但是,这种只能保证单个 Producer 对于 单会话单 Partition  的 Exactly Once 语义。不能保证同一个 Producer 一个 topic 不同的 Partition 幂等。

Kafka 事务性

Kafka 事务支持

正是因为 Kafka Idempotent 不提供跨多个 Partition 和跨会话场景下的保证,因此,我们是需要一种更强的事务保证,能够原子处理多个 Partition 的写入操作,数据要么全部写入成功,要么全部失败,这就是 Kafka Transactions,即Kafka 事务。

Kafka 事务 API

producer提供了initTransactions,beginTransaction,sendOffsetsToTransaction,commitTransaction,abortTransaction 五个事务方法。

    /**

* 初始化事务。需要注意的有:

* 1、前提

* 需要保证transation.id属性被配置。

* 2、这个方法执行逻辑是:

*   (1)Ensures any transactions initiated by previous instances of the producer with the same

*      transactional.id are completed. If the previous instance had failed with a transaction in

*      progress, it will be aborted. If the last transaction had begun completion,

*      but not yet finished, this method awaits its completion.

*    (2)Gets the internal producer id and epoch, used in all future transactional

*      messages issued by the producer.

*

*/

public void initTransactions();

/**

* 开启事务

*/

public void beginTransaction() throws ProducerFencedException ;

/**

* 为消费者提供的在事务内提交偏移量的操作

*/

public void sendOffsetsToTransaction(Map offsets,

String consumerGroupId) throws ProducerFencedException ;

/**

* 提交事务

*/

public void commitTransaction() throws ProducerFencedException;

/**

* 放弃事务,类似回滚事务的操作

*/

public void abortTransaction() throws ProducerFencedException ;

相关属性配置

使用 Kafka 的事务 API 时的一些注意事项:

  • 需要消费者的自动模式设置为 false,并且不能子再手动的进行执行consumer#commitSync或者consumer#commitAsyc。

  • 生产者配置  transactional.id  属性。

  • 生产者不需要再配置 enable.idempotence,因为如果配置了transaction.id,则此时 enable.idempotence 会被设置为true。

  • 消费者需要配置 isolation.level 属性,有两个可选值:”read_committed read_uncommitted “,默认” read_uncommitted “。