RabbitMQ
约 4615 字大约 15 分钟
2025-07-07
1.RabbitMQ核心概念有哪些?
Broker:
表示消息队列服务器Exchange:
消息交换机,它指定消息按什么规则,路由到哪个队列Queue:
消息队列载体,每个消息都会被投入到一个或多个队列Binding:
绑定,它的作用就是把exchange和queue按照路由规则绑定起来Routing Key:
路由关键字,exchange根据这个关键字进行消息投递VHost:
vhost 可以理解为虚拟 broker ,包含:一批交换机,消息队列和相关的对象。Producer:
消息生产者,就是投递消息的程序Consumer:
消息消费者,就是接受消息的程序Channel:
消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务
2.RabbitMQ有哪些使用场景?
RabbitMQ有哪些使用场景或者你们业务都什么地方使用MQ,其实都是一类问题。
RabbitMQ主要的应用场景如下:
异步任务
:将业务中属于非核心或不重要的流程部分,使用消息异步通知的方式发给目标系统,这样主业务流程无需同步等待其他系统的处理结果,从而达到系统快速响应的目的。服务间异步通信
:多个服务之间或者系统之间,通过消息互相通信,就大家平时跟朋友互发短信差不多。应用解耦
:基于消息订阅机制实现业务扩展,例如:电商下单场景,用户下单之后,产生一条订单消息,然后仓库模块订阅订单消息发货、积分模块可以订阅订单消息增加积分、短信模块可用订阅订单消息发送短信等等。顺序消费
:业务层面需要排队处理的场景,例如:活动报名,名额有限,先到先得。定时任务
:通过消息队列的延迟消息机制,可以实现定时执行任务的效果,例如:订单15分钟内未支付,则关闭订单,下单的时候发送一条延迟15分钟的消息,15分钟后消息才会投递给消费者处理。提示:详情可以参考延迟队列章节。
请求削峰
:因为上下游系统之间处理流量的能力存在差异,消息队列可以起到调节作用,让下游系统可以匀速处理流量,跟排队挤地铁是一个意思,因为地铁站的吞吐量有限,一下子无法容纳太多人,通过排队进站的方式,可以避免地铁站超负荷,维持在一个稳定运转的状态。
3.为什么使用MQ?MQ的优点有那些?
简答
异步处理
- 相比于传统的串行、并行方式,提高了系统吞吐量。应用解耦
- 系统间通过消息通信,不用关心其他系统的处理。流量削锋
- 可以通过消息队列长度控制请求量;可以缓解短时间内的高并发请求。日志处理
- 解决大量日志传输。消息通讯
- 消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。
详答
主要是:解耦、异步、削峰三个方面讲解。
解耦
A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃…A 系统跟其它各种乱七八糟的系统严重耦合,A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。如果使用 MQ,A 系统产生一条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据,直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来,A 系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。
就是一个系统或者一个模块,调用了多个系统或者模块,互相之间的调用很复杂,维护起来很麻烦。但是其实这个调用是不需要直接同步调用接口的,如果用 MQ 给它异步化解耦。
异步
A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s,用户感觉搞个什么东西,慢死了慢死了。用户通过浏览器发起请求。如果使用 MQ,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms。
削峰
减少高峰时期对服务器压力。
4.系统引入RabbitMQ有什么缺点?
总体上项目引入RabbitMQ会带来下面三个方面的影响
系统可用性降低
因为项目增加了RabbitMQ这个服务组件,RabbitMQ自己本身不稳定或者挂了,我们系统也会受影响,所以系统可用性会降低,总体上引入的服务组件越多,要维护的东西越多,不稳定的因素越多,相对于什么都不用,或者少用的项目来说系统的可用性是有所下降的。
系统复杂度提高
加入了消息队列,要多考虑很多方面的问题,比如:一致性问题、如何保证消息不被重复消费、如何保证消息可靠性传输等。因此,需要考虑的东西更多,复杂性增大。
一致性问题
A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了。
总体上引入RabbitMQ消息队列有好处,也带来了一些问题,需要权衡利弊
5.消息如何路由(有几种交换机类型)?
RabbitMQ消息如何路由、消息如何投递跟交换机(exchange)类型有关,不同的交换机类型消息路由策略不一样。
RabbitMQ常用的交换机如下:
- Direct类型
- Topic 类型
- Fanout 类型
下面是不同交换机的消息路由策略:
- direct:如果路由键完全匹配,消息就被投递到相应的队列
- topic:跟direct类似,区别是路由键支持通配符。
- fanout:如果交换器收到消息,将会广播到所有绑定的队列上
6.RabbitMQ有几种工作模式?
RabbitMQ有几种工作模式?
根据交换机类型和消费者数量的不同,RabbitMQ有下面几种工作模式:
简单队列
:一个消息生产者,一个消息消费者Work队列
:一个生产者,多个消费者发布订阅模式
:发布订阅模式,就是一个生产者发送的消息会被多个消费者获取,因为一条消息会被多个消费者分别消费处理,所以也叫广播模式、一对多模式。路由模式
:RabbitMQ路由模式大体上跟发布订阅模式一样,区别在于发布订阅模式将消息转发给所有绑定的队列,而路由模式将消息转发给那个队列是根据路由匹配情况决定的。主题模式
:RabbitMQ主题模式(Topic)跟路由模式类似,区别在于主题模式的路由匹配支持通配符模糊匹配,而路由模式仅支持完全匹配。
7.如何解决消息的顺序问题?
RabbitMQ的消息顺序问题,需要分三个环节看待,发送消息的顺序、队列中消息的顺序、消费消息的顺序。
发送消息的顺序
消息发送端的顺序,大部分业务不做要求,谁先发消息无所谓,如果遇到业务一定要发送消息也确保顺序,那意味着,只能全局加锁一个个的操作,一个个的发消息,不能并发发送消息。
队列中消息的顺序
RabbitMQ中,消息最终会保存在队列中,在同一个队列中,消息是顺序的,先进先出原则,这个由Rabbitmq保证,不同队列中的消息顺序,是没有保证的,例如:进地铁站的时候,排了三个队伍,不同队伍之间的,不能确保谁先进站。
消费消息的顺序
在多个消费者消费同一个消息队列的场景,通常是无法保证消息顺序的。
例如:消息A、B、C按顺序进入队列,消费者A1拿到消息A、消费者B1拿到消息B, 结果消费者B执行速度快,就跑完了,又或者消费者A1挂了,都会导致消息顺序不一致。
解决消费顺序的问题,通常就是一个队列只有一个消费者
这样就可以一个个消息按顺序处理,缺点就是并发能力下降了,无法并发消费消息,这是个取舍问题。
8.如何处理重复消息?(即消息幂等性保证)
处理RabbitMQ重复消息的问题,就是处理消息的业务逻辑保持幂等性,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。
这个问题需要灵活作答,考察的是综合应用能力和经验,因为消费的场景有很多,有数据库、有缓存、有第三方接口。
提示:总体上根据不同的业务增加去重逻辑,通常就是根据业务设置一个唯一标识,通过检测这个唯一标识是否已经处理过来处理去重问题。
例如:
- 比如针对数据库,插入数据,可以通过唯一键,重复插入会报错,又或者插入之前先检测一下是否存在,针对更新操作,可以根据数据的状态判断是否已经处理过。
- 再比如redis缓存,你拿到这个消息做redis的set的操作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。
- 再比如第三方接口,接口需要有去重能力。
9.如何确保消息正确地发送至RabbitMQ?
RabbitMQ支持Confirm模式
,确认消息有没有安全的投递给RabbitMQ。
下面是大致原理:
将信道设置成 confirm
模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的 ID
。
一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道
会发送一个确认给生产者(包含消息唯一 ID)。
如果 RabbitMQ 发生内部错误从而导致消息丢失,会发送一条 nack(notacknowledged,未确认)
消息。
发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。
10.如何确保消息接收方消费了消息?
消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ 才能安全地把消息从队列中删除。
提示:RabbitMQ消费者消息确认(ACK)有两种机制,自动确认或者手动确认,自动确认就是消费者收到消息RabbitMQ就删除消息,手动确认,需要我们在代码层面主动调用Ack方法通知RabbitMQ,消息已经处理。
这里并没有用到超时机制,RabbitMQ 仅通过 Consumer 的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ 给了 Consumer 足够长的时间来处理消息。保证数据的最终一致性;
下面罗列几种特殊情况
- 如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ 会认为消息没有被分发,然后重新分发给下一个订阅的消费者。(可能存在消息重复消费的隐患,需要去重)
- 如果消费者接收到消息却没有确认消息,连接也未断开,则 RabbitMQ 认为该消费者繁忙,将不会给该消费者分发更多的消息。
11.如何确保消息不丢失?
RabbitMq如何确保消息不丢失?
RabbitMQ主要通过持久化机制,确保消息不丢,RabbitMQ持久化机制分为队列持久化
、消息持久化
、交换器持久化
。
下面从多个方面确保消息不丢:
消息持久化
RabbitMQ 的消息默认存放在内存上面
,如果不特别声明,消息不会持久化保存到硬盘上面,如果节点重启或者意外crash掉,消息就会丢失。
要想做到消息持久化,必须满足以下三个条件:
Exchange
设置持久化Queue
设置持久化 -Message
持久化发送:发送消息设置发送模式deliveryMode=2,代表持久化消息
消息ACK机制
默认情况消费者收到消息,MQ就会从队列中删除消息,如果消费者没处理成功,消息就丢了,可以使用手动ACK机制,处理完成手动调用MQ的ACK方法通知MQ删除消息。
RabbitMQ集群模式
使用集群模式部署RabbitMQ,实现消息的高可用,避免单个MQ节点挂了,消息就没了。
消息补偿
有时候可能是因为消息过期(TTL)、或者消费者异常导致消息丢了,这个时候需要从业务数据角度,写个脚本重新生成消息,投递到消息队列中。
12.什么是延迟队列?
RabbitMQ延迟队列就是存储延迟消息的队列,延迟消息指的就是消息投递到队列后,消费者不能立刻消费,需要等待一段时间,消费者才能消费消息。
提示:延迟队列,可以用来做定时任务。
RabbitMQ原生不支持延迟消息,目前主要通过死信交换机 + 消息TTL方案或者rabbitmq-delayed-message-exchange插件实现。
13.什么是死信队列?
DLX,全称为 Dead-Letter-Exchange,死信交换机,死信邮箱。当消息在一个队列中变成死信 (dead message) 之后,它能被重新被发送到另一个交换机中,这个交换机就是 DLX,绑定 DLX 的队列就称之为死信队列。
导致死信的原因:
- 消息被拒(Basic.Reject /Basic.Nack) 且 requeue = false。
- 消息TTL过期。
- 队列满了,无法再添加。
14.什么是优先级队列?
优先级队列,顾名思义,优先级高的消息具备优先被消费的特权。
RabbitMQ优先级队列注意点:
- 只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效。
- RabbitMQ3.5版本以后才支持优先级队列。
15.如何解决消息积压?
如何解决RabbitMQ消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?数百万消息持续积压几个小时,说说怎么解决?
提示:出现消息积压,主要是因为消息的消费速度跟不上消息产生的速度。
消息堆积解决策略1
消费者临时扩容,例如:原先是10个消费者,扩容10倍,100个消费者,目的是加快消息消费速度。
提示:扩容数量根据实际情况确定。
消息堆积解决策略2
修复消费者问题,消费者本身的问题,主要体现在两个方面: 业务异常和消息处理速度慢。
- 如果是消费者自己异常了,导致无法正常消费消息,只要修复异常问题即可。
- 如果是消费者处理速度慢,可以分析下业务代码有没有进一步提升性能的空间,有就优化,没有就走策略1扩容方案。
消息堆积解决策略3
如果消息堆积的太多,短时间内消费不完(需要几个小时,甚至更长时间),可以做个取舍,反正前面的客户已经得罪了,新的客户不能得罪,我们可以确保新的消息可以正常消费,老的消息慢慢处理。
可以新开一个队列,让新的消息投递到这个队列,新开一批消费者,处理新的消息,老的队列里面堆积的消息,让一批消费者慢慢跑。
消息堆积解决策略4
如果堆积的消息不重要,直接干掉(删除)队列,创建新的队列,处理新的消息就行,不能在一棵树上吊死。
消息堆积解决策略5
如果消息设置了TTL,这种情况,消息可能因为已经到期,被丢弃了,丢了多少我们不知道,为确保业务消息都被正常消费,这里首先要决绝的是怎么找回丢失的消息,主要思路是根据业务数据,重新投递消息到MQ中,例如:根据订单记录,如果订单未处理,重新投递消息到消息队列。