RocketMQ 事务消息半消息清理:Half Message 堆积导致 Broker 磁盘告警?自动补偿机制!
做过分布式事务开发的朋友肯定都遇到过这个问题:使用 RocketMQ 的事务消息时,由于网络抖动、服务宕机、消费者超时等原因,部分 Half Message(半消息)无法被正确处理,导致在 Broker 上不断堆积。这不仅占用磁盘空间,严重时还会触发磁盘告警,影响整个消息队列的稳定性。
我之前就遇到过这样一个案例:某天凌晨,监控告警显示 RocketMQ Broker 的磁盘使用率突然飙升至 85%,马上就要触达 90% 的告警阈值。排查后发现,是某个服务的数据库在凌晨进行大批量数据迁移时,部分事务消息的本地事务执行失败,但由于网络重试机制的问题,相关的 Half Message 没有被正确回滚,大批量堆积在了 Broker 上。
今天我们就来聊聊 RocketMQ 事务消息 Half Message 的自动清理方案,让您的系统远离磁盘告警的困扰。
Half Message 的产生与堆积原因
1. 事务消息的执行流程
首先,让我们回顾一下 RocketMQ 事务消息的完整流程:
┌─────────────────────────────────────────────────────────────────────┐
│ RocketMQ 事务消息完整流程 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 1. 生产者发送 Half Message(预消息) │
│ └──→ Broker 存储预消息,返回 halfOffset │
│ │
│ 2. Broker 执行本地事务 │
│ └──→ 生产者根据本地事务结果执行 commit/rollback │
│ │
│ 3. 事务提交/回滚 │
│ ├── commit:Half Message 转为正式消息,投递给消费者 │
│ └── rollback:Half Message 被标记删除,不投递给消费者 │
│ │
│ 4. 事务状态回查(如果长时间未收到事务结果) │
│ └──→ Broker 向生产者查询事务状态 │
│ │
└─────────────────────────────────────────────────────────────────────┘
2. Half Message 堆积的根本原因
堆积原因分析:
┌─────────────────────────────────────────────────────────────┐
│ 原因分类 │ 说明 │
├─────────────────────────────┼──────────────────────────────┤
│ 本地事务执行失败 │ 业务代码抛出异常未回滚 │
│ 网络通信问题 │ commit/rollback 消息发送失败 │
│ 服务宕机 │ 事务执行中断,未完成提交 │
│ 消费者超时 │ 消息处理超时,触发重试 │
│ 事务状态回查失败 │ 回查服务不可用 │
│ 事务超时未回查 │ 超时时间设置不合理 │
└─────────────────────────────┴──────────────────────────────┘
3. Half Message 堆积的后果
后果分析:
场景模拟:每小时产生 10000 条事务消息,其中 1% 堆积
1 天后:10000 × 24 × 1% = 2400 条堆积
1 周后:2400 × 7 = 16800 条堆积
1 月后:16800 × 4 = 67200 条堆积
磁盘占用:
- 每条 Half Message 大小:约 1-2KB
- 1 个月后磁盘占用:约 100-200MB(仅这一个 Topic)
多个 Topic 叠加,磁盘告警只是时间问题
解决方案:自动补偿清理机制
1. 核心设计思想
我们的方案核心是三个关键机制:
- 定时扫描:定期扫描堆积的 Half Message
- 状态回查:主动查询这些消息的原始事务状态
- 自动补偿:根据事务状态执行 commit 或 rollback
架构图如下:
┌─────────────────────────────────────────────────────────────────────┐
│ RocketMQ Half Message 自动清理系统 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────┐ │
│ │ Half Message │───→│ 定时扫描器 │───→│ 状态回查服务 │ │
│ │ (堆积) │ │ (定期检测) │ │ (查询原始事务状态) │ │
│ └──────────────┘ └──────────────┘ └──────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────┐ │
│ │ 补偿执行器 │←───│ 决策引擎 │←───│ 事务状态结果 │ │
│ │ (执行清理) │ │ (判断处理) │ │ │ │
│ └──────────────┘ └──────────────┘ └──────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
2. 定时扫描策略
扫描策略设计:
┌─────────────────────────────────────────────────────────────┐
│ 扫描参数配置 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 扫描间隔:每 5 分钟执行一次 │
│ 扫描范围:所有事务 Topic 的 Half Message │
│ 筛选条件:creationTime 早于(当前时间 - 等待时间) │
│ 批量大小:每次最多处理 100 条 │
│ 并发限制:最多 5 个线程并发处理 │
│ │
└─────────────────────────────────────────────────────────────┘
3. 状态回查机制
状态回查流程:
function queryHalfMessageStatus(halfMessage):
# 1. 解析 Half Message 获取事务 ID
transactionId = parseTransactionId(halfMessage)
# 2. 向生产者查询事务状态
status = transactionListener.getTransactionStatus(transactionId)
# 3. 根据状态决定处理方式
if status == COMMITTED:
return COMMIT
else if status == ROLLBACK:
return ROLLBACK
else:
return UNKNOWN
4. 自动补偿决策
补偿决策逻辑:
┌─────────────────────────────────────────────────────────────┐
│ 决策状态机 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────┐ │
│ │ START │ │
│ └─────┬─────┘ │
│ │ │
│ ▼ │
│ ┌───────────┐ ┌──────────────┐ │
│ │ QUERYING │───→│ SUCCESS │ │
│ └─────┬─────┘ └──────┬───────┘ │
│ │ │ │
│ │ ┌─────┴─────┐ │
│ │ │ │ │
│ │ ▼ ▼ │
│ │ ┌──────────┐ ┌──────────┐ │
│ │ │ COMMIT │ │ ROLLBACK │ │
│ │ └──────────┘ └──────────┘ │
│ │ │
│ ▼ │
│ ┌───────────┐ │
│ │ FAILED │──→ 重试(最多 3 次) │
│ └───────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
实战方案实现
1. Half Message 扫描器
// Half Message 扫描器
class HalfMessageScanner {
private ScheduledExecutorService scheduler;
private RocketMQTemplate rocketMQTemplate;
function init() {
scheduler.scheduleAtFixedRate(() -> {
scanAndProcess();
}, 0, 5 * 60 * 1000, TimeUnit.MILLISECONDS);
}
function scanAndProcess() {
# 1. 获取所有事务 Topic
topics = getTransactionTopics();
# 2. 遍历每个 Topic 扫描 Half Message
for topic in topics:
halfMessages = queryHalfMessages(
topic,
startTime = now - WAIT_TIME,
maxCount = 100
)
# 3. 并发处理每条消息
for msg in halfMessages:
processAsync(msg)
}
}
2. 事务状态回查服务
// 事务状态回查服务
class TransactionStatusQueryService {
function queryStatus(halfMessage):
transactionId = extractTransactionId(halfMessage)
# 最多重试 3 次
for attempt in 1..3:
try:
status = doQuery(transactionId)
return status
catch Exception as e:
if attempt == 3:
log.error("Query failed after 3 attempts: {}", transactionId)
return UNKNOWN
sleep(1000 * attempt)
private function doQuery(transactionId):
# 根据事务 ID 查询本地事务状态
return transactionListener.getTransactionState(transactionId)
}
3. 补偿执行器
// 补偿执行器
class CompensationExecutor {
function compensate(halfMessage, status):
switch status:
case COMMITTED:
commitMessage(halfMessage)
break
case ROLLBACK:
rollbackMessage(halfMessage)
break
case UNKNOWN:
# 未知状态,放置更长时间后再处理
scheduleRetry(halfMessage, delay = 1 hour)
break
function commitMessage(halfMessage):
rocketMQTemplate.sendMessage(
halfMessage.topic,
MessageBuilder.withPayload(halfMessage.body)
.setHeader("transactionId", halfMessage.transactionId)
.build()
)
function rollbackMessage(halfMessage):
rocketMQTemplate.rollback(halfMessage.transactionId)
}
4. 监控告警
监控指标:
┌────────────────────────┬─────────────────────────────────────┐
│ 指标名称 │ 说明 │
├────────────────────────┼─────────────────────────────────────┤
│ halfMessageCount │ 当前 Half Message 堆积数量 │
│ scanCount │ 每次扫描的消息数量 │
│ commitCount │ 执行 commit 的数量 │
│ rollbackCount │ 执行 rollback 的数量 │
│ queryFailedCount │ 查询失败的次数 │
│ diskUsage │ Broker 磁盘使用率 │
│ cleanupLatency │ 清理延迟(从 creation 到清理的时间) │
└────────────────────────┴─────────────────────────────────────┘
最佳实践与注意事项
1. 合理设置等待时间
等待时间设置建议:
┌─────────────────────────────────────────────────────────────┐
│ 等待时间 = 本地事务超时时间 × 2 + 网络重试时间 │
│ │
│ 例如: │
│ - 本地事务超时:30 秒 │
│ - 网络重试时间:10 秒 │
│ - 等待时间 = 30 × 2 + 10 = 70 秒 │
│ │
│ 建议实际设置:5-10 分钟(留足余量) │
└─────────────────────────────────────────────────────────────┘
2. 避免清理正在处理的消息
过滤条件:
# 只清理创建时间超过阈值的消息
creationTime < now - MAX_TRANSACTION_TIME
# 排除正在回查的消息
status != PROCESSING
3. 事务状态存储优化
状态存储建议:
# 使用 Redis 存储事务状态
transaction:status:{transactionId} = COMMITTED | ROLLBACK | PROCESSING
# 设置合理的过期时间
TTL = MAX_TRANSACTION_TIME × 2
4. 优雅关闭
关闭流程:
function shutdown():
# 1. 停止接收新任务
scheduler.shutdown()
# 2. 等待现有任务完成(最多 30 秒)
scheduler.awaitTermination(30, TimeUnit.SECONDS)
# 3. 保存当前处理状态
saveProcessingState()
# 4. 关闭数据库连接
closeConnections()
5. 集群部署注意事项
集群部署要点:
┌─────────────────────────────────────────────────────────────┐
│ 1. 只部署一个清理节点(避免重复清理) │
│ 2. 使用分布式锁控制清理任务归属 │
│ 3. 监控告警要联动自动扩容 │
│ 4. 定期检查清理任务的健康状态 │
└─────────────────────────────────────────────────────────────┘
效果对比
| 方案 | 磁盘占用 | 人工干预 | 可靠性 | 复杂度 |
|---|---|---|---|---|
| 人工清理 | 低 | 高 | 低 | 低 |
| 超时自动删除 | 中 | 中 | 中 | 低 |
| 状态回查+补偿 | 低 | 低 | 高 | 中 |
总结
RocketMQ 事务消息 Half Message 清理的核心原则:
- 定时扫描:定期检测堆积的 Half Message
- 状态回查:主动查询原始事务状态,避免误删
- 自动补偿:根据状态执行 commit 或 rollback
- 监控告警:实时监控堆积量和磁盘使用率
- 集群协调:使用分布式锁避免重复清理
记住:Half Message 不可怕,可怕的是不知道如何清理。一个完善的自动清理机制,是 RocketMQ 事务消息的安全保障。
源码获取
文章已同步至小程序博客栏目,需要源码的请关注小程序博客。
公众号:服务端技术精选
小程序码:
标题:RocketMQ 事务消息半消息清理:Half Message 堆积导致 Broker 磁盘告警?自动补偿机制!
作者:jiangyi
地址:http://www.jiangyi.space/articles/2026/05/24/1779201102484.html
公众号:服务端技术精选
评论
0 评论