Kafka消费者重启后重复处理了80万条消息,库存直接扣成负数
凌晨一点,库存服务报警:有 30 个商品库存变成了负数。扣成了 -500、-1200,离谱的是这些商品白天就卖完了。
查了半小时,线索指向凌晨零点的一次容器滚动发布。新 Pod 启动后重新消费,从上次提交的 offset 开始读。但问题出在——上次提交的 offset 是 3 小时前的。
日志里拉出一条关键记录:
2026-06-21 00:16:23.456 WARN o.a.k.c.c.i.ConsumerCoordinator -
Auto offset commit failed for group inventory-consumer:
Connection to node -1 (broker/192.168.1.10:9092) failed due to network error
凌晨那次提交因为网络抖动失败了。Kafka 消费者没报错继续消费,下一个 commit 周期也没重试。直到滚动重启时,消费者重新从上一个成功提交的 offset 开始读——等于把这中间的 80 万条消息重新处理了一遍。
库存又扣了一次。
一、自动提交的陷阱
绝大多数 Kafka 消费者是这样配的:
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
5 秒自动提交一次 offset。看着省心,但有两个致命问题:
问题一:提交失败不重试。 网络抖了一下,commitSync 失败了(CommitFailedException),自动提交机制不会重试——它只是打个 WARN 日志,然后继续消费。下次提交再过 5 秒,如果那时候网络好了,offset 跳到最新;如果网络一直不好,offset 永远滞后。
问题二:根本不知道 offset 滞后了多久。 没有监控告诉你"消费者已经消费了 80 万条消息但 offset 还在 3 小时前"。直到重启那一刻,才以"重复消费"的形式暴露。
一个可能让你惊出一身冷汗的事实——如果你从来没有在重启时对过 Kafka 和数据库的状态,你可能根本不知道 offset 已经滞后了。
二、先止血:关闭自动提交,手动同步提交
第一步:关掉自动提交,自己接管 offset 管理:
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
然后用同步提交 + 重试:
@Component
public class ReliableKafkaConsumer {
private static final int MAX_COMMIT_RETRIES = 3;
@KafkaListener(topics = "inventory-deduct", groupId = "inventory-consumer")
public void consume(ConsumerRecord<String, String> record) {
// 1. 处理业务
processInventoryDeduct(record);
// 2. 同步提交 offset,带重试
commitWithRetry(record);
}
private void commitWithRetry(ConsumerRecord<?, ?> record) {
for (int attempt = 1; attempt <= MAX_COMMIT_RETRIES; attempt++) {
try {
// 异步提交 + 等待确认 = 实际上的同步提交
consumer.commitSync();
return;
} catch (CommitFailedException | TimeoutException e) {
log.error("offset 提交失败(第{}次/共{}次): partition={}, offset={}",
attempt, MAX_COMMIT_RETRIES, record.partition(), record.offset());
if (attempt == MAX_COMMIT_RETRIES) {
// 三次失败:落盘兜底 + 告警
fallbackToLocalStore(record);
alertService.send("Kafka offset 提交连续失败",
"partition=" + record.partition() + " offset=" + record.offset());
return;
}
try {
Thread.sleep(1000L * attempt); // 退避重试
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return;
}
}
}
}
}
关键变化:提交失败会重试,重试三次还失败就走兜底逻辑(本地存 offset + 告警),而不是默默放弃。
三、根除方案:幂等落盘
同步提交 + 重试解决了提交失败的问题。但如果消费者在"业务处理完、offset 还没提交"的缝隙中挂了,消息还是会重复消费——因为业务处理成功了但 offset 没记录。
所以需要业务处理本身是幂等的。 即使一条消息被处理两次,结果和处理一次一样。
@Component
public class IdempotentInventoryConsumer {
@Autowired private JdbcTemplate jdbc;
/**
* 幂等的库存扣减
*
* 核心:每个 Kafka 消息都有唯一的 key(订单号),
* 用一张已处理表来去重
*/
@Transactional
public void processInventoryDeduct(ConsumerRecord<String, String> record) {
String orderId = extractOrderId(record);
// 1. 幂等性检查:这条消息是不是处理过了
int exists = jdbc.queryForObject(
"SELECT COUNT(1) FROM kafka_processed_record " +
"WHERE topic = ? AND partition = ? AND offset = ?",
Integer.class,
record.topic(), record.partition(), record.offset()
);
if (exists > 0) {
log.info("消息已处理,跳过: topic={}, offset={}",
record.topic(), record.offset());
return;
}
// 2. 执行业务:扣库存 + 生成物流单
String sql = "UPDATE inventory SET stock = stock - ? " +
"WHERE product_id = ? AND stock >= ?";
int updated = jdbc.update(sql,
extractQuantity(record),
extractProductId(record),
extractQuantity(record)
);
if (updated == 0) {
throw new InsufficientStockException("库存不足");
}
// 3. 同一事务内记录"已处理"
jdbc.update(
"INSERT INTO kafka_processed_record " +
"(topic, partition, offset, order_id, created_at) VALUES (?, ?, ?, ?, NOW())",
record.topic(), record.partition(), record.offset(), orderId
);
}
}
数据库表:
CREATE TABLE kafka_processed_record (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
topic VARCHAR(128) NOT NULL,
partition_id INT NOT NULL,
offset_val BIGINT NOT NULL,
order_id VARCHAR(64) NOT NULL,
created_at DATETIME DEFAULT NOW(),
UNIQUE KEY uk_topic_partition_offset (topic, partition_id, offset_val)
);
三个动作在一个事务里:
- 查
kafka_processed_record——如果已经有这条 offset 的记录,说明处理过了,跳过 - 扣库存(带了
stock >= ?防护,不会扣成负数) - 插入已处理记录
重复消费时,第二步再次执行,发现已处理记录存在,直接跳过。业务数据不重复变更。
四、offset 本地兜底
当 Kafka broker 持续不可用时,把 offset 存到本地数据库:
private void fallbackToLocalStore(ConsumerRecord<?, ?> record) {
jdbc.update(
"INSERT INTO kafka_offset_fallback " +
"(consumer_group, topic, partition_id, offset_val, updated_at) " +
"VALUES (?, ?, ?, ?, NOW()) " +
"ON DUPLICATE KEY UPDATE offset_val = ?, updated_at = NOW()",
"inventory-consumer", record.topic(),
record.partition(), record.offset(),
record.offset()
);
}
消费者启动时,优先从 Kafka broker 获取 offset,如果拿不到(broker 挂了),从本地兜底读:
@PostConstruct
public void seekToLastOffset() {
// 尝试从 Kafka 获取已提交 offset
Map<TopicPartition, OffsetAndMetadata> committed =
consumer.committed(partitions);
if (committed == null || committed.isEmpty()) {
// Kafka 不可用,从本地兜底恢复
Map<TopicPartition, Long> fallback = loadFallbackOffsets();
fallback.forEach(consumer::seek);
log.warn("从本地兜底恢复 offset: {}", fallback);
}
}
五、监控 offset 滞后
offset 滞后没监控,等于裸奔。加一个定时检查:
@Scheduled(fixedRate = 30_000)
public void monitorOffsetLag() {
consumer.assignment().forEach(tp -> {
long committed = consumer.committed(tp) != null
? consumer.committed(tp).offset() : -1;
long current = consumer.position(tp);
long endOffset = consumer.endOffsets(Collections.singleton(tp)).get(tp);
// 1. 消费滞后:已消费 vs 已提交
long commitLag = current - committed;
// 2. 提交滞后:当前 offset vs 最新 offset
long produceLag = endOffset - current;
gaugeService.report("kafka.commit_lag", commitLag,
"topic", tp.topic(), "partition", String.valueOf(tp.partition()));
gaugeService.report("kafka.produce_lag", produceLag,
"topic", tp.topic(), "partition", String.valueOf(tp.partition()));
if (commitLag > 10000) {
alertService.warn("Kafka offset 提交滞后超过 1 万条",
"topic=" + tp.topic() + " lag=" + commitLag);
}
if (produceLag > 100000) {
alertService.warn("Kafka 消费滞后超过 10 万条",
"topic=" + tp.topic() + " lag=" + produceLag);
}
});
}
两个 lag 都要监控:
- commitLag(已消费但未提交):越大说明提交越不可靠,重启后重复消费越多
- produceLag(待消费):越大说明消费速度跟不上生产速度
六、完整流程
把所有的串起来:
┌─────────────────────────────────────────────────────┐
│ Kafka 消费者 │
├─────────────────────────────────────────────────────┤
│ 1. poll 消息 │
│ 2. 幂等检查(查 kafka_processed_record) │
│ ├── 已处理 → 跳过 │
│ └── 未处理 → 执行业务(扣库存) │
│ 3. 同一事务:插已处理记录 │
│ 4. 同步提交 offset(带 3 次重试) │
│ ├── 成功 → 结束 │
│ └── 3 次失败 → 本地兜底存 offset + 告警 │
│ 5. 每 30 秒检查 commitLag 和 produceLag │
└─────────────────────────────────────────────────────┘
七、效果对比
| 指标 | 优化前 | 优化后 |
|---|---|---|
| 重启重复消费 | 80 万条 | 0(幂等跳过) |
| 库存扣负 | 30 个商品 | 0 |
| offset 提交可靠性 | 网络抖就丢 | 3 次重试 + 本地兜底 |
| offset 滞后可观测 | 不知道 | commitLag 实时监控 |
| 断线恢复方式 | wait,等自动重连 | 优先 Kafka,失败从本地恢复 |
八、注意事项
注意一:kafka_processed_record 表不能无限增长。 定期清理 7 天前的记录(消息重复只可能发生在重启或 rebalance 后,7 天内的 offset 够用了):
DELETE FROM kafka_processed_record WHERE created_at < DATE_SUB(NOW(), INTERVAL 7 DAY);
注意二:业务幂等不是"去重就行"。 有些业务(比如余额变动)需要精确知道处理了几次来实现补偿回滚。这种情况下要记"消息 x 被处理了 n 次"而不仅是"已处理/未处理"。
注意三:同步提交影响吞吐。 commitSync() 每次都会阻塞直到 broker 确认。如果消费 QPS 很高,可以改成批量提交——每消费 N 条或每隔 M 秒提交一次,而不是每条都提交。
if (processedCount % 100 == 0 ||
System.currentTimeMillis() - lastCommitTime > 5000) {
commitWithRetry(record);
}
注意四:分区重分配(rebalance)时的 offset。 同步提交保证的是"已提交 = 已处理"。如果在业务处理完但还没提交时发生了 rebalance,这条消息会被重新分配并重复消费——靠幂等检查兜底,不会扣两次库存。
Kafka 自动提交省了 3 行代码,但付出的代价是 3 小时后才发现 80 万条消息重复消费了。自动化是好事,但对可靠性有要求的场景,手动控制永远是更清醒的选择。
你们的 Kafka 消费用的是自动提交还是手动提交?有没有过重启后大量重复消费的经历?
标题:Kafka消费者重启后重复处理了80万条消息,库存直接扣成负数
作者:jiangyi
地址:http://www.jiangyi.space/articles/2026/06/25/1782025750985.html
公众号:服务端技术精选
评论