消息队列的消费语义和投递语义

OK,回到我们的正题。

所谓消费语义,指的就是如下三种情况

  • 如何保证消息最多消费一次

  • 如何保证消息至少消费一次

  • 如何保证消息恰好消费一次

其实类似还有一个投递语义

  • 如何保证消息最多投递一次

  • 如何保证消息至少投递一次

  • 如何保证消息恰好投递一次

说句实在话,其实还是老问题,只是换了一种问法!

OK,开始我们的正文

正文

我们先做如下约定

  • Producer 代表生产者

  • Consumer 代表消费者

  • Message Queue 代表消息队列

投递语义

我们先从投递语义开始讲起,因为要先把这个概念讲明白了,才能讲消费语义。恰巧, kafka 实现了这三种语义,我们以 kafka 来说明。

如何保证消息最多投递一次?

简单,就是我已经投出去了,收没收到不管了,会存在消息丢失。

我们在初始化 Producer 时可以通过配置 request.required.acks 不同的值,来实现不同的发送模式。

这里将 request.required.acks 设为0,意思就是 Producer 不等待Leader确认,只管发出即可;最可能丢失消息。如果丢了消息,就是投递0次。如果没丢,就是投递1次。符合最多投递一次的含义。

如何保证消息至少投递一次?

这里将 request.required.acks 设为-1。 ProducerkafkaLeader(主) 节点发送消息后,会等 follower(从) 节点同步完数据以后,再给 Producer 返回ACK确认消息。

但是这里是有几率出现重复消费的问题的。

例如, kafka 保存消息后,发送ACK前宕机, Producer 认为消息未发送成功并重试,造成数据重复!

那么,在这种情况下,就会出现大于1次的投递情况,符合至少投递一次的含义。

如何保证消息恰好投递一次?

kafka 在0.11.0.0版本之后支持恰好投递一次的语义。

我们将 enable.idempotence 设置为ture,此时就会默认把 request.required.acks 设为-1,可以达到恰好投递一次的语义。

如何做到的?

为了实现 Producer 的幂等语义, kafka 引入了Producer ID(即PID)和Sequence Number。

kafka 为每个 Producer 分配一个pid,作为该 Producer 的唯一标识。

Producer 会为每一个消息

维护一个单调递增的seq。

类似的,
Message Queue 也会为每个消息

记录下最新的seq。

当req_seq == message_seq+1时, Message Queue 才会接受该消息。因为:

  • (1)消息的seq比 Message Queue 的seq大一以上,说明中间有数据还没写入,即乱序了。

  • (2)消息的seq比 Message Queue 的seq小,那么说明该消息已被保存。

消费语义

这里我们还是做一个定义如下所示

  • consumer.poll() 表示消费者获取消息内容

  • processMsg(message) 表示下游系统进行消费消息

  • consumer.commit() 表示消费者往消息队列提交确认信息,消息队列接到确认消息,删除该消息。

注意了,我是以 processMsg 函数,即处理消息的过程,定义为消费消息。

如何保证消息最多消费一次?

Producer :满足最多投递一次的语义即可,即只管发消息,不需要等待消息队列返回确认消息。

Message Queue :接到消息后往内存中一放就行,不用持久化存储。

Consumer :拉取到消息以后,直接给消息队列返回确认消息即可。至于后续消费消息成功与否,无所谓的。即按照以下顺序执行

consumer.poll();
consumer.commit();
processMsg(message);

如何保证消息至少消费一次?

Producer :满足至少投递一次语义即可,即发送消息后,需要等待消息队列返回确认消息。如果超时没收到确认消息,则重发。

Message Queue :接到消息后,进行持久化存储,而后返回生产者确认消息。

Consumer :拉取到消息后,进行消费,消费成功后,再返回确认消息。即按照如下顺序执行

consumer.poll();
processMsg(message);
consumer.commit();

由于这里 Producer 满足的是至少投递一次语义,因此消息队列中是有重复消息的。所以我们的 Consumer 会出现重复消费的情形!

如何保证消息恰好消费一次?

在保证至少消费一次的基础上, processMsg 满足幂等性操作即可。

如何保证幂等性操作?

老问题了,比如有状态的消息啊。比如唯一表啊。大家搜一搜,一大堆答案,不想重复说了。

总结

本文讲的是消息队列的消费语义和投递语义的含义,希望大家有所收获。

最后,我就是带薪上厕所了,羡慕不!