博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RocketMQ高阶业务问题及解决方案
阅读量:4131 次
发布时间:2019-05-25

本文共 3350 字,大约阅读时间需要 11 分钟。

RocketMq全链路消息零丢失方案

  • 发送消息到mq零丢失: 事务消息
  • Broker 存储消息零丢失:同步刷盘+主从机制
  • Consumer 消费消息零丢失:手动提交offset + 自动故障转移

Broker消息零丢失方案:同步刷盘 + Raft协议主从同步

Broker 是负责存储消息的,怎么保证消息发送到Broker后,一定不会丢失呢?

刷盘失败

首先RocketMq一般情况下,为了保证高吞吐量,使用的是异步刷盘策略。但是这种策略会出现消息写入os cache成功,但是异步写入磁盘的时候失败了。那么这条消息就丢失了。 所以需要改成同步刷盘,使用这种策略后,只要Broker 告诉我们消息发送成功,那么消息就一定被写入磁盘了。

磁盘损坏

只要消息写入到磁盘,消息就一定不会丢失吗? 显然不是的,如果磁盘文件损坏的话,这些消息也就丢失了。 所以必须使用 Broker 主从架构,也就是说让一个 Master Broker 有一个 Slave Broker去同步主节点的数据,而一条消息写入成功,必须让Slave Broker也写入成功,保证数据有多个副本冗余。 这样的话,就算Master Broker的磁盘损坏了,也还有Slave Broker的数据

Consumer消息零丢失方案:手动提交offset + 自动故障转移

Consumer消费消息的时候,理论上将也是有可能丢失消息的。 比如当 Consumer 收到消息后,还没进行业务处理,就直接返回成功,提交offset,接着Consumer 就崩溃了,业务还没处理完,Broker 收到 offset 却以为 Consumer 消费成功了,那么这条消息就丢失了。 所以,在RocketMq中,是先处理业务,然后最后在返回CONSUME_SUCCESS,这样的话,即使处理业务的时候,消费者挂了,只要没返回CONSUME_SUCCESS,Broker都认为这个消息还没被消费,还会再次push。 当Broker 感知到消费者挂掉的时候,它会把该机器没处理完的消息,交给消费组里的另一台机器去消费,这种方式实现了故障转移。

顺序消息

通常情况下MQ的消息是无序的,因为MQ会根据算法把消息发送到不同的MessageQueue,Consumer也会开启多线程去进行消费,因此并不难保证消息有序。 但是,在一些场景,我们又需要消息有序。例如订单create update delete这几个状态要发送消息给其他业务方,业务方希望消息严格按照订单这几个状态的顺序来发送,否则可能会出现先更新订单,发下订单不存在的情况。

同一个MessageQueue

如果要想保证消息有序,首先要让同一个订单的消息都进入到同一个MessageQueue中,MessageQueue是先进先出的,可以保证订单的消息在该队列中有序。 那么如何让同一个订单的消息都进入到同一个MessageQueue中呢? RocketMq提供了MessageQueueSelector类,我们可以按照订单ID对MessageQueue的数量取模,然后发送消息的时候指定这个MessageQueue。

Consumer单线程消费

虽然我们保证了同一个订单ID下的消息都在同一个MessageQueue中,但是Consumer默认是开启多线程消费的,如果消费订单创建消息的时候超时了,那么还是不能保证消息有序。 RocketMq提供了MessageListenerOrderly监听器,该类可以保证对每一个MessageQueue都使用一个线程去消费。底层是通过ConcurrentHashMap来加锁,使得同一时间只有一个线程可以消费。

public class MessageQueueLock {    private ConcurrentMap
mqLockTable = new ConcurrentHashMap
(); public Object fetchLockObject(final MessageQueue mq) { Object objLock = this.mqLockTable.get(mq); if (null == objLock) { objLock = new Object(); Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock); if (prevLock != null) { objLock = prevLock; } } return objLock; }}复制代码
//ConsumeMessageOrderlyService.java    final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);            synchronized (objLock) {            //...            }复制代码

Consumer消费失败怎么办

思考一个问题,如果consumer消费的时候失败了,我们返回重试,这时候消息的顺序就乱了,这种情况要如何处理呢? RocketMQ提供了

SUSPEND_CURRENT_QUEUE_A_MOMENT状态,当返回这个状态后,MQ会暂停一段时间再消息,不会把消息放入重试队列。

延迟消息

RocketMQ提供了延迟消息的功能,非常方便。其实它的实现原理就是给延迟的消息新开一个队列。

  • 消息生产者发送消息,如果发送的消息DelayTimeLevel大于0,则改变消息主题为SCHEDULE_TOPIC_XXXX,消息的队列为DelayTimeLevel-1。
  • 消息存储到SCHEDULE_TOPIC_XXXX上,把原有主题设置成属性。
  • 定时任务DeliverDelayedMessageTimerTask每隔1秒从SCHEDULE_TOPIC_XXXX获取消息。一个 task 处理一个级别的延时消息
  • 根据消息的属性重新创建消息,并恢复原主题TopicTest、原消息队列ID,清除DelayTimeLevel属性存入Commitlog中,供消费者消费。

MQ消息中有百万积压怎么处理

假设在订单场景中,我们的消费者挂掉了,而订单量是很巨大的,在短时间内就堆积了几百万条消息,这种情况该怎么处理呢?

  • 根据MessageQueue的数量,扩充消费者机器 需要注意的是,机器和线程数量增大后,可能会对数据库造成成倍的压力!
  • 增加消费者线程数
  • 如果不能增加机器,则修改代码,新增一个 Topic,把积压的消息写入新的Topic中,部署多台Consumer去消费新的Topic

消息队列崩溃怎么办

在一些金融级场景,由于涉及到金钱,因此服务一定要高可用,但是如果我们的消息队列崩溃了,服务却依赖消息队列发送消息,这时要怎么处理呢? 针对这种场景,通常要在生产者的系统中设计高可用的降级方案,比如在发送MQ的代码里try catch捕获异常,如果发现有异常,进行重试。 如果发现超过3次都是失败的,这时候可能MQ已经崩溃了,此时必须把这条消息进行持久化,可以存储到DB、nosql(如redis的list结构)、磁盘文件中等等。 然后开启一个后台定时任务,去尝试把失败持久化的消息重新发送到MQ。

这里必须按照顺序发送,存储时也要按照顺序存储

为什么要给RocketMQ增加消息限流功能保证其高可用性

限流功能其实是对MQ系统的一种保护。 如果某个程序员代码里写了个bug,死循环不停的往MQ里写消息,并且如果有10台机器的话,那可能没一会MQ系统就被打挂了,影响到其他业务系统的使用。 因此,一般可以先通过压测测一下你的MQ最多可以抗多少QPS,然后做好限流。

转载地址:http://eyfvi.baihongyu.com/

你可能感兴趣的文章
iOS菜鸟学习--如何避免两个按钮同时响应
查看>>
How to access the keys in dictionary in object-c
查看>>
iOS菜鸟学习—— NSSortDescriptor的使用
查看>>
hdu 3787 hdoj 3787
查看>>
hdu 3790 hdoj 3790
查看>>
hdu 3789 hdoj 3789
查看>>
hdu 3788 hdoj 3788
查看>>
zju 1003 zoj 1003
查看>>
zju 1004 zoj 1004
查看>>
zju 1005 zoj 1005
查看>>
zju 1006 zoj 1006
查看>>
【虚拟机】虚拟化架构与系统部署(Windows系统安装)
查看>>
字节跳动安卓开发实习生面试分享
查看>>
好书分享之——《能力陷进》
查看>>
阅读笔记《c++ primer》
查看>>
阅读笔记《C++标准程序库》
查看>>
基于mirror driver的windows屏幕录像
查看>>
C语言8
查看>>
Qt实现简单延时
查看>>
qml有关矩形说明
查看>>