SpringBoot + Kafka 严格顺序消费方案:扩缩容不乱序,金融级交易链路保障!
在金融交易、支付、证券等场景下,消息顺序至关重要:
- 用户下单 → 支付 → 发货 → 确认收货,这个顺序绝不能乱
- 账户余额变更必须按时间顺序处理,否则会出现透支
- 证券撮合交易对时序要求精确到毫秒
Kafka 虽然通过分区机制保证了分区内的顺序,但在实际生产中,顺序消费往往会遇到各种问题:
- 多消费者消费同一个分区,消息乱序
- 扩容缩容时,分区重分配导致消息处理顺序被打乱
- 消息重试导致的顺序错乱
- 批量消费时部分失败导致的顺序问题
今天我们来聊一聊如何在 SpringBoot 中实现 Kafka 严格顺序消费,保证金融级交易链路的稳定性。
为什么顺序消费这么难?
先分析一下 Kafka 顺序消费的难点:
1. 多消费者消费同一个分区
问题:
┌─────────────────────────────────────────────────────────┐
│ Kafka Partition 0 │
│ [Msg1] → [Msg2] → [Msg3] → [Msg4] → [Msg5] → [Msg6] │
└─────────────────────────────────────────────────────────┘
↓
┌───────────────┐
│ Consumer 1 │ → 处理 Msg1, Msg3, Msg5
└───────────────┘
┌───────────────┐
│ Consumer 2 │ → 处理 Msg2, Msg4, Msg6
└───────────────┘
结果:Msg2 可能比 Msg1 先处理完,造成乱序
2. 扩缩容时的分区重分配
扩容前:
┌───────────┐ ┌───────────┐ ┌───────────┐
│ Consumer1 │──→│ Partition0│ │ Partition1│←──Consumer2
└───────────┘ └───────────┘ └───────────┘
扩容后:
┌───────────┐ ┌───────────┐
│ Consumer1 │──→│ Partition0│
└───────────┘ └───────────┘
┌───────────┐ ┌───────────┐
│ Consumer2 │──→│ Partition1│
└───────────┘ └───────────┘
┌───────────┐ ┌───────────┐
│ Consumer3 │──→│ Partition2│
└───────────┘ └───────────┘
问题:重分配期间,正在处理的消息可能被重复消费或乱序
3. 消息重试导致的顺序问题
正常流程:
Msg1 (成功) → Msg2 (成功) → Msg3 (成功)
异常重试:
Msg1 (成功) → Msg2 (失败) → 重试 → Msg3 (处理) → Msg2 (成功)
结果:Msg3 比 Msg2 先处理完
4. 批量消费的部分失败
批量消费:
批次:[Msg1, Msg2, Msg3, Msg4, Msg5]
结果:Msg1-3 成功,Msg4-5 失败
问题:如果只重试失败的消息,顺序可能被打乱
整体架构设计
我们的严格顺序消费方案由以下核心组件构成:
- OrderProducer:按业务键分区的顺序生产者
- OrderConsumer:单线程顺序消费者
- PartitionCoordinator:分区协调器,处理扩缩容
- OrderStateManager:状态管理器,追踪处理顺序
- DeadLetterHandler:死信处理器,处理异常消息
- SequenceChecker:序列号检查器,确保顺序正确
让我们看看如何在 SpringBoot 中实现这套系统:
1. 按业务键分区的生产者
首先实现按业务键分区的生产者,确保同一业务的消息始终在同一个分区:
@Component
@Slf4j
public class OrderProducer {
@Autowired
private KafkaTemplate<String, OrderMessage> kafkaTemplate;
@Value("${kafka.topic.order-events}")
private String topic;
/**
* 发送订单消息,按业务键分区
*/
public SendResult sendOrderMessage(OrderMessage message) {
Assert.hasText(message.getOrderNo(), "订单号不能为空");
Assert.hasText(message.getEventType(), "事件类型不能为空");
long startTime = System.currentTimeMillis();
try {
// 使用订单号作为 key,确保同一订单的消息在同一分区
ProducerRecord<String, OrderMessage> record = new ProducerRecord<>(
topic,
message.getOrderNo(),
message
);
ListenableFuture<SendResult<String, OrderMessage>> future = kafkaTemplate.send(record);
SendResult result = future.get();
long costTime = System.currentTimeMillis() - startTime;
log.info("发送订单消息成功: orderNo={}, eventType={}, partition={}, offset={}, cost={}ms",
message.getOrderNo(), message.getEventType(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset(),
costTime);
return result;
} catch (Exception e) {
log.error("发送订单消息失败: orderNo={}, eventType={}",
message.getOrderNo(), message.getEventType(), e);
throw new OrderMessageSendException("发送订单消息失败", e);
}
}
/**
* 批量发送订单消息,同一业务键的消息保持顺序
*/
public void sendOrderMessages(List<OrderMessage> messages) {
// 按订单号分组
Map<String, List<OrderMessage>> groupedMessages = messages.stream()
.collect(Collectors.groupingBy(OrderMessage::getOrderNo));
// 同一订单的消息按顺序发送
for (Map.Entry<String, List<OrderMessage>> entry : groupedMessages.entrySet()) {
List<OrderMessage> orderMessages = entry.getValue();
orderMessages.sort(Comparator.comparing(OrderMessage::getSequenceNo));
for (OrderMessage message : orderMessages) {
sendOrderMessage(message);
}
}
}
}
2. 顺序消息实体和序列号生成
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class OrderMessage implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 订单号(作为分区键)
*/
private String orderNo;
/**
* 事件类型
*/
private String eventType;
/**
* 序列号(用于校验顺序)
*/
private Long sequenceNo;
/**
* 业务数据
*/
private Map<String, Object> data;
/**
* 创建时间
*/
private Date createTime;
/**
* 重试次数
*/
private Integer retryCount;
}
@Component
public class SequenceGenerator {
private final Map<String, AtomicLong> sequenceMap = new ConcurrentHashMap<>();
/**
* 为订单生成下一个序列号
*/
public Long nextSequence(String orderNo) {
return sequenceMap.computeIfAbsent(orderNo, k -> new AtomicLong(0))
.incrementAndGet();
}
/**
* 获取当前序列号
*/
public Long getCurrentSequence(String orderNo) {
AtomicLong sequence = sequenceMap.get(orderNo);
return sequence == null ? 0L : sequence.get();
}
/**
* 重置序列号
*/
public void resetSequence(String orderNo) {
sequenceMap.remove(orderNo);
}
}
3. 单线程顺序消费者
核心的顺序消费者,确保同一分区的消息按顺序处理:
@Component
@Slf4j
public class OrderConsumer {
@Autowired
private OrderStateManager stateManager;
@Autowired
private OrderProcessor orderProcessor;
@Autowired
private DeadLetterHandler deadLetterHandler;
@Autowired
private SequenceChecker sequenceChecker;
/**
* 单线程消费,确保顺序
*/
@KafkaListener(
topics = "${kafka.topic.order-events}",
groupId = "${kafka.group.order-consumer}",
concurrency = "1"
)
public void consume(
ConsumerRecord<String, OrderMessage> record,
Acknowledgment ack,
Consumer<?, ?> consumer
) {
String orderNo = record.key();
OrderMessage message = record.value();
long offset = record.offset();
int partition = record.partition();
log.debug("收到订单消息: orderNo={}, sequenceNo={}, eventType={}, partition={}, offset={}",
orderNo, message.getSequenceNo(), message.getEventType(), partition, offset);
try {
// 1. 检查序列号是否正确
if (!sequenceChecker.checkAndUpdate(orderNo, message.getSequenceNo())) {
log.warn("序列号不匹配: orderNo={}, expected={}, actual={}",
orderNo, stateManager.getNextExpectedSequence(orderNo), message.getSequenceNo());
handleSequenceMismatch(record, message);
return;
}
// 2. 检查消息是否已处理(幂等)
if (stateManager.isProcessed(orderNo, offset)) {
log.info("消息已处理,跳过: orderNo={}, offset={}", orderNo, offset);
ack.acknowledge();
return;
}
// 3. 标记为处理中
stateManager.markProcessing(orderNo, offset, message.getSequenceNo());
// 4. 处理业务逻辑
boolean success = orderProcessor.process(message);
if (success) {
// 5. 处理成功,标记已完成
stateManager.markCompleted(orderNo, offset);
ack.acknowledge();
log.info("订单消息处理成功: orderNo={}, sequenceNo={}, eventType={}, offset={}",
orderNo, message.getSequenceNo(), message.getEventType(), offset);
} else {
// 6. 处理失败,进入重试流程
handleProcessFailure(record, message, partition, offset);
}
} catch (Exception e) {
log.error("消费订单消息异常: orderNo={}, offset={}", orderNo, offset, e);
handleConsumeException(record, message, e);
}
}
private void handleSequenceMismatch(
ConsumerRecord<String, OrderMessage> record,
OrderMessage message
) {
String orderNo = record.key();
long expectedSeq = stateManager.getNextExpectedSequence(orderNo);
long actualSeq = message.getSequenceNo();
if (actualSeq < expectedSeq) {
// 过期消息,直接丢弃
log.info("过期消息,直接丢弃: orderNo={}, expected={}, actual={}",
orderNo, expectedSeq, actualSeq);
return;
}
// 消息滞后,暂存等待
stateManager.stashMessage(record);
log.info("消息滞后,已暂存: orderNo={}, expected={}, actual={}",
orderNo, expectedSeq, actualSeq);
// 尝试处理已暂存的消息
processStashedMessages(orderNo);
}
private void processStashedMessages(String orderNo) {
List<ConsumerRecord<String, OrderMessage>> stashed = stateManager.getStashedMessages(orderNo);
stashed.sort(Comparator.comparing(r -> r.value().getSequenceNo()));
for (ConsumerRecord<String, OrderMessage> record : stashed) {
OrderMessage message = record.value();
if (sequenceChecker.checkAndUpdate(orderNo, message.getSequenceNo())) {
stateManager.removeStashedMessage(record);
// 递归处理
consume(record, (Acknowledgment) null, (Consumer<?, ?>) null);
} else {
break;
}
}
}
private void handleProcessFailure(
ConsumerRecord<String, OrderMessage> record,
OrderMessage message,
int partition,
long offset
) {
String orderNo = record.key();
int retryCount = message.getRetryCount() == null ? 0 : message.getRetryCount();
if (retryCount < 5) {
// 重试
message.setRetryCount(retryCount + 1);
log.warn("订单消息处理失败,准备重试: orderNo={}, retryCount={}", orderNo, retryCount + 1);
// 延迟重试
scheduleRetry(record, message, retryCount + 1);
} else {
// 重试次数用尽,进入死信队列
log.error("订单消息重试次数用尽,进入死信队列: orderNo={}", orderNo);
deadLetterHandler.sendToDeadLetter(record, "重试次数用尽");
stateManager.markFailed(orderNo, offset);
}
}
private void handleConsumeException(
ConsumerRecord<String, OrderMessage> record,
OrderMessage message,
Exception e
) {
String orderNo = record.key();
log.error("消费异常: orderNo={}", orderNo, e);
deadLetterHandler.sendToDeadLetter(record, e.getMessage());
stateManager.markFailed(orderNo, record.offset());
}
private void scheduleRetry(
ConsumerRecord<String, OrderMessage> record,
OrderMessage message,
int retryCount
) {
// 使用延迟队列实现重试
long delayMs = getRetryDelayMs(retryCount);
// 简化实现,实际项目中可以使用延迟队列
log.info("消息将在 {}ms 后重试", delayMs);
}
private long getRetryDelayMs(int retryCount) {
switch (retryCount) {
case 1: return 1000L;
case 2: return 5000L;
case 3: return 10000L;
case 4: return 30000L;
case 5: return 60000L;
default: return 60000L;
}
}
}
4. 状态管理器
管理消息处理状态和暂存消息:
@Component
@Slf4j
public class OrderStateManager {
// 处理中的消息: orderNo -> (offset, sequenceNo)
private final Map<String, ProcessingState> processingMap = new ConcurrentHashMap<>();
// 已处理的消息: orderNo -> Set<offset>
private final Map<String, Set<Long>> processedMap = new ConcurrentHashMap<>();
// 暂存的消息: orderNo -> List<Record>
private final Map<String, List<ConsumerRecord<String, OrderMessage>>> stashedMap = new ConcurrentHashMap<>();
// 下一个期望序列号: orderNo -> sequenceNo
private final Map<String, AtomicLong> expectedSequenceMap = new ConcurrentHashMap<>();
/**
* 获取下一个期望序列号
*/
public Long getNextExpectedSequence(String orderNo) {
return expectedSequenceMap.computeIfAbsent(orderNo, k -> new AtomicLong(1L)).get();
}
/**
* 标记为处理中
*/
public void markProcessing(String orderNo, long offset, long sequenceNo) {
processingMap.put(orderNo, new ProcessingState(offset, sequenceNo));
log.debug("标记为处理中: orderNo={}, offset={}, sequenceNo={}", orderNo, offset, sequenceNo);
}
/**
* 标记为已完成
*/
public void markCompleted(String orderNo, long offset) {
processingMap.remove(orderNo);
processedMap.computeIfAbsent(orderNo, k -> ConcurrentHashMap.newKeySet()).add(offset);
log.debug("标记为已完成: orderNo={}, offset={}", orderNo, offset);
}
/**
* 标记为失败
*/
public void markFailed(String orderNo, long offset) {
processingMap.remove(orderNo);
log.warn("标记为失败: orderNo={}, offset={}", orderNo, offset);
}
/**
* 检查是否已处理
*/
public boolean isProcessed(String orderNo, long offset) {
Set<Long> processedOffsets = processedMap.get(orderNo);
return processedOffsets != null && processedOffsets.contains(offset);
}
/**
* 暂存消息
*/
public void stashMessage(ConsumerRecord<String, OrderMessage> record) {
String orderNo = record.key();
stashedMap.computeIfAbsent(orderNo, k -> Collections.synchronizedList(new ArrayList<>()))
.add(record);
}
/**
* 获取暂存消息
*/
public List<ConsumerRecord<String, OrderMessage>> getStashedMessages(String orderNo) {
return stashedMap.getOrDefault(orderNo, Collections.emptyList());
}
/**
* 移除暂存消息
*/
public void removeStashedMessage(ConsumerRecord<String, OrderMessage> record) {
String orderNo = record.key();
List<ConsumerRecord<String, OrderMessage>> list = stashedMap.get(orderNo);
if (list != null) {
list.remove(record);
}
}
@Data
@AllArgsConstructor
private static class ProcessingState {
private long offset;
private long sequenceNo;
}
}
5. 序列号检查器
检查消息顺序是否正确:
@Component
@Slf4j
public class SequenceChecker {
@Autowired
private OrderStateManager stateManager;
/**
* 检查并更新序列号
* @return true: 顺序正确,false: 顺序不正确
*/
public boolean checkAndUpdate(String orderNo, Long sequenceNo) {
if (sequenceNo == null) {
log.warn("消息序列号为空: orderNo={}", orderNo);
return false;
}
AtomicLong expectedSequence = getOrCreateExpectedSequence(orderNo);
long expected = expectedSequence.get();
if (sequenceNo.equals(expected)) {
// 顺序正确,更新期望序列号
expectedSequence.incrementAndGet();
log.debug("序列号验证通过: orderNo={}, sequenceNo={}", orderNo, sequenceNo);
return true;
}
if (sequenceNo < expected) {
// 过期消息
log.debug("过期消息: orderNo={}, expected={}, actual={}",
orderNo, expected, sequenceNo);
return false;
}
// 消息滞后
log.debug("消息滞后: orderNo={}, expected={}, actual={}",
orderNo, expected, sequenceNo);
return false;
}
private AtomicLong getOrCreateExpectedSequence(String orderNo) {
// 这里简化实现,实际项目中应该从数据库或缓存获取初始序列号
return new AtomicLong(1L);
}
/**
* 重置序列号(用于订单完成后)
*/
public void resetSequence(String orderNo) {
// 实现重置逻辑
}
}
6. 死信处理器
处理无法正常消费的消息:
@Component
@Slf4j
public class DeadLetterHandler {
@Autowired
private KafkaTemplate<String, OrderMessage> kafkaTemplate;
@Value("${kafka.topic.order-dead-letter}")
private String deadLetterTopic;
/**
* 发送到死信队列
*/
public void sendToDeadLetter(ConsumerRecord<String, OrderMessage> record, String reason) {
OrderMessage message = record.value();
String orderNo = record.key();
try {
// 记录死信原因
Map<String, Object> deadLetterInfo = new HashMap<>();
deadLetterInfo.put("originalTopic", record.topic());
deadLetterInfo.put("partition", record.partition());
deadLetterInfo.put("offset", record.offset());
deadLetterInfo.put("reason", reason);
deadLetterInfo.put("deadLetterTime", new Date());
message.setData(deadLetterInfo);
ProducerRecord<String, OrderMessage> deadLetterRecord =
new ProducerRecord<>(deadLetterTopic, orderNo, message);
kafkaTemplate.send(deadLetterRecord).get();
log.error("消息已发送到死信队列: orderNo={}, reason={}, partition={}, offset={}",
orderNo, reason, record.partition(), record.offset());
} catch (Exception e) {
log.error("发送死信消息失败: orderNo={}", orderNo, e);
// 告警通知
sendAlert(orderNo, record.partition(), record.offset(), reason, e);
}
}
private void sendAlert(String orderNo, int partition, long offset, String reason, Exception e) {
// 发送告警,实际项目中可以对接监控告警系统
log.error("死信告警: orderNo={}, partition={}, offset={}, reason={}",
orderNo, partition, offset, reason);
}
}
7. 分区协调器
处理扩缩容时的分区重分配:
@Component
@Slf4j
public class PartitionCoordinator implements ConsumerAwareRebalanceListener {
@Autowired
private OrderStateManager stateManager;
private final Map<Integer, Long> partitionLastOffsets = new ConcurrentHashMap<>();
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
log.info("分区回收前(提交前): partitions={}", partitions);
// 记录当前处理状态
saveCurrentState(partitions);
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
log.info("分区回收后(提交后): partitions={}", partitions);
}
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
log.info("分区分配: partitions={}", partitions);
// 恢复处理状态
restoreState(partitions);
// 从上次提交的位置开始消费
for (TopicPartition partition : partitions) {
Long lastOffset = partitionLastOffsets.get(partition.partition());
if (lastOffset != null) {
consumer.seek(partition, lastOffset);
log.info("分区消费位置重置: partition={}, offset={}",
partition.partition(), lastOffset);
}
}
}
private void saveCurrentState(Collection<TopicPartition> partitions) {
log.info("保存分区状态: partitions={}", partitions);
// 实际项目中可以保存到 Redis 或数据库
}
private void restoreState(Collection<TopicPartition> partitions) {
log.info("恢复分区状态: partitions={}", partitions);
// 实际项目中可以从 Redis 或数据库恢复
}
/**
* 记录分区最新偏移量
*/
public void updatePartitionOffset(int partition, long offset) {
partitionLastOffsets.put(partition, offset);
}
}
8. 订单业务处理器
@Service
@Slf4j
public class OrderProcessor {
public boolean process(OrderMessage message) {
String orderNo = message.getOrderNo();
String eventType = message.getEventType();
log.info("处理订单事件: orderNo={}, eventType={}", orderNo, eventType);
try {
switch (eventType) {
case "ORDER_CREATED":
return handleOrderCreated(message);
case "PAYMENT_SUCCESS":
return handlePaymentSuccess(message);
case "PAYMENT_FAILED":
return handlePaymentFailed(message);
case "ORDER_SHIPPED":
return handleOrderShipped(message);
case "ORDER_DELIVERED":
return handleOrderDelivered(message);
case "ORDER_CANCELLED":
return handleOrderCancelled(message);
default:
log.warn("未知事件类型: orderNo={}, eventType={}", orderNo, eventType);
return true;
}
} catch (Exception e) {
log.error("处理订单事件失败: orderNo={}, eventType={}", orderNo, eventType, e);
return false;
}
}
private boolean handleOrderCreated(OrderMessage message) {
log.info("创建订单: orderNo={}, data={}", message.getOrderNo(), message.getData());
return true;
}
private boolean handlePaymentSuccess(OrderMessage message) {
log.info("支付成功: orderNo={}, data={}", message.getOrderNo(), message.getData());
return true;
}
private boolean handlePaymentFailed(OrderMessage message) {
log.info("支付失败: orderNo={}, data={}", message.getOrderNo(), message.getData());
return true;
}
private boolean handleOrderShipped(OrderMessage message) {
log.info("订单发货: orderNo={}, data={}", message.getOrderNo(), message.getData());
return true;
}
private boolean handleOrderDelivered(OrderMessage message) {
log.info("订单收货: orderNo={}, data={}", message.getOrderNo(), message.getData());
return true;
}
private boolean handleOrderCancelled(OrderMessage message) {
log.info("订单取消: orderNo={}, data={}", message.getOrderNo(), message.getData());
return true;
}
}
9. 配置文件
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: order-consumer-group
auto-offset-reset: earliest
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: com.example.order.message
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all
retries: 3
enable-idempotence: true
kafka:
topic:
order-events: order-events-topic
order-dead-letter: order-dead-letter-topic
group:
order-consumer: order-consumer-group
server:
port: 8080
10. Kafka 配置类
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Autowired
private PartitionCoordinator partitionCoordinator;
@Bean
public ConsumerFactory<String, OrderMessage> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.order.message");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderMessage> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, OrderMessage> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 单线程消费,确保顺序
factory.setConcurrency(1);
// 手动提交
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// 配置分区重分配监听器
factory.getContainerProperties().setConsumerRebalanceListener(partitionCoordinator);
return factory;
}
@Bean
public ProducerFactory<String, OrderMessage> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 每个分区内严格顺序
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, OrderMessage> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
11. 测试接口
@RestController
@RequestMapping("/api/order")
public class OrderController {
@Autowired
private OrderProducer orderProducer;
@Autowired
private SequenceGenerator sequenceGenerator;
/**
* 模拟订单流程
*/
@PostMapping("/simulate")
public String simulateOrderFlow(@RequestParam String orderNo) {
// 模拟订单生命周期事件
List<OrderMessage> events = new ArrayList<>();
// 1. 创建订单
events.add(buildMessage(orderNo, "ORDER_CREATED", Map.of(
"userId", "user001",
"amount", 100.00
)));
// 2. 支付成功
events.add(buildMessage(orderNo, "PAYMENT_SUCCESS", Map.of(
"paymentNo", "PAY" + System.currentTimeMillis(),
"amount", 100.00
)));
// 3. 订单发货
events.add(buildMessage(orderNo, "ORDER_SHIPPED", Map.of(
"logisticsNo", "LOG" + System.currentTimeMillis(),
"company", "SF Express"
)));
// 4. 订单收货
events.add(buildMessage(orderNo, "ORDER_DELIVERED", Map.of(
"signTime", new Date()
)));
// 按顺序发送
orderProducer.sendOrderMessages(events);
return "订单流程模拟完成: " + orderNo;
}
private OrderMessage buildMessage(String orderNo, String eventType, Map<String, Object> data) {
return OrderMessage.builder()
.orderNo(orderNo)
.eventType(eventType)
.sequenceNo(sequenceGenerator.nextSequence(orderNo))
.data(data)
.createTime(new Date())
.retryCount(0)
.build();
}
}
实际应用效果
通过这套方案,我们可以实现:
1. 分区内严格顺序消费
同一订单消息在同一分区:
Partition 0:
[Msg1(order=123)] → [Msg2(order=123)] → [Msg3(order=123)]
↓ ↓ ↓
处理成功 处理成功 处理成功
↓ ↓ ↓
ack提交 ack提交 ack提交
2. 扩缩容时状态保持
扩容流程:
1. 触发重分配前保存当前状态
2. 提交已处理的偏移量
3. 重新分配分区
4. 恢复处理状态
5. 从正确的位置继续消费
3. 异常重试不破坏顺序
异常场景:
Msg1 (成功) → Msg2 (失败) → 暂存后续消息 → 重试成功 → 继续处理后续消息
4. 死信队列兜底
重试用尽:
Msg1 (成功) → Msg2 (失败×5) → 死信队列 → 告警人工介入
总结
通过这套 Kafka 严格顺序消费方案,我们可以保证金融级交易链路的顺序性:
- 业务键分区:同一业务的消息始终在同一分区
- 单线程消费:确保分区内严格按顺序处理
- 序列号校验:检测并处理乱序消息
- 状态管理:追踪消息处理状态,支持幂等和重试
- 分区协调:扩缩容时保证状态一致
- 死信兜底:异常消息人工可追溯
希望这篇文章能对你有所帮助,如果你觉得有用,欢迎关注"服务端技术精选",我会持续分享更多实用的技术干货。
标题:SpringBoot + Kafka 严格顺序消费方案:扩缩容不乱序,金融级交易链路保障!
作者:jiangyi
地址:http://www.jiangyi.space/articles/2026/05/08/1777866923247.html
公众号:服务端技术精选
评论
0 评论