SpringBoot + WebSocket + STOMP:支持群聊、@提醒、消息回执的企业 IM 系统实战

传统IM系统的挑战

在我们的日常开发工作中,经常会遇到这样的需求:

  • 需要实现实时聊天功能,支持一对一和群聊
  • 要有@提醒功能,让用户不错过重要消息
  • 需要消息回执,确保消息已送达
  • 要支持离线消息推送
  • 要有良好的性能和扩展性

如果用传统的HTTP轮询方式,不仅服务器压力大,用户体验也不好。今天我们就用WebSocket + STOMP技术来解决这些问题。

解决方案思路

今天我们要解决的,就是如何用SpringBoot + WebSocket + STOMP构建一个功能完整的企业IM系统。

核心思路是:

  1. WebSocket连接:建立持久化的双向通信通道
  2. STOMP协议:在WebSocket之上构建消息传递框架
  3. 消息路由:实现精确的消息推送和路由
  4. 状态管理:管理用户在线状态和消息状态

技术选型

  • SpringBoot:快速搭建应用
  • WebSocket:实时双向通信
  • STOMP:消息传递协议
  • Redis:消息存储和用户状态管理
  • Spring Security:连接认证

核心实现思路

1. WebSocket配置

首先配置WebSocket和STOMP:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        // 配置消息代理
        config.enableSimpleBroker("/topic", "/queue");  // 订阅频道前缀
        config.setApplicationDestinationPrefixes("/app");  // 应用目标前缀
        config.setUserDestinationPrefix("/user");  // 用户私信前缀
    }
    
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // 注册STOMP端点
        registry.addEndpoint("/ws")  // WebSocket连接端点
                .setAllowedOriginPatterns("*")  // 允许跨域
                .withSockJS();  // 支持降级到HTTP轮询
    }
    
    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        // 配置WebSocket传输参数
        registration.setMessageSizeLimit(8192)  // 消息大小限制
                  .setSendTimeLimit(20000)      // 发送时间限制
                  .setSendBufferSizeLimit(8192); // 发送缓冲区限制
    }
}

2. 消息模型定义

定义消息相关的数据模型:

@Data
public class ChatMessage {
    private String id;
    private String senderId;
    private String senderName;
    private String receiverId;  // 单聊接收者
    private String roomId;      // 群聊房间ID
    private MessageType type;   // 消息类型:TEXT, IMAGE, FILE等
    private String content;
    private Long timestamp;
    private MessageStatus status;  // 消息状态:SENT, DELIVERED, READ
    private List<String> mentionedUsers;  // @提醒的用户列表
    private String replyTo;     // 回复的消息ID
    
    public enum MessageType {
        TEXT, IMAGE, FILE, SYSTEM
    }
    
    public enum MessageStatus {
        SENT, DELIVERED, READ
    }
}

@Data
public class ChatRoom {
    private String id;
    private String name;
    private String type;  // PRIVATE, GROUP
    private Set<String> members;
    private String creatorId;
    private LocalDateTime createTime;
}

3. 消息处理控制器

实现消息处理逻辑:

@Controller
@Slf4j
public class ChatController {
    
    @Autowired
    private SimpMessagingTemplate messagingTemplate;
    
    @Autowired
    private ChatService chatService;
    
    /**
     * 发送消息
     */
    @MessageMapping("/chat.sendMessage")
    @SendTo("/topic/public")
    public ChatMessage sendMessage(@Payload ChatMessage message) {
        // 保存消息到数据库
        ChatMessage savedMessage = chatService.saveMessage(message);
        
        // 处理@提醒
        handleMentions(savedMessage);
        
        // 更新消息状态为已发送
        chatService.updateMessageStatus(savedMessage.getId(), MessageStatus.SENT);
        
        return savedMessage;
    }
    
    /**
     * 发送私信
     */
    @MessageMapping("/chat.sendPrivateMessage")
    public void sendPrivateMessage(@Payload ChatMessage message, Principal principal) {
        // 保存消息
        ChatMessage savedMessage = chatService.saveMessage(message);
        
        // 发送到指定用户
        messagingTemplate.convertAndSendToUser(
            message.getReceiverId(), 
            "/queue/messages", 
            savedMessage
        );
        
        // 更新消息状态
        chatService.updateMessageStatus(savedMessage.getId(), MessageStatus.SENT);
    }
    
    /**
     * 发送群聊消息
     */
    @MessageMapping("/chat.sendGroupMessage")
    public void sendGroupMessage(@Payload ChatMessage message, Principal principal) {
        // 验证用户是否有权限发送消息到群聊
        if (!chatService.isUserInChatRoom(principal.getName(), message.getRoomId())) {
            throw new UnauthorizedException("用户不在群聊中");
        }
        
        // 保存消息
        ChatMessage savedMessage = chatService.saveMessage(message);
        
        // 发送到群聊
        messagingTemplate.convertAndSend(
            "/topic/chatroom/" + message.getRoomId(), 
            savedMessage
        );
        
        // 更新消息状态
        chatService.updateMessageStatus(savedMessage.getId(), MessageStatus.SENT);
    }
    
    /**
     * 处理@提醒
     */
    private void handleMentions(ChatMessage message) {
        if (message.getMentionedUsers() != null && !message.getMentionedUsers().isEmpty()) {
            for (String mentionedUserId : message.getMentionedUsers()) {
                // 发送提醒通知
                ChatMessage mentionNotification = new ChatMessage();
                mentionNotification.setType(ChatMessage.MessageType.SYSTEM);
                mentionNotification.setContent("您在群聊中被@" + message.getSenderName() + "提及");
                mentionNotification.setSenderId("system");
                mentionNotification.setReceiverId(mentionedUserId);
                mentionNotification.setTimestamp(System.currentTimeMillis());
                
                messagingTemplate.convertAndSendToUser(
                    mentionedUserId,
                    "/queue/notifications",
                    mentionNotification
                );
            }
        }
    }
    
    /**
     * 消息回执
     */
    @MessageMapping("/chat.messageReceipt")
    public void messageReceipt(@Payload MessageReceipt receipt, Principal principal) {
        // 更新消息状态
        chatService.updateMessageStatus(receipt.getMessageId(), receipt.getStatus());
        
        // 如果是已读回执,通知发送方
        if (receipt.getStatus() == MessageStatus.READ) {
            ChatMessage readNotification = new ChatMessage();
            readNotification.setType(ChatMessage.MessageType.SYSTEM);
            readNotification.setContent("消息已被阅读");
            readNotification.setSenderId(receipt.getUserId());
            readNotification.setReceiverId(receipt.getSenderId());
            readNotification.setTimestamp(System.currentTimeMillis());
            
            messagingTemplate.convertAndSendToUser(
                receipt.getSenderId(),
                "/queue/messages",
                readNotification
            );
        }
    }
    
    /**
     * 用户上线
     */
    @EventListener
    public void handleWebSocketConnectListener(SessionConnectedEvent event) {
        log.info("用户连接: {}", event.getMessage().getHeaders().get("simpSessionId"));
    }
    
    /**
     * 用户断开连接
     */
    @EventListener
    public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) {
        String sessionId = event.getSessionId();
        String userId = chatService.getUserIdBySessionId(sessionId);
        
        if (userId != null) {
            // 更新用户状态为离线
            chatService.updateUserStatus(userId, UserStatus.OFFLINE);
            
            // 通知其他用户
            ChatMessage statusMessage = new ChatMessage();
            statusMessage.setType(ChatMessage.MessageType.SYSTEM);
            statusMessage.setContent(userId + " 已离线");
            statusMessage.setSenderId("system");
            statusMessage.setTimestamp(System.currentTimeMillis());
            
            messagingTemplate.convertAndSend("/topic/public", statusMessage);
        }
    }
}

4. 聊天服务实现

实现核心的聊天业务逻辑:

@Service
@Transactional
public class ChatService {
    
    @Autowired
    private MessageRepository messageRepository;
    
    @Autowired
    private UserRepository userRepository;
    
    @Autowired
    private ChatRoomRepository chatRoomRepository;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    /**
     * 保存消息
     */
    public ChatMessage saveMessage(ChatMessage message) {
        message.setId(UUID.randomUUID().toString());
        message.setTimestamp(System.currentTimeMillis());
        message.setStatus(MessageStatus.SENT);
        
        return messageRepository.save(message);
    }
    
    /**
     * 更新消息状态
     */
    public void updateMessageStatus(String messageId, MessageStatus status) {
        messageRepository.updateMessageStatus(messageId, status);
    }
    
    /**
     * 获取聊天记录
     */
    public List<ChatMessage> getChatHistory(String senderId, String receiverId, int page, int size) {
        return messageRepository.findChatHistory(senderId, receiverId, page, size);
    }
    
    /**
     * 获取群聊记录
     */
    public List<ChatMessage> getGroupChatHistory(String roomId, int page, int size) {
        return messageRepository.findGroupChatHistory(roomId, page, size);
    }
    
    /**
     * 检查用户是否在群聊中
     */
    public boolean isUserInChatRoom(String userId, String roomId) {
        return chatRoomRepository.isUserInRoom(userId, roomId);
    }
    
    /**
     * 获取在线用户
     */
    public Set<String> getOnlineUsers() {
        // 从Redis中获取在线用户列表
        return redisTemplate.opsForSet().members("online_users");
    }
    
    /**
     * 更新用户状态
     */
    public void updateUserStatus(String userId, UserStatus status) {
        if (UserStatus.ONLINE.equals(status)) {
            redisTemplate.opsForSet().add("online_users", userId);
        } else {
            redisTemplate.opsForSet().remove("online_users", userId);
        }
    }
    
    /**
     * 获取用户ID通过Session ID
     */
    public String getUserIdBySessionId(String sessionId) {
        // 从Redis中获取Session到用户ID的映射
        return (String) redisTemplate.opsForValue().get("session:" + sessionId);
    }
}

5. 安全认证配置

配置WebSocket连接的安全认证:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketSecurityConfig implements WebSocketMessageBrokerConfigurer {
    
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(new ChannelInterceptor() {
            @Override
            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
                
                if (StompCommand.CONNECT.equals(accessor.getCommand())) {
                    // 连接认证
                    String token = accessor.getFirstNativeHeader("Authorization");
                    if (token != null && token.startsWith("Bearer ")) {
                        String jwt = token.substring(7);
                        String username = JwtUtil.getUsernameFromToken(jwt);
                        
                        if (username != null && JwtUtil.validateToken(jwt)) {
                            // 创建认证对象
                            UserDetails userDetails = userDetailsService.loadUserByUsername(username);
                            UsernamePasswordAuthenticationToken authentication = 
                                new UsernamePasswordAuthenticationToken(
                                    userDetails, null, userDetails.getAuthorities()
                                );
                            
                            accessor.setUser(authentication);
                        }
                    }
                }
                
                return message;
            }
        });
    }
}

6. 消息存储优化

使用Redis优化消息存储和推送:

@Service
public class RedisMessageService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    /**
     * 保存离线消息
     */
    public void saveOfflineMessage(String userId, ChatMessage message) {
        String key = "offline_messages:" + userId;
        redisTemplate.opsForList().rightPush(key, message);
        
        // 设置过期时间,避免无限存储
        redisTemplate.expire(key, Duration.ofDays(7));
    }
    
    /**
     * 获取离线消息
     */
    public List<ChatMessage> getOfflineMessages(String userId) {
        String key = "offline_messages:" + userId;
        List<ChatMessage> messages = redisTemplate.opsForList().range(key, 0, -1);
        
        // 清除已获取的离线消息
        redisTemplate.delete(key);
        
        return messages != null ? messages : new ArrayList<>();
    }
    
    /**
     * 保存用户Session映射
     */
    public void saveUserSession(String sessionId, String userId) {
        redisTemplate.opsForValue().set("session:" + sessionId, userId);
        redisTemplate.expire("session:" + sessionId, Duration.ofHours(24));
    }
}

7. REST API接口

提供额外的REST API接口:

@RestController
@RequestMapping("/api/chat")
public class ChatRestController {
    
    @Autowired
    private ChatService chatService;
    
    @Autowired
    private RedisMessageService redisMessageService;
    
    /**
     * 获取聊天记录
     */
    @GetMapping("/history")
    public Result<List<ChatMessage>> getChatHistory(
            @RequestParam String userId,
            @RequestParam(defaultValue = "0") int page,
            @RequestParam(defaultValue = "20") int size) {
        
        List<ChatMessage> history = chatService.getChatHistory(
            SecurityUtil.getCurrentUserId(), userId, page, size);
        
        return Result.success(history);
    }
    
    /**
     * 获取群聊记录
     */
    @GetMapping("/group/{roomId}/history")
    public Result<List<ChatMessage>> getGroupHistory(
            @PathVariable String roomId,
            @RequestParam(defaultValue = "0") int page,
            @RequestParam(defaultValue = "20") int size) {
        
        List<ChatMessage> history = chatService.getGroupChatHistory(roomId, page, size);
        
        return Result.success(history);
    }
    
    /**
     * 获取离线消息
     */
    @GetMapping("/offline-messages")
    public Result<List<ChatMessage>> getOfflineMessages() {
        String userId = SecurityUtil.getCurrentUserId();
        List<ChatMessage> offlineMessages = redisMessageService.getOfflineMessages(userId);
        
        return Result.success(offlineMessages);
    }
    
    /**
     * 获取在线用户列表
     */
    @GetMapping("/online-users")
    public Result<Set<String>> getOnlineUsers() {
        Set<String> onlineUsers = chatService.getOnlineUsers();
        return Result.success(onlineUsers);
    }
}

性能优化策略

1. 消息分页加载

// 前端实现消息分页加载
function loadMoreMessages() {
    const currentOffset = messages.length;
    fetch(`/api/chat/history?userId=${targetUserId}&page=${currentOffset}&size=20`)
        .then(response => response.json())
        .then(data => {
            messages.unshift(...data.data);
            renderMessages();
        });
}

2. 消息压缩

// 消息压缩配置
@Configuration
public class MessageCompressionConfig {
    
    @Bean
    public MessageConverter messageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setObjectMapper(new ObjectMapper()
            .configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false));
        return converter;
    }
}

优势分析

相比传统的HTTP轮询方式,WebSocket + STOMP方案的优势明显:

  1. 实时性:毫秒级消息推送
  2. 资源消耗低:单个连接处理多个消息
  3. 扩展性好:支持集群部署
  4. 功能丰富:支持复杂的聊天功能
  5. 用户体验佳:接近原生应用体验

注意事项

  1. 连接管理:需要处理连接断开重连逻辑
  2. 消息顺序:保证消息的有序性
  3. 安全防护:防止消息伪造和重放攻击
  4. 资源限制:控制单个用户的连接数
  5. 监控告警:建立连接和消息处理监控

总结

通过SpringBoot + WebSocket + STOMP的技术组合,我们可以构建一个功能完整、性能优异的企业IM系统。这不仅能提升团队协作效率,还能为用户提供良好的沟通体验。

在实际项目中,建议根据具体业务需求进行定制化开发,并充分考虑安全性、性能和可扩展性等因素。


服务端技术精选,专注分享后端开发实战技术,助力你的技术成长!


标题:SpringBoot + WebSocket + STOMP:支持群聊、@提醒、消息回执的企业 IM 系统实战
作者:jiangyi
地址:http://www.jiangyi.space/articles/2026/01/12/1768215056068.html

    0 评论
avatar