MQ消息堆积到系统崩溃?这5个排查技巧让你10秒定位问题!
MQ消息堆积到系统崩溃?这5个排查技巧让你10秒定位问题!
作为一名后端开发,经历过太多因为MQ消息堆积导致的"血案":
- 某电商大促期间,订单消息堆积了500万条,用户下单后2小时还没收到确认短信,客服电话被打爆
- 某支付系统因为消息堆积,导致用户支付成功但订单状态一直未更新,老板差点把我祭天
- 某社交平台的消息通知系统,堆积了1000万条消息,用户私信延迟了整整一天才收到
消息堆积,可以说是分布式系统中最让人闻风丧胆的问题之一。今天就结合自己踩过的坑,跟大家聊聊如何快速排查和解决MQ消息堆积问题,让你的系统稳如老狗。
一、消息堆积到底是个啥?为啥这么可怕?
简单来说,消息堆积就是生产者发送消息的速度远大于消费者处理消息的速度,导致消息在队列中越积越多。
堆积的恐怖后果:
- 内存暴涨:消息堆积会把MQ服务器的内存打满,最终导致OOM
- 磁盘爆满:持久化消息会把磁盘写满,影响整个服务器
- 系统雪崩:消息处理延迟越来越高,最终整个系统不可用
- 用户体验差:用户操作后迟迟得不到响应,直接投诉
二、消息堆积的3大元凶,你肯定遇到过
在讲排查技巧之前,我们先得搞清楚消息为什么会堆积。根据我多年的踩坑经验,主要有以下3个原因:
1. 消费者处理太慢:最常见的元凶
症状:消息处理时间越来越长,消费TPS持续下降
真实案例:我们有个订单系统,消费者需要调用3个外部接口:库存接口、支付接口、物流接口。有一次物流接口响应从100ms变成了5秒,直接导致消息处理速度从1000TPS降到了50TPS,消息疯狂堆积。
2. 消费者数量不足:小马拉大车
症状:单机处理能力达到上限,但集群资源利用率很低
真实案例:某次大促活动,我们预估QPS会增长10倍,但只增加了2台消费者服务器。结果消息处理能力只提升了2倍,剩下8倍的流量直接堆积在MQ中,差点把服务器撑爆。
3. 消息生产异常:洪水猛兽般的冲击
症状:某个时间段消息量突然暴增,远超正常水平
真实案例:某次系统上线bug,导致一个循环发送消息的逻辑失控,1分钟内发送了50万条消息,而消费者正常处理能力只有1000TPS,直接GG。
三、5个核心排查技巧,10秒定位问题根源
技巧1:三板斧快速定位堆积位置
第一板斧:看监控面板
# RocketMQ查看消息堆积数量
sh mqadmin consumerProgress -n localhost:9876 -g your_consumer_group
# RabbitMQ查看队列消息数量
rabbitmqctl list_queues name messages messages_ready messages_unacknowledged
第二板斧:看消费TPS
# 查看消费者TPS
sh mqadmin statsAll -n localhost:9876 | grep your_topic
第三板斧:看消费者状态
# 查看消费者连接情况
sh mqadmin consumerConnection -n localhost:9876 -g your_consumer_group
技巧2:线程Dump分析,揪出慢处理元凶
当消费者处理慢时,第一时间打线程Dump:
# 快速获取线程Dump
jstack [消费者进程PID] > consumer_thread_dump.txt
# 或者直接使用Arthas
arthas> thread -n 10
实战案例:有次发现消费者TPS从1000降到了100,打线程Dump发现90%的线程都在等待一个Redis响应,原来是Redis连接池配置过小,导致大量线程阻塞。
技巧3:消息轨迹追踪,找到问题消息
RocketMQ的消息轨迹功能特别好用:
// 开启消息轨迹
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("your_group");
consumer.setEnableMsgTrace(true);
consumer.setTraceTopic("RMQ_SYS_TRACE_TOPIC");
// 查看消息轨迹
sh mqadmin queryMsgTraceById -n localhost:9876 -i your_message_id
真实场景:通过消息轨迹发现,有10%的消息处理时间特别长,进一步分析发现这些消息都来自同一个用户,该用户的数据存在异常,导致处理逻辑卡死。
技巧4:资源监控,发现系统瓶颈
CPU监控:
# 查看CPU使用率
top -p [消费者PID]
# 查看GC情况
jstat -gc [消费者PID] 1000
内存监控:
# 查看内存使用
free -h
# 查看JVM内存
jmap -heap [消费者PID]
网络监控:
# 查看网络连接
netstat -an | grep ESTABLISHED | wc -l
# 查看网络流量
iftop -i eth0
技巧5:日志分析,还原案发现场
关键日志位置:
// 在消费者中添加详细日志
@Slf4j
@Component
public class MessageConsumer {
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer"
)
public void onMessage(String message) {
long startTime = System.currentTimeMillis();
String messageId = MessageHelper.getMessageId(message);
try {
log.info("开始处理消息: {}, 时间: {}", messageId, startTime);
// 业务处理
processBusinessLogic(message);
long costTime = System.currentTimeMillis() - startTime;
log.info("消息处理完成: {}, 耗时: {}ms", messageId, costTime);
// 如果处理时间超过1秒,记录警告
if (costTime > 1000) {
log.warn("消息处理超时: {}, 耗时: {}ms", messageId, costTime);
}
} catch (Exception e) {
log.error("消息处理失败: {}, 错误: {}", messageId, e.getMessage(), e);
throw e;
}
}
}
四、实战案例:某电商平台大促期间消息堆积全记录
下面分享一个真实的电商平台大促期间消息堆积排查案例。
1. 事故发生
大促开始1小时后,监控系统疯狂报警:
- 订单消息堆积数量:从0暴涨到200万条
- 消费TPS:从正常的2000TPS降到了200TPS
- 消息延迟:从1秒增加到5分钟
2. 紧急排查过程
第一步:快速定位问题
# 1. 查看消费者状态
sh mqadmin consumerProgress -n localhost:9876 -g order-consumer-group
# 结果显示:未消费消息200万,延迟5分钟
# 2. 查看消费者连接
sh mqadmin consumerConnection -n localhost:9876 -g order-consumer-group
# 结果显示:只有5台消费者在线,应该有20台
第二步:分析消费者日志
发现大量如下异常:
2024-01-15 14:32:15 ERROR [OrderConsumer] - 调用库存服务超时: Read timed out after 3000ms
2024-01-15 14:32:16 ERROR [OrderConsumer] - 消息处理失败,准备重试
第三步:查看外部依赖
发现库存服务的响应时间从正常的100ms增加到了3秒以上,导致消费者处理时间大幅增加。
3. 紧急处理方案
临时方案(5分钟内实施):
- 扩容消费者:紧急启动15台消费者服务器,消费者数量从5台增加到20台
- 调整超时时间:将库存服务调用超时时间从3秒调整为1秒,快速失败
- 开启批量消费:将单条消费改为批量消费,一次处理10条消息
永久方案(2天内实施):
- 库存服务优化:增加库存服务服务器数量,优化数据库查询
- 缓存预热:大促前预热库存缓存,减少数据库查询
- 异步处理:将非核心的库存扣减改为异步处理
4. 效果验证
- 消息堆积数量:2小时内从200万降到0
- 消费TPS:提升到5000TPS,是原来的2.5倍
- 消息延迟:稳定在1秒以内
五、6个避坑小贴士,让你远离消息堆积
1. 监控告警要做全
# RocketMQ监控配置示例
rocketmq:
consumer:
metrics:
enabled: true
delay-level: 2 # 2分钟延迟就报警
accumulation: 10000 # 堆积1万条就报警
2. 消费者要支持水平扩展
// 使用容器化部署,支持快速扩容
@Component
public class MessageConsumer {
@Value("${consumer.thread-count:10}")
private int consumeThreadCount;
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer",
consumeThreadNumber = "#{${consumer.thread-count}}"
)
public void onMessage(String message) {
// 处理逻辑
}
}
3. 设置合理的消费超时时间
// RocketMQ配置
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order-consumer");
consumer.setConsumeTimeout(15); // 15秒超时,避免长时间阻塞
// RabbitMQ配置
container.setDefaultRequeueRejected(false); // 失败后直接进入死信队列
4. 消息分级处理
// 高优先级消息优先处理
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer",
selectorExpression = "orderType='urgent'" // 紧急订单优先
)
public void processUrgentOrder(String message) {
// 紧急订单处理逻辑
}
5. 定期清理过期消息
# 定期清理死信队列中的过期消息
sh mqadmin deleteTopic -n localhost:9876 -t DLQ_order-consumer
6. 做好压力测试
// 压测脚本示例
@Test
public void testMessageAccumulation() {
// 模拟10倍流量冲击
for (int i = 0; i < 1000000; i++) {
rocketMQTemplate.asyncSend("order-topic", buildOrderMessage());
}
// 验证消费者处理能力
await().atMost(5, TimeUnit.MINUTES)
.until(() -> getAccumulatedMessageCount() == 0);
}
六、总结:消息堆积处理的黄金法则
- 监控先行:没有监控就没有发言权,实时掌握消息堆积情况
- 快速定位:三板斧(看监控、打Dump、查日志)快速找到问题
- 先治标再治本:先扩容缓解,再优化根治
- 预防为主:压测、限流、熔断,把问题消灭在萌芽状态
最后再提醒一句:消息堆积不可怕,可怕的是没有预案。建议每个季度都做一次消息堆积的演练,确保真正出问题的时候能够快速响应。
觉得有用的话,别忘了关注我的公众号【服务端技术精选】,点赞、在看、转发三连哦!咱们下期见~
标题:MQ消息堆积到系统崩溃?这5个排查技巧让你10秒定位问题!
作者:jiangyi
地址:http://www.jiangyi.space/articles/2025/12/21/1766304278157.html
- 一、消息堆积到底是个啥?为啥这么可怕?
- 二、消息堆积的3大元凶,你肯定遇到过
- 1. 消费者处理太慢:最常见的元凶
- 2. 消费者数量不足:小马拉大车
- 3. 消息生产异常:洪水猛兽般的冲击
- 三、5个核心排查技巧,10秒定位问题根源
- 技巧1:三板斧快速定位堆积位置
- 技巧2:线程Dump分析,揪出慢处理元凶
- 技巧3:消息轨迹追踪,找到问题消息
- 技巧4:资源监控,发现系统瓶颈
- 技巧5:日志分析,还原案发现场
- 四、实战案例:某电商平台大促期间消息堆积全记录
- 1. 事故发生
- 2. 紧急排查过程
- 3. 紧急处理方案
- 4. 效果验证
- 五、6个避坑小贴士,让你远离消息堆积
- 1. 监控告警要做全
- 2. 消费者要支持水平扩展
- 3. 设置合理的消费超时时间
- 4. 消息分级处理
- 5. 定期清理过期消息
- 6. 做好压力测试
- 六、总结:消息堆积处理的黄金法则