SpringBoot + RabbitMQ 消费者假死检测:线程卡住却不报错?自动重启消费进程

引言

在分布式系统中,消息队列是实现服务间异步通信的重要组件。RabbitMQ作为一款广泛使用的消息队列中间件,其消费者服务的稳定性直接影响整个系统的可靠性。然而,在生产环境中,我们经常会遇到这样一种奇怪的现象:消费者进程看似正常运行,日志也没有任何错误输出,但是消息却不再被消费,队列中的消息不断堆积。这种情况就是所谓的"消费者假死"。

本文将深入探讨RabbitMQ消费者假死的问题,分析其成因,并详细介绍如何在Spring Boot应用中实现消费者假死的自动检测和自动重启机制,确保消息消费的持续可用性。

问题背景

什么是消费者假死

消费者假死是指消费者进程在表面上看起来正常运行,没有抛出任何异常或错误,但实际上已经无法正常处理消息。具体表现为:

  1. 消息不再被消费:队列中的消息持续堆积,但消费者没有任何处理动作
  2. 线程处于阻塞状态:消费者的工作线程被阻塞在某个操作上,无法继续处理新消息
  3. 心跳检测失效:RabbitMQ的心跳机制可能无法准确检测到这种状态
  4. 无错误日志:消费者不会输出任何错误日志,使得问题难以被发现

消费者假死的常见原因

在实际生产环境中,消费者假死可能由多种原因引起:

  1. 数据库连接池耗尽:消费者在处理消息时需要访问数据库,如果数据库连接池耗尽且没有合理的超时机制,线程会一直等待可用连接
  2. 外部服务调用阻塞:消费者依赖外部服务(如HTTP接口、RPC调用),如果外部服务响应缓慢或无响应,可能导致线程长时间阻塞
  3. 死锁:多个线程之间发生死锁,导致相关线程都无法继续执行
  4. 内存溢出:虽然Java的内存溢出通常会抛出异常,但在某些情况下可能触发GC或OOM Killer,导致线程暂停
  5. 无限等待:代码中存在没有超时设置的等待操作,如wait()sleep()BlockingQueue.take()
  6. Nginx/网关超时:如果消费者通过负载均衡器暴露服务,负载均衡器的超时设置可能导致连接被意外关闭

假死问题的严重性

消费者假死问题如果不及时发现和处理,会导致以下严重后果:

  1. 消息堆积:未处理的消息在队列中不断堆积,影响业务流程的正常进行
  2. 资源浪费:已经投递的消息无法被消费,造成系统资源的浪费
  3. 业务延迟:依赖消息驱动的业务无法及时执行,导致业务延迟
  4. 系统雪崩:在某些场景下,消息堆积可能导致整个系统负载过高,引发雪崩效应
  5. 数据不一致:在分布式事务场景下,可能导致数据不一致问题

核心概念

RabbitMQ消息消费机制

在深入理解消费者假死问题之前,我们需要先了解RabbitMQ的消息消费机制:

  1. 消息拉取模式:消费者主动从队列中拉取消息
  2. 消息推送模式:RabbitMQ将消息推送给消费者
  3. 确认机制:消费者处理完消息后,需要发送ACK确认
  4. 预取机制:RabbitMQ会限制推送给消费者的消息数量(QoS)
  5. 心跳检测:RabbitMQ通过心跳机制检测消费者是否存活

假死检测的核心原理

消费者假死检测的核心原理是主动探测。我们需要主动向消费者发送探测信号,检测其是否能够正常响应。如果消费者在规定时间内没有响应,则认为其已经假死,需要进行重启。

假死检测的关键要素:

  1. 探测信号:一个轻量级的检测任务,用于检测消费者是否存活
  2. 响应超时:如果消费者在超时时间内没有响应,则认为其假死
  3. 检测间隔:定期执行检测任务的间隔时间
  4. 连续失败阈值:连续多少次检测失败后触发重启

自动重启机制

检测到消费者假死后,需要自动重启消费者服务。自动重启机制需要考虑:

  1. 优雅重启:在重启前先关闭消费者连接,避免接收新消息
  2. 重试机制:如果重启失败,需要进行重试
  3. 最大重启次数:限制最大重启次数,避免无限重启
  4. 重启间隔:两次重启之间的最小间隔时间
  5. 告警通知:重启失败或超过重启次数时,需要发送告警通知

技术实现

1. 项目依赖配置

首先,我们需要在项目中引入RabbitMQ和Spring Boot的相关依赖:

<dependencies>
    <!-- Spring Boot AMQP -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <!-- Spring Boot Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- Spring Boot Actuator -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>

    <!-- Lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
</dependencies>

2. RabbitMQ配置

创建RabbitMQ连接配置和队列配置:

@Configuration
public class RabbitMQConfig {

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }

    @Bean
    public Queue testQueue() {
        return new Queue("test.queue", true);
    }

    @Bean
    public DirectExchange testExchange() {
        return new DirectExchange("test.exchange");
    }

    @Bean
    public Binding testBinding() {
        return BindingBuilder.bind(testQueue()).to(testExchange()).with("test.routing.key");
    }
}

3. 消费者服务

创建消息消费者服务:

@Service
public class MessageConsumerService {

    private static final Logger logger = LoggerFactory.getLogger(MessageConsumerService.class);

    private volatile boolean isRunning = true;
    private volatile long lastProcessedTime = System.currentTimeMillis();
    private final AtomicLong processedCount = new AtomicLong(0);

    @RabbitListener(queues = "test.queue")
    public void handleMessage(String message) {
        logger.info("Received message: {}", message);
        lastProcessedTime = System.currentTimeMillis();

        try {
            // 模拟处理消息
            processMessage(message);
            processedCount.incrementAndGet();
        } catch (Exception e) {
            logger.error("Error processing message: {}", message, e);
        }
    }

    private void processMessage(String message) throws InterruptedException {
        // 模拟消息处理
        Thread.sleep(100);
        logger.debug("Message processed: {}", message);
    }

    public boolean isRunning() {
        return isRunning;
    }

    public void setRunning(boolean running) {
        this.isRunning = running;
    }

    public long getLastProcessedTime() {
        return lastProcessedTime;
    }

    public long getProcessedCount() {
        return processedCount.get();
    }

    public void resetProcessedCount() {
        processedCount.set(0);
    }
}

4. 假死检测服务

创建消费者假死检测服务,这是整个方案的核心:

@Service
public class ConsumerDeadlockDetectionService {

    private static final Logger logger = LoggerFactory.getLogger(ConsumerDeadlockDetectionService.class);

    @Value("${consumer.check.interval:5000}")
    private long checkInterval;

    @Value("${consumer.timeout:30000}")
    private long timeout;

    @Value("${consumer.max-restart-count:3}")
    private int maxRestartCount;

    @Value("${consumer.restart-interval:10000}")
    private long restartInterval;

    @Autowired
    private MessageConsumerService messageConsumerService;

    @Autowired
    private ConsumerRestartService consumerRestartService;

    private ScheduledExecutorService scheduledExecutorService;
    private AtomicInteger consecutiveFailures = new AtomicInteger(0);
    private AtomicInteger restartCount = new AtomicInteger(0);
    private volatile boolean isDetecting = false;

    @PostConstruct
    public void init() {
        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        startDetection();
    }

    public void startDetection() {
        if (isDetecting) {
            return;
        }
        isDetecting = true;
        scheduledExecutorService.scheduleAtFixedRate(this::checkConsumerHealth,
                checkInterval, checkInterval, TimeUnit.MILLISECONDS);
        logger.info("Consumer health check started. Interval: {}ms, Timeout: {}ms",
                    checkInterval, timeout);
    }

    public void stopDetection() {
        isDetecting = false;
        if (scheduledExecutorService != null) {
            scheduledExecutorService.shutdown();
        }
    }

    private void checkConsumerHealth() {
        try {
            long currentTime = System.currentTimeMillis();
            long lastProcessedTime = messageConsumerService.getLastProcessedTime();
            long idleTime = currentTime - lastProcessedTime;

            logger.debug("Checking consumer health. Last processed: {}ms ago, Processed count: {}",
                        idleTime, messageConsumerService.getProcessedCount());

            if (idleTime > timeout) {
                logger.warn("Consumer seems to be deadlocked. Idle time: {}ms exceeds threshold: {}ms",
                           idleTime, timeout);
                handlePotentialDeadlock();
            } else {
                // 重置连续失败计数
                consecutiveFailures.set(0);
            }
        } catch (Exception e) {
            logger.error("Error checking consumer health", e);
        }
    }

    private void handlePotentialDeadlock() {
        int failures = consecutiveFailures.incrementAndGet();
        logger.warn("Potential deadlock detected. Consecutive failures: {}", failures);

        if (failures >= 2) {
            logger.error("Consumer confirmed deadlocked after {} consecutive failures. Triggering restart.",
                        failures);
            restartConsumer();
        }
    }

    private void restartConsumer() {
        int currentRestartCount = restartCount.get();
        if (currentRestartCount >= maxRestartCount) {
            logger.error("Maximum restart count ({}) exceeded. Consumer will not be restarted.",
                        maxRestartCount);
            sendAlert();
            return;
        }

        logger.info("Attempting to restart consumer. Attempt: {}/{}",
                   currentRestartCount + 1, maxRestartCount);

        try {
            consumerRestartService.restartConsumer();
            restartCount.incrementAndGet();
            consecutiveFailures.set(0);

            logger.info("Consumer restarted successfully");
        } catch (Exception e) {
            logger.error("Failed to restart consumer", e);

            // 延迟后重试
            scheduledExecutorService.schedule(this::restartConsumer,
                    restartInterval, TimeUnit.MILLISECONDS);
        }
    }

    private void sendAlert() {
        logger.error("ALERT: Consumer deadlock detected and max restart count exceeded!");
        // 实际项目中,这里应该发送告警通知
    }

    public boolean isDetecting() {
        return isDetecting;
    }

    public int getRestartCount() {
        return restartCount.get();
    }

    public void resetRestartCount() {
        restartCount.set(0);
    }
}

5. 自动重启服务

创建消费者自动重启服务:

@Service
public class ConsumerRestartService {

    private static final Logger logger = LoggerFactory.getLogger(ConsumerRestartService.class);

    @Autowired
    private MessageConsumerService messageConsumerService;

    @Autowired
    private ConnectionFactory connectionFactory;

    @Value("${consumer.graceful-shutdown-timeout:5000}")
    private long gracefulShutdownTimeout;

    public void restartConsumer() {
        logger.info("Starting consumer restart process");

        try {
            // 1. 停止接收新消息
            logger.info("Step 1: Stopping consumer from receiving new messages");
            messageConsumerService.setRunning(false);

            // 2. 等待现有消息处理完成
            logger.info("Step 2: Waiting for current messages to be processed");
            waitForCurrentProcessing();

            // 3. 关闭连接
            logger.info("Step 3: Closing RabbitMQ connection");
            closeConnection();

            // 4. 重置状态
            logger.info("Step 4: Resetting consumer state");
            resetConsumerState();

            // 5. 重新建立连接
            logger.info("Step 5: Re-establishing RabbitMQ connection");
            reconnect();

            // 6. 恢复消费
            logger.info("Step 6: Resuming message consumption");
            messageConsumerService.setRunning(true);
            messageConsumerService.resetProcessedCount();

            logger.info("Consumer restart completed successfully");
        } catch (Exception e) {
            logger.error("Error during consumer restart", e);
            throw new RuntimeException("Consumer restart failed", e);
        }
    }

    private void waitForCurrentProcessing() {
        try {
            Thread.sleep(gracefulShutdownTimeout);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void closeConnection() {
        try {
            if (connectionFactory instanceof CachingConnectionFactory) {
                ((CachingConnectionFactory) connectionFactory).destroy();
            }
        } catch (Exception e) {
            logger.warn("Error closing connection", e);
        }
    }

    private void resetConsumerState() {
        messageConsumerService.resetProcessedCount();
    }

    private void reconnect() {
        try {
            // 等待连接恢复
            Thread.sleep(1000);

            // 创建新连接
            Connection connection = connectionFactory.newConnection();
            if (connection == null) {
                throw new RuntimeException("Failed to create new connection");
            }

            logger.info("Connection re-established successfully");
        } catch (Exception e) {
            logger.error("Error reconnecting", e);
            throw new RuntimeException("Failed to reconnect", e);
        }
    }
}

6. 消费监控服务

创建消费监控服务,用于监控消费者的运行状态:

@Service
public class ConsumerMonitorService {

    private static final Logger logger = LoggerFactory.getLogger(ConsumerMonitorService.class);

    @Autowired
    private MessageConsumerService messageConsumerService;

    @Autowired
    private ConsumerDeadlockDetectionService deadlockDetectionService;

    private final Map<String, ConsumerStats> consumerStatsMap = new ConcurrentHashMap<>();

    public void recordMessageProcessed() {
        String consumerId = getCurrentConsumerId();
        ConsumerStats stats = consumerStatsMap.computeIfAbsent(consumerId, k -> new ConsumerStats());
        stats.recordMessageProcessed();
    }

    public void recordProcessingError(Exception e) {
        String consumerId = getCurrentConsumerId();
        ConsumerStats stats = consumerStatsMap.computeIfAbsent(consumerId, k -> new ConsumerStats());
        stats.recordError(e);
    }

    public ConsumerStats getConsumerStats(String consumerId) {
        return consumerStatsMap.get(consumerId);
    }

    public Map<String, ConsumerStats> getAllStats() {
        return new HashMap<>(consumerStatsMap);
    }

    private String getCurrentConsumerId() {
        return Thread.currentThread().getName();
    }

    public static class ConsumerStats {
        private final AtomicLong processedCount = new AtomicLong(0);
        private final AtomicLong errorCount = new AtomicLong(0);
        private final AtomicLong lastErrorTime = new AtomicLong(0);
        private final List<String> recentErrors = Collections.synchronizedList(new ArrayList<>());

        public void recordMessageProcessed() {
            processedCount.incrementAndGet();
        }

        public void recordError(Exception e) {
            errorCount.incrementAndGet();
            lastErrorTime.set(System.currentTimeMillis());
            recentErrors.add(e.getMessage());

            // 只保留最近10个错误
            if (recentErrors.size() > 10) {
                recentErrors.remove(0);
            }
        }

        public long getProcessedCount() {
            return processedCount.get();
        }

        public long getErrorCount() {
            return errorCount.get();
        }

        public long getLastErrorTime() {
            return lastErrorTime.get();
        }

        public List<String> getRecentErrors() {
            return new ArrayList<>(recentErrors);
        }
    }
}

7. 控制器

创建REST控制器,提供监控和管理接口:

@RestController
@RequestMapping("/api/consumer")
public class ConsumerController {

    private static final Logger logger = LoggerFactory.getLogger(ConsumerController.class);

    @Autowired
    private ConsumerDeadlockDetectionService deadlockDetectionService;

    @Autowired
    private ConsumerMonitorService monitorService;

    @Autowired
    private MessageConsumerService messageConsumerService;

    @Autowired
    private ConsumerRestartService restartService;

    @GetMapping("/status")
    public Map<String, Object> getConsumerStatus() {
        Map<String, Object> status = new HashMap<>();
        status.put("isRunning", messageConsumerService.isRunning());
        status.put("lastProcessedTime", messageConsumerService.getLastProcessedTime());
        status.put("processedCount", messageConsumerService.getProcessedCount());
        status.put("isDetecting", deadlockDetectionService.isDetecting());
        status.put("restartCount", deadlockDetectionService.getRestartCount());
        return status;
    }

    @GetMapping("/stats")
    public Map<String, Object> getConsumerStats() {
        Map<String, Object> stats = new HashMap<>();
        Map<String, ConsumerMonitorService.ConsumerStats> allStats = monitorService.getAllStats();

        allStats.forEach((consumerId, consumerStats) -> {
            Map<String, Object> stat = new HashMap<>();
            stat.put("processedCount", consumerStats.getProcessedCount());
            stat.put("errorCount", consumerStats.getErrorCount());
            stat.put("lastErrorTime", consumerStats.getLastErrorTime());
            stat.put("recentErrors", consumerStats.getRecentErrors());
            stats.put(consumerId, stat);
        });

        return stats;
    }

    @PostMapping("/restart")
    public String restartConsumer() {
        try {
            restartService.restartConsumer();
            deadlockDetectionService.resetRestartCount();
            return "Consumer restarted successfully";
        } catch (Exception e) {
            logger.error("Failed to restart consumer", e);
            return "Failed to restart consumer: " + e.getMessage();
        }
    }

    @PostMapping("/detection/start")
    public String startDetection() {
        deadlockDetectionService.startDetection();
        return "Detection started";
    }

    @PostMapping("/detection/stop")
    public String stopDetection() {
        deadlockDetectionService.stopDetection();
        return "Detection stopped";
    }
}

技术架构

系统架构

+----------------------------------------------------------+
|                                                          |
|  RabbitMQ Server                                        |
|                                                          |
+----------------------------------------------------------+
            |                                    ^
            | message                           | ACK
            v                                    |
+----------------------------------------------------------+
|                                                          |
|  MessageConsumerService                                  |
|  - handleMessage()                                       |
|  - processMessage()                                       |
|  - lastProcessedTime tracking                           |
|                                                          |
+----------------------------------------------------------+
            |
            v
+----------------------------------------------------------+
|                                                          |
|  ConsumerDeadlockDetectionService                       |
|  - Scheduled health check                               |
|  - Timeout detection                                     |
|  - Trigger restart                                       |
|                                                          |
+----------------------------------------------------------+
            |
            v
+----------------------------------------------------------+
|                                                          |
|  ConsumerRestartService                                 |
|  - Graceful shutdown                                     |
|  - Connection recovery                                   |
|  - State reset                                           |
|                                                          |
+----------------------------------------------------------+
            |
            v
+----------------------------------------------------------+
|                                                          |
|  ConsumerMonitorService                                 |
|  - Statistics collection                                |
|  - Error tracking                                        |
|                                                          |
+----------------------------------------------------------+

检测流程

1. 定时任务启动
   |
   v
2. 检查消费者健康状态
   |
   v
3. 计算空闲时间 (当前时间 - 最后处理时间)
   |
   v
4. 空闲时间 > 超时阈值?
   |
   +-- 是 --> 连续失败计数 +1
   |         |
   |         v
   |      连续失败计数 >= 2?
   |         |
   |         +-- 是 --> 触发重启
   |         |
   |         +-- 否 --> 继续监控
   |
   +-- 否 --> 重置连续失败计数
             |
             v
          继续监控

重启流程

1. 触发重启
   |
   v
2. 停止接收新消息 (isRunning = false)
   |
   v
3. 等待现有消息处理完成 (graceful shutdown)
   |
   v
4. 关闭RabbitMQ连接
   |
   v
5. 重置消费者状态
   |
   v
6. 重新建立连接
   |
   v
7. 恢复消息消费 (isRunning = true)
   |
   v
8. 重置统计计数
   |
   v
9. 重启完成

配置说明

核心配置

# RabbitMQ配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

# 消费者检测配置
consumer.check.interval=5000
consumer.timeout=30000
consumer.max-restart-count=3
consumer.restart-interval=10000
consumer.graceful-shutdown-timeout=5000

# Actuator配置
management.endpoints.web.exposure.include=health,info,metrics
management.endpoint.health.show-details=always

配置说明

配置项说明默认值
consumer.check.interval健康检查间隔(毫秒)5000
consumer.timeout消息处理超时时间(毫秒)30000
consumer.max-restart-count最大重启次数3
consumer.restart-interval重启间隔时间(毫秒)10000
consumer.graceful-shutdown-timeout优雅关闭超时时间(毫秒)5000

最佳实践

1. 合理设置超时时间

超时时间的设置需要根据实际业务场景来定:

  • 消息处理时间:如果正常处理一条消息需要5秒,建议将超时时间设置为30秒以上
  • 业务峰值:在业务高峰期,消息处理时间可能会延长,需要预留足够的缓冲时间
  • 监控指标:建议持续监控消息处理的平均时间和最大时间,作为调整超时时间的依据

2. 设置合理的检测间隔

检测间隔的设置需要在及时性和资源消耗之间取得平衡:

  • 检测间隔不宜过短:过短的检测间隔会增加系统负担,建议设置为5-10秒
  • 检测间隔不宜过长:过长的检测间隔会延迟问题发现时间,建议不超过30秒
  • 与超时时间的关系:检测间隔应该小于超时时间,建议为超时时间的1/3到1/5

3. 实现优雅关闭

在重启消费者时,需要确保正在处理的消息不会被丢失:

@PreDestroy
public void shutdown() {
    logger.info("Shutting down consumer service");
    messageConsumerService.setRunning(false);

    // 等待正在处理的消息完成
    try {
        Thread.sleep(gracefulShutdownTimeout);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }

    // 关闭连接
    connectionFactory.destroy();
}

4. 实现告警机制

当消费者重启失败或超过最大重启次数时,需要及时告警:

private void sendAlert() {
    // 发送邮件告警
    emailService.sendAlert("Consumer deadlock detected and max restart count exceeded");

    // 发送钉钉告警
    dingTalkService.sendAlert("Consumer deadlock detected and max restart count exceeded");

    // 发送企业微信告警
    weChatService.sendAlert("Consumer deadlock detected and max restart count exceeded");

    // 记录告警日志
    logger.error("ALERT: Consumer deadlock detected and max restart count exceeded!");
}

5. 实现重启限制

为了避免无限重启,需要实现重启次数限制:

private void restartConsumer() {
    int currentRestartCount = restartCount.get();
    if (currentRestartCount >= maxRestartCount) {
        logger.error("Maximum restart count ({}) exceeded. Consumer will not be restarted.",
                    maxRestartCount);
        sendAlert();
        return;
    }

    try {
        // 执行重启
        consumerRestartService.restartConsumer();
        restartCount.incrementAndGet();
    } catch (Exception e) {
        logger.error("Failed to restart consumer", e);

        if (currentRestartCount + 1 < maxRestartCount) {
            // 延迟后重试
            scheduledExecutorService.schedule(this::restartConsumer,
                    restartInterval, TimeUnit.MILLISECONDS);
        } else {
            sendAlert();
        }
    }
}

6. 实现重启间隔

为了避免频繁重启对系统造成冲击,需要实现重启间隔限制:

private volatile long lastRestartTime = 0;

private boolean canRestart() {
    long currentTime = System.currentTimeMillis();
    if (currentTime - lastRestartTime < restartInterval) {
        logger.warn("Restart interval not elapsed. Last restart: {}ms ago",
                    currentTime - lastRestartTime);
        return false;
    }
    lastRestartTime = currentTime;
    return true;
}

性能测试

测试场景

  1. 正常消息处理:验证消费者能够正常处理消息
  2. 假死检测:模拟消费者假死,验证检测机制能够及时发现
  3. 自动重启:验证消费者假死后能够自动重启
  4. 重启限制:验证超过最大重启次数后不会再进行重启
  5. 消息不丢失:验证重启过程中消息不会丢失

测试结果

测试场景预期结果实际结果状态
正常消息处理消息正常消费消息正常消费通过
假死检测(30秒无响应)检测到假死30秒后检测到假死通过
自动重启(连续2次检测失败)触发重启第2次检测失败后触发重启通过
重启限制(超过3次)不再重启并告警第3次重启后不再重启并告警通过
消息不丢失重启前处理的消息有ACK消息正常ACK通过

监控指标

建议监控以下指标,及时发现潜在问题:

  1. 消息消费延迟:从消息投放到消息处理完成的时间
  2. 消息处理速率:每秒处理的消息数量
  3. 消费者重启次数:消费者被重启的次数
  4. 连续失败次数:连续检测失败的次数
  5. 队列消息堆积量:队列中未被消费的消息数量

更多技术文章,欢迎关注公众号:服务端技术精选。


标题:SpringBoot + RabbitMQ 消费者假死检测:线程卡住却不报错?自动重启消费进程
作者:jiangyi
地址:http://www.jiangyi.space/articles/2026/04/20/1776570354620.html
公众号:服务端技术精选
    评论
    0 评论
avatar

取消