SpringBoot + WebSocket 集群 + Redis Pub/Sub:多节点在线用户状态同步,消息精准投递

今天咱们聊聊一个在实时通信场景中非常关键的话题:如何在多节点集群环境下实现WebSocket连接的精准消息投递。

集群环境下的挑战

在我们的日常开发工作中,经常会遇到这样的实时通信需求:

  • 在线客服系统需要将客户消息精准推送给对应的客服
  • 消息推送系统需要将通知发送给指定用户
  • 游戏系统需要将房间消息广播给房间内的玩家
  • 即时通讯应用需要实现点对点消息传递

传统的单节点WebSocket实现无法满足集群部署的需求,当用户连接分布到不同节点时,消息投递就成了大问题。

集群消息投递的核心问题

相比单节点部署,集群环境下主要有以下挑战:

  • 连接分散:用户连接分布在不同节点
  • 状态同步:需要同步用户连接状态
  • 消息路由:需要将消息准确路由到目标连接
  • 负载均衡:需要合理分配连接负载

核心实现方案

1. WebSocket集群架构

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
    
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(clusterWebSocketHandler(), "/ws/{userId}")
                .setAllowedOrigins("*");
    }
    
    @Bean
    public ClusterWebSocketHandler clusterWebSocketHandler() {
        return new ClusterWebSocketHandler();
    }
}

2. 连接管理器

@Component
public class ConnectionManager {
    
    // 本地连接映射
    private final Map<String, WebSocketSession> localConnections = new ConcurrentHashMap<>();
    
    // 用户ID到节点的映射(存储在Redis中)
    private static final String USER_NODE_MAP_KEY = "websocket:user:node";
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    @Autowired
    private RedisPubSubService redisPubSubService;
    
    public void addConnection(String userId, WebSocketSession session) {
        // 添加本地连接
        localConnections.put(userId, session);
        
        // 更新Redis中的用户节点映射
        String nodeId = getNodeIdentifier();
        redisTemplate.opsForHash().put(USER_NODE_MAP_KEY, userId, nodeId);
        
        // 发布连接状态变更事件
        redisPubSubService.publishUserStatusChange(userId, nodeId, "CONNECTED");
    }
    
    public void removeConnection(String userId) {
        // 移除本地连接
        localConnections.remove(userId);
        
        // 从Redis中删除用户节点映射
        redisTemplate.opsForHash().delete(USER_NODE_MAP_KEY, userId);
        
        // 发布连接状态变更事件
        redisPubSubService.publishUserStatusChange(userId, getNodeIdentifier(), "DISCONNECTED");
    }
    
    public boolean isUserOnline(String userId) {
        // 检查本地连接
        if (localConnections.containsKey(userId)) {
            return true;
        }
        
        // 检查Redis中的节点映射
        String nodeId = (String) redisTemplate.opsForHash().get(USER_NODE_MAP_KEY, userId);
        return nodeId != null;
    }
    
    private String getNodeIdentifier() {
        // 获取当前节点标识
        return InetAddress.getLocalHost().getHostName() + ":" + 
               System.getProperty("server.port", "8080");
    }
}

3. Redis消息发布订阅

@Component
public class RedisPubSubService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private ConnectionManager connectionManager;
    
    public void publishUserStatusChange(String userId, String nodeId, String status) {
        UserStatusMessage message = new UserStatusMessage();
        message.setUserId(userId);
        message.setNodeId(nodeId);
        message.setStatus(status);
        message.setTimestamp(System.currentTimeMillis());
        
        redisTemplate.convertAndSend("user:status:channel", message);
    }
    
    public void publishMessage(String userId, String message) {
        // 构建消息对象
        TargetedMessage targetedMessage = new TargetedMessage();
        targetedMessage.setUserId(userId);
        targetedMessage.setContent(message);
        targetedMessage.setSenderId(getNodeIdentifier());
        
        // 发布到Redis频道
        redisTemplate.convertAndSend("message:channel", targetedMessage);
    }
    
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        
        // 订阅用户状态变更频道
        container.addMessageListener(userStatusListener(), new PatternTopic("user:status:channel"));
        
        // 订阅消息投递频道
        container.addMessageListener(messageDeliveryListener(), new PatternTopic("message:channel"));
        
        return container;
    }
    
    @Bean
    public MessageListener userStatusListener() {
        return (message, pattern) -> {
            String json = new String(message.getBody());
            UserStatusMessage statusMessage = JsonUtils.fromJson(json, UserStatusMessage.class);
            
            // 处理用户状态变更
            handleUserStatusChange(statusMessage);
        };
    }
    
    @Bean
    public MessageListener messageDeliveryListener() {
        return (message, pattern) -> {
            String json = new String(message.getBody());
            TargetedMessage targetedMessage = JsonUtils.fromJson(json, TargetedMessage.class);
            
            // 尝试本地投递消息
            deliverMessageLocally(targetedMessage);
        };
    }
    
    private void handleUserStatusChange(UserStatusMessage message) {
        if ("DISCONNECTED".equals(message.getStatus())) {
            // 清理本地缓存
            connectionManager.removeLocalUserCache(message.getUserId());
        }
    }
    
    private void deliverMessageLocally(TargetedMessage message) {
        WebSocketSession session = connectionManager.getConnection(message.getUserId());
        if (session != null && session.isOpen()) {
            try {
                session.sendMessage(new TextMessage(message.getContent()));
            } catch (IOException e) {
                log.error("消息投递失败", e);
            }
        }
    }
}

4. WebSocket处理器

@Component
public class ClusterWebSocketHandler extends TextWebSocketHandler {
    
    @Autowired
    private ConnectionManager connectionManager;
    
    @Autowired
    private RedisPubSubService redisPubSubService;
    
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        // 从URI中提取用户ID
        String userId = extractUserId(session);
        
        // 添加连接到管理器
        connectionManager.addConnection(userId, session);
        
        log.info("用户 {} 连接成功,节点:{}", userId, getNodeIdentifier());
    }
    
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        String userId = extractUserId(session);
        
        // 移除连接
        connectionManager.removeConnection(userId);
        
        log.info("用户 {} 连接关闭,状态:{}", userId, status);
    }
    
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        String userId = extractUserId(session);
        String payload = message.getPayload();
        
        // 解析消息内容
        MessageContent content = JsonUtils.fromJson(payload, MessageContent.class);
        
        if (MessageType.SEND_TO_USER.equals(content.getType())) {
            // 发送消息给指定用户
            sendMessageToUser(content.getTargetUserId(), content.getContent());
        } else if (MessageType.BROADCAST.equals(content.getType())) {
            // 广播消息
            broadcastMessage(content.getContent());
        }
    }
    
    private void sendMessageToUser(String targetUserId, String content) {
        // 检查目标用户是否在线
        if (!connectionManager.isUserOnline(targetUserId)) {
            log.warn("目标用户 {} 不在线", targetUserId);
            return;
        }
        
        // 通过Redis发布消息
        redisPubSubService.publishMessage(targetUserId, content);
    }
    
    private void broadcastMessage(String content) {
        // 获取所有在线用户
        Set<String> onlineUsers = connectionManager.getAllOnlineUsers();
        
        for (String userId : onlineUsers) {
            sendMessageToUser(userId, content);
        }
    }
    
    private String extractUserId(WebSocketSession session) {
        // 从URI路径中提取用户ID
        Map<String, Object> attributes = session.getAttributes();
        String uri = session.getUri().toString();
        
        // 假设URI格式为 /ws/{userId}
        String[] parts = uri.split("/");
        if (parts.length >= 3) {
            return parts[2]; // 返回userId
        }
        
        return null;
    }
}

高级特性实现

1. 消息可靠性保证

@Component
public class ReliableMessageService {
    
    private static final String MESSAGE_QUEUE_KEY = "message:queue:";
    private static final String MESSAGE_ACK_KEY = "message:ack:";
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    public void sendMessageWithRetry(String userId, String message, int maxRetries) {
        String messageId = UUID.randomUUID().toString();
        String queueKey = MESSAGE_QUEUE_KEY + userId;
        
        // 将消息存入队列
        PendingMessage pendingMessage = new PendingMessage();
        pendingMessage.setMessageId(messageId);
        pendingMessage.setContent(message);
        pendingMessage.setTimestamp(System.currentTimeMillis());
        pendingMessage.setRetryCount(0);
        pendingMessage.setMaxRetries(maxRetries);
        
        redisTemplate.opsForList().leftPush(queueKey, pendingMessage);
        
        // 设置队列过期时间
        redisTemplate.expire(queueKey, Duration.ofHours(24));
    }
    
    public void acknowledgeMessage(String userId, String messageId) {
        String ackKey = MESSAGE_ACK_KEY + userId;
        redisTemplate.opsForSet().add(ackKey, messageId);
        redisTemplate.expire(ackKey, Duration.ofDays(7));
    }
    
    @Scheduled(fixedRate = 30000) // 每30秒检查一次未确认消息
    public void checkUnacknowledgedMessages() {
        // 获取所有在线用户
        Set<String> onlineUsers = getAllOnlineUsers();
        
        for (String userId : onlineUsers) {
            checkUserPendingMessages(userId);
        }
    }
    
    private void checkUserPendingMessages(String userId) {
        String queueKey = MESSAGE_QUEUE_KEY + userId;
        List<PendingMessage> pendingMessages = 
            redisTemplate.opsForList().range(queueKey, 0, -1);
        
        if (pendingMessages != null) {
            for (PendingMessage message : pendingMessages) {
                if (message.getRetryCount() < message.getMaxRetries()) {
                    // 重新发送消息
                    redisPubSubService.publishMessage(userId, message.getContent());
                    
                    // 更新重试次数
                    message.setRetryCount(message.getRetryCount() + 1);
                    redisTemplate.opsForList().remove(queueKey, 0, message);
                    redisTemplate.opsForList().leftPush(queueKey, message);
                } else {
                    // 达到最大重试次数,移除消息
                    redisTemplate.opsForList().remove(queueKey, 0, message);
                }
            }
        }
    }
}

2. 用户状态管理

@Service
public class UserStatusService {
    
    private static final String USER_STATUS_KEY = "user:status:";
    private static final String USER_HEARTBEAT_KEY = "user:heartbeat:";
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    public void updateUserStatus(String userId, UserStatus status) {
        String statusKey = USER_STATUS_KEY + userId;
        
        UserStatusInfo statusInfo = new UserStatusInfo();
        statusInfo.setUserId(userId);
        statusInfo.setStatus(status);
        statusInfo.setNodeId(getNodeIdentifier());
        statusInfo.setUpdateTime(System.currentTimeMillis());
        
        redisTemplate.opsForValue().set(statusKey, statusInfo, Duration.ofMinutes(30));
    }
    
    public UserStatus getUserStatus(String userId) {
        String statusKey = USER_STATUS_KEY + userId;
        UserStatusInfo statusInfo = (UserStatusInfo) redisTemplate.opsForValue().get(statusKey);
        
        if (statusInfo != null) {
            // 检查心跳是否超时
            if (System.currentTimeMillis() - statusInfo.getUpdateTime() > 60000) { // 1分钟
                // 心跳超时,认为用户离线
                return UserStatus.OFFLINE;
            }
            return statusInfo.getStatus();
        }
        
        return UserStatus.OFFLINE;
    }
    
    public void updateHeartbeat(String userId) {
        String heartbeatKey = USER_HEARTBEAT_KEY + userId;
        redisTemplate.opsForValue().set(heartbeatKey, System.currentTimeMillis(), 
                                      Duration.ofMinutes(5));
    }
    
    @Scheduled(fixedRate = 60000) // 每分钟检查一次心跳
    public void checkUserHeartbeat() {
        // 这里可以使用Redis的SCAN命令遍历所有心跳键
        // 检查超时的用户并更新状态
    }
}

3. 负载均衡策略

@Component
public class LoadBalancer {
    
    private static final String NODE_CONNECTION_COUNT_KEY = "node:connection:count";
    private static final int MAX_CONNECTIONS_PER_NODE = 1000;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    public boolean canAcceptConnection() {
        String nodeId = getNodeIdentifier();
        Integer currentCount = (Integer) redisTemplate.opsForHash()
            .get(NODE_CONNECTION_COUNT_KEY, nodeId);
        
        return currentCount == null || currentCount < MAX_CONNECTIONS_PER_NODE;
    }
    
    public void incrementConnectionCount() {
        String nodeId = getNodeIdentifier();
        redisTemplate.opsForHash().increment(NODE_CONNECTION_COUNT_KEY, nodeId, 1);
    }
    
    public void decrementConnectionCount() {
        String nodeId = getNodeIdentifier();
        redisTemplate.opsForHash().increment(NODE_CONNECTION_COUNT_KEY, nodeId, -1);
    }
    
    public String getLeastLoadedNode() {
        Map<Object, Object> nodeCounts = redisTemplate.opsForHash()
            .entries(NODE_CONNECTION_COUNT_KEY);
        
        String leastLoadedNode = null;
        int minConnections = Integer.MAX_VALUE;
        
        for (Map.Entry<Object, Object> entry : nodeCounts.entrySet()) {
            int count = (Integer) entry.getValue();
            if (count < minConnections) {
                minConnections = count;
                leastLoadedNode = (String) entry.getKey();
            }
        }
        
        return leastLoadedNode;
    }
}

最佳实践建议

  1. 连接池管理:合理设置连接池大小,避免资源耗尽
  2. 消息序列化:使用高效的序列化方式,如Protobuf或Jackson
  3. 监控告警:监控连接数、消息延迟等关键指标
  4. 优雅关闭:确保节点关闭时正确清理连接状态
  5. 安全考虑:验证用户身份,防止未授权访问

通过这样的集群架构,我们可以实现高可用、可扩展的WebSocket实时通信系统。


以上就是本期分享的内容,希望对你有所帮助。更多技术干货,请关注服务端技术精选,我们下期再见!


标题:SpringBoot + WebSocket 集群 + Redis Pub/Sub:多节点在线用户状态同步,消息精准投递
作者:jiangyi
地址:http://www.jiangyi.space/articles/2026/02/05/1770094763784.html

    0 评论
avatar