Kafka消费者重启后重复处理了80万条消息,库存直接扣成负数

凌晨一点,库存服务报警:有 30 个商品库存变成了负数。扣成了 -500、-1200,离谱的是这些商品白天就卖完了。

查了半小时,线索指向凌晨零点的一次容器滚动发布。新 Pod 启动后重新消费,从上次提交的 offset 开始读。但问题出在——上次提交的 offset 是 3 小时前的。

日志里拉出一条关键记录:

2026-06-21 00:16:23.456 WARN  o.a.k.c.c.i.ConsumerCoordinator - 
Auto offset commit failed for group inventory-consumer: 
Connection to node -1 (broker/192.168.1.10:9092) failed due to network error

凌晨那次提交因为网络抖动失败了。Kafka 消费者没报错继续消费,下一个 commit 周期也没重试。直到滚动重启时,消费者重新从上一个成功提交的 offset 开始读——等于把这中间的 80 万条消息重新处理了一遍。

库存又扣了一次。


一、自动提交的陷阱

绝大多数 Kafka 消费者是这样配的:

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");

5 秒自动提交一次 offset。看着省心,但有两个致命问题:

问题一:提交失败不重试。 网络抖了一下,commitSync 失败了(CommitFailedException),自动提交机制不会重试——它只是打个 WARN 日志,然后继续消费。下次提交再过 5 秒,如果那时候网络好了,offset 跳到最新;如果网络一直不好,offset 永远滞后。

问题二:根本不知道 offset 滞后了多久。 没有监控告诉你"消费者已经消费了 80 万条消息但 offset 还在 3 小时前"。直到重启那一刻,才以"重复消费"的形式暴露。

一个可能让你惊出一身冷汗的事实——如果你从来没有在重启时对过 Kafka 和数据库的状态,你可能根本不知道 offset 已经滞后了。


二、先止血:关闭自动提交,手动同步提交

第一步:关掉自动提交,自己接管 offset 管理:

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

然后用同步提交 + 重试:

@Component
public class ReliableKafkaConsumer {
    
    private static final int MAX_COMMIT_RETRIES = 3;
    
    @KafkaListener(topics = "inventory-deduct", groupId = "inventory-consumer")
    public void consume(ConsumerRecord<String, String> record) {
        // 1. 处理业务
        processInventoryDeduct(record);
        
        // 2. 同步提交 offset,带重试
        commitWithRetry(record);
    }
    
    private void commitWithRetry(ConsumerRecord<?, ?> record) {
        for (int attempt = 1; attempt <= MAX_COMMIT_RETRIES; attempt++) {
            try {
                // 异步提交 + 等待确认 = 实际上的同步提交
                consumer.commitSync();
                return;
                
            } catch (CommitFailedException | TimeoutException e) {
                log.error("offset 提交失败(第{}次/共{}次): partition={}, offset={}", 
                    attempt, MAX_COMMIT_RETRIES, record.partition(), record.offset());
                
                if (attempt == MAX_COMMIT_RETRIES) {
                    // 三次失败:落盘兜底 + 告警
                    fallbackToLocalStore(record);
                    alertService.send("Kafka offset 提交连续失败", 
                        "partition=" + record.partition() + " offset=" + record.offset());
                    return;
                }
                
                try {
                    Thread.sleep(1000L * attempt);  // 退避重试
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }
    }
}

关键变化:提交失败会重试,重试三次还失败就走兜底逻辑(本地存 offset + 告警),而不是默默放弃。


三、根除方案:幂等落盘

同步提交 + 重试解决了提交失败的问题。但如果消费者在"业务处理完、offset 还没提交"的缝隙中挂了,消息还是会重复消费——因为业务处理成功了但 offset 没记录。

所以需要业务处理本身是幂等的。 即使一条消息被处理两次,结果和处理一次一样。

@Component
public class IdempotentInventoryConsumer {
    
    @Autowired private JdbcTemplate jdbc;
    
    /**
     * 幂等的库存扣减
     * 
     * 核心:每个 Kafka 消息都有唯一的 key(订单号),
     *       用一张已处理表来去重
     */
    @Transactional
    public void processInventoryDeduct(ConsumerRecord<String, String> record) {
        String orderId = extractOrderId(record);
        
        // 1. 幂等性检查:这条消息是不是处理过了
        int exists = jdbc.queryForObject(
            "SELECT COUNT(1) FROM kafka_processed_record " +
            "WHERE topic = ? AND partition = ? AND offset = ?",
            Integer.class,
            record.topic(), record.partition(), record.offset()
        );
        
        if (exists > 0) {
            log.info("消息已处理,跳过: topic={}, offset={}", 
                record.topic(), record.offset());
            return;
        }
        
        // 2. 执行业务:扣库存 + 生成物流单
        String sql = "UPDATE inventory SET stock = stock - ? " +
                     "WHERE product_id = ? AND stock >= ?";
        int updated = jdbc.update(sql, 
            extractQuantity(record), 
            extractProductId(record),
            extractQuantity(record)
        );
        
        if (updated == 0) {
            throw new InsufficientStockException("库存不足");
        }
        
        // 3. 同一事务内记录"已处理"
        jdbc.update(
            "INSERT INTO kafka_processed_record " +
            "(topic, partition, offset, order_id, created_at) VALUES (?, ?, ?, ?, NOW())",
            record.topic(), record.partition(), record.offset(), orderId
        );
    }
}

数据库表:

CREATE TABLE kafka_processed_record (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    topic VARCHAR(128) NOT NULL,
    partition_id INT NOT NULL,
    offset_val BIGINT NOT NULL,
    order_id VARCHAR(64) NOT NULL,
    created_at DATETIME DEFAULT NOW(),
    UNIQUE KEY uk_topic_partition_offset (topic, partition_id, offset_val)
);

三个动作在一个事务里:

  1. kafka_processed_record——如果已经有这条 offset 的记录,说明处理过了,跳过
  2. 扣库存(带了 stock >= ? 防护,不会扣成负数)
  3. 插入已处理记录

重复消费时,第二步再次执行,发现已处理记录存在,直接跳过。业务数据不重复变更。


四、offset 本地兜底

当 Kafka broker 持续不可用时,把 offset 存到本地数据库:

private void fallbackToLocalStore(ConsumerRecord<?, ?> record) {
    jdbc.update(
        "INSERT INTO kafka_offset_fallback " +
        "(consumer_group, topic, partition_id, offset_val, updated_at) " +
        "VALUES (?, ?, ?, ?, NOW()) " +
        "ON DUPLICATE KEY UPDATE offset_val = ?, updated_at = NOW()",
        "inventory-consumer", record.topic(), 
        record.partition(), record.offset(),
        record.offset()
    );
}

消费者启动时,优先从 Kafka broker 获取 offset,如果拿不到(broker 挂了),从本地兜底读:

@PostConstruct
public void seekToLastOffset() {
    // 尝试从 Kafka 获取已提交 offset
    Map<TopicPartition, OffsetAndMetadata> committed = 
        consumer.committed(partitions);
    
    if (committed == null || committed.isEmpty()) {
        // Kafka 不可用,从本地兜底恢复
        Map<TopicPartition, Long> fallback = loadFallbackOffsets();
        fallback.forEach(consumer::seek);
        log.warn("从本地兜底恢复 offset: {}", fallback);
    }
}

五、监控 offset 滞后

offset 滞后没监控,等于裸奔。加一个定时检查:

@Scheduled(fixedRate = 30_000)
public void monitorOffsetLag() {
    consumer.assignment().forEach(tp -> {
        long committed = consumer.committed(tp) != null 
            ? consumer.committed(tp).offset() : -1;
        long current = consumer.position(tp);
        long endOffset = consumer.endOffsets(Collections.singleton(tp)).get(tp);
        
        // 1. 消费滞后:已消费 vs 已提交
        long commitLag = current - committed;
        // 2. 提交滞后:当前 offset vs 最新 offset
        long produceLag = endOffset - current;
        
        gaugeService.report("kafka.commit_lag", commitLag, 
            "topic", tp.topic(), "partition", String.valueOf(tp.partition()));
        gaugeService.report("kafka.produce_lag", produceLag,
            "topic", tp.topic(), "partition", String.valueOf(tp.partition()));
        
        if (commitLag > 10000) {
            alertService.warn("Kafka offset 提交滞后超过 1 万条",
                "topic=" + tp.topic() + " lag=" + commitLag);
        }
        if (produceLag > 100000) {
            alertService.warn("Kafka 消费滞后超过 10 万条",
                "topic=" + tp.topic() + " lag=" + produceLag);
        }
    });
}

两个 lag 都要监控:

  • commitLag(已消费但未提交):越大说明提交越不可靠,重启后重复消费越多
  • produceLag(待消费):越大说明消费速度跟不上生产速度

六、完整流程

把所有的串起来:

┌─────────────────────────────────────────────────────┐
│                  Kafka 消费者                         │
├─────────────────────────────────────────────────────┤
│  1. poll 消息                                         │
│  2. 幂等检查(查 kafka_processed_record)              │
│     ├── 已处理 → 跳过                                  │
│     └── 未处理 → 执行业务(扣库存)                     │
│  3. 同一事务:插已处理记录                              │
│  4. 同步提交 offset(带 3 次重试)                     │
│     ├── 成功 → 结束                                    │
│     └── 3 次失败 → 本地兜底存 offset + 告警             │
│  5. 每 30 秒检查 commitLag 和 produceLag               │
└─────────────────────────────────────────────────────┘

七、效果对比

指标优化前优化后
重启重复消费80 万条0(幂等跳过)
库存扣负30 个商品0
offset 提交可靠性网络抖就丢3 次重试 + 本地兜底
offset 滞后可观测不知道commitLag 实时监控
断线恢复方式wait,等自动重连优先 Kafka,失败从本地恢复

八、注意事项

注意一:kafka_processed_record 表不能无限增长。 定期清理 7 天前的记录(消息重复只可能发生在重启或 rebalance 后,7 天内的 offset 够用了):

DELETE FROM kafka_processed_record WHERE created_at < DATE_SUB(NOW(), INTERVAL 7 DAY);

注意二:业务幂等不是"去重就行"。 有些业务(比如余额变动)需要精确知道处理了几次来实现补偿回滚。这种情况下要记"消息 x 被处理了 n 次"而不仅是"已处理/未处理"。

注意三:同步提交影响吞吐。 commitSync() 每次都会阻塞直到 broker 确认。如果消费 QPS 很高,可以改成批量提交——每消费 N 条或每隔 M 秒提交一次,而不是每条都提交。

if (processedCount % 100 == 0 || 
    System.currentTimeMillis() - lastCommitTime > 5000) {
    commitWithRetry(record);
}

注意四:分区重分配(rebalance)时的 offset。 同步提交保证的是"已提交 = 已处理"。如果在业务处理完但还没提交时发生了 rebalance,这条消息会被重新分配并重复消费——靠幂等检查兜底,不会扣两次库存。


Kafka 自动提交省了 3 行代码,但付出的代价是 3 小时后才发现 80 万条消息重复消费了。自动化是好事,但对可靠性有要求的场景,手动控制永远是更清醒的选择。

你们的 Kafka 消费用的是自动提交还是手动提交?有没有过重启后大量重复消费的经历?


标题:Kafka消费者重启后重复处理了80万条消息,库存直接扣成负数
作者:jiangyi
地址:http://www.jiangyi.space/articles/2026/06/25/1782025750985.html
公众号:服务端技术精选
    评论
    0 评论
avatar

取消