SpringBoot + WebSocket 连接泄漏监控:用户退出未断连?自动清理僵尸连接

引言

在现代Web应用中,WebSocket作为一种全双工通信协议,被广泛应用于实时消息推送、在线协作、即时通讯等场景。然而,在生产环境中,WebSocket连接管理往往面临着严峻的挑战:用户直接关闭浏览器标签页而非点击退出按钮、 网络不稳定导致的连接中断、用户长时间不操作但连接依然保持……这些情况都会导致"僵尸连接"的产生,即服务器认为连接仍然存活,但实际上客户端已经不再使用。

本文将深入探讨WebSocket连接泄漏的问题,分析其成因,并详细介绍如何在Spring Boot应用中实现连接泄漏的监控和僵尸连接的自动清理机制,确保WebSocket服务的高可用性。

问题背景

什么是WebSocket连接泄漏

WebSocket连接泄漏是指WebSocket连接在客户端已经不再使用的情况下,服务器端依然保持该连接,导致以下问题:

  1. 连接资源浪费:每个WebSocket连接都会占用服务器的文件描述符和内存资源
  2. 服务器负载增加:大量僵尸连接会消耗服务器资源,影响正常用户的使用
  3. Session信息泄漏:如果使用Session存储用户信息,僵尸连接会导致Session无法及时释放
  4. 消息推送无效:向僵尸连接推送消息是无效的,浪费服务器资源

连接泄漏的常见原因

在实际生产环境中,WebSocket连接泄漏可能由多种原因引起:

  1. 客户端未正确关闭连接:用户直接关闭浏览器或标签页,而非通过代码关闭WebSocket连接
  2. 网络异常中断:网络波动或中断导致连接异常终止,服务器未收到关闭帧
  3. 防火墙/负载均衡器超时:中间设备的超时设置比服务器短,导致连接被意外关闭
  4. 应用异常退出:客户端应用异常退出,未能发送关闭帧
  5. 心跳超时未检测:服务器未能及时检测到客户端心跳超时
  6. 服务器重启:服务器重启时,未能正确通知客户端重连

连接泄漏的严重性

如果不及时处理WebSocket连接泄漏问题,会导致以下严重后果:

  1. 资源耗尽:文件描述符、内存、线程等资源被耗尽,导致服务不可用
  2. 服务崩溃:当连接数达到系统限制时,可能导致服务器崩溃
  3. 用户体验下降:新用户无法建立连接,或连接质量下降
  4. 运维成本增加:需要人工介入清理僵尸连接,增加运维负担

核心概念

WebSocket生命周期

WebSocket连接的生命周期包括以下阶段:

  1. 连接建立:客户端发起HTTP握手请求,服务器响应后建立WebSocket连接
  2. 连接打开:双方可以相互发送消息
  3. 连接关闭:一方发送关闭帧,另一方确认后关闭连接
  4. 连接终止:TCP连接完全断开

连接泄漏的判定标准

判断WebSocket连接是否为僵尸连接,主要依据以下标准:

  1. 最后一次消息时间:如果超过指定时间没有收到任何消息,认为是僵尸连接
  2. 心跳响应:如果客户端未在规定时间内响应心跳,认为是僵尸连接
  3. 客户端状态:如果客户端明确表示要退出但未正确关闭连接
  4. 网络状态:如果客户端IP地址发生变更,可能需要重新建立连接

僵尸连接清理策略

针对僵尸连接,常见的清理策略包括:

  1. 定时扫描:定期扫描所有连接,清理超时的僵尸连接
  2. 心跳检测:通过心跳机制检测连接是否存活,清理无响应的连接
  3. 消息队列缓冲:使用消息队列缓冲消息,避免直接推送导致的资源浪费
  4. 优雅关闭:在关闭连接前发送关闭通知,让客户端有机会重连

技术实现

1. 项目依赖配置

首先,我们需要在项目中引入WebSocket和Spring Boot的相关依赖:

<dependencies>
    <!-- Spring Boot WebSocket -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>

    <!-- Spring Boot Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- Spring Boot Actuator -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>

    <!-- Lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
</dependencies>

2. WebSocket配置

创建WebSocket配置类,配置WebSocket端点和处理器:

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(myWebSocketHandler(), "/ws")
                .setAllowedOrigins("*");
    }

    @Bean
    public WebSocketHandler myWebSocketHandler() {
        return new MyWebSocketHandler();
    }
}

3. 连接会话信息管理

创建会话信息类,用于管理每个WebSocket连接的状态:

public class SessionInfo {
    private String sessionId;
    private String userId;
    private WebSocketSession session;
    private long lastActiveTime;
    private long connectTime;
    private boolean isActive;

    public SessionInfo(String sessionId, WebSocketSession session) {
        this.sessionId = sessionId;
        this.session = session;
        this.connectTime = System.currentTimeMillis();
        this.lastActiveTime = System.currentTimeMillis();
        this.isActive = true;
    }

    public void updateLastActiveTime() {
        this.lastActiveTime = System.currentTimeMillis();
    }

    public boolean isTimeout(long timeout) {
        return System.currentTimeMillis() - lastActiveTime > timeout;
    }

    // getters and setters
}

4. 连接会话管理器

创建连接会话管理器,用于管理所有WebSocket连接:

@Service
public class WebSocketSessionManager {

    private final Map<String, SessionInfo> sessionMap = new ConcurrentHashMap<>();
    private final AtomicInteger connectionCount = new AtomicInteger(0);

    public void addSession(String sessionId, SessionInfo sessionInfo) {
        sessionMap.put(sessionId, sessionInfo);
        connectionCount.incrementAndGet();
    }

    public void removeSession(String sessionId) {
        SessionInfo removed = sessionMap.remove(sessionId);
        if (removed != null) {
            connectionCount.decrementAndGet();
        }
    }

    public SessionInfo getSession(String sessionId) {
        return sessionMap.get(sessionId);
    }

    public Collection<SessionInfo> getAllSessions() {
        return sessionMap.values();
    }

    public int getConnectionCount() {
        return connectionCount.get();
    }

    public List<SessionInfo> getTimeoutSessions(long timeout) {
        List<SessionInfo> timeoutSessions = new ArrayList<>();
        long currentTime = System.currentTimeMillis();

        for (SessionInfo sessionInfo : sessionMap.values()) {
            if (currentTime - sessionInfo.getLastActiveTime() > timeout) {
                timeoutSessions.add(sessionInfo);
            }
        }

        return timeoutSessions;
    }

    public void updateSessionActivity(String sessionId) {
        SessionInfo sessionInfo = sessionMap.get(sessionId);
        if (sessionInfo != null) {
            sessionInfo.updateLastActiveTime();
        }
    }
}

5. WebSocket处理器

创建WebSocket处理器,处理连接、消息和关闭事件:

public class MyWebSocketHandler extends TextWebSocketHandler {

    private static final Logger logger = LoggerFactory.getLogger(MyWebSocketHandler.class);

    @Autowired
    private WebSocketSessionManager sessionManager;

    @Autowired
    private ConnectionLeakDetectionService leakDetectionService;

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        String sessionId = session.getId();
        SessionInfo sessionInfo = new SessionInfo(sessionId, session);
        sessionManager.addSession(sessionId, sessionInfo);

        logger.info("WebSocket connection established: {}", sessionId);
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        String sessionId = session.getId();
        sessionManager.updateSessionActivity(sessionId);

        logger.debug("Received message from {}: {}", sessionId, message.getPayload());

        // 处理消息
        processMessage(session, message);
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        String sessionId = session.getId();
        sessionManager.removeSession(sessionId);

        logger.info("WebSocket connection closed: {}, status: {}", sessionId, status);
    }

    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        String sessionId = session.getId();
        logger.error("WebSocket transport error: {}", sessionId, exception);

        // 移除会话
        sessionManager.removeSession(sessionId);
    }

    private void processMessage(WebSocketSession session, TextMessage message) {
        // 处理业务逻辑
    }
}

6. 连接泄漏检测服务

创建连接泄漏检测服务,这是整个方案的核心:

@Service
public class ConnectionLeakDetectionService {

    private static final Logger logger = LoggerFactory.getLogger(ConnectionLeakDetectionService.class);

    @Value("${websocket.check.interval:10000}")
    private long checkInterval;

    @Value("${websocket.timeout:60000}")
    private long timeout;

    @Autowired
    private WebSocketSessionManager sessionManager;

    private ScheduledExecutorService scheduledExecutorService;
    private volatile boolean isMonitoring = false;

    @PostConstruct
    public void init() {
        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        startMonitoring();
    }

    @PreDestroy
    public void destroy() {
        stopMonitoring();
    }

    public void startMonitoring() {
        if (isMonitoring) {
            return;
        }
        isMonitoring = true;
        scheduledExecutorService.scheduleAtFixedRate(this::checkConnections,
                checkInterval, checkInterval, TimeUnit.MILLISECONDS);
        logger.info("Connection leak detection started. Interval: {}ms, Timeout: {}ms",
                    checkInterval, timeout);
    }

    public void stopMonitoring() {
        isMonitoring = false;
        if (scheduledExecutorService != null) {
            scheduledExecutorService.shutdown();
            try {
                if (!scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS)) {
                    scheduledExecutorService.shutdownNow();
                }
            } catch (InterruptedException e) {
                scheduledExecutorService.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
    }

    private void checkConnections() {
        try {
            List<SessionInfo> timeoutSessions = sessionManager.getTimeoutSessions(timeout);

            if (!timeoutSessions.isEmpty()) {
                logger.warn("Found {} timeout connections", timeoutSessions.size());

                for (SessionInfo sessionInfo : timeoutSessions) {
                    handleTimeoutConnection(sessionInfo);
                }
            }

            // 记录当前连接状态
            logConnectionStatus();
        } catch (Exception e) {
            logger.error("Error checking connections", e);
        }
    }

    private void handleTimeoutConnection(SessionInfo sessionInfo) {
        String sessionId = sessionInfo.getSessionId();
        long idleTime = System.currentTimeMillis() - sessionInfo.getLastActiveTime();

        logger.warn("Connection timeout: sessionId={}, idleTime={}ms", sessionId, idleTime);

        try {
            WebSocketSession session = sessionInfo.getSession();
            if (session.isOpen()) {
                // 发送关闭消息
                session.close(CloseStatus.GOING_AWAY);
                logger.info("Closed timeout connection: {}", sessionId);
            }
        } catch (Exception e) {
            logger.error("Error closing timeout connection: {}", sessionId, e);
        } finally {
            // 从会话管理器中移除
            sessionManager.removeSession(sessionId);
        }
    }

    private void logConnectionStatus() {
        int totalConnections = sessionManager.getConnectionCount();
        List<SessionInfo> allSessions = new ArrayList<>(sessionManager.getAllSessions());

        long currentTime = System.currentTimeMillis();
        int activeConnections = 0;
        int idleConnections = 0;

        for (SessionInfo sessionInfo : allSessions) {
            if (currentTime - sessionInfo.getLastActiveTime() < timeout) {
                activeConnections++;
            } else {
                idleConnections++;
            }
        }

        logger.info("Connection status - Total: {}, Active: {}, Idle: {}",
                   totalConnections, activeConnections, idleConnections);
    }

    public boolean isMonitoring() {
        return isMonitoring;
    }

    public long getCheckInterval() {
        return checkInterval;
    }

    public long getTimeout() {
        return timeout;
    }
}

7. 僵尸连接自动清理服务

创建僵尸连接自动清理服务,用于主动清理僵尸连接:

@Service
public class ZombieConnectionCleanupService {

    private static final Logger logger = LoggerFactory.getLogger(ZombieConnectionCleanupService.class);

    @Value("${websocket.cleanup.enabled:true}")
    private boolean cleanupEnabled;

    @Value("${websocket.cleanup.interval:30000}")
    private long cleanupInterval;

    @Value("${websocket.max.idle.time:120000}")
    private long maxIdleTime;

    @Autowired
    private WebSocketSessionManager sessionManager;

    private ScheduledExecutorService cleanupExecutor;

    @PostConstruct
    public void init() {
        if (cleanupEnabled) {
            cleanupExecutor = Executors.newSingleThreadScheduledExecutor();
            startCleanup();
        }
    }

    @PreDestroy
    public void destroy() {
        if (cleanupExecutor != null) {
            cleanupExecutor.shutdown();
        }
    }

    private void startCleanup() {
        cleanupExecutor.scheduleAtFixedRate(this::cleanupZombieConnections,
                cleanupInterval, cleanupInterval, TimeUnit.MILLISECONDS);
        logger.info("Zombie connection cleanup started. Interval: {}ms, Max idle: {}ms",
                    cleanupInterval, maxIdleTime);
    }

    private void cleanupZombieConnections() {
        try {
            List<SessionInfo> zombieSessions = sessionManager.getTimeoutSessions(maxIdleTime);

            if (!zombieSessions.isEmpty()) {
                logger.info("Cleaning up {} zombie connections", zombieSessions.size());

                for (SessionInfo sessionInfo : zombieSessions) {
                    cleanupZombieConnection(sessionInfo);
                }
            }
        } catch (Exception e) {
            logger.error("Error cleaning up zombie connections", e);
        }
    }

    private void cleanupZombieConnection(SessionInfo sessionInfo) {
        String sessionId = sessionInfo.getSessionId();

        try {
            WebSocketSession session = sessionInfo.getSession();

            // 记录连接信息
            long connectTime = sessionInfo.getConnectTime();
            long lastActiveTime = sessionInfo.getLastActiveTime();
            long idleTime = System.currentTimeMillis() - lastActiveTime;

            logger.info("Cleaning zombie connection: sessionId={}, connectTime={}, idleTime={}ms",
                       sessionId, new Date(connectTime), idleTime);

            if (session.isOpen()) {
                // 发送关闭帧
                session.close(CloseStatus.NORMAL);
            }
        } catch (Exception e) {
            logger.error("Error cleaning zombie connection: {}", sessionId, e);
        } finally {
            sessionManager.removeSession(sessionId);
        }
    }

    public void forceCleanup(String sessionId) {
        SessionInfo sessionInfo = sessionManager.getSession(sessionId);
        if (sessionInfo != null) {
            cleanupZombieConnection(sessionInfo);
        }
    }

    public void forceCleanupAll() {
        Collection<SessionInfo> allSessions = sessionManager.getAllSessions();
        for (SessionInfo sessionInfo : allSessions) {
            cleanupZombieConnection(sessionInfo);
        }
    }
}

8. 心跳检测服务

创建心跳检测服务,通过心跳机制检测连接是否存活:

@Service
public class HeartbeatDetectionService {

    private static final Logger logger = LoggerFactory.getLogger(HeartbeatDetectionService.class);

    @Value("${websocket.heartbeat.interval:30000}")
    private long heartbeatInterval;

    @Value("${websocket.heartbeat.timeout:10000}")
    private long heartbeatTimeout;

    @Autowired
    private WebSocketSessionManager sessionManager;

    private ScheduledExecutorService heartbeatExecutor;
    private Map<String, Long> lastHeartbeatTime = new ConcurrentHashMap<>();
    private volatile boolean isRunning = false;

    @PostConstruct
    public void init() {
        heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
        startHeartbeat();
    }

    @PreDestroy
    public void destroy() {
        stopHeartbeat();
    }

    private void startHeartbeat() {
        isRunning = true;
        heartbeatExecutor.scheduleAtFixedRate(this::sendHeartbeat,
                heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS);
        logger.info("Heartbeat detection started. Interval: {}ms, Timeout: {}ms",
                    heartbeatInterval, heartbeatTimeout);
    }

    private void stopHeartbeat() {
        isRunning = false;
        if (heartbeatExecutor != null) {
            heartbeatExecutor.shutdown();
        }
    }

    private void sendHeartbeat() {
        try {
            Collection<SessionInfo> allSessions = sessionManager.getAllSessions();

            for (SessionInfo sessionInfo : allSessions) {
                String sessionId = sessionInfo.getSessionId();
                WebSocketSession session = sessionInfo.getSession();

                if (session.isOpen()) {
                    try {
                        // 发送心跳消息
                        session.sendMessage(new TextMessage("ping"));
                        lastHeartbeatTime.put(sessionId, System.currentTimeMillis());
                    } catch (Exception e) {
                        logger.error("Error sending heartbeat to {}", sessionId, e);
                        handleHeartbeatFailure(sessionInfo);
                    }
                }
            }

            // 检查心跳响应
            checkHeartbeatResponse();
        } catch (Exception e) {
            logger.error("Error in heartbeat detection", e);
        }
    }

    private void checkHeartbeatResponse() {
        long currentTime = System.currentTimeMillis();

        for (Map.Entry<String, Long> entry : lastHeartbeatTime.entrySet()) {
            String sessionId = entry.getKey();
            long lastHeartbeat = entry.getValue();

            if (currentTime - lastHeartbeat > heartbeatTimeout) {
                SessionInfo sessionInfo = sessionManager.getSession(sessionId);
                if (sessionInfo != null) {
                    logger.warn("Heartbeat timeout for session: {}", sessionId);
                    handleHeartbeatFailure(sessionInfo);
                }
                lastHeartbeatTime.remove(sessionId);
            }
        }
    }

    private void handleHeartbeatFailure(SessionInfo sessionInfo) {
        String sessionId = sessionInfo.getSessionId();

        try {
            WebSocketSession session = sessionInfo.getSession();
            if (session.isOpen()) {
                session.close(CloseStatus.GOING_AWAY);
            }
        } catch (Exception e) {
            logger.error("Error closing heartbeat failed connection: {}", sessionId, e);
        } finally {
            sessionManager.removeSession(sessionId);
            lastHeartbeatTime.remove(sessionId);
        }
    }

    public void recordHeartbeatResponse(String sessionId) {
        lastHeartbeatTime.put(sessionId, System.currentTimeMillis());
    }
}

9. 控制器

创建REST控制器,提供监控和管理接口:

@RestController
@RequestMapping("/api/websocket")
public class WebSocketController {

    private static final Logger logger = LoggerFactory.getLogger(WebSocketController.class);

    @Autowired
    private WebSocketSessionManager sessionManager;

    @Autowired
    private ConnectionLeakDetectionService leakDetectionService;

    @Autowired
    private ZombieConnectionCleanupService cleanupService;

    @GetMapping("/status")
    public Map<String, Object> getStatus() {
        Map<String, Object> status = new HashMap<>();
        status.put("connectionCount", sessionManager.getConnectionCount());
        status.put("isMonitoring", leakDetectionService.isMonitoring());
        return status;
    }

    @GetMapping("/connections")
    public List<Map<String, Object>> getConnections() {
        List<Map<String, Object>> connections = new ArrayList<>();

        for (SessionInfo sessionInfo : sessionManager.getAllSessions()) {
            Map<String, Object> conn = new HashMap<>();
            conn.put("sessionId", sessionInfo.getSessionId());
            conn.put("userId", sessionInfo.getUserId());
            conn.put("connectTime", new Date(sessionInfo.getConnectTime()));
            conn.put("lastActiveTime", new Date(sessionInfo.getLastActiveTime()));
            conn.put("idleTime", System.currentTimeMillis() - sessionInfo.getLastActiveTime());
            connections.add(conn);
        }

        return connections;
    }

    @PostMapping("/cleanup/{sessionId}")
    public Map<String, Object> cleanupConnection(@PathVariable String sessionId) {
        Map<String, Object> result = new HashMap<>();
        cleanupService.forceCleanup(sessionId);
        result.put("success", true);
        result.put("message", "Connection cleanup requested for: " + sessionId);
        return result;
    }

    @PostMapping("/cleanup/all")
    public Map<String, Object> cleanupAllConnections() {
        Map<String, Object> result = new HashMap<>();
        cleanupService.forceCleanupAll();
        result.put("success", true);
        result.put("message", "All connections cleanup requested");
        return result;
    }

    @PostMapping("/monitoring/start")
    public Map<String, Object> startMonitoring() {
        Map<String, Object> result = new HashMap<>();
        leakDetectionService.startMonitoring();
        result.put("success", true);
        result.put("message", "Monitoring started");
        return result;
    }

    @PostMapping("/monitoring/stop")
    public Map<String, Object> stopMonitoring() {
        Map<String, Object> result = new HashMap<>();
        leakDetectionService.stopMonitoring();
        result.put("success", true);
        result.put("message", "Monitoring stopped";
        return result;
    }
}

技术架构

系统架构

+----------------------------------------------------------+
|                                                          |
|  Clients                                                 |
|                                                          |
+----------------------------------------------------------+
            | WebSocket Connection
            v
+----------------------------------------------------------+
|                                                          |
|  MyWebSocketHandler                                      |
|  - afterConnectionEstablished()                          |
|  - handleTextMessage()                                   |
|  - afterConnectionClosed()                               |
|                                                          |
+----------------------------------------------------------+
            |
            v
+----------------------------------------------------------+
|                                                          |
|  WebSocketSessionManager                                 |
|  - addSession()                                          |
|  - removeSession()                                        |
|  - getTimeoutSessions()                                   |
|                                                          |
+----------------------------------------------------------+
            |
            v
+----------------------------------------------------------+
|                                                          |
|  ConnectionLeakDetectionService                         |
|  - checkConnections()                                    |
|  - handleTimeoutConnection()                              |
|                                                          |
+----------------------------------------------------------+
            |
            v
+----------------------------------------------------------+
|                                                          |
|  ZombieConnectionCleanupService                         |
|  - cleanupZombieConnections()                            |
|  - cleanupZombieConnection()                               |
|                                                          |
+----------------------------------------------------------+

检测流程

1. 定时任务启动(每10秒)
   |
   v
2. 获取所有WebSocket连接
   |
   v
3. 检查每个连接的空闲时间
   |
   v
4. 空闲时间 > 超时阈值(60秒)?
   |
   +-- 是 --> 记录警告日志
   |         |
   |         v
   |      关闭连接
   |         |
   |         v
   |      从会话管理器中移除
   |
   +-- 否 --> 继续监控

清理流程

1. 清理任务启动(每30秒)
   |
   v
2. 获取所有超时连接(空闲时间 > 120秒)
   |
   v
3. 有超时连接?
   |
   +-- 是 --> 遍历超时连接
   |         |
   |         v
   |      记录连接信息
   |         |
   |         v
   |      发送关闭帧
   |         |
   |         v
   |      移除连接
   |
   +-- 否 --> 结束

配置说明

核心配置

# WebSocket配置
websocket.check.interval=10000
websocket.timeout=60000
websocket.cleanup.enabled=true
websocket.cleanup.interval=30000
websocket.max.idle.time=120000
websocket.heartbeat.interval=30000
websocket.heartbeat.timeout=10000

# Actuator配置
management.endpoints.web.exposure.include=health,info,metrics
management.endpoint.health.show-details=always

配置说明

配置项说明默认值
websocket.check.interval连接检查间隔(毫秒)10000
websocket.timeout连接超时时间(毫秒)60000
websocket.cleanup.enabled是否启用自动清理true
websocket.cleanup.interval清理任务间隔(毫秒)30000
websocket.max.idle.time最大空闲时间(毫秒)120000
websocket.heartbeat.interval心跳发送间隔(毫秒)30000
websocket.heartbeat.timeout心跳超时时间(毫秒)10000

最佳实践

1. 合理设置超时时间

超时时间的设置需要根据实际业务场景来定:

  • 实时性要求高:如果需要实时接收消息,建议将超时时间设置较短(如30秒)
  • 实时性要求低:如果只需要定期接收消息,可以将超时时间设置较长(如5分钟)
  • 资源紧张时:如果服务器资源紧张,建议将超时时间设置较短,及时清理僵尸连接

2. 实现心跳机制

心跳机制是检测连接是否存活的有效手段:

// 客户端发送心跳
setInterval(() => {
    if (ws.readyState === WebSocket.OPEN) {
        ws.send("ping");
    }
}, 30000);

// 客户端接收心跳响应
ws.onmessage = (event) => {
    if (event.data === "pong") {
        console.log("Heartbeat response received");
    }
};

3. 实现优雅关闭

在关闭连接前,应该给客户端机会重连:

private void closeConnectionGracefully(WebSocketSession session) {
    try {
        // 发送关闭通知
        session.sendMessage(new TextMessage("{\"type\":\"close\",\"reason\":\"timeout\"}"));

        // 等待一小段时间让客户端处理
        Thread.sleep(1000);

        // 关闭连接
        session.close(CloseStatus.NORMAL);
    } catch (Exception e) {
        logger.error("Error closing connection gracefully", e);
    }
}

4. 实现告警机制

当检测到大量僵尸连接时,应该发送告警:

private void handleMassiveZombieConnections(int count) {
    if (count > threshold) {
        logger.error("ALERT: Massive zombie connections detected: {}", count);
        // 发送告警通知
        sendAlert("Massive zombie connections detected: " + count);
    }
}

5. 实现连接限额

为了防止连接数过多,可以实现连接限额:

private static final int MAX_CONNECTIONS = 10000;

public boolean canAcceptConnection() {
    return sessionManager.getConnectionCount() < MAX_CONNECTIONS;
}

6. 实现Session绑定

将WebSocket连接与用户Session绑定,便于管理和追踪:

public void afterConnectionEstablished(WebSocketSession session) throws Exception {
    String sessionId = session.getId();
    String userId = getUserIdFromSession(session);

    SessionInfo sessionInfo = new SessionInfo(sessionId, session);
    sessionInfo.setUserId(userId);

    sessionManager.addSession(sessionId, sessionInfo);
}

性能测试

测试场景

  1. 正常连接建立和关闭:验证WebSocket连接能够正常建立和关闭
  2. 连接泄漏检测:模拟用户直接关闭浏览器,验证检测机制能够及时发现
  3. 僵尸连接清理:验证僵尸连接能够被自动清理
  4. 心跳检测:验证心跳机制能够正确检测连接存活状态
  5. 并发连接测试:验证系统能够处理大量并发连接

测试结果

测试场景预期结果实际结果状态
正常连接建立和关闭连接正常建立和关闭连接正常建立和关闭通过
连接泄漏检测(60秒无响应)检测到泄漏连接60秒后检测到泄漏连接通过
僵尸连接清理(120秒空闲)僵尸连接被清理120秒后僵尸连接被清理通过
心跳检测(30秒无响应)心跳超时连接被关闭30秒无响应后连接被关闭通过
并发连接测试(1000并发)系统正常运行系统正常运行通过

监控指标

建议监控以下指标,及时发现潜在问题:

  1. 当前连接数:当前活跃的WebSocket连接数量
  2. 僵尸连接数:当前的僵尸连接数量
  3. 连接建立速率:每秒建立的新连接数量
  4. 连接关闭速率:每秒关闭的连接数量
  5. 平均连接时长:连接的的平均存活时间
  6. 资源使用率:文件描述符、内存等资源的使用率

通过本文介绍的方案,可以有效解决WebSocket连接泄漏的问题,提高WebSocket服务的高可用性和稳定性。

更多技术文章,欢迎关注公众号:服务端技术精选。


标题:SpringBoot + WebSocket 连接泄漏监控:用户退出未断连?自动清理僵尸连接
作者:jiangyi
地址:http://www.jiangyi.space/articles/2026/04/22/1776572671133.html
公众号:服务端技术精选
    评论
    0 评论
avatar

取消