Kafka消息丢失的3种场景,生产环境千万要注意!老司机带你避坑

Kafka消息丢失的3种场景,生产环境千万要注意!老司机带你避坑

明明发送了重要消息,但消费者就是收不到?或者消息处理了一半,突然就消失了?这些问题很可能就是Kafka消息丢失造成的!今天就来聊聊Kafka消息丢失的3种典型场景,以及如何在生产环境中完美避免这些坑!

一、Kafka消息丢失的根源分析

在深入讨论具体场景之前,我们先来了解一下Kafka的消息流转过程和可能导致消息丢失的关键环节。

1.1 Kafka消息流转过程

// Kafka消息流转过程简述
public class KafkaMessageFlow {
    
    public void flow() {
        System.out.println("=== Kafka消息流转过程 ===");
        System.out.println("1. Producer发送消息到Broker");
        System.out.println("2. Broker将消息写入磁盘并确认");
        System.out.println("3. Consumer从Broker拉取消息");
        System.out.println("4. Consumer处理消息并提交偏移量");
        System.out.println("5. 消息被成功消费");
    }
}

1.2 消息丢失的关键环节

// 消息丢失的关键环节
public class MessageLossPoints {
    
    public void lossPoints() {
        System.out.println("=== 消息丢失的关键环节 ===");
        System.out.println("1. Producer发送阶段:网络问题、Broker不可用");
        System.out.println("2. Broker存储阶段:磁盘故障、副本同步失败");
        System.out.println("3. Consumer消费阶段:处理失败、偏移量提交异常");
    }
}

二、场景一:Producer发送消息丢失

这是最常见的消息丢失场景,特别是在网络不稳定或Broker负载过高时容易出现。

2.1 问题表现

// Producer发送消息丢失的表现
public class ProducerLossSymptoms {
    
    public void symptoms() {
        System.out.println("=== Producer发送消息丢失的表现 ===");
        System.out.println("1. send()方法返回成功,但消费者收不到消息");
        System.out.println("2. 消息在Broker中找不到");
        System.out.println("3. 监控显示发送成功率100%,但实际消息缺失");
    }
}

2.2 错误的配置示例

// 错误的Producer配置示例
@Configuration
public class WrongKafkaProducerConfig {
    
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        
        // 错误配置1:acks设置为0,不等待任何确认
        props.put(ProducerConfig.ACKS_CONFIG, "0");
        
        // 错误配置2:重试次数为0,不重试
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        
        // 错误配置3:没有设置消息确认超时
        // props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        
        return new DefaultKafkaProducerFactory<>(props);
    }
    
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

2.3 正确的解决方案

// 正确的Producer配置
@Configuration
public class CorrectKafkaProducerConfig {
    
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        
        // 正确配置1:acks=all,等待所有副本确认
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        
        // 正确配置2:设置重试次数
        props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        
        // 正确配置3:开启幂等性
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        
        // 正确配置4:设置消息确认超时
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 60000);
        
        // 正确配置5:设置批次大小和linger时间
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        
        return new DefaultKafkaProducerFactory<>(props);
    }
    
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
        
        // 设置默认Topic
        kafkaTemplate.setDefaultTopic("default-topic");
        
        return kafkaTemplate;
    }
}

2.4 安全的消息发送方式

@Service
@Slf4j
public class SafeMessageSender {
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    /**
     * 同步发送消息(确保消息送达)
     */
    public void sendSync(String topic, String key, String message) {
        try {
            SendResult<String, String> result = kafkaTemplate.send(topic, key, message).get(10, TimeUnit.SECONDS);
            log.info("消息发送成功: topic={}, partition={}, offset={}", 
                    result.getRecordMetadata().topic(),
                    result.getRecordMetadata().partition(),
                    result.getRecordMetadata().offset());
        } catch (Exception e) {
            log.error("消息发送失败: topic={}, key={}, message={}", topic, key, message, e);
            throw new RuntimeException("消息发送失败", e);
        }
    }
    
    /**
     * 异步发送消息(带回调确认)
     */
    public void sendAsync(String topic, String key, String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, message);
        
        future.addCallback(
            new ListenableFutureCallback<SendResult<String, String>>() {
                @Override
                public void onSuccess(SendResult<String, String> result) {
                    log.info("消息发送成功: topic={}, partition={}, offset={}", 
                            result.getRecordMetadata().topic(),
                            result.getRecordMetadata().partition(),
                            result.getRecordMetadata().offset());
                }
                
                @Override
                public void onFailure(Throwable ex) {
                    log.error("消息发送失败: topic={}, key={}, message={}", topic, key, message, ex);
                    // 可以在这里实现重试逻辑
                    handleSendFailure(topic, key, message, ex);
                }
            }
        );
    }
    
    /**
     * 处理发送失败的情况
     */
    private void handleSendFailure(String topic, String key, String message, Throwable ex) {
        // 记录到数据库或发送到死信队列
        log.warn("准备重试发送消息: topic={}, key={}", topic, key);
        
        // 实现重试逻辑(可以使用Spring Retry或其他重试框架)
        retrySendMessage(topic, key, message, ex);
    }
    
    private void retrySendMessage(String topic, String key, String message, Throwable ex) {
        // 重试逻辑实现
        // 可以记录到数据库,由定时任务重试
        // 或者直接再次发送
    }
}

三、场景二:Broker存储消息丢失

当Broker节点发生故障或磁盘损坏时,可能会导致已接收但未完全同步的消息丢失。

3.1 问题表现

// Broker存储消息丢失的表现
public class BrokerLossSymptoms {
    
    public void symptoms() {
        System.out.println("=== Broker存储消息丢失的表现 ===");
        System.out.println("1. Broker节点宕机后,部分消息无法找回");
        System.out.println("2. 副本同步失败,Leader切换后消息缺失");
        System.out.println("3. 磁盘故障导致部分分区数据丢失");
    }
}

3.2 错误的Broker配置

# 错误的Broker配置示例 server.properties
# 错误配置1:副本因子过小
num.replica.fetchers=1
default.replication.factor=1  # 单副本,无冗余

# 错误配置2:刷盘策略过于宽松
log.flush.interval.messages=9223372036854775807  # 几乎不刷盘
log.flush.interval.ms=9223372036854775807  # 几乎不刷盘

# 错误配置3:未开启自动Leader选举
auto.leader.rebalance.enable=false

3.3 正确的Broker配置

# 正确的Broker配置 server.properties
# 正确配置1:合理的副本因子
default.replication.factor=3  # 至少3副本保证高可用
min.insync.replicas=2  # 至少2个副本确认才算成功

# 正确配置2:严格的刷盘策略
log.flush.interval.messages=10000  # 每1万条消息刷盘一次
log.flush.interval.ms=1000  # 每秒刷盘一次

# 正确配置3:开启自动Leader选举
auto.leader.rebalance.enable=true

# 正确配置4:其他重要配置
unclean.leader.election.enable=false  # 禁止不干净的Leader选举
retention.ms=604800000  # 消息保留7天

3.4 监控和告警配置

// Broker监控配置
@Component
@Slf4j
public class KafkaBrokerMonitor {
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    /**
     * 监控副本同步状态
     */
    public void monitorReplicaSync() {
        // 监控Under Replicated Partitions
        Gauge.builder("kafka.broker.under.replicated.partitions")
                .description("未完全同步的分区数量")
                .register(meterRegistry, this, KafkaBrokerMonitor::getUnderReplicatedPartitions);
                
        // 监控Offline Partitions
        Gauge.builder("kafka.broker.offline.partitions")
                .description("离线分区数量")
                .register(meterRegistry, this, KafkaBrokerMonitor::getOfflinePartitions);
    }
    
    private double getUnderReplicatedPartitions() {
        // 获取未完全同步的分区数量
        // 可以通过JMX或Kafka AdminClient获取
        return 0;
    }
    
    private double getOfflinePartitions() {
        // 获取离线分区数量
        return 0;
    }
    
    /**
     * 告警检查
     */
    @Scheduled(fixedRate = 60000) // 每分钟检查一次
    public void checkBrokerHealth() {
        int underReplicated = (int) getUnderReplicatedPartitions();
        int offlinePartitions = (int) getOfflinePartitions();
        
        if (underReplicated > 0) {
            log.warn("发现{}个未完全同步的分区", underReplicated);
            // 发送告警
            sendAlert("Kafka副本同步异常", "发现" + underReplicated + "个未完全同步的分区");
        }
        
        if (offlinePartitions > 0) {
            log.error("发现{}个离线分区", offlinePartitions);
            // 发送严重告警
            sendCriticalAlert("Kafka分区离线", "发现" + offlinePartitions + "个离线分区");
        }
    }
    
    private void sendAlert(String title, String message) {
        // 告警发送实现
    }
    
    private void sendCriticalAlert(String title, String message) {
        // 严重告警发送实现
    }
}

四、场景三:Consumer消费消息丢失

这是最容易被忽视的场景,特别是在消费者处理逻辑复杂或异常处理不当的情况下。

4.1 问题表现

// Consumer消费消息丢失的表现
public class ConsumerLossSymptoms {
    
    public void symptoms() {
        System.out.println("=== Consumer消费消息丢失的表现 ===");
        System.out.println("1. 消息被消费但业务逻辑未执行完成");
        System.out.println("2. 异常处理不当导致消息被跳过");
        System.out.println("3. 偏移量提前提交导致消息丢失");
        System.out.println("4. 消费者重启后重复消费或丢失消息");
    }
}

4.2 错误的Consumer实现

// 错误的Consumer实现示例
@Component
@Slf4j
public class WrongMessageConsumer {
    
    // 错误实现1:自动提交偏移量
    @KafkaListener(topics = "order-topic", groupId = "order-group")
    public void consumeWrong(ConsumerRecord<String, String> record) {
        log.info("收到消息: topic={}, partition={}, offset={}, key={}, value={}",
                record.topic(), record.partition(), record.offset(), record.key(), record.value());
        
        try {
            // 处理业务逻辑
            processBusinessLogic(record.value());
            
            // 错误实现2:处理完就认为成功,不管是否真的成功
            log.info("消息处理完成");
        } catch (Exception e) {
            // 错误实现3:异常处理不当,消息丢失
            log.error("处理消息失败", e);
            // 没有重试机制,消息直接丢失
        }
        
        // 错误实现4:这里已经是自动提交偏移量了(如果配置为自动提交)
    }
    
    private void processBusinessLogic(String message) throws Exception {
        // 模拟复杂的业务处理逻辑
        // 可能抛出异常
        if (Math.random() < 0.1) {
            throw new RuntimeException("模拟业务处理异常");
        }
        
        // 模拟长时间处理
        Thread.sleep(1000);
    }
}

4.3 正确的Consumer实现

// 正确的Consumer实现
@Component
@Slf4j
public class CorrectMessageConsumer {
    
    @Autowired
    private BusinessService businessService;
    
    @Autowired
    private DeadLetterQueueService deadLetterQueueService;
    
    // 正确实现1:手动提交偏移量
    @KafkaListener(
        topics = "order-topic", 
        groupId = "order-group",
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void consumeCorrect(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String topic = record.topic();
        int partition = record.partition();
        long offset = record.offset();
        String key = record.key();
        String value = record.value();
        
        log.info("收到消息: topic={}, partition={}, offset={}, key={}", topic, partition, offset, key);
        
        try {
            // 正确实现2:完整的业务处理流程
            boolean success = processBusinessLogic(value);
            
            if (success) {
                // 正确实现3:处理成功后再提交偏移量
                ack.acknowledge();
                log.info("消息处理成功并确认: offset={}", offset);
            } else {
                // 处理失败,发送到死信队列
                handleProcessingFailure(record, ack);
            }
        } catch (Exception e) {
            // 正确实现4:完善的异常处理机制
            log.error("处理消息失败: topic={}, partition={}, offset={}", topic, partition, offset, e);
            handleProcessingException(record, ack, e);
        }
    }
    
    /**
     * 处理业务逻辑
     */
    private boolean processBusinessLogic(String message) {
        try {
            // 调用业务服务处理消息
            businessService.handleMessage(message);
            return true;
        } catch (Exception e) {
            log.error("业务处理异常", e);
            return false;
        }
    }
    
    /**
     * 处理业务处理失败的情况
     */
    private void handleProcessingFailure(ConsumerRecord<String, String> record, Acknowledgment ack) {
        log.warn("业务处理失败,发送到死信队列: offset={}", record.offset());
        
        // 发送到死信队列
        deadLetterQueueService.sendToDLQ(record);
        
        // 确认原消息(因为我们已经将其发送到DLQ)
        ack.acknowledge();
    }
    
    /**
     * 处理异常情况
     */
    private void handleProcessingException(ConsumerRecord<String, String> record, Acknowledgment ack, Exception e) {
        // 记录异常信息
        log.error("消息处理异常: offset={}, exception={}", record.offset(), e.getMessage());
        
        // 根据异常类型决定处理策略
        if (isRetriableException(e)) {
            // 可重试异常,不确认偏移量,让Kafka重新投递
            log.info("可重试异常,消息将被重新投递: offset={}", record.offset());
            // 不调用ack.acknowledge(),消息会被重新消费
        } else {
            // 不可重试异常,发送到死信队列
            handleProcessingFailure(record, ack);
        }
    }
    
    /**
     * 判断是否为可重试异常
     */
    private boolean isRetriableException(Exception e) {
        // 可以根据具体业务需求定义哪些异常是可重试的
        return e instanceof TimeoutException || 
               e instanceof RetriableException ||
               e instanceof NetworkException;
    }
}

4.4 Consumer配置优化

// 正确的Consumer配置
@Configuration
public class CorrectKafkaConsumerConfig {
    
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        
        // 正确配置1:手动提交偏移量
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        
        // 正确配置2:设置合理的会话超时
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45000);
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 15000);
        
        // 正确配置3:设置最大拉取大小
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576); // 1MB
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
        
        // 正确配置4:设置重试间隔
        props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
        
        return new DefaultKafkaConsumerFactory<>(props);
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = 
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        
        // 正确配置5:设置手动确认模式
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        
        // 正确配置6:设置并发消费者数量
        factory.setConcurrency(3);
        
        // 正确配置7:设置错误处理器
        factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(1000L, 3L)));
        
        return factory;
    }
}

五、综合保障措施

5.1 消息追踪和监控

// 消息追踪实现
@Component
@Slf4j
public class MessageTraceService {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    /**
     * 记录消息轨迹
     */
    public void traceMessage(String messageId, String status, String info) {
        String traceKey = "message_trace:" + messageId;
        String traceInfo = System.currentTimeMillis() + "|" + status + "|" + info;
        
        redisTemplate.opsForList().leftPush(traceKey, traceInfo);
        redisTemplate.expire(traceKey, Duration.ofHours(24));
    }
    
    /**
     * 查询消息轨迹
     */
    public List<String> getMessageTrace(String messageId) {
        String traceKey = "message_trace:" + messageId;
        return redisTemplate.opsForList().range(traceKey, 0, -1);
    }
}

5.2 死信队列处理

// 死信队列处理
@Component
@Slf4j
public class DeadLetterQueueService {
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    @Autowired
    private FailedMessageRepository failedMessageRepository;
    
    /**
     * 发送到死信队列
     */
    public void sendToDLQ(ConsumerRecord<String, String> record) {
        try {
            // 构造死信消息
            FailedMessage failedMessage = new FailedMessage();
            failedMessage.setTopic(record.topic());
            failedMessage.setPartition(record.partition());
            failedMessage.setOffset(record.offset());
            failedMessage.setKey(record.key());
            failedMessage.setValue(record.value());
            failedMessage.setTimestamp(System.currentTimeMillis());
            failedMessage.setException("处理失败");
            
            // 保存到数据库
            failedMessageRepository.save(failedMessage);
            
            // 发送到死信Topic
            String dlqTopic = record.topic() + ".dlq";
            kafkaTemplate.send(dlqTopic, record.key(), record.value());
            
            log.info("消息已发送到死信队列: topic={}, offset={}", record.topic(), record.offset());
        } catch (Exception e) {
            log.error("发送死信消息失败", e);
        }
    }
    
    /**
     * 死信消息处理
     */
    @KafkaListener(topics = "order-topic.dlq", groupId = "dlq-group")
    public void handleDLQMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
        log.info("收到死信消息: topic={}, offset={}", record.topic(), record.offset());
        
        try {
            // 尝试重新处理
            reprocessMessage(record);
            
            // 确认消息
            ack.acknowledge();
        } catch (Exception e) {
            log.error("死信消息处理失败", e);
            // 可以记录到人工处理队列
            moveToManualQueue(record);
            ack.acknowledge();
        }
    }
    
    private void reprocessMessage(ConsumerRecord<String, String> record) {
        // 重新处理逻辑
    }
    
    private void moveToManualQueue(ConsumerRecord<String, String> record) {
        // 移动到人工处理队列
    }
}

5.3 完整的监控告警体系

// 完整的监控告警体系
@Component
@Slf4j
public class KafkaMonitoringService {
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    private final Counter messageProducedCounter;
    private final Counter messageConsumedCounter;
    private final Counter messageLostCounter;
    private final Timer messageProcessTimer;
    
    public KafkaMonitoringService(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        
        this.messageProducedCounter = Counter.builder("kafka.messages.produced")
                .description("已生产的消息数量")
                .register(meterRegistry);
                
        this.messageConsumedCounter = Counter.builder("kafka.messages.consumed")
                .description("已消费的消息数量")
                .register(meterRegistry);
                
        this.messageLostCounter = Counter.builder("kafka.messages.lost")
                .description("丢失的消息数量")
                .register(meterRegistry);
                
        this.messageProcessTimer = Timer.builder("kafka.message.process.time")
                .description("消息处理耗时")
                .register(meterRegistry);
    }
    
    /**
     * 记录消息生产
     */
    public void recordMessageProduced() {
        messageProducedCounter.increment();
    }
    
    /**
     * 记录消息消费
     */
    public void recordMessageConsumed(long processTimeMs) {
        messageConsumedCounter.increment();
        messageProcessTimer.record(processTimeMs, TimeUnit.MILLISECONDS);
    }
    
    /**
     * 记录消息丢失
     */
    public void recordMessageLost() {
        messageLostCounter.increment();
        // 发送告警
        sendLossAlert();
    }
    
    private void sendLossAlert() {
        log.error("检测到Kafka消息丢失!");
        // 实现告警发送逻辑
    }
}

六、生产环境最佳实践

6.1 部署架构建议

// 生产环境部署架构建议
public class ProductionDeploymentGuide {
    
    public void guide() {
        System.out.println("=== Kafka生产环境部署建议 ===");
        System.out.println("1. Broker节点:至少3个节点,分布在不同机架");
        System.out.println("2. Zookeeper:奇数个节点(3或5),独立部署");
        System.out.println("3. 磁盘:SSD存储,单独挂载数据目录");
        System.out.println("4. 网络:千兆网络,低延迟");
        System.out.println("5. 监控:Prometheus + Grafana + AlertManager");
    }
}

6.2 运维操作手册

// 运维操作手册
public class OperationsManual {
    
    public void manual() {
        System.out.println("=== Kafka运维操作手册 ===");
        System.out.println("日常检查:");
        System.out.println("- 监控Broker状态和资源使用率");
        System.out.println("- 检查副本同步状态");
        System.out.println("- 监控消费者组滞后情况");
        System.out.println("- 检查磁盘空间使用情况");
        
        System.out.println("\n应急处理:");
        System.out.println("- Broker宕机:检查日志,重启服务");
        System.out.println("- 分区离线:检查副本状态,重新分配分区");
        System.out.println("- 消费者滞后:增加消费者实例");
        System.out.println("- 磁盘满:清理旧数据,扩容磁盘");
        
        System.out.println("\n预防措施:");
        System.out.println("- 定期备份重要数据");
        System.out.println("- 配置合理的监控告警");
        System.out.println("- 制定灾难恢复预案");
        System.out.println("- 定期进行压力测试");
    }
}

结语

Kafka消息丢失问题是生产环境中必须高度重视的问题。通过本文介绍的三种典型场景和相应的解决方案,相信你能有效避免消息丢失的风险。

关键要点总结:

  1. Producer端:使用acks=all、开启幂等性、同步发送或带回调的异步发送
  2. Broker端:配置合理的副本因子、刷盘策略、监控副本同步状态
  3. Consumer端:手动提交偏移量、完善的异常处理机制、死信队列处理

记住,消息系统的可靠性不是自然而然的,需要我们在每个环节都做好充分的保障措施。在分布式系统中,任何组件都可能出现故障,关键是要有完善的容错和恢复机制。

如果你觉得这篇文章对你有帮助,欢迎分享给更多的朋友。在Kafka消息系统的设计和运维路上,我们一起成长!


关注「服务端技术精选」,获取更多干货技术文章!


标题:Kafka消息丢失的3种场景,生产环境千万要注意!老司机带你避坑
作者:jiangyi
地址:http://www.jiangyi.space/articles/2025/12/21/1766304278850.html

    0 评论
avatar