RocketMQ最佳实践,就看这一篇!

点击上方蓝色字关注我们~
RocketMQ是阿里开源的消息队列框架,如今也已成为Apache顶级项目,RockerMQ是一个非常优秀的框架,现在大部分互联网公司使用的消息队列也是RocketMQ,在我们使用的过程中,如果能一开始就给你最佳实践,可以避免走一些弯路,甚至你看完之后可以自身检查下你们是不是这样使用,没有的话可以进行适当的调整,这篇文章应该能够帮助你更好的使用RockerMQ。
1
〓Producer最佳实践
1、Topic
一个应用尽可能用一个 Topic,消息子类型用 tags 来标识,tags 可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用 tags 在 broker 做消息过滤:message.setTags(“TagA”)。
2、Key
每个消息在业务层面的唯一标识码,要设置到 keys 字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic、key来查询这条消息内容,以及消息被谁消费。
每个消息在业务层面的唯一标识码要设置到keys字段,方便将来定位消息丢失问题。
由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。
// 订单Id
String orderId = “20034568923546”;
message.setKeys(orderId);
3、日志
消息发送成功或者失败,要打印消息日志,务必要打印 sendresult 和 key 字段。
send消息方法只要不抛异常,就代表发送成功。
发送成功会有多个状态,在sendResult里定义。
4、重发
对于消息不可丢失应用,务必要有消息重发机制。
例如:消息发送失败,存储到数据库,能有定时程序尝试重发或者人工触发重发。
5、sendOneWay
某些应用如果不关注消息是否发送成功,请直接使用sendOneWay方法发送消息。
对可靠性要求并不高,例如日志收集类应用,此类应用可以采用oneway形式调用。
1
〓 Consumer最佳实践
1、幂等
消费过程要做到幂等(即消费端去重)。
RocketMQ目前无法避免消息重复,所以如果业务对消费重复非常敏感,务必要在业务层面去重,有以下几种去重方式:
a).将消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等,消费之前判断是否在Db或Tair(全局KV存储)中存在,如果不存在则插入,并消费,否则跳过。
b). 用业务层面的状态机去重。
2、批量消费
尽量使用批量方式消费方式,可以很大程度上提高消费吞吐量。
某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过设置 consumer的 consumeMessageBatchMaxSize 返个参数,默认是 1,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。
3、 跳过非重要消息
发生消息堆积时,如果消费速度一直追不上发送速度,如果业务对数据要求不高的话,可以选择丢弃不重要的消息。
例如,当某个队列的消息数堆积到100000条以上,则尝试丢弃部分或全部消息,这样就可以快速追上发送消息的速度。
示例代码如下:
public ConsumeConcurrentlyStatus consumeMessage(
List msgs,
ConsumeConcurrentlyContext context) {
long offset = msgs.get(0).getQueueOffset();
String maxOffset =
msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET);
long diff = Long.parseLong(maxOffset) - offset;
if (diff > 100000) {
// TODO 消息堆积情况的特殊处理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// TODO 正常消费过程
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
4、提高消费并行度
a). 同一个ConsumerGroup下,通过增加Consumer实例数量来提高并行度,超过订阅队列数的Consumer实例无效。
可以通过加机器,或者在已有机器启动多个进程的方式。
b). 提高单个Consumer的消费并行线程,通过修改以下参数:
consumeThreadMin consumeThreadMax
5、优化每条消息消费过程
举例如下,某条消息的消费过程如下:
-
根据消息从 DB 查询【数据 1】
-
根据消息从 DB 查询【数据 2】
-
复杂的业务计算
-
向 DB 插入【数据 3】
-
向 DB 插入【数据 4】
这条消息的消费过程中有4次与 DB的 交互,如果按照每次 5ms 计算,那么总共耗时 20ms,假设业务计算耗时 5ms,那么总过耗时 25ms,所以如果能把 4 次 DB 交互优化为 2 次,那么总耗时就可以优化到 15ms,即总体性能提高了 40%。
所以应用如果对时延敏感的话,可以把DB部署在SSD硬盘,相比于SCSI磁盘,前者的RT会小很多。
6、日志
消费时记录日志,以便后续定位问题。
如果消息量较少,建议在消费入口方法打印消息,消费耗时等,方便后续排查问题。
public ConsumeConcurrentlyStatus consumeMessage(
List msgs,
ConsumeConcurrentlyContext context) {
log.info("RECEIVE_MSG_BEGIN: " + msgs.toString());
// TODO 正常消费过程
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
如果能打印每条消息消费耗时,那么在排查消费慢等线上问题时,会更方便。
〓其他配置
1、线上应该关闭autoCreateTopicEnable,即在配置文件中将其设置为false。
RocketMQ在发送消息时,会首先获取路由信息。如果是新的消息,由于MQServer上面还没有创建对应的Topic,这个时候,如果上面的配置打开的话,会返回默认TOPIC的(RocketMQ会在每台broker上面创建名为TBW102的TOPIC)路由信息,然后Producer会选择一台Broker发送消息,选中的broker在存储消息时,发现消息的topic还没有创建,就会自动创建topic。
后果 就是:以后所有该TOPIC的消息,都将发送到这台broker上,达不到负载均衡的目的。
所以基于目前RocketMQ的设计, 建议 关闭自动创建TOPIC的功能,然后根据消息量的大小,手动创建TOPIC。
2、JVM选项
推荐使用最新发布的JDK 1.8版本。通过设置相同的Xms和Xmx值来防止JVM调整堆大小以获得更好的性能。
简单的JVM配置如下所示:
-server -Xms8g -Xmx8g -Xmn4g
如果您不关心RocketMQ Broker的启动时间,还有一种更好的选择,就是通过“预触摸”Java堆以确保在JVM初始化期间每个页面都将被分配。
那些不关心启动时间的人可以启用它: -XX:+AlwaysPreTouch
禁用偏置锁定可能会减少JVM暂停, -XX:-UseBiasedLocking
至于垃圾回收,建议使用带JDK 1.8的G1收集器。