Kafka消息丢失的3种场景,生产环境千万要注意!老司机带你避坑
Kafka消息丢失的3种场景,生产环境千万要注意!老司机带你避坑
明明发送了重要消息,但消费者就是收不到?或者消息处理了一半,突然就消失了?这些问题很可能就是Kafka消息丢失造成的!今天就来聊聊Kafka消息丢失的3种典型场景,以及如何在生产环境中完美避免这些坑!
一、Kafka消息丢失的根源分析
在深入讨论具体场景之前,我们先来了解一下Kafka的消息流转过程和可能导致消息丢失的关键环节。
1.1 Kafka消息流转过程
// Kafka消息流转过程简述
public class KafkaMessageFlow {
public void flow() {
System.out.println("=== Kafka消息流转过程 ===");
System.out.println("1. Producer发送消息到Broker");
System.out.println("2. Broker将消息写入磁盘并确认");
System.out.println("3. Consumer从Broker拉取消息");
System.out.println("4. Consumer处理消息并提交偏移量");
System.out.println("5. 消息被成功消费");
}
}
1.2 消息丢失的关键环节
// 消息丢失的关键环节
public class MessageLossPoints {
public void lossPoints() {
System.out.println("=== 消息丢失的关键环节 ===");
System.out.println("1. Producer发送阶段:网络问题、Broker不可用");
System.out.println("2. Broker存储阶段:磁盘故障、副本同步失败");
System.out.println("3. Consumer消费阶段:处理失败、偏移量提交异常");
}
}
二、场景一:Producer发送消息丢失
这是最常见的消息丢失场景,特别是在网络不稳定或Broker负载过高时容易出现。
2.1 问题表现
// Producer发送消息丢失的表现
public class ProducerLossSymptoms {
public void symptoms() {
System.out.println("=== Producer发送消息丢失的表现 ===");
System.out.println("1. send()方法返回成功,但消费者收不到消息");
System.out.println("2. 消息在Broker中找不到");
System.out.println("3. 监控显示发送成功率100%,但实际消息缺失");
}
}
2.2 错误的配置示例
// 错误的Producer配置示例
@Configuration
public class WrongKafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 错误配置1:acks设置为0,不等待任何确认
props.put(ProducerConfig.ACKS_CONFIG, "0");
// 错误配置2:重试次数为0,不重试
props.put(ProducerConfig.RETRIES_CONFIG, 0);
// 错误配置3:没有设置消息确认超时
// props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
2.3 正确的解决方案
// 正确的Producer配置
@Configuration
public class CorrectKafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 正确配置1:acks=all,等待所有副本确认
props.put(ProducerConfig.ACKS_CONFIG, "all");
// 正确配置2:设置重试次数
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
// 正确配置3:开启幂等性
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 正确配置4:设置消息确认超时
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 60000);
// 正确配置5:设置批次大小和linger时间
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
// 设置默认Topic
kafkaTemplate.setDefaultTopic("default-topic");
return kafkaTemplate;
}
}
2.4 安全的消息发送方式
@Service
@Slf4j
public class SafeMessageSender {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 同步发送消息(确保消息送达)
*/
public void sendSync(String topic, String key, String message) {
try {
SendResult<String, String> result = kafkaTemplate.send(topic, key, message).get(10, TimeUnit.SECONDS);
log.info("消息发送成功: topic={}, partition={}, offset={}",
result.getRecordMetadata().topic(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
} catch (Exception e) {
log.error("消息发送失败: topic={}, key={}, message={}", topic, key, message, e);
throw new RuntimeException("消息发送失败", e);
}
}
/**
* 异步发送消息(带回调确认)
*/
public void sendAsync(String topic, String key, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, message);
future.addCallback(
new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("消息发送成功: topic={}, partition={}, offset={}",
result.getRecordMetadata().topic(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
log.error("消息发送失败: topic={}, key={}, message={}", topic, key, message, ex);
// 可以在这里实现重试逻辑
handleSendFailure(topic, key, message, ex);
}
}
);
}
/**
* 处理发送失败的情况
*/
private void handleSendFailure(String topic, String key, String message, Throwable ex) {
// 记录到数据库或发送到死信队列
log.warn("准备重试发送消息: topic={}, key={}", topic, key);
// 实现重试逻辑(可以使用Spring Retry或其他重试框架)
retrySendMessage(topic, key, message, ex);
}
private void retrySendMessage(String topic, String key, String message, Throwable ex) {
// 重试逻辑实现
// 可以记录到数据库,由定时任务重试
// 或者直接再次发送
}
}
三、场景二:Broker存储消息丢失
当Broker节点发生故障或磁盘损坏时,可能会导致已接收但未完全同步的消息丢失。
3.1 问题表现
// Broker存储消息丢失的表现
public class BrokerLossSymptoms {
public void symptoms() {
System.out.println("=== Broker存储消息丢失的表现 ===");
System.out.println("1. Broker节点宕机后,部分消息无法找回");
System.out.println("2. 副本同步失败,Leader切换后消息缺失");
System.out.println("3. 磁盘故障导致部分分区数据丢失");
}
}
3.2 错误的Broker配置
# 错误的Broker配置示例 server.properties
# 错误配置1:副本因子过小
num.replica.fetchers=1
default.replication.factor=1 # 单副本,无冗余
# 错误配置2:刷盘策略过于宽松
log.flush.interval.messages=9223372036854775807 # 几乎不刷盘
log.flush.interval.ms=9223372036854775807 # 几乎不刷盘
# 错误配置3:未开启自动Leader选举
auto.leader.rebalance.enable=false
3.3 正确的Broker配置
# 正确的Broker配置 server.properties
# 正确配置1:合理的副本因子
default.replication.factor=3 # 至少3副本保证高可用
min.insync.replicas=2 # 至少2个副本确认才算成功
# 正确配置2:严格的刷盘策略
log.flush.interval.messages=10000 # 每1万条消息刷盘一次
log.flush.interval.ms=1000 # 每秒刷盘一次
# 正确配置3:开启自动Leader选举
auto.leader.rebalance.enable=true
# 正确配置4:其他重要配置
unclean.leader.election.enable=false # 禁止不干净的Leader选举
retention.ms=604800000 # 消息保留7天
3.4 监控和告警配置
// Broker监控配置
@Component
@Slf4j
public class KafkaBrokerMonitor {
@Autowired
private MeterRegistry meterRegistry;
/**
* 监控副本同步状态
*/
public void monitorReplicaSync() {
// 监控Under Replicated Partitions
Gauge.builder("kafka.broker.under.replicated.partitions")
.description("未完全同步的分区数量")
.register(meterRegistry, this, KafkaBrokerMonitor::getUnderReplicatedPartitions);
// 监控Offline Partitions
Gauge.builder("kafka.broker.offline.partitions")
.description("离线分区数量")
.register(meterRegistry, this, KafkaBrokerMonitor::getOfflinePartitions);
}
private double getUnderReplicatedPartitions() {
// 获取未完全同步的分区数量
// 可以通过JMX或Kafka AdminClient获取
return 0;
}
private double getOfflinePartitions() {
// 获取离线分区数量
return 0;
}
/**
* 告警检查
*/
@Scheduled(fixedRate = 60000) // 每分钟检查一次
public void checkBrokerHealth() {
int underReplicated = (int) getUnderReplicatedPartitions();
int offlinePartitions = (int) getOfflinePartitions();
if (underReplicated > 0) {
log.warn("发现{}个未完全同步的分区", underReplicated);
// 发送告警
sendAlert("Kafka副本同步异常", "发现" + underReplicated + "个未完全同步的分区");
}
if (offlinePartitions > 0) {
log.error("发现{}个离线分区", offlinePartitions);
// 发送严重告警
sendCriticalAlert("Kafka分区离线", "发现" + offlinePartitions + "个离线分区");
}
}
private void sendAlert(String title, String message) {
// 告警发送实现
}
private void sendCriticalAlert(String title, String message) {
// 严重告警发送实现
}
}
四、场景三:Consumer消费消息丢失
这是最容易被忽视的场景,特别是在消费者处理逻辑复杂或异常处理不当的情况下。
4.1 问题表现
// Consumer消费消息丢失的表现
public class ConsumerLossSymptoms {
public void symptoms() {
System.out.println("=== Consumer消费消息丢失的表现 ===");
System.out.println("1. 消息被消费但业务逻辑未执行完成");
System.out.println("2. 异常处理不当导致消息被跳过");
System.out.println("3. 偏移量提前提交导致消息丢失");
System.out.println("4. 消费者重启后重复消费或丢失消息");
}
}
4.2 错误的Consumer实现
// 错误的Consumer实现示例
@Component
@Slf4j
public class WrongMessageConsumer {
// 错误实现1:自动提交偏移量
@KafkaListener(topics = "order-topic", groupId = "order-group")
public void consumeWrong(ConsumerRecord<String, String> record) {
log.info("收到消息: topic={}, partition={}, offset={}, key={}, value={}",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
try {
// 处理业务逻辑
processBusinessLogic(record.value());
// 错误实现2:处理完就认为成功,不管是否真的成功
log.info("消息处理完成");
} catch (Exception e) {
// 错误实现3:异常处理不当,消息丢失
log.error("处理消息失败", e);
// 没有重试机制,消息直接丢失
}
// 错误实现4:这里已经是自动提交偏移量了(如果配置为自动提交)
}
private void processBusinessLogic(String message) throws Exception {
// 模拟复杂的业务处理逻辑
// 可能抛出异常
if (Math.random() < 0.1) {
throw new RuntimeException("模拟业务处理异常");
}
// 模拟长时间处理
Thread.sleep(1000);
}
}
4.3 正确的Consumer实现
// 正确的Consumer实现
@Component
@Slf4j
public class CorrectMessageConsumer {
@Autowired
private BusinessService businessService;
@Autowired
private DeadLetterQueueService deadLetterQueueService;
// 正确实现1:手动提交偏移量
@KafkaListener(
topics = "order-topic",
groupId = "order-group",
containerFactory = "kafkaListenerContainerFactory"
)
public void consumeCorrect(ConsumerRecord<String, String> record, Acknowledgment ack) {
String topic = record.topic();
int partition = record.partition();
long offset = record.offset();
String key = record.key();
String value = record.value();
log.info("收到消息: topic={}, partition={}, offset={}, key={}", topic, partition, offset, key);
try {
// 正确实现2:完整的业务处理流程
boolean success = processBusinessLogic(value);
if (success) {
// 正确实现3:处理成功后再提交偏移量
ack.acknowledge();
log.info("消息处理成功并确认: offset={}", offset);
} else {
// 处理失败,发送到死信队列
handleProcessingFailure(record, ack);
}
} catch (Exception e) {
// 正确实现4:完善的异常处理机制
log.error("处理消息失败: topic={}, partition={}, offset={}", topic, partition, offset, e);
handleProcessingException(record, ack, e);
}
}
/**
* 处理业务逻辑
*/
private boolean processBusinessLogic(String message) {
try {
// 调用业务服务处理消息
businessService.handleMessage(message);
return true;
} catch (Exception e) {
log.error("业务处理异常", e);
return false;
}
}
/**
* 处理业务处理失败的情况
*/
private void handleProcessingFailure(ConsumerRecord<String, String> record, Acknowledgment ack) {
log.warn("业务处理失败,发送到死信队列: offset={}", record.offset());
// 发送到死信队列
deadLetterQueueService.sendToDLQ(record);
// 确认原消息(因为我们已经将其发送到DLQ)
ack.acknowledge();
}
/**
* 处理异常情况
*/
private void handleProcessingException(ConsumerRecord<String, String> record, Acknowledgment ack, Exception e) {
// 记录异常信息
log.error("消息处理异常: offset={}, exception={}", record.offset(), e.getMessage());
// 根据异常类型决定处理策略
if (isRetriableException(e)) {
// 可重试异常,不确认偏移量,让Kafka重新投递
log.info("可重试异常,消息将被重新投递: offset={}", record.offset());
// 不调用ack.acknowledge(),消息会被重新消费
} else {
// 不可重试异常,发送到死信队列
handleProcessingFailure(record, ack);
}
}
/**
* 判断是否为可重试异常
*/
private boolean isRetriableException(Exception e) {
// 可以根据具体业务需求定义哪些异常是可重试的
return e instanceof TimeoutException ||
e instanceof RetriableException ||
e instanceof NetworkException;
}
}
4.4 Consumer配置优化
// 正确的Consumer配置
@Configuration
public class CorrectKafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 正确配置1:手动提交偏移量
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// 正确配置2:设置合理的会话超时
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 15000);
// 正确配置3:设置最大拉取大小
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576); // 1MB
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
// 正确配置4:设置重试间隔
props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 正确配置5:设置手动确认模式
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// 正确配置6:设置并发消费者数量
factory.setConcurrency(3);
// 正确配置7:设置错误处理器
factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(1000L, 3L)));
return factory;
}
}
五、综合保障措施
5.1 消息追踪和监控
// 消息追踪实现
@Component
@Slf4j
public class MessageTraceService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 记录消息轨迹
*/
public void traceMessage(String messageId, String status, String info) {
String traceKey = "message_trace:" + messageId;
String traceInfo = System.currentTimeMillis() + "|" + status + "|" + info;
redisTemplate.opsForList().leftPush(traceKey, traceInfo);
redisTemplate.expire(traceKey, Duration.ofHours(24));
}
/**
* 查询消息轨迹
*/
public List<String> getMessageTrace(String messageId) {
String traceKey = "message_trace:" + messageId;
return redisTemplate.opsForList().range(traceKey, 0, -1);
}
}
5.2 死信队列处理
// 死信队列处理
@Component
@Slf4j
public class DeadLetterQueueService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private FailedMessageRepository failedMessageRepository;
/**
* 发送到死信队列
*/
public void sendToDLQ(ConsumerRecord<String, String> record) {
try {
// 构造死信消息
FailedMessage failedMessage = new FailedMessage();
failedMessage.setTopic(record.topic());
failedMessage.setPartition(record.partition());
failedMessage.setOffset(record.offset());
failedMessage.setKey(record.key());
failedMessage.setValue(record.value());
failedMessage.setTimestamp(System.currentTimeMillis());
failedMessage.setException("处理失败");
// 保存到数据库
failedMessageRepository.save(failedMessage);
// 发送到死信Topic
String dlqTopic = record.topic() + ".dlq";
kafkaTemplate.send(dlqTopic, record.key(), record.value());
log.info("消息已发送到死信队列: topic={}, offset={}", record.topic(), record.offset());
} catch (Exception e) {
log.error("发送死信消息失败", e);
}
}
/**
* 死信消息处理
*/
@KafkaListener(topics = "order-topic.dlq", groupId = "dlq-group")
public void handleDLQMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
log.info("收到死信消息: topic={}, offset={}", record.topic(), record.offset());
try {
// 尝试重新处理
reprocessMessage(record);
// 确认消息
ack.acknowledge();
} catch (Exception e) {
log.error("死信消息处理失败", e);
// 可以记录到人工处理队列
moveToManualQueue(record);
ack.acknowledge();
}
}
private void reprocessMessage(ConsumerRecord<String, String> record) {
// 重新处理逻辑
}
private void moveToManualQueue(ConsumerRecord<String, String> record) {
// 移动到人工处理队列
}
}
5.3 完整的监控告警体系
// 完整的监控告警体系
@Component
@Slf4j
public class KafkaMonitoringService {
@Autowired
private MeterRegistry meterRegistry;
private final Counter messageProducedCounter;
private final Counter messageConsumedCounter;
private final Counter messageLostCounter;
private final Timer messageProcessTimer;
public KafkaMonitoringService(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.messageProducedCounter = Counter.builder("kafka.messages.produced")
.description("已生产的消息数量")
.register(meterRegistry);
this.messageConsumedCounter = Counter.builder("kafka.messages.consumed")
.description("已消费的消息数量")
.register(meterRegistry);
this.messageLostCounter = Counter.builder("kafka.messages.lost")
.description("丢失的消息数量")
.register(meterRegistry);
this.messageProcessTimer = Timer.builder("kafka.message.process.time")
.description("消息处理耗时")
.register(meterRegistry);
}
/**
* 记录消息生产
*/
public void recordMessageProduced() {
messageProducedCounter.increment();
}
/**
* 记录消息消费
*/
public void recordMessageConsumed(long processTimeMs) {
messageConsumedCounter.increment();
messageProcessTimer.record(processTimeMs, TimeUnit.MILLISECONDS);
}
/**
* 记录消息丢失
*/
public void recordMessageLost() {
messageLostCounter.increment();
// 发送告警
sendLossAlert();
}
private void sendLossAlert() {
log.error("检测到Kafka消息丢失!");
// 实现告警发送逻辑
}
}
六、生产环境最佳实践
6.1 部署架构建议
// 生产环境部署架构建议
public class ProductionDeploymentGuide {
public void guide() {
System.out.println("=== Kafka生产环境部署建议 ===");
System.out.println("1. Broker节点:至少3个节点,分布在不同机架");
System.out.println("2. Zookeeper:奇数个节点(3或5),独立部署");
System.out.println("3. 磁盘:SSD存储,单独挂载数据目录");
System.out.println("4. 网络:千兆网络,低延迟");
System.out.println("5. 监控:Prometheus + Grafana + AlertManager");
}
}
6.2 运维操作手册
// 运维操作手册
public class OperationsManual {
public void manual() {
System.out.println("=== Kafka运维操作手册 ===");
System.out.println("日常检查:");
System.out.println("- 监控Broker状态和资源使用率");
System.out.println("- 检查副本同步状态");
System.out.println("- 监控消费者组滞后情况");
System.out.println("- 检查磁盘空间使用情况");
System.out.println("\n应急处理:");
System.out.println("- Broker宕机:检查日志,重启服务");
System.out.println("- 分区离线:检查副本状态,重新分配分区");
System.out.println("- 消费者滞后:增加消费者实例");
System.out.println("- 磁盘满:清理旧数据,扩容磁盘");
System.out.println("\n预防措施:");
System.out.println("- 定期备份重要数据");
System.out.println("- 配置合理的监控告警");
System.out.println("- 制定灾难恢复预案");
System.out.println("- 定期进行压力测试");
}
}
结语
Kafka消息丢失问题是生产环境中必须高度重视的问题。通过本文介绍的三种典型场景和相应的解决方案,相信你能有效避免消息丢失的风险。
关键要点总结:
- Producer端:使用acks=all、开启幂等性、同步发送或带回调的异步发送
- Broker端:配置合理的副本因子、刷盘策略、监控副本同步状态
- Consumer端:手动提交偏移量、完善的异常处理机制、死信队列处理
记住,消息系统的可靠性不是自然而然的,需要我们在每个环节都做好充分的保障措施。在分布式系统中,任何组件都可能出现故障,关键是要有完善的容错和恢复机制。
如果你觉得这篇文章对你有帮助,欢迎分享给更多的朋友。在Kafka消息系统的设计和运维路上,我们一起成长!
关注「服务端技术精选」,获取更多干货技术文章!
标题:Kafka消息丢失的3种场景,生产环境千万要注意!老司机带你避坑
作者:jiangyi
地址:http://www.jiangyi.space/articles/2025/12/21/1766304278850.html
- 一、Kafka消息丢失的根源分析
- 1.1 Kafka消息流转过程
- 1.2 消息丢失的关键环节
- 二、场景一:Producer发送消息丢失
- 2.1 问题表现
- 2.2 错误的配置示例
- 2.3 正确的解决方案
- 2.4 安全的消息发送方式
- 三、场景二:Broker存储消息丢失
- 3.1 问题表现
- 3.2 错误的Broker配置
- 3.3 正确的Broker配置
- 3.4 监控和告警配置
- 四、场景三:Consumer消费消息丢失
- 4.1 问题表现
- 4.2 错误的Consumer实现
- 4.3 正确的Consumer实现
- 4.4 Consumer配置优化
- 五、综合保障措施
- 5.1 消息追踪和监控
- 5.2 死信队列处理
- 5.3 完整的监控告警体系
- 六、生产环境最佳实践
- 6.1 部署架构建议
- 6.2 运维操作手册
- 结语
0 评论