SpringBoot + WebSocket 连接数限流 + 防资源耗尽:单用户最多建立 N 个连接,保障服务稳定

前言

在现代 Web 应用中,WebSocket 已成为实现实时通信的重要技术。它允许服务器主动向客户端推送数据,实现了真正的双向通信。然而,随着用户量的增长,WebSocket 连接的管理变得越来越重要。如果不进行有效的连接数限制,可能会导致服务器资源耗尽,影响服务的稳定性。

想象一下这样的场景:你的应用支持实时聊天功能,每个用户可以建立多个 WebSocket 连接。如果某个用户恶意或误操作建立了大量连接,可能会占用服务器的大量资源,影响其他用户的正常使用。更严重的是,如果多个用户都这样做,服务器可能会因为资源耗尽而崩溃。

WebSocket 连接数限流防资源耗尽是解决这个问题的有效方案。通过限制单个用户的最大连接数,以及采取其他防资源耗尽的措施,可以保障服务的稳定性。本文将详细介绍如何在 SpringBoot 项目中实现 WebSocket 连接数限流和防资源耗尽功能。

一、WebSocket 连接数限流的核心概念

1.1 什么是 WebSocket 连接数限流

WebSocket 连接数限流是指限制单个用户或单个 IP 地址可以建立的 WebSocket 连接数量,以防止资源滥用和保障服务稳定性。

1.2 为什么需要 WebSocket 连接数限流

  • 防止资源耗尽:每个 WebSocket 连接都会占用服务器的内存和网络资源,如果连接数过多,可能会导致服务器资源耗尽
  • 防止恶意攻击:恶意用户可能会通过建立大量连接来攻击服务器,导致服务不可用
  • 保障公平性:确保每个用户都能获得合理的资源分配,避免少数用户占用过多资源
  • 提高服务稳定性:通过限制连接数,可以确保服务器在高负载下仍能稳定运行

1.3 常见的限流策略

限流策略说明适用场景
基于用户 ID限制单个用户 ID 可以建立的连接数已登录用户的应用
基于 IP 地址限制单个 IP 地址可以建立的连接数匿名用户的应用
基于设备限制单个设备可以建立的连接数移动应用
基于全局限制整个服务器的最大连接数所有场景

二、防资源耗尽的核心概念

2.1 什么是防资源耗尽

防资源耗尽是指通过一系列措施,防止服务器因为资源占用过多而崩溃,保障服务的稳定性和可用性。

2.2 资源耗尽的常见原因

资源类型耗尽原因影响
内存连接数过多、内存泄漏服务器崩溃、服务不可用
网络网络带宽不足、网络拥塞通信延迟增加、服务响应慢
CPU处理请求过多、计算密集型操作服务响应慢、系统负载高
文件描述符打开的文件和连接数过多无法建立新连接、服务不可用

2.3 防资源耗尽的策略

策略说明效果
连接数限制限制单个用户或全局的连接数防止连接数过多导致的资源耗尽
超时机制为连接设置超时时间,自动关闭长时间空闲的连接释放空闲连接占用的资源
心跳机制定期发送心跳消息,检测连接状态及时发现和关闭无效连接
资源监控监控服务器资源使用情况,及时发现异常提前预警,防止资源耗尽
优雅关闭当服务器需要重启时,优雅关闭连接避免强制关闭连接导致的客户端错误

三、SpringBoot WebSocket 连接数限流实现

3.1 依赖配置

<dependencies>
    <!-- Spring Boot Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- WebSocket -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>

    <!-- Spring Boot Actuator -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>

    <!-- Micrometer -->
    <dependency>
        <groupId>io.micrometer</groupId>
        <artifactId>micrometer-registry-prometheus</artifactId>
    </dependency>

    <!-- Spring Boot Configuration Processor -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
        <optional>true</optional>
    </dependency>

    <!-- Lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>

    <!-- Test -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

3.2 WebSocket 连接管理

3.2.1 WebSocket 连接管理器

@Component
@Slf4j
public class WebSocketConnectionManager {

    private final Map<String, Set<WebSocketSession>> userConnections = new ConcurrentHashMap<>();
    private final Map<String, String> sessionUserMap = new ConcurrentHashMap<>();

    @Autowired
    private WebSocketProperties properties;

    public boolean addConnection(String userId, WebSocketSession session) {
        // 检查用户连接数是否超过限制
        Set<WebSocketSession> connections = userConnections.getOrDefault(userId, new HashSet<>());
        if (connections.size() >= properties.getUserMaxConnections()) {
            log.warn("User {} has reached maximum connections limit: {}", userId, properties.getUserMaxConnections());
            return false;
        }

        // 添加连接
        connections.add(session);
        userConnections.put(userId, connections);
        sessionUserMap.put(session.getId(), userId);

        log.info("User {} connected, current connections: {}", userId, connections.size());
        return true;
    }

    public void removeConnection(WebSocketSession session) {
        String userId = sessionUserMap.remove(session.getId());
        if (userId != null) {
            Set<WebSocketSession> connections = userConnections.get(userId);
            if (connections != null) {
                connections.remove(session);
                if (connections.isEmpty()) {
                    userConnections.remove(userId);
                } else {
                    userConnections.put(userId, connections);
                }
                log.info("User {} disconnected, current connections: {}", userId, connections.size());
            }
        }
    }

    public int getUserConnectionCount(String userId) {
        Set<WebSocketSession> connections = userConnections.get(userId);
        return connections != null ? connections.size() : 0;
    }

    public int getTotalConnectionCount() {
        return sessionUserMap.size();
    }

    public Set<WebSocketSession> getUserConnections(String userId) {
        return userConnections.getOrDefault(userId, Collections.emptySet());
    }

}

3.2.2 WebSocket 处理器

@Component
@Slf4j
public class WebSocketHandler extends TextWebSocketHandler {

    @Autowired
    private WebSocketConnectionManager connectionManager;

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        // 从会话中获取用户 ID
        String userId = (String) session.getAttributes().get("userId");
        if (userId == null) {
            log.warn("WebSocket connection established without userId");
            session.close();
            return;
        }

        // 添加连接
        boolean added = connectionManager.addConnection(userId, session);
        if (!added) {
            session.close(CloseStatus.TOO_MANY_CONNECTIONS);
            return;
        }

        log.info("WebSocket connection established for user: {}", userId);
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        // 处理文本消息
        String userId = (String) session.getAttributes().get("userId");
        log.info("Received message from user {}: {}", userId, message.getPayload());

        // 发送消息给用户
        session.sendMessage(new TextMessage("Message received: " + message.getPayload()));
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        // 移除连接
        connectionManager.removeConnection(session);
        log.info("WebSocket connection closed: {}", status);
    }

    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        log.error("WebSocket transport error", exception);
        // 移除连接
        connectionManager.removeConnection(session);
    }

}

四、防资源耗尽实现

4.1 心跳机制

4.1.1 心跳配置

@Configuration
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Autowired
    private WebSocketHandler webSocketHandler;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // 注册 STOMP 端点
        registry.addEndpoint("/ws")
                .setAllowedOrigins("*")
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // 配置消息代理
        registry.enableSimpleBroker("/topic")
                .setHeartbeatValue(new long[]{10000, 10000}); // 10秒心跳
        registry.setApplicationDestinationPrefixes("/app");
    }

}

4.1.2 心跳处理器

@Component
@Slf4j
public class HeartbeatHandler {

    @Autowired
    private WebSocketConnectionManager connectionManager;

    @Scheduled(fixedRate = 30000) // 每30秒检查一次
    public void checkHeartbeats() {
        log.info("Checking WebSocket connections for heartbeat");

        // 遍历所有连接,检查是否有长时间未活动的连接
        for (Map.Entry<String, Set<WebSocketSession>> entry : connectionManager.getUserConnections().entrySet()) {
            String userId = entry.getKey();
            Set<WebSocketSession> sessions = entry.getValue();

            Iterator<WebSocketSession> iterator = sessions.iterator();
            while (iterator.hasNext()) {
                WebSocketSession session = iterator.next();
                if (isSessionIdle(session)) {
                    try {
                        session.close(CloseStatus.NOT_ACCEPTABLE.withReason("Idle connection"));
                        iterator.remove();
                        log.info("Closed idle connection for user: {}", userId);
                    } catch (IOException e) {
                        log.error("Failed to close idle connection", e);
                    }
                }
            }

            // 如果用户没有连接了,移除用户
            if (sessions.isEmpty()) {
                connectionManager.getUserConnections().remove(userId);
            }
        }
    }

    private boolean isSessionIdle(WebSocketSession session) {
        // 检查会话是否空闲
        // 这里可以根据实际情况实现,例如检查最后活动时间
        return false; // 简化实现
    }

}

4.2 资源监控

4.2.1 资源监控服务

@Component
@Slf4j
public class ResourceMonitorService {

    @Autowired
    private WebSocketConnectionManager connectionManager;

    @Autowired
    private MeterRegistry meterRegistry;

    @Scheduled(fixedRate = 60000) // 每分钟监控一次
    public void monitorResources() {
        // 监控连接数
        int totalConnections = connectionManager.getTotalConnectionCount();
        meterRegistry.gauge("websocket.connections.total", totalConnections);

        // 监控内存使用情况
        Runtime runtime = Runtime.getRuntime();
        long usedMemory = (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024);
        long maxMemory = runtime.maxMemory() / (1024 * 1024);
        double memoryUsage = (double) usedMemory / maxMemory * 100;

        meterRegistry.gauge("system.memory.usage", memoryUsage);

        // 监控 CPU 使用率
        // 这里可以使用第三方库获取 CPU 使用率

        log.info("Resource monitoring: connections={}, memory usage={}%", totalConnections, memoryUsage);

        // 检查是否需要告警
        if (totalConnections > 1000) {
            log.warn("High WebSocket connection count: {}", totalConnections);
        }

        if (memoryUsage > 80) {
            log.warn("High memory usage: {}%", memoryUsage);
        }
    }

}

五、SpringBoot 完整实现

5.1 项目依赖

<dependencies>
    <!-- Spring Boot Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- WebSocket -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>

    <!-- Spring Boot Actuator -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>

    <!-- Micrometer -->
    <dependency>
        <groupId>io.micrometer</groupId>
        <artifactId>micrometer-registry-prometheus</artifactId>
    </dependency>

    <!-- Spring Boot Configuration Processor -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
        <optional>true</optional>
    </dependency>

    <!-- Lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>

    <!-- Test -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

5.2 配置文件

server:
  port: 8080

spring:
  application:
    name: websocket-rate-limit-demo
  websocket:
    broker:
      enabled: true

# WebSocket 配置
websocket:
  max-connections:
    user: 5
    global: 1000
  idle-timeout: 300000 # 5分钟
  heartbeat:
    interval: 30000 # 30秒

# 资源监控配置
resource:
  monitor:
    enabled: true
    alert:
      connections-threshold: 1000
      memory-threshold: 80

5.3 核心配置类

5.3.1 WebSocket 配置

@Data
@ConfigurationProperties(prefix = "websocket")
public class WebSocketProperties {

    private MaxConnections maxConnections = new MaxConnections();
    private long idleTimeout = 300000;
    private Heartbeat heartbeat = new Heartbeat();

    @Data
    public static class MaxConnections {
        private int user = 5;
        private int global = 1000;
    }

    @Data
    public static class Heartbeat {
        private long interval = 30000;
    }

}

5.3.2 资源监控配置

@Data
@ConfigurationProperties(prefix = "resource.monitor")
public class ResourceMonitorProperties {

    private boolean enabled = true;
    private Alert alert = new Alert();

    @Data
    public static class Alert {
        private int connectionsThreshold = 1000;
        private int memoryThreshold = 80;
    }

}

5.4 服务实现

5.4.1 WebSocket 连接管理器

@Component
@Slf4j
public class WebSocketConnectionManager {

    private final Map<String, Set<WebSocketSession>> userConnections = new ConcurrentHashMap<>();
    private final Map<String, String> sessionUserMap = new ConcurrentHashMap<>();

    @Autowired
    private WebSocketProperties properties;

    public boolean addConnection(String userId, WebSocketSession session) {
        // 检查用户连接数是否超过限制
        Set<WebSocketSession> connections = userConnections.getOrDefault(userId, new HashSet<>());
        if (connections.size() >= properties.getMaxConnections().getUser()) {
            log.warn("User {} has reached maximum connections limit: {}", userId, properties.getMaxConnections().getUser());
            return false;
        }

        // 检查全局连接数是否超过限制
        if (sessionUserMap.size() >= properties.getMaxConnections().getGlobal()) {
            log.warn("Global WebSocket connections have reached maximum limit: {}", properties.getMaxConnections().getGlobal());
            return false;
        }

        // 添加连接
        connections.add(session);
        userConnections.put(userId, connections);
        sessionUserMap.put(session.getId(), userId);

        log.info("User {} connected, current connections: {}", userId, connections.size());
        return true;
    }

    public void removeConnection(WebSocketSession session) {
        String userId = sessionUserMap.remove(session.getId());
        if (userId != null) {
            Set<WebSocketSession> connections = userConnections.get(userId);
            if (connections != null) {
                connections.remove(session);
                if (connections.isEmpty()) {
                    userConnections.remove(userId);
                } else {
                    userConnections.put(userId, connections);
                }
                log.info("User {} disconnected, current connections: {}", userId, connections.size());
            }
        }
    }

    public int getUserConnectionCount(String userId) {
        Set<WebSocketSession> connections = userConnections.get(userId);
        return connections != null ? connections.size() : 0;
    }

    public int getTotalConnectionCount() {
        return sessionUserMap.size();
    }

    public Map<String, Set<WebSocketSession>> getUserConnections() {
        return userConnections;
    }

}

5.4.2 WebSocket 处理器

@Component
@Slf4j
public class WebSocketHandler extends TextWebSocketHandler {

    @Autowired
    private WebSocketConnectionManager connectionManager;

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        // 从会话中获取用户 ID
        String userId = (String) session.getAttributes().get("userId");
        if (userId == null) {
            log.warn("WebSocket connection established without userId");
            session.close();
            return;
        }

        // 添加连接
        boolean added = connectionManager.addConnection(userId, session);
        if (!added) {
            session.close(CloseStatus.TOO_MANY_CONNECTIONS);
            return;
        }

        log.info("WebSocket connection established for user: {}", userId);
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        // 处理文本消息
        String userId = (String) session.getAttributes().get("userId");
        log.info("Received message from user {}: {}", userId, message.getPayload());

        // 发送消息给用户
        session.sendMessage(new TextMessage("Message received: " + message.getPayload()));
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        // 移除连接
        connectionManager.removeConnection(session);
        log.info("WebSocket connection closed: {}", status);
    }

    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        log.error("WebSocket transport error", exception);
        // 移除连接
        connectionManager.removeConnection(session);
    }

}

5.4.3 心跳处理器

@Component
@Slf4j
public class HeartbeatHandler {

    @Autowired
    private WebSocketConnectionManager connectionManager;

    @Scheduled(fixedRate = 30000) // 每30秒检查一次
    public void checkHeartbeats() {
        log.info("Checking WebSocket connections for heartbeat");

        // 遍历所有连接,检查是否有长时间未活动的连接
        for (Map.Entry<String, Set<WebSocketSession>> entry : connectionManager.getUserConnections().entrySet()) {
            String userId = entry.getKey();
            Set<WebSocketSession> sessions = entry.getValue();

            Iterator<WebSocketSession> iterator = sessions.iterator();
            while (iterator.hasNext()) {
                WebSocketSession session = iterator.next();
                if (isSessionIdle(session)) {
                    try {
                        session.close(CloseStatus.NOT_ACCEPTABLE.withReason("Idle connection"));
                        iterator.remove();
                        log.info("Closed idle connection for user: {}", userId);
                    } catch (IOException e) {
                        log.error("Failed to close idle connection", e);
                    }
                }
            }

            // 如果用户没有连接了,移除用户
            if (sessions.isEmpty()) {
                connectionManager.getUserConnections().remove(userId);
            }
        }
    }

    private boolean isSessionIdle(WebSocketSession session) {
        // 检查会话是否空闲
        // 这里可以根据实际情况实现,例如检查最后活动时间
        return false; // 简化实现
    }

}

5.4.4 资源监控服务

@Component
@Slf4j
public class ResourceMonitorService {

    @Autowired
    private WebSocketConnectionManager connectionManager;

    @Autowired
    private MeterRegistry meterRegistry;

    @Autowired
    private ResourceMonitorProperties properties;

    @Scheduled(fixedRate = 60000) // 每分钟监控一次
    public void monitorResources() {
        // 监控连接数
        int totalConnections = connectionManager.getTotalConnectionCount();
        meterRegistry.gauge("websocket.connections.total", totalConnections);

        // 监控内存使用情况
        Runtime runtime = Runtime.getRuntime();
        long usedMemory = (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024);
        long maxMemory = runtime.maxMemory() / (1024 * 1024);
        double memoryUsage = (double) usedMemory / maxMemory * 100;

        meterRegistry.gauge("system.memory.usage", memoryUsage);

        // 监控 CPU 使用率
        // 这里可以使用第三方库获取 CPU 使用率

        log.info("Resource monitoring: connections={}, memory usage={}%", totalConnections, memoryUsage);

        // 检查是否需要告警
        if (totalConnections > properties.getAlert().getConnectionsThreshold()) {
            log.warn("High WebSocket connection count: {}", totalConnections);
        }

        if (memoryUsage > properties.getAlert().getMemoryThreshold()) {
            log.warn("High memory usage: {}%", memoryUsage);
        }
    }

}

5.5 控制器

5.5.1 WebSocket 控制器

@RestController
@RequestMapping("/api/websocket")
@Slf4j
public class WebSocketController {

    @Autowired
    private WebSocketConnectionManager connectionManager;

    @GetMapping("/connections")
    public Map<String, Object> getConnections() {
        Map<String, Object> result = new HashMap<>();
        result.put("totalConnections", connectionManager.getTotalConnectionCount());
        result.put("timestamp", LocalDateTime.now());
        return result;
    }

    @GetMapping("/connections/{userId}")
    public Map<String, Object> getUserConnections(@PathVariable String userId) {
        Map<String, Object> result = new HashMap<>();
        result.put("userId", userId);
        result.put("connectionCount", connectionManager.getUserConnectionCount(userId));
        result.put("timestamp", LocalDateTime.now());
        return result;
    }

}

六、最佳实践

6.1 WebSocket 连接管理

原则

  • 合理设置连接数限制:根据服务器资源和业务需求,设置合理的连接数限制
  • 实时监控连接状态:实时监控连接状态,及时发现异常
  • 优雅处理连接关闭:当连接数超过限制时,优雅关闭新连接
  • 定期清理空闲连接:定期清理长时间空闲的连接,释放资源

建议

  • 单个用户的连接数限制建议设置为 3-5 个
  • 全局连接数限制根据服务器资源和业务需求设置
  • 实现心跳机制,定期检测连接状态
  • 当连接数超过限制时,返回明确的错误信息

6.2 资源监控

原则

  • 实时监控资源使用情况:实时监控服务器资源使用情况,及时发现异常
  • 设置合理的告警阈值:根据服务器资源和业务需求,设置合理的告警阈值
  • 建立告警机制:当资源使用超过阈值时,及时告警
  • 定期分析资源使用趋势:定期分析资源使用趋势,提前预警

建议

  • 使用 Micrometer 记录资源使用指标
  • 集成 Prometheus 和 Grafana 进行可视化监控
  • 设置合理的告警阈值,例如连接数超过 1000,内存使用率超过 80%
  • 定期分析资源使用趋势,提前扩容

6.3 性能优化

原则

  • 减少连接开销:减少每个连接的开销,提高服务器处理能力
  • 优化消息处理:优化消息处理逻辑,提高处理效率
  • 使用连接池:使用连接池管理 WebSocket 连接,提高资源利用率
  • 负载均衡:使用负载均衡,分散连接压力

建议

  • 使用轻量级的 WebSocket 实现,减少连接开销
  • 优化消息处理逻辑,避免阻塞操作
  • 使用连接池管理 WebSocket 连接,提高资源利用率
  • 使用负载均衡,分散连接压力

6.4 安全措施

原则

  • 认证和授权:确保只有合法用户可以建立 WebSocket 连接
  • 防止恶意攻击:防止恶意用户通过建立大量连接来攻击服务器
  • 数据加密:对 WebSocket 通信进行加密,保护数据安全
  • 速率限制:限制单个用户的消息发送速率,防止消息轰炸

建议

  • 实现 WebSocket 连接的认证和授权
  • 基于 IP 地址和用户 ID 进行限流
  • 使用 WSS (WebSocket Secure) 进行加密通信
  • 限制单个用户的消息发送速率

七、总结

WebSocket 连接数限流和防资源耗尽是保障 WebSocket 服务稳定运行的重要措施。通过限制单个用户的最大连接数,以及采取其他防资源耗尽的措施,可以有效防止资源滥用和恶意攻击,保障服务的稳定性。在实际项目中,我们应该根据业务需求和服务器资源,合理配置 WebSocket 连接数限制和防资源耗尽措施,建立完善的监控和告警机制,确保 WebSocket 服务的稳定运行。通过 WebSocket 连接数限流和防资源耗尽功能,可以有效提高服务的可靠性和可用性,为用户提供更好的实时通信体验。

互动话题

  1. 你的项目中是如何管理 WebSocket 连接的?
  2. 你认为 WebSocket 连接数限流最大的挑战是什么?
  3. 你有遇到过 WebSocket 资源耗尽的问题吗?

欢迎在评论区留言讨论!更多技术文章,欢迎关注公众号:服务端技术精选


标题:SpringBoot + WebSocket 连接数限流 + 防资源耗尽:单用户最多建立 N 个连接,保障服务稳定
作者:jiangyi
地址:http://www.jiangyi.space/articles/2026/04/06/1774967841947.html
公众号:服务端技术精选
    评论
    0 评论
avatar

取消