SpringBoot通过SSE实现消息推送:告别轮询,让实时消息推送更高效

引言:消息推送的演进之路

用户在线等待订单状态更新,但页面一直显示"处理中"?或者需要实时显示系统通知,但轮询方式消耗资源又不及时?再或者要实现在线聊天功能,却发现WebSocket实现起来太复杂?

这就是传统消息推送方式的局限性。今天我们就来聊聊SSE(Server-Sent Events),看看它如何解决这些痛点,让你的消息推送系统更高效、更稳定。

为什么传统方式有局限性?

先说说为什么传统的轮询方式不够用。

想象一下,你是一家电商平台的后端工程师。有100万用户同时在线查看订单状态,如果用轮询方式:

  1. 每个用户每5秒轮询一次
  2. 100万用户就是每秒20万次请求
  3. 服务器压力巨大
  4. 大部分请求都是无效的(订单状态没变)

这会导致什么问题?

  • 服务器资源消耗巨大:大量无效请求
  • 响应不及时:轮询间隔时间内无法获取最新状态
  • 网络开销大:每次请求都包含完整HTTP头

SSE:服务器推送消息的轻量级方案

SSE(Server-Sent Events)是HTML5规范的一部分,专门为服务器向客户端推送消息而设计:

  • 单向通信:服务器向客户端推送
  • 自动重连:连接断开后自动重连
  • 事件ID:支持断线续传
  • 轻量级:基于HTTP协议
  • 浏览器原生支持:无需额外库

SSE vs WebSocket vs 轮询

SSE优势

  • 比WebSocket简单,实现成本低
  • 比轮询高效,减少无效请求
  • 自动重连,无需手动处理
  • 浏览器原生支持

适用场景

  • 订单状态更新
  • 系统通知推送
  • 实时日志查看
  • 股票价格更新

SpringBoot集成SSE实战

1. 基础SSE控制器

@RestController
public class SseController {
    
    @GetMapping(value = "/sse/notifications", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter createSseConnection() {
        // 设置超时时间(-1表示永不超时)
        SseEmitter emitter = new SseEmitter(-1L);
        
        // 添加连接成功事件
        try {
            emitter.send(SseEmitter.event()
                .name("connect")
                .data("连接成功"));
        } catch (IOException e) {
            emitter.completeWithError(e);
        }
        
        // 添加连接关闭回调
        emitter.onCompletion(() -> {
            log.info("SSE连接关闭");
        });
        
        emitter.onTimeout(() -> {
            log.warn("SSE连接超时");
            emitter.complete();
        });
        
        return emitter;
    }
}

2. 消息推送服务

@Service
public class SseNotificationService {
    
    // 存储所有活跃的SSE连接
    private final Map<String, SseEmitter> sseEmitters = new ConcurrentHashMap<>();
    
    public SseEmitter createConnection(String userId) {
        SseEmitter emitter = new SseEmitter(-1L);
        
        // 存储连接
        sseEmitters.put(userId, emitter);
        
        // 连接建立成功
        try {
            emitter.send(SseEmitter.event()
                .name("connect")
                .data("连接建立成功"));
        } catch (IOException e) {
            emitter.completeWithError(e);
        }
        
        // 连接关闭时清理
        emitter.onCompletion(() -> {
            sseEmitters.remove(userId);
        });
        
        emitter.onTimeout(() -> {
            emitter.complete();
        });
        
        return emitter;
    }
    
    public void sendNotification(String userId, NotificationMessage message) {
        SseEmitter emitter = sseEmitters.get(userId);
        if (emitter != null) {
            try {
                emitter.send(SseEmitter.event()
                    .name("notification")
                    .data(JSON.toJSONString(message)));
            } catch (IOException e) {
                // 发送失败,移除连接
                sseEmitters.remove(userId);
            }
        }
    }
    
    public void sendToAll(NotificationMessage message) {
        sseEmitters.forEach((userId, emitter) -> {
            try {
                emitter.send(SseEmitter.event()
                    .name("broadcast")
                    .data(JSON.toJSONString(message)));
            } catch (IOException e) {
                sseEmitters.remove(userId);
            }
        });
    }
}

3. 订单状态推送

@RestController
public class OrderController {
    
    @Autowired
    private SseNotificationService sseService;
    
    @Autowired
    private OrderService orderService;
    
    @GetMapping("/sse/order/{orderId}")
    public SseEmitter trackOrder(@PathVariable String orderId, 
                               @RequestParam String userId) {
        SseEmitter emitter = sseService.createConnection(userId + ":" + orderId);
        
        // 发送当前订单状态
        Order order = orderService.getOrderById(orderId);
        try {
            emitter.send(SseEmitter.event()
                .name("order_status")
                .data(JSON.toJSONString(order)));
        } catch (IOException e) {
            emitter.completeWithError(e);
        }
        
        return emitter;
    }
    
    // 订单状态变更时推送
    @EventListener
    public void handleOrderStatusChange(OrderStatusChangeEvent event) {
        Order order = event.getOrder();
        
        // 推送给订单相关的用户
        sseService.sendToAll(SseMessage.builder()
            .type("order_status_change")
            .data(order)
            .build());
    }
}

4. 客户端连接管理

@RestController
public class SseConnectionController {
    
    @Autowired
    private SseNotificationService sseService;
    
    @GetMapping("/sse/connect/{userId}")
    public SseEmitter connect(@PathVariable String userId) {
        return sseService.createConnection(userId);
    }
    
    @PostMapping("/sse/push/{userId}")
    public ResponseEntity<String> pushMessage(@PathVariable String userId, 
                                           @RequestBody PushMessage message) {
        sseService.sendNotification(userId, message);
        return ResponseEntity.ok("消息推送成功");
    }
    
    @GetMapping("/sse/online-users")
    public ResponseEntity<List<String>> getOnlineUsers() {
        List<String> onlineUsers = new ArrayList<>(sseService.getOnlineUserIds());
        return ResponseEntity.ok(onlineUsers);
    }
}

高级特性实现

1. 断线续传支持

@Service
public class SseWithReconnectService {
    
    private final Map<String, SseEmitter> sseEmitters = new ConcurrentHashMap<>();
    private final Map<String, Long> lastEventIds = new ConcurrentHashMap<>();
    
    public SseEmitter createConnection(String userId, 
                                    @RequestParam(required = false) Long lastEventId) {
        SseEmitter emitter = new SseEmitter(-1L);
        
        // 设置最后事件ID
        if (lastEventId != null) {
            lastEventIds.put(userId, lastEventId);
            // 从指定ID开始推送历史消息
            sendHistoricalMessages(emitter, userId, lastEventId);
        }
        
        sseEmitters.put(userId, emitter);
        
        emitter.onCompletion(() -> {
            sseEmitters.remove(userId);
            lastEventIds.remove(userId);
        });
        
        return emitter;
    }
    
    private void sendHistoricalMessages(SseEmitter emitter, String userId, Long lastEventId) {
        // 获取历史消息并推送
        List<NotificationMessage> historyMessages = 
            notificationService.getMessagesAfter(userId, lastEventId);
        
        historyMessages.forEach(message -> {
            try {
                emitter.send(SseEmitter.event()
                    .id(String.valueOf(message.getId()))
                    .name("history")
                    .data(JSON.toJSONString(message)));
            } catch (IOException e) {
                log.error("发送历史消息失败", e);
            }
        });
    }
}

2. 消息队列集成

@Service
public class SseMessageQueueService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @PostConstruct
    public void initMessageProcessor() {
        // 启动消息处理线程
        new Thread(this::processMessageQueue).start();
    }
    
    public void addMessageToQueue(NotificationMessage message) {
        String queueKey = "sse:message:queue";
        redisTemplate.opsForList().rightPush(queueKey, message);
    }
    
    private void processMessageQueue() {
        String queueKey = "sse:message:queue";
        while (true) {
            try {
                // 阻塞式获取消息
                NotificationMessage message = (NotificationMessage) 
                    redisTemplate.opsForList().leftPop(queueKey, Duration.ofSeconds(1));
                
                if (message != null) {
                    // 推送给所有连接
                    broadcastMessage(message);
                }
            } catch (Exception e) {
                log.error("处理消息队列失败", e);
            }
        }
    }
    
    private void broadcastMessage(NotificationMessage message) {
        // 推送给所有活跃连接
        // 实现逻辑...
    }
}

3. 分组推送

@Service
public class SseGroupService {
    
    private final Map<String, Set<String>> groupMembers = new ConcurrentHashMap<>();
    private final Map<String, SseEmitter> sseEmitters = new ConcurrentHashMap<>();
    
    public void joinGroup(String userId, String groupId) {
        groupMembers.computeIfAbsent(groupId, k -> ConcurrentHashMap.newKeySet())
            .add(userId);
    }
    
    public void leaveGroup(String userId, String groupId) {
        Set<String> members = groupMembers.get(groupId);
        if (members != null) {
            members.remove(userId);
        }
    }
    
    public void sendToGroup(String groupId, NotificationMessage message) {
        Set<String> members = groupMembers.get(groupId);
        if (members != null) {
            members.forEach(userId -> {
                SseEmitter emitter = sseEmitters.get(userId);
                if (emitter != null) {
                    try {
                        emitter.send(SseEmitter.event()
                            .name("group_message")
                            .data(JSON.toJSONString(message)));
                    } catch (IOException e) {
                        // 处理发送失败
                        sseEmitters.remove(userId);
                    }
                }
            });
        }
    }
}

性能优化建议

1. 连接池管理

@Component
public class SseConnectionPool {
    
    private final ExecutorService executorService = 
        Executors.newFixedThreadPool(20);
    
    public void sendAsync(String userId, NotificationMessage message) {
        executorService.submit(() -> {
            SseEmitter emitter = sseEmitters.get(userId);
            if (emitter != null) {
                try {
                    emitter.send(SseEmitter.event()
                        .name("async_message")
                        .data(JSON.toJSONString(message)));
                } catch (IOException e) {
                    sseEmitters.remove(userId);
                }
            }
        });
    }
}

2. 消息批量推送

@Service
public class BatchSseService {
    
    public void batchSend(List<String> userIds, NotificationMessage message) {
        List<SseEmitter> validEmitters = userIds.stream()
            .map(sseEmitters::get)
            .filter(Objects::nonNull)
            .collect(Collectors.toList());
        
        // 批量发送
        validEmitters.parallelStream().forEach(emitter -> {
            try {
                emitter.send(SseEmitter.event()
                    .name("batch_message")
                    .data(JSON.toJSONString(message)));
            } catch (IOException e) {
                // 异常处理
            }
        });
    }
}

客户端实现

JavaScript客户端

// 创建SSE连接
const eventSource = new EventSource('/api/sse/connect/' + userId);

// 监听连接事件
eventSource.addEventListener('connect', function(event) {
    console.log('连接建立成功');
});

// 监听通知事件
eventSource.addEventListener('notification', function(event) {
    const message = JSON.parse(event.data);
    showNotification(message);
});

// 监听订单状态事件
eventSource.addEventListener('order_status', function(event) {
    const order = JSON.parse(event.data);
    updateOrderStatus(order);
});

// 监听错误
eventSource.onerror = function(event) {
    console.error('SSE连接错误:', event);
};

// 断线重连处理
eventSource.addEventListener('error', function(event) {
    if (eventSource.readyState === EventSource.CLOSED) {
        // 重新连接
        setTimeout(() => {
            eventSource.close();
            // 重新创建连接
        }, 5000);
    }
});

安全考虑

1. 认证授权

@RestController
public class SecureSseController {
    
    @GetMapping("/sse/secure/{userId}")
    public SseEmitter createSecureConnection(
            @PathVariable String userId,
            @RequestHeader("Authorization") String token) {
        
        // 验证token
        if (!tokenService.validateToken(token, userId)) {
            throw new UnauthorizedException("认证失败");
        }
        
        return sseService.createConnection(userId);
    }
}

2. 消息过滤

public class MessageFilterService {
    
    public boolean isMessageAllowed(String userId, NotificationMessage message) {
        // 检查用户是否有权限接收该消息
        return permissionService.hasPermission(userId, message.getTopic());
    }
}

监控与运维

1. 连接监控

@Component
public class SseMetricsCollector {
    
    private final MeterRegistry meterRegistry;
    
    public void recordConnection(String userId) {
        Counter.builder("sse_connections_total")
            .tag("user_id", userId)
            .register(meterRegistry)
            .increment();
    }
    
    public void recordMessageSent(String userId) {
        Counter.builder("sse_messages_sent_total")
            .tag("user_id", userId)
            .register(meterRegistry)
            .increment();
    }
    
    public int getActiveConnections() {
        return sseService.getActiveConnectionCount();
    }
}

最佳实践

1. 连接管理

  • 合理设置超时时间:避免长时间占用连接
  • 定期清理无效连接:防止内存泄漏
  • 连接数限制:防止恶意连接

2. 消息设计

  • 消息格式统一:便于客户端处理
  • 消息大小控制:避免传输大消息
  • 消息类型标识:便于客户端区分处理

3. 错误处理

  • 优雅降级:SSE不可用时回退到轮询
  • 重试机制:连接失败时自动重试
  • 异常监控:及时发现和处理异常

总结

SSE是实现服务器推送消息的轻量级方案,相比传统的轮询方式和复杂的WebSocket,它具有以下优势:

  1. 简单易用:基于HTTP,实现简单
  2. 自动重连:无需手动处理连接管理
  3. 高效传输:减少无效请求
  4. 浏览器原生支持:无需额外库

记住,SSE适合单向服务器推送的场景,如果需要双向通信,还是需要WebSocket。但对大多数消息推送需求,SSE绝对是更优的选择!


标题:SpringBoot通过SSE实现消息推送:告别轮询,让实时消息推送更高效
作者:jiangyi
地址:http://www.jiangyi.space/articles/2026/01/03/1767453210016.html

    0 评论
avatar