SpringBoot + WebSocket 集群 + Redis Pub/Sub:多节点在线用户状态同步,消息精准投递
今天咱们聊聊一个在实时通信场景中非常关键的话题:如何在多节点集群环境下实现WebSocket连接的精准消息投递。
集群环境下的挑战
在我们的日常开发工作中,经常会遇到这样的实时通信需求:
- 在线客服系统需要将客户消息精准推送给对应的客服
- 消息推送系统需要将通知发送给指定用户
- 游戏系统需要将房间消息广播给房间内的玩家
- 即时通讯应用需要实现点对点消息传递
传统的单节点WebSocket实现无法满足集群部署的需求,当用户连接分布到不同节点时,消息投递就成了大问题。
集群消息投递的核心问题
相比单节点部署,集群环境下主要有以下挑战:
- 连接分散:用户连接分布在不同节点
- 状态同步:需要同步用户连接状态
- 消息路由:需要将消息准确路由到目标连接
- 负载均衡:需要合理分配连接负载
核心实现方案
1. WebSocket集群架构
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(clusterWebSocketHandler(), "/ws/{userId}")
.setAllowedOrigins("*");
}
@Bean
public ClusterWebSocketHandler clusterWebSocketHandler() {
return new ClusterWebSocketHandler();
}
}
2. 连接管理器
@Component
public class ConnectionManager {
// 本地连接映射
private final Map<String, WebSocketSession> localConnections = new ConcurrentHashMap<>();
// 用户ID到节点的映射(存储在Redis中)
private static final String USER_NODE_MAP_KEY = "websocket:user:node";
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private RedisPubSubService redisPubSubService;
public void addConnection(String userId, WebSocketSession session) {
// 添加本地连接
localConnections.put(userId, session);
// 更新Redis中的用户节点映射
String nodeId = getNodeIdentifier();
redisTemplate.opsForHash().put(USER_NODE_MAP_KEY, userId, nodeId);
// 发布连接状态变更事件
redisPubSubService.publishUserStatusChange(userId, nodeId, "CONNECTED");
}
public void removeConnection(String userId) {
// 移除本地连接
localConnections.remove(userId);
// 从Redis中删除用户节点映射
redisTemplate.opsForHash().delete(USER_NODE_MAP_KEY, userId);
// 发布连接状态变更事件
redisPubSubService.publishUserStatusChange(userId, getNodeIdentifier(), "DISCONNECTED");
}
public boolean isUserOnline(String userId) {
// 检查本地连接
if (localConnections.containsKey(userId)) {
return true;
}
// 检查Redis中的节点映射
String nodeId = (String) redisTemplate.opsForHash().get(USER_NODE_MAP_KEY, userId);
return nodeId != null;
}
private String getNodeIdentifier() {
// 获取当前节点标识
return InetAddress.getLocalHost().getHostName() + ":" +
System.getProperty("server.port", "8080");
}
}
3. Redis消息发布订阅
@Component
public class RedisPubSubService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private ConnectionManager connectionManager;
public void publishUserStatusChange(String userId, String nodeId, String status) {
UserStatusMessage message = new UserStatusMessage();
message.setUserId(userId);
message.setNodeId(nodeId);
message.setStatus(status);
message.setTimestamp(System.currentTimeMillis());
redisTemplate.convertAndSend("user:status:channel", message);
}
public void publishMessage(String userId, String message) {
// 构建消息对象
TargetedMessage targetedMessage = new TargetedMessage();
targetedMessage.setUserId(userId);
targetedMessage.setContent(message);
targetedMessage.setSenderId(getNodeIdentifier());
// 发布到Redis频道
redisTemplate.convertAndSend("message:channel", targetedMessage);
}
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 订阅用户状态变更频道
container.addMessageListener(userStatusListener(), new PatternTopic("user:status:channel"));
// 订阅消息投递频道
container.addMessageListener(messageDeliveryListener(), new PatternTopic("message:channel"));
return container;
}
@Bean
public MessageListener userStatusListener() {
return (message, pattern) -> {
String json = new String(message.getBody());
UserStatusMessage statusMessage = JsonUtils.fromJson(json, UserStatusMessage.class);
// 处理用户状态变更
handleUserStatusChange(statusMessage);
};
}
@Bean
public MessageListener messageDeliveryListener() {
return (message, pattern) -> {
String json = new String(message.getBody());
TargetedMessage targetedMessage = JsonUtils.fromJson(json, TargetedMessage.class);
// 尝试本地投递消息
deliverMessageLocally(targetedMessage);
};
}
private void handleUserStatusChange(UserStatusMessage message) {
if ("DISCONNECTED".equals(message.getStatus())) {
// 清理本地缓存
connectionManager.removeLocalUserCache(message.getUserId());
}
}
private void deliverMessageLocally(TargetedMessage message) {
WebSocketSession session = connectionManager.getConnection(message.getUserId());
if (session != null && session.isOpen()) {
try {
session.sendMessage(new TextMessage(message.getContent()));
} catch (IOException e) {
log.error("消息投递失败", e);
}
}
}
}
4. WebSocket处理器
@Component
public class ClusterWebSocketHandler extends TextWebSocketHandler {
@Autowired
private ConnectionManager connectionManager;
@Autowired
private RedisPubSubService redisPubSubService;
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// 从URI中提取用户ID
String userId = extractUserId(session);
// 添加连接到管理器
connectionManager.addConnection(userId, session);
log.info("用户 {} 连接成功,节点:{}", userId, getNodeIdentifier());
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
String userId = extractUserId(session);
// 移除连接
connectionManager.removeConnection(userId);
log.info("用户 {} 连接关闭,状态:{}", userId, status);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String userId = extractUserId(session);
String payload = message.getPayload();
// 解析消息内容
MessageContent content = JsonUtils.fromJson(payload, MessageContent.class);
if (MessageType.SEND_TO_USER.equals(content.getType())) {
// 发送消息给指定用户
sendMessageToUser(content.getTargetUserId(), content.getContent());
} else if (MessageType.BROADCAST.equals(content.getType())) {
// 广播消息
broadcastMessage(content.getContent());
}
}
private void sendMessageToUser(String targetUserId, String content) {
// 检查目标用户是否在线
if (!connectionManager.isUserOnline(targetUserId)) {
log.warn("目标用户 {} 不在线", targetUserId);
return;
}
// 通过Redis发布消息
redisPubSubService.publishMessage(targetUserId, content);
}
private void broadcastMessage(String content) {
// 获取所有在线用户
Set<String> onlineUsers = connectionManager.getAllOnlineUsers();
for (String userId : onlineUsers) {
sendMessageToUser(userId, content);
}
}
private String extractUserId(WebSocketSession session) {
// 从URI路径中提取用户ID
Map<String, Object> attributes = session.getAttributes();
String uri = session.getUri().toString();
// 假设URI格式为 /ws/{userId}
String[] parts = uri.split("/");
if (parts.length >= 3) {
return parts[2]; // 返回userId
}
return null;
}
}
高级特性实现
1. 消息可靠性保证
@Component
public class ReliableMessageService {
private static final String MESSAGE_QUEUE_KEY = "message:queue:";
private static final String MESSAGE_ACK_KEY = "message:ack:";
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public void sendMessageWithRetry(String userId, String message, int maxRetries) {
String messageId = UUID.randomUUID().toString();
String queueKey = MESSAGE_QUEUE_KEY + userId;
// 将消息存入队列
PendingMessage pendingMessage = new PendingMessage();
pendingMessage.setMessageId(messageId);
pendingMessage.setContent(message);
pendingMessage.setTimestamp(System.currentTimeMillis());
pendingMessage.setRetryCount(0);
pendingMessage.setMaxRetries(maxRetries);
redisTemplate.opsForList().leftPush(queueKey, pendingMessage);
// 设置队列过期时间
redisTemplate.expire(queueKey, Duration.ofHours(24));
}
public void acknowledgeMessage(String userId, String messageId) {
String ackKey = MESSAGE_ACK_KEY + userId;
redisTemplate.opsForSet().add(ackKey, messageId);
redisTemplate.expire(ackKey, Duration.ofDays(7));
}
@Scheduled(fixedRate = 30000) // 每30秒检查一次未确认消息
public void checkUnacknowledgedMessages() {
// 获取所有在线用户
Set<String> onlineUsers = getAllOnlineUsers();
for (String userId : onlineUsers) {
checkUserPendingMessages(userId);
}
}
private void checkUserPendingMessages(String userId) {
String queueKey = MESSAGE_QUEUE_KEY + userId;
List<PendingMessage> pendingMessages =
redisTemplate.opsForList().range(queueKey, 0, -1);
if (pendingMessages != null) {
for (PendingMessage message : pendingMessages) {
if (message.getRetryCount() < message.getMaxRetries()) {
// 重新发送消息
redisPubSubService.publishMessage(userId, message.getContent());
// 更新重试次数
message.setRetryCount(message.getRetryCount() + 1);
redisTemplate.opsForList().remove(queueKey, 0, message);
redisTemplate.opsForList().leftPush(queueKey, message);
} else {
// 达到最大重试次数,移除消息
redisTemplate.opsForList().remove(queueKey, 0, message);
}
}
}
}
}
2. 用户状态管理
@Service
public class UserStatusService {
private static final String USER_STATUS_KEY = "user:status:";
private static final String USER_HEARTBEAT_KEY = "user:heartbeat:";
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public void updateUserStatus(String userId, UserStatus status) {
String statusKey = USER_STATUS_KEY + userId;
UserStatusInfo statusInfo = new UserStatusInfo();
statusInfo.setUserId(userId);
statusInfo.setStatus(status);
statusInfo.setNodeId(getNodeIdentifier());
statusInfo.setUpdateTime(System.currentTimeMillis());
redisTemplate.opsForValue().set(statusKey, statusInfo, Duration.ofMinutes(30));
}
public UserStatus getUserStatus(String userId) {
String statusKey = USER_STATUS_KEY + userId;
UserStatusInfo statusInfo = (UserStatusInfo) redisTemplate.opsForValue().get(statusKey);
if (statusInfo != null) {
// 检查心跳是否超时
if (System.currentTimeMillis() - statusInfo.getUpdateTime() > 60000) { // 1分钟
// 心跳超时,认为用户离线
return UserStatus.OFFLINE;
}
return statusInfo.getStatus();
}
return UserStatus.OFFLINE;
}
public void updateHeartbeat(String userId) {
String heartbeatKey = USER_HEARTBEAT_KEY + userId;
redisTemplate.opsForValue().set(heartbeatKey, System.currentTimeMillis(),
Duration.ofMinutes(5));
}
@Scheduled(fixedRate = 60000) // 每分钟检查一次心跳
public void checkUserHeartbeat() {
// 这里可以使用Redis的SCAN命令遍历所有心跳键
// 检查超时的用户并更新状态
}
}
3. 负载均衡策略
@Component
public class LoadBalancer {
private static final String NODE_CONNECTION_COUNT_KEY = "node:connection:count";
private static final int MAX_CONNECTIONS_PER_NODE = 1000;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public boolean canAcceptConnection() {
String nodeId = getNodeIdentifier();
Integer currentCount = (Integer) redisTemplate.opsForHash()
.get(NODE_CONNECTION_COUNT_KEY, nodeId);
return currentCount == null || currentCount < MAX_CONNECTIONS_PER_NODE;
}
public void incrementConnectionCount() {
String nodeId = getNodeIdentifier();
redisTemplate.opsForHash().increment(NODE_CONNECTION_COUNT_KEY, nodeId, 1);
}
public void decrementConnectionCount() {
String nodeId = getNodeIdentifier();
redisTemplate.opsForHash().increment(NODE_CONNECTION_COUNT_KEY, nodeId, -1);
}
public String getLeastLoadedNode() {
Map<Object, Object> nodeCounts = redisTemplate.opsForHash()
.entries(NODE_CONNECTION_COUNT_KEY);
String leastLoadedNode = null;
int minConnections = Integer.MAX_VALUE;
for (Map.Entry<Object, Object> entry : nodeCounts.entrySet()) {
int count = (Integer) entry.getValue();
if (count < minConnections) {
minConnections = count;
leastLoadedNode = (String) entry.getKey();
}
}
return leastLoadedNode;
}
}
最佳实践建议
- 连接池管理:合理设置连接池大小,避免资源耗尽
- 消息序列化:使用高效的序列化方式,如Protobuf或Jackson
- 监控告警:监控连接数、消息延迟等关键指标
- 优雅关闭:确保节点关闭时正确清理连接状态
- 安全考虑:验证用户身份,防止未授权访问
通过这样的集群架构,我们可以实现高可用、可扩展的WebSocket实时通信系统。
以上就是本期分享的内容,希望对你有所帮助。更多技术干货,请关注服务端技术精选,我们下期再见!
标题:SpringBoot + WebSocket 集群 + Redis Pub/Sub:多节点在线用户状态同步,消息精准投递
作者:jiangyi
地址:http://www.jiangyi.space/articles/2026/02/05/1770094763784.html
0 评论