深度剖析RabbitMQ可靠性消息投递以及实践方案

一般而言,如果你选择RabbitMQ,那肯定就是把可靠性放在第一位。毕竟,RabbitMQ可是金融行业消息队列的标配。如果把性能放在第一位,那毫无疑问,必须是Kafka。但是,可靠性毕竟是相对的,就拿大火的阿里云,AWS云,或者传统的IBM小型机,Oracle数据库,没有谁敢说自己可靠性100%,都是说几个9。所以,本文的目的很明确,就是尽可能的提高我们RabbitMQ的可靠性,从发送、存储、消费、集群、监控、告警等多个维度给出可行性方案,指导开发者以及运维人员获取更加可靠的消息投递,保障我们的业务系统安全、可靠、稳定的运行。
数据可靠性是和RabbitMQ节点、生产者、消费者以及服务器等息息相关的。本文比较长,大概分为如下几个段落:

  1. 确认机制
  2. 生产者
  3. 消费者
  4. 队列镜像
  5. 告警
  6. 监控和Metrics
  7. 健康检查

如下是一张RabbitMQ架构图,本文对可靠性的分析,会涉及到架构图中的方方面面:

1. 确认机制

当连接出现问题的时候,在客户端和服务端之间的消息可能正在投递中,还没有被Broker接收,它们可能正在被编码或者解码,或者一些其他的情况。在这种场景下,消息并没有被投递,那么它们是需要被重新投递以保障业务稳定性。确认机制让服务端和客户端知道什么时候需要做这些事情,它对于生产者和消费者保障数据安全是非常重要的。
确认机制能被用在两个方向:允许消费者告诉服务器(Broker)它已经收到了消息,也允许服务器告诉生产者它接收到了消息。前者就是我们常说的消费者Ack,后者就是我们常说的生产者Confirm。

1.1 生产者/消费者确认

生产者确认以及消费者确认接下来会单独的段落进行详细的介绍,并且有示例代码,这里就不过多的讲解了。

1.2 确认机制总结

确认机制的使用,能够保证最少一次(at least once)投递。如果没有确认机制,消息就非常可能会丢失,这时候只能保证最多一次(at most once )。至于恰好一次投递,目前还没有哪个中间件可以保证,毕竟分布式系统非常复杂,尤其是网络的不可控,不确定的因素太多太多。

2. 生产者

当使用确认机制的时候,生产者从连接或者channel故障中恢复过来时,会重发没有被Broker确认签收的消息。如此一来,消息就可能被重复发送,因为可能是由于网络故障等原因,Broker发送了确认,但是生产者没有收到而已。亦或者,消息压根就没有发送到Broker那里去。正因为生产者为了可靠性可能会重发消息,所以在消费者消费消息处理业务时,还需要去重,或者对接受到的消息做幂等处理(推荐幂等处理)。
生产者增加确认机制非常简单,channel开启confirm模式,然后增加监听即可:

// 选择确认机制

channel.confirmSelect();

// 确认消息监听

channel.addConfirmListener(new ConfirmListener() {

    @Override

    public void handleAck(long deliveryTag, boolean multiple) throws IOException {

        System.out.println("消息已经ack,tag: " + deliveryTag);

    }


@Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { // 对于消费者没有ack的消息,可以做一些特殊处理 System.out.println("消息被拒签,tag: " + deliveryTag); } });

说明:RabbitMQ还有事务机制(txSelect、txCommit、txRollback),也能保障消息的发送。不过事务机制是 「同步阻塞」
的,所以不推荐使用。而confirm模式是 「异步」
机制。如下图所示(图片摘自《Rabbit实战指南》,皮皮厮的书籍,非常值得一看),是事务机制以及confirm方式TPS性能对比,我们可以很明显的看到,事务机制是性能最差的:
在一些很重要的业务场景,我们还需要确保消息被正确路由到了队列。为了确保消息被正确路由到一个已知的队列,我们需要确保消息被正确的从交换器传递到了队列中,并且还需要确保目标队列至少有一个消费者。我们试想,如果一个队列都没有任何消费者,那发送的消息相当于石沉大海了。
前者,我们可以通过给交换器绑定一个备份交换器解决这个问题。比如一个direct类型的交换器,如果发送消息时路由KEY不匹配,那么这条消息就会进入备份交换器中,而不会被丢失:

Map argsMap = new HashMap();

argsMap.put("alternate-exchange", ALTER_EXCHANGE_NAME);

// map参数中alternate-exchange的值,就是申明绑定的备份交换器名称

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, argsMap);


RabbitMQ支持的4种交换器类型中,只有fanout是不存在路由不到队列的情况,因为它会自动路由到所有队列中,跟绑定KEY没有任何关系。所以,在满足你业务的前提下,笔者建议,尽可能使用fanout类型交换器。

后者,我们可以通过死信交换器(DLX,也被称为私信队列)来解决这个问题,假设一些消息没有被消费,那么它就会被转移到绑定的死信交换器上,对于这类消息,我们消费并处理死信队列即可:

Map argsMap = new HashMap();

// 死信交换器/死信队列

argsMap.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);

// 设置队列过期时间(第一次设置一个值后,以后不能设置一个更大的值)

argsMap.put("x-message-ttl", 60000);


channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true); // 死信的关系一定要在queue申明时指定,而不能在exchange申明时指定 channel.queueDeclare(QUEUE_NAME, true, false, false, argsMap);

3. 消费者

只有消费者确认的消息,RabbitMQ才会删除它,不确认就不会被删除。所以,在消费端,建议关闭自动确认机制。应该在收到消息处理完业务以后,手动确认消息。消费者手动确认实现代码如下:

DefaultConsumer consumer = new DefaultConsumer(channel) {

    @Override

    public void handleDelivery(String consumerTag, Envelope envelope,

                               AMQP.BasicProperties props, byte[] body){

        System.out.println("死信队列接受到的消息:" + new String(body));

        // 手动确认消息接受成功

        channel.basicAck(envelope.getDeliveryTag(), false);

        // channel.basicNack(envelope.getDeliveryTag(), false, false);

    }


}; // 推模式,并且关闭自动确认机制,即autoAck=false channel.basicConsume(QUEUE_NAME, false, consumer);

注意上面的basicAck方法:void basicAck(long deliveryTag, boolean multiple) 第二个参数multiple。要说明这个参数的含义,首先需要讲清楚一个概念 「deliveryTag」
,即投递消息唯一标识符,它是一个 「单调递增」
的Long类型正整数。假设此次basicAck的tag为123130,如果multiple=false,那么表示只确认签收这一条消息。如果multiple=true,那么表示确认签收tag小于或等于123130的所有消息。

「最大Tag值」:Delivery tag是一个64位长整型值,也就是说它的最大值是9223372036854775807(2^64-1),不过这个tag值是按channel划分的。也就是说,理论上每一个channel都可以发送2^64-1条消息。因此,我们认为在实际情况中,tag基本上不可能达到它的上限值。

前面提到,由于网络等一些原因引起了故障,就会导致消息被重发。因此,消费端一定要做好处理重复消息的准备,强烈建议在消费端实现 「幂等」
的业务逻辑。比如以支付送积分为例,那么,消息体中肯定有订单号这个业务属性唯一的ID。那么在消费这个消息送积分的业务代码中,需要根据这个订单号做幂等处理,即 「同一个订单号只能送一次积分」

在消费端还有一种情况,就是当前消费者认为它不能处理当前消息。因此,它就拒绝签收(basic.reject或者basic.nack)这个消息。那么生产者也需要监听这些消息并做特殊的业务处理。
RabbitMQ中消费者有3种签收消息方式:1、channel.basicAck(long deliveryTag, boolean multiple):确认签收消息;2、channel.basicNack(long deliveryTag, boolean multiple, boolean requeue):不确认签收消息,并设置是否重入队列中,并且可以批量不确定签收tag之前的所有还未ack的消息;3、channel.basicReject(long deliveryTag, boolean requeue):直接拒绝签收这一条消息,并设置是否重入队列中。

4. 队列镜像

为了防止在Broker中丢失消息,交换器、队列和消息都应该设置为持久化。除此以外,队列和消息还应该被复制,为了应对操作系统未及时fsync刷盘、Broker重启、Broker服务器硬件故障、或者Broker crash故障等问题。

集群节点提供了冗余能力,能容忍单节点故障。在RabbitMQ集群中,所有的定义都可以被复制,例如交换器、绑定关系、用户等。但是队列有所不同,默认只存在一个节点上(这一点和kafka很不一样,kafka的Topic可以设置多个副本)。不过,可以通过配置把队列 「镜像」
到多个节点上,从而让队列不会有单点故障的问题:
当节点发生故障时,如果这时候有镜像队列,还会自动发生新的选举,就能选举出新的Master。从而不会因为某个节点故障而导致队列不可用,而且整个过程对业务完全无感知。

不过需要注意的是,镜像并不能保证所有队列的高可用, 「排他性队列」
(Exclusive Queues)就不行。因为排他性队列生命周期与它的连接绑定在一起,当Broker节点发生故障重启后,排他性队列是会自动删除的,因此不能被镜像(Mirrored)!

5. 告警

这里我们主要说一下磁盘告警以及内存告警。因为出现下面两种情况时,RabbitMQ会停止从客户端网络socket中读数据:
1、内存使用达到配置上限;2、磁盘使用达到配置上限;
内存使用上限有3种设置方式:

## 设置节点可使用RAM百分比,超过这个百分比就会告警

vm_memory_high_watermark.relative = 0.4

## 设置节点可使用RAM的上限,单位为byte即字节

vm_memory_high_watermark.absolute = 1073741824

## RabbitMQ 3.6.0+,设置节点可使用RAM的绝对大小,它的优先级低于relative

vm_memory_high_watermark.absolute = 2GB

磁盘使用上限也有3种设置方式:

## 设置磁盘剩余空间阈值,当可用空间低于这个值就会触发告警

disk_free_limit.absolute = 51200


## with RabbitMQ 3.6.0+. disk_free_limit.absolute = 500KB disk_free_limit.absolute = 50MB disk_free_limit.absolute = 5GB
## 设置磁盘剩余空间为有效RAM的多少倍,比如当前可用RAM为2G,且设定这个参数为2。那么当磁盘可用空间低于2*2=4G时就会触发告警 disk_free_limit.relative = 2.0


由上可知:开启磁盘告警以及内存告警非常简单,需要说明的是,设置绝对值大小时,MB和M是不一样的,M即MiB表示 mebibytes (2^20 ,1,048,576 bytes),而MB表示 megabytes (10^6 ,1,000,000 bytes)。

只要满足这两种情况其中的一种情况,服务器就会临时阻塞连接,并且连接心跳也会断开。这时候我们通过rabbitmqctl或者RabbitMQ管理后台查看,所有网络连接都是 「blocking」
。如果我们是RabbitMQ集群,那么内存和磁盘告警就会影响整个集群。只要其中一个节点内存或者磁盘达到上限,整个集群的所有节点都会阻塞连接。
一些客户端包是支持连接阻塞提醒的,如果你使用的客户端包恰好也支持,建议基于此增加监控告警。此外,运维需要针对RabbitMQ集群服务器增加内存和磁盘使用率告警。假设RabbitMQ磁盘告警是只剩10%,那么运维设置的告警应该要大于10%,比如15%,从而能够提前发现隐患,提前介入处理问题,避免磁盘使用达到上限而引起线上问题,背个P0故障可不好看。

6. 监控和Metrics

生产环境我们应该有完善且合理的监控机制,从而做到防患于未然。监控能在问题还未暴露出来时,就能提前发现问题。监控最重要的事情就是对Metrics的采集和分析。我们把Metrics分为两大类: 「RabbitMQ的Metrics」
「基础设施的Metrics」

6.1 基础设施的Metrics

这块的监控,我们需要收集运行RabbitMQ节点的所有服务器,以及应用的一些metrics,包括但不限于:

  1. CPU统计情况;
  2. 内存使用率;
  3. 虚拟内存统计;
  4. RabbitMQ节点数据目录下磁盘剩余可用空间;
  5. 磁盘IO情况;
  6. 网络吞吐量(接收量、发送量、最大网络吞吐量等);
  7. 网络延迟情况(RabbitMQ集群所有节点以及客户端之间的网络延迟);
  8. 文件描述符;

对基础设施以及系统内核Metrics的监控工具非常多,而且都很成熟。比如:Prometheus、Datadog、Zabbix等。它们都能很好的收集Metrics信息,然后存储并可视化展示,并且可以自定义告警规则。

6.2 RabbitMQ的Metrics

接下来我们说一下对RabbitMQ集群本身的监控。RabbitMQ的管理后台UI暴露了节点很多metrcis信息,并且RabbitMQ还通过HTTP API把这些信息暴露出来方便我们二次开发,自定义监控系统就非常依赖它的HTTP API。HTTP API访问参考:curl -i -u root:root123 ‘http://localhost:15672/api/overview’。
我们先说RabbitMQ管理后台,这个后台做的还是很不错的。但是如果用于监控的话,就显得有些不足了:

  1. 强依赖被监控的系统(应该要解耦被监控的系统和监控系统);
  2. 只存储最近一天的数据(超过一天的数据就没有了);
  3. 用户接口不够强大;
  4. 它的权限系统是依赖RabbitMQ权限系统的;

可喜的是,RabbitMQ从3.8版本开始,已经支持Prometheus和Grafana了,也推荐应用在生产环境上。接下来说一下RabbitMQ监控系统需要关注RabbitMQ的哪些metrics。

6.2.1 集群Metrics

我们可以通过 ‘http://localhost:15672/api/overview’ 得到RabbitMQ集群相关信息,。结果(部分字段)如下:

{

  ... ...

  "message_stats": {

      "ack": 212,

      "ack_details": {

          "rate": 0.0

      },

      "confirm": 143,

      "confirm_details": {

          "rate": 0.0

      },

      "publish": 323,

      ... ...

  },

  "queue_totals": {

      "messages": 197,

      "messages_details": {

          "rate": 0.0

      },

      ...

  },

  "object_totals": {

      "channels": 2,

      "connections": 2,

      "consumers": 1,

      "exchanges": 24,

      "queues": 10

  },

  ... ...

}

这里我们需要关注的一些主要指标有:

  • message_stats.ack:消费者确认接受消息的数量;
  • message_stats.confirm:生产者得到Broker已经确认的消息(ConfirmListener中的handleAck);
  • message_stats.publish:最近发布的消息总量(它只有最近的数据,没有RabbitMQ集群整个生命周期的数据。前面两个指标ack和confirm一样,也都是最近的数据。如果RabbitMQ闲置一段时间,这几个值都会归零);
  • object_totals.channels:channel数量,等价于RabbitMQ管理后台 「Channels」
    页面中交换器数量;

  • object_totals.connections:连接数量,等价于RabbitMQ管理后台 「Connections」
    页面中交换器数量;

  • object_totals.consumers:消费者数量;
  • object_totals.exchanges:交换器数量,等价于RabbitMQ管理后台 「Exchanges」
    页面中交换器数量;

  • object_totals.queues:队列数量,等价于RabbitMQ管理后台 「Queues」
    页面中队列数量;

说明:object_totals这个节点下几个字段的值,我们也可以在RabbitMQ管理后台 「Overview」
页面的 「Global counts」
中得到,如下图所示:

6.2.2. 节点Metrics

首先,我们通过 ‘http://localhost:15672/api/nodes/’ 可以获取到RabbitMQ集群中所有节点的详细信息,它返回的是一个JSON数组。也可以通过 ‘http://localhost:15672/api/nodes/{nodeName}’ 获取某个节点信息(这里的nodeName就是第一个URL结果中字段name的值,例如:”name”: “rabbit@afeideMacBook-Pro”),所以这个URL返回的是一个JSON对象。结果(部分字段)如下:

{

  "partitions": [],

  "os_pid": "36637",

  "fd_total": 4864,

  "sockets_total": 4285,

  "mem_limit": 3435973836,

  "mem_alarm": false,

  "disk_free_limit": 50000000,

  "disk_free_alarm": false,

  "proc_total": 1048576,

  "rates_mode": "basic",

  "uptime": 115368753,

  "run_queue": 1,

  "processors": 4,

  ... ...

}

这里我们需要关注的一些主要指标有:

  • mem_used:已经使用的内存;
  • mem_limit:限制最大允许使用的内存;
  • mem_alarm:bool类型值,是否开启了内存告警;
  • disk_free_limit:磁盘还剩下多少空间告警阈值,也就是说当磁盘可用空间小于这个值时就会告警;
  • disk_free_alarm:bool类型值,是否开启了磁盘告警;
  • fd_total:总计可用文件描述符数量;
  • fd_used:已经使用的文件描述符数量;
  • sockets_total:总计可用sockets数量;
  • sockets_used:已经使用sockets数量;

6.2.3 队列Metrics

队列的metrics都可以通过’http://localhost:15672/api/queues/afei/queue-normal-afei’获取。得到的json结果如下:

{

  "consumer_details": [],

  "arguments": {

      "x-dead-letter-exchange": "exchange-dlx-afei",

      "x-message-ttl": 3000

  },

  "auto_delete": false,

  ... ...

  "idle_since": "2020-05-05 7:45:58",

  "incoming": [],

  "memory": 19900,

  "message_bytes": 0,

  ... ...

  "state": "running",

  "vhost": "/afei"

}

我们需要关注的一些主要指标有:

  • memory:使用的内存;
  • messages:未被确认的消息总数 + 准备投递的消息总数;
  • messages_ready:准备投递的消息总数;
  • messages_unacknowledged:未被确认的消息总数;
  • state:当前队列的状态,running状态表示正常;
  • idle_since:表示当前队列有多久没有被消费者消费了,如果这个时间相比当前时间是很久以前,很有可能说明当前队列没有生产者已经废弃。

6.2.4 应用Metrics

现在大部分系统都是分布式的,在这样的系统中,当某个组件出问题时,通常很难立即定位问题。所以,系统的每个部分,包括应用本身,都应该被监控起来。
一些基础设施级别和RabbitMQ的Metrics能看出系统当前有异常行为和问题,但是,还不能定位根据原因。例如,通过监控我们很容易知道某个节点磁盘空间不足,但是很难麻烦就知道为什么不足。所以,我们需要应用Metris信息来协助我们排查这样的问题:区分哪个消息生产者速度异常、哪个消息者出现重复性的失败、消费者速度赶不上消息产生的速度等。
应用程序跟踪的Metrics可能是特定系统的,但是也有一些Metrics与大多数系统都是有关联的,例如:Connection opening rate、Channel opening rate、Connection failure (recovery) rate、Publishing rate、Delivery rate、Positive delivery acknowledgement rate、Negative delivery acknowledgement rate、Mean/95th percentile delivery processing latency… …

7. 健康检查

下面推荐一些RabbitMQ监控检查的命令:

  • rabbitmq-diagnostics -q ping:如果节点没有任何问题,那么返回Ping succeeded。
  • rabbitmq-diagnostics -q status:会显示当前RabbitMQ节点即broker的很多信息,例如内存信息、磁盘信息、虚拟内存信息、告警信息、文件描述符等。
  • rabbitmq-diagnostics -q alarms:该命令可以检查RabbitMQ本地以及集群节点是否有告警信息,如果没有,那么返回这样的信息:Node rabbit@afeideMacBook-Pro reported no alarms, local or clusterwide;

说明: 「rabbitmq-diagnostics」
这个命令还有很多其他的用法,我们可以通过rabbitmq-diagnostics –help查看它还能监控和检查哪些Metrics。
最后推荐一些第三方监控工具,如下表格所示,按照字母排序,排名不分先后。这些工具能力上各有千秋,不过一般都是既能收集基础设施级别的Metrics,也能收集RabbitMQ的Metrics。当然,笔者能力有限,可能还有其他一些好用的工具并没有列举出来:

名称 简介 地址
AppDynamics RabbitMQ监控扩展,从RaabitMQ管理API上收集RabbitMQ的metrics并上传到AppDynamics Controller上 https://github.com/Appdynamics/rabbitmq-monitoring-extension
AWS CloudWatch 把RabbitMQ集群的Metrics信息推送到AWS CloudWatch上的RabbitMQ插件 https://github.com/noxdafox/rabbitmq-cloudwatch-exporter
DataDog 通过Datadog代理监控RabbitMQ https://github.com/DataDog/integrations-core/tree/master/rabbitmq
Prometheus 通过Prometheus暴露RabbitMQ一些核心Metrics https://github.com/rabbitmq/rabbitmq-prometheus
Zabbix 通过Zabbix监控RabbitMQ https://git.zabbix.com/projects/ZBX/repos/zabbix/browse/templates/app/rabbitmq_http