SpringBoot + Kafka 严格顺序消费方案:扩缩容不乱序,金融级交易链路保障!

在金融交易、支付、证券等场景下,消息顺序至关重要:

  • 用户下单 → 支付 → 发货 → 确认收货,这个顺序绝不能乱
  • 账户余额变更必须按时间顺序处理,否则会出现透支
  • 证券撮合交易对时序要求精确到毫秒

Kafka 虽然通过分区机制保证了分区内的顺序,但在实际生产中,顺序消费往往会遇到各种问题:

  • 多消费者消费同一个分区,消息乱序
  • 扩容缩容时,分区重分配导致消息处理顺序被打乱
  • 消息重试导致的顺序错乱
  • 批量消费时部分失败导致的顺序问题

今天我们来聊一聊如何在 SpringBoot 中实现 Kafka 严格顺序消费,保证金融级交易链路的稳定性。

为什么顺序消费这么难?

先分析一下 Kafka 顺序消费的难点:

1. 多消费者消费同一个分区

问题:
┌─────────────────────────────────────────────────────────┐
│                    Kafka Partition 0                     │
│  [Msg1] → [Msg2] → [Msg3] → [Msg4] → [Msg5] → [Msg6]    │
└─────────────────────────────────────────────────────────┘
                            ↓
                    ┌───────────────┐
                    │ Consumer 1    │ → 处理 Msg1, Msg3, Msg5
                    └───────────────┘
                    ┌───────────────┐
                    │ Consumer 2    │ → 处理 Msg2, Msg4, Msg6
                    └───────────────┘

结果:Msg2 可能比 Msg1 先处理完,造成乱序

2. 扩缩容时的分区重分配

扩容前:
┌───────────┐   ┌───────────┐   ┌───────────┐
│ Consumer1 │──→│ Partition0│   │ Partition1│←──Consumer2
└───────────┘   └───────────┘   └───────────┘

扩容后:
┌───────────┐   ┌───────────┐
│ Consumer1 │──→│ Partition0│
└───────────┘   └───────────┘
┌───────────┐   ┌───────────┐
│ Consumer2 │──→│ Partition1│
└───────────┘   └───────────┘
┌───────────┐   ┌───────────┐
│ Consumer3 │──→│ Partition2│
└───────────┘   └───────────┘

问题:重分配期间,正在处理的消息可能被重复消费或乱序

3. 消息重试导致的顺序问题

正常流程:
Msg1 (成功) → Msg2 (成功) → Msg3 (成功)

异常重试:
Msg1 (成功) → Msg2 (失败) → 重试 → Msg3 (处理) → Msg2 (成功)
结果:Msg3 比 Msg2 先处理完

4. 批量消费的部分失败

批量消费:
批次:[Msg1, Msg2, Msg3, Msg4, Msg5]
结果:Msg1-3 成功,Msg4-5 失败

问题:如果只重试失败的消息,顺序可能被打乱

整体架构设计

我们的严格顺序消费方案由以下核心组件构成:

  1. OrderProducer:按业务键分区的顺序生产者
  2. OrderConsumer:单线程顺序消费者
  3. PartitionCoordinator:分区协调器,处理扩缩容
  4. OrderStateManager:状态管理器,追踪处理顺序
  5. DeadLetterHandler:死信处理器,处理异常消息
  6. SequenceChecker:序列号检查器,确保顺序正确

让我们看看如何在 SpringBoot 中实现这套系统:

1. 按业务键分区的生产者

首先实现按业务键分区的生产者,确保同一业务的消息始终在同一个分区:

@Component
@Slf4j
public class OrderProducer {

    @Autowired
    private KafkaTemplate<String, OrderMessage> kafkaTemplate;

    @Value("${kafka.topic.order-events}")
    private String topic;

    /**
     * 发送订单消息,按业务键分区
     */
    public SendResult sendOrderMessage(OrderMessage message) {
        Assert.hasText(message.getOrderNo(), "订单号不能为空");
        Assert.hasText(message.getEventType(), "事件类型不能为空");

        long startTime = System.currentTimeMillis();
        try {
            // 使用订单号作为 key,确保同一订单的消息在同一分区
            ProducerRecord<String, OrderMessage> record = new ProducerRecord<>(
                topic,
                message.getOrderNo(),
                message
            );

            ListenableFuture<SendResult<String, OrderMessage>> future = kafkaTemplate.send(record);
            SendResult result = future.get();

            long costTime = System.currentTimeMillis() - startTime;
            log.info("发送订单消息成功: orderNo={}, eventType={}, partition={}, offset={}, cost={}ms",
                message.getOrderNo(), message.getEventType(),
                result.getRecordMetadata().partition(),
                result.getRecordMetadata().offset(),
                costTime);

            return result;
        } catch (Exception e) {
            log.error("发送订单消息失败: orderNo={}, eventType={}",
                message.getOrderNo(), message.getEventType(), e);
            throw new OrderMessageSendException("发送订单消息失败", e);
        }
    }

    /**
     * 批量发送订单消息,同一业务键的消息保持顺序
     */
    public void sendOrderMessages(List<OrderMessage> messages) {
        // 按订单号分组
        Map<String, List<OrderMessage>> groupedMessages = messages.stream()
            .collect(Collectors.groupingBy(OrderMessage::getOrderNo));

        // 同一订单的消息按顺序发送
        for (Map.Entry<String, List<OrderMessage>> entry : groupedMessages.entrySet()) {
            List<OrderMessage> orderMessages = entry.getValue();
            orderMessages.sort(Comparator.comparing(OrderMessage::getSequenceNo));
            for (OrderMessage message : orderMessages) {
                sendOrderMessage(message);
            }
        }
    }
}

2. 顺序消息实体和序列号生成

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class OrderMessage implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 订单号(作为分区键)
     */
    private String orderNo;

    /**
     * 事件类型
     */
    private String eventType;

    /**
     * 序列号(用于校验顺序)
     */
    private Long sequenceNo;

    /**
     * 业务数据
     */
    private Map<String, Object> data;

    /**
     * 创建时间
     */
    private Date createTime;

    /**
     * 重试次数
     */
    private Integer retryCount;
}
@Component
public class SequenceGenerator {

    private final Map<String, AtomicLong> sequenceMap = new ConcurrentHashMap<>();

    /**
     * 为订单生成下一个序列号
     */
    public Long nextSequence(String orderNo) {
        return sequenceMap.computeIfAbsent(orderNo, k -> new AtomicLong(0))
            .incrementAndGet();
    }

    /**
     * 获取当前序列号
     */
    public Long getCurrentSequence(String orderNo) {
        AtomicLong sequence = sequenceMap.get(orderNo);
        return sequence == null ? 0L : sequence.get();
    }

    /**
     * 重置序列号
     */
    public void resetSequence(String orderNo) {
        sequenceMap.remove(orderNo);
    }
}

3. 单线程顺序消费者

核心的顺序消费者,确保同一分区的消息按顺序处理:

@Component
@Slf4j
public class OrderConsumer {

    @Autowired
    private OrderStateManager stateManager;

    @Autowired
    private OrderProcessor orderProcessor;

    @Autowired
    private DeadLetterHandler deadLetterHandler;

    @Autowired
    private SequenceChecker sequenceChecker;

    /**
     * 单线程消费,确保顺序
     */
    @KafkaListener(
        topics = "${kafka.topic.order-events}",
        groupId = "${kafka.group.order-consumer}",
        concurrency = "1"
    )
    public void consume(
        ConsumerRecord<String, OrderMessage> record,
        Acknowledgment ack,
        Consumer<?, ?> consumer
    ) {
        String orderNo = record.key();
        OrderMessage message = record.value();
        long offset = record.offset();
        int partition = record.partition();

        log.debug("收到订单消息: orderNo={}, sequenceNo={}, eventType={}, partition={}, offset={}",
            orderNo, message.getSequenceNo(), message.getEventType(), partition, offset);

        try {
            // 1. 检查序列号是否正确
            if (!sequenceChecker.checkAndUpdate(orderNo, message.getSequenceNo())) {
                log.warn("序列号不匹配: orderNo={}, expected={}, actual={}",
                    orderNo, stateManager.getNextExpectedSequence(orderNo), message.getSequenceNo());
                handleSequenceMismatch(record, message);
                return;
            }

            // 2. 检查消息是否已处理(幂等)
            if (stateManager.isProcessed(orderNo, offset)) {
                log.info("消息已处理,跳过: orderNo={}, offset={}", orderNo, offset);
                ack.acknowledge();
                return;
            }

            // 3. 标记为处理中
            stateManager.markProcessing(orderNo, offset, message.getSequenceNo());

            // 4. 处理业务逻辑
            boolean success = orderProcessor.process(message);

            if (success) {
                // 5. 处理成功,标记已完成
                stateManager.markCompleted(orderNo, offset);
                ack.acknowledge();
                log.info("订单消息处理成功: orderNo={}, sequenceNo={}, eventType={}, offset={}",
                    orderNo, message.getSequenceNo(), message.getEventType(), offset);
            } else {
                // 6. 处理失败,进入重试流程
                handleProcessFailure(record, message, partition, offset);
            }

        } catch (Exception e) {
            log.error("消费订单消息异常: orderNo={}, offset={}", orderNo, offset, e);
            handleConsumeException(record, message, e);
        }
    }

    private void handleSequenceMismatch(
        ConsumerRecord<String, OrderMessage> record,
        OrderMessage message
    ) {
        String orderNo = record.key();
        long expectedSeq = stateManager.getNextExpectedSequence(orderNo);
        long actualSeq = message.getSequenceNo();

        if (actualSeq < expectedSeq) {
            // 过期消息,直接丢弃
            log.info("过期消息,直接丢弃: orderNo={}, expected={}, actual={}",
                orderNo, expectedSeq, actualSeq);
            return;
        }

        // 消息滞后,暂存等待
        stateManager.stashMessage(record);
        log.info("消息滞后,已暂存: orderNo={}, expected={}, actual={}",
            orderNo, expectedSeq, actualSeq);

        // 尝试处理已暂存的消息
        processStashedMessages(orderNo);
    }

    private void processStashedMessages(String orderNo) {
        List<ConsumerRecord<String, OrderMessage>> stashed = stateManager.getStashedMessages(orderNo);
        stashed.sort(Comparator.comparing(r -> r.value().getSequenceNo()));

        for (ConsumerRecord<String, OrderMessage> record : stashed) {
            OrderMessage message = record.value();
            if (sequenceChecker.checkAndUpdate(orderNo, message.getSequenceNo())) {
                stateManager.removeStashedMessage(record);
                // 递归处理
                consume(record, (Acknowledgment) null, (Consumer<?, ?>) null);
            } else {
                break;
            }
        }
    }

    private void handleProcessFailure(
        ConsumerRecord<String, OrderMessage> record,
        OrderMessage message,
        int partition,
        long offset
    ) {
        String orderNo = record.key();
        int retryCount = message.getRetryCount() == null ? 0 : message.getRetryCount();

        if (retryCount < 5) {
            // 重试
            message.setRetryCount(retryCount + 1);
            log.warn("订单消息处理失败,准备重试: orderNo={}, retryCount={}", orderNo, retryCount + 1);
            // 延迟重试
            scheduleRetry(record, message, retryCount + 1);
        } else {
            // 重试次数用尽,进入死信队列
            log.error("订单消息重试次数用尽,进入死信队列: orderNo={}", orderNo);
            deadLetterHandler.sendToDeadLetter(record, "重试次数用尽");
            stateManager.markFailed(orderNo, offset);
        }
    }

    private void handleConsumeException(
        ConsumerRecord<String, OrderMessage> record,
        OrderMessage message,
        Exception e
    ) {
        String orderNo = record.key();
        log.error("消费异常: orderNo={}", orderNo, e);
        deadLetterHandler.sendToDeadLetter(record, e.getMessage());
        stateManager.markFailed(orderNo, record.offset());
    }

    private void scheduleRetry(
        ConsumerRecord<String, OrderMessage> record,
        OrderMessage message,
        int retryCount
    ) {
        // 使用延迟队列实现重试
        long delayMs = getRetryDelayMs(retryCount);
        // 简化实现,实际项目中可以使用延迟队列
        log.info("消息将在 {}ms 后重试", delayMs);
    }

    private long getRetryDelayMs(int retryCount) {
        switch (retryCount) {
            case 1: return 1000L;
            case 2: return 5000L;
            case 3: return 10000L;
            case 4: return 30000L;
            case 5: return 60000L;
            default: return 60000L;
        }
    }
}

4. 状态管理器

管理消息处理状态和暂存消息:

@Component
@Slf4j
public class OrderStateManager {

    // 处理中的消息: orderNo -> (offset, sequenceNo)
    private final Map<String, ProcessingState> processingMap = new ConcurrentHashMap<>();

    // 已处理的消息: orderNo -> Set<offset>
    private final Map<String, Set<Long>> processedMap = new ConcurrentHashMap<>();

    // 暂存的消息: orderNo -> List<Record>
    private final Map<String, List<ConsumerRecord<String, OrderMessage>>> stashedMap = new ConcurrentHashMap<>();

    // 下一个期望序列号: orderNo -> sequenceNo
    private final Map<String, AtomicLong> expectedSequenceMap = new ConcurrentHashMap<>();

    /**
     * 获取下一个期望序列号
     */
    public Long getNextExpectedSequence(String orderNo) {
        return expectedSequenceMap.computeIfAbsent(orderNo, k -> new AtomicLong(1L)).get();
    }

    /**
     * 标记为处理中
     */
    public void markProcessing(String orderNo, long offset, long sequenceNo) {
        processingMap.put(orderNo, new ProcessingState(offset, sequenceNo));
        log.debug("标记为处理中: orderNo={}, offset={}, sequenceNo={}", orderNo, offset, sequenceNo);
    }

    /**
     * 标记为已完成
     */
    public void markCompleted(String orderNo, long offset) {
        processingMap.remove(orderNo);
        processedMap.computeIfAbsent(orderNo, k -> ConcurrentHashMap.newKeySet()).add(offset);
        log.debug("标记为已完成: orderNo={}, offset={}", orderNo, offset);
    }

    /**
     * 标记为失败
     */
    public void markFailed(String orderNo, long offset) {
        processingMap.remove(orderNo);
        log.warn("标记为失败: orderNo={}, offset={}", orderNo, offset);
    }

    /**
     * 检查是否已处理
     */
    public boolean isProcessed(String orderNo, long offset) {
        Set<Long> processedOffsets = processedMap.get(orderNo);
        return processedOffsets != null && processedOffsets.contains(offset);
    }

    /**
     * 暂存消息
     */
    public void stashMessage(ConsumerRecord<String, OrderMessage> record) {
        String orderNo = record.key();
        stashedMap.computeIfAbsent(orderNo, k -> Collections.synchronizedList(new ArrayList<>()))
            .add(record);
    }

    /**
     * 获取暂存消息
     */
    public List<ConsumerRecord<String, OrderMessage>> getStashedMessages(String orderNo) {
        return stashedMap.getOrDefault(orderNo, Collections.emptyList());
    }

    /**
     * 移除暂存消息
     */
    public void removeStashedMessage(ConsumerRecord<String, OrderMessage> record) {
        String orderNo = record.key();
        List<ConsumerRecord<String, OrderMessage>> list = stashedMap.get(orderNo);
        if (list != null) {
            list.remove(record);
        }
    }

    @Data
    @AllArgsConstructor
    private static class ProcessingState {
        private long offset;
        private long sequenceNo;
    }
}

5. 序列号检查器

检查消息顺序是否正确:

@Component
@Slf4j
public class SequenceChecker {

    @Autowired
    private OrderStateManager stateManager;

    /**
     * 检查并更新序列号
     * @return true: 顺序正确,false: 顺序不正确
     */
    public boolean checkAndUpdate(String orderNo, Long sequenceNo) {
        if (sequenceNo == null) {
            log.warn("消息序列号为空: orderNo={}", orderNo);
            return false;
        }

        AtomicLong expectedSequence = getOrCreateExpectedSequence(orderNo);
        long expected = expectedSequence.get();

        if (sequenceNo.equals(expected)) {
            // 顺序正确,更新期望序列号
            expectedSequence.incrementAndGet();
            log.debug("序列号验证通过: orderNo={}, sequenceNo={}", orderNo, sequenceNo);
            return true;
        }

        if (sequenceNo < expected) {
            // 过期消息
            log.debug("过期消息: orderNo={}, expected={}, actual={}",
                orderNo, expected, sequenceNo);
            return false;
        }

        // 消息滞后
        log.debug("消息滞后: orderNo={}, expected={}, actual={}",
            orderNo, expected, sequenceNo);
        return false;
    }

    private AtomicLong getOrCreateExpectedSequence(String orderNo) {
        // 这里简化实现,实际项目中应该从数据库或缓存获取初始序列号
        return new AtomicLong(1L);
    }

    /**
     * 重置序列号(用于订单完成后)
     */
    public void resetSequence(String orderNo) {
        // 实现重置逻辑
    }
}

6. 死信处理器

处理无法正常消费的消息:

@Component
@Slf4j
public class DeadLetterHandler {

    @Autowired
    private KafkaTemplate<String, OrderMessage> kafkaTemplate;

    @Value("${kafka.topic.order-dead-letter}")
    private String deadLetterTopic;

    /**
     * 发送到死信队列
     */
    public void sendToDeadLetter(ConsumerRecord<String, OrderMessage> record, String reason) {
        OrderMessage message = record.value();
        String orderNo = record.key();

        try {
            // 记录死信原因
            Map<String, Object> deadLetterInfo = new HashMap<>();
            deadLetterInfo.put("originalTopic", record.topic());
            deadLetterInfo.put("partition", record.partition());
            deadLetterInfo.put("offset", record.offset());
            deadLetterInfo.put("reason", reason);
            deadLetterInfo.put("deadLetterTime", new Date());

            message.setData(deadLetterInfo);

            ProducerRecord<String, OrderMessage> deadLetterRecord =
                new ProducerRecord<>(deadLetterTopic, orderNo, message);

            kafkaTemplate.send(deadLetterRecord).get();

            log.error("消息已发送到死信队列: orderNo={}, reason={}, partition={}, offset={}",
                orderNo, reason, record.partition(), record.offset());

        } catch (Exception e) {
            log.error("发送死信消息失败: orderNo={}", orderNo, e);
            // 告警通知
            sendAlert(orderNo, record.partition(), record.offset(), reason, e);
        }
    }

    private void sendAlert(String orderNo, int partition, long offset, String reason, Exception e) {
        // 发送告警,实际项目中可以对接监控告警系统
        log.error("死信告警: orderNo={}, partition={}, offset={}, reason={}",
            orderNo, partition, offset, reason);
    }
}

7. 分区协调器

处理扩缩容时的分区重分配:

@Component
@Slf4j
public class PartitionCoordinator implements ConsumerAwareRebalanceListener {

    @Autowired
    private OrderStateManager stateManager;

    private final Map<Integer, Long> partitionLastOffsets = new ConcurrentHashMap<>();

    @Override
    public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        log.info("分区回收前(提交前): partitions={}", partitions);
        // 记录当前处理状态
        saveCurrentState(partitions);
    }

    @Override
    public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        log.info("分区回收后(提交后): partitions={}", partitions);
    }

    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        log.info("分区分配: partitions={}", partitions);

        // 恢复处理状态
        restoreState(partitions);

        // 从上次提交的位置开始消费
        for (TopicPartition partition : partitions) {
            Long lastOffset = partitionLastOffsets.get(partition.partition());
            if (lastOffset != null) {
                consumer.seek(partition, lastOffset);
                log.info("分区消费位置重置: partition={}, offset={}",
                    partition.partition(), lastOffset);
            }
        }
    }

    private void saveCurrentState(Collection<TopicPartition> partitions) {
        log.info("保存分区状态: partitions={}", partitions);
        // 实际项目中可以保存到 Redis 或数据库
    }

    private void restoreState(Collection<TopicPartition> partitions) {
        log.info("恢复分区状态: partitions={}", partitions);
        // 实际项目中可以从 Redis 或数据库恢复
    }

    /**
     * 记录分区最新偏移量
     */
    public void updatePartitionOffset(int partition, long offset) {
        partitionLastOffsets.put(partition, offset);
    }
}

8. 订单业务处理器

@Service
@Slf4j
public class OrderProcessor {

    public boolean process(OrderMessage message) {
        String orderNo = message.getOrderNo();
        String eventType = message.getEventType();

        log.info("处理订单事件: orderNo={}, eventType={}", orderNo, eventType);

        try {
            switch (eventType) {
                case "ORDER_CREATED":
                    return handleOrderCreated(message);
                case "PAYMENT_SUCCESS":
                    return handlePaymentSuccess(message);
                case "PAYMENT_FAILED":
                    return handlePaymentFailed(message);
                case "ORDER_SHIPPED":
                    return handleOrderShipped(message);
                case "ORDER_DELIVERED":
                    return handleOrderDelivered(message);
                case "ORDER_CANCELLED":
                    return handleOrderCancelled(message);
                default:
                    log.warn("未知事件类型: orderNo={}, eventType={}", orderNo, eventType);
                    return true;
            }
        } catch (Exception e) {
            log.error("处理订单事件失败: orderNo={}, eventType={}", orderNo, eventType, e);
            return false;
        }
    }

    private boolean handleOrderCreated(OrderMessage message) {
        log.info("创建订单: orderNo={}, data={}", message.getOrderNo(), message.getData());
        return true;
    }

    private boolean handlePaymentSuccess(OrderMessage message) {
        log.info("支付成功: orderNo={}, data={}", message.getOrderNo(), message.getData());
        return true;
    }

    private boolean handlePaymentFailed(OrderMessage message) {
        log.info("支付失败: orderNo={}, data={}", message.getOrderNo(), message.getData());
        return true;
    }

    private boolean handleOrderShipped(OrderMessage message) {
        log.info("订单发货: orderNo={}, data={}", message.getOrderNo(), message.getData());
        return true;
    }

    private boolean handleOrderDelivered(OrderMessage message) {
        log.info("订单收货: orderNo={}, data={}", message.getOrderNo(), message.getData());
        return true;
    }

    private boolean handleOrderCancelled(OrderMessage message) {
        log.info("订单取消: orderNo={}, data={}", message.getOrderNo(), message.getData());
        return true;
    }
}

9. 配置文件

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: order-consumer-group
      auto-offset-reset: earliest
      enable-auto-commit: false
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: com.example.order.message
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all
      retries: 3
      enable-idempotence: true

kafka:
  topic:
    order-events: order-events-topic
    order-dead-letter: order-dead-letter-topic
  group:
    order-consumer: order-consumer-group

server:
  port: 8080

10. Kafka 配置类

@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Autowired
    private PartitionCoordinator partitionCoordinator;

    @Bean
    public ConsumerFactory<String, OrderMessage> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.order.message");

        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderMessage> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, OrderMessage> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // 单线程消费,确保顺序
        factory.setConcurrency(1);
        // 手动提交
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        // 配置分区重分配监听器
        factory.getContainerProperties().setConsumerRebalanceListener(partitionCoordinator);

        return factory;
    }

    @Bean
    public ProducerFactory<String, OrderMessage> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        // 每个分区内严格顺序
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);

        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public KafkaTemplate<String, OrderMessage> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

11. 测试接口

@RestController
@RequestMapping("/api/order")
public class OrderController {

    @Autowired
    private OrderProducer orderProducer;

    @Autowired
    private SequenceGenerator sequenceGenerator;

    /**
     * 模拟订单流程
     */
    @PostMapping("/simulate")
    public String simulateOrderFlow(@RequestParam String orderNo) {
        // 模拟订单生命周期事件
        List<OrderMessage> events = new ArrayList<>();

        // 1. 创建订单
        events.add(buildMessage(orderNo, "ORDER_CREATED", Map.of(
            "userId", "user001",
            "amount", 100.00
        )));

        // 2. 支付成功
        events.add(buildMessage(orderNo, "PAYMENT_SUCCESS", Map.of(
            "paymentNo", "PAY" + System.currentTimeMillis(),
            "amount", 100.00
        )));

        // 3. 订单发货
        events.add(buildMessage(orderNo, "ORDER_SHIPPED", Map.of(
            "logisticsNo", "LOG" + System.currentTimeMillis(),
            "company", "SF Express"
        )));

        // 4. 订单收货
        events.add(buildMessage(orderNo, "ORDER_DELIVERED", Map.of(
            "signTime", new Date()
        )));

        // 按顺序发送
        orderProducer.sendOrderMessages(events);

        return "订单流程模拟完成: " + orderNo;
    }

    private OrderMessage buildMessage(String orderNo, String eventType, Map<String, Object> data) {
        return OrderMessage.builder()
            .orderNo(orderNo)
            .eventType(eventType)
            .sequenceNo(sequenceGenerator.nextSequence(orderNo))
            .data(data)
            .createTime(new Date())
            .retryCount(0)
            .build();
    }
}

实际应用效果

通过这套方案,我们可以实现:

1. 分区内严格顺序消费

同一订单消息在同一分区:
Partition 0:
[Msg1(order=123)] → [Msg2(order=123)] → [Msg3(order=123)]
         ↓                ↓                ↓
    处理成功        处理成功        处理成功
         ↓                ↓                ↓
    ack提交          ack提交          ack提交

2. 扩缩容时状态保持

扩容流程:
1. 触发重分配前保存当前状态
2. 提交已处理的偏移量
3. 重新分配分区
4. 恢复处理状态
5. 从正确的位置继续消费

3. 异常重试不破坏顺序

异常场景:
Msg1 (成功) → Msg2 (失败) → 暂存后续消息 → 重试成功 → 继续处理后续消息

4. 死信队列兜底

重试用尽:
Msg1 (成功) → Msg2 (失败×5) → 死信队列 → 告警人工介入

总结

通过这套 Kafka 严格顺序消费方案,我们可以保证金融级交易链路的顺序性:

  1. 业务键分区:同一业务的消息始终在同一分区
  2. 单线程消费:确保分区内严格按顺序处理
  3. 序列号校验:检测并处理乱序消息
  4. 状态管理:追踪消息处理状态,支持幂等和重试
  5. 分区协调:扩缩容时保证状态一致
  6. 死信兜底:异常消息人工可追溯

希望这篇文章能对你有所帮助,如果你觉得有用,欢迎关注"服务端技术精选",我会持续分享更多实用的技术干货。


标题:SpringBoot + Kafka 严格顺序消费方案:扩缩容不乱序,金融级交易链路保障!
作者:jiangyi
地址:http://www.jiangyi.space/articles/2026/05/08/1777866923247.html
公众号:服务端技术精选
    评论
    0 评论
avatar

取消