SpringBoot通过SSE实现消息推送:告别轮询,让实时消息推送更高效
引言:消息推送的演进之路
用户在线等待订单状态更新,但页面一直显示"处理中"?或者需要实时显示系统通知,但轮询方式消耗资源又不及时?再或者要实现在线聊天功能,却发现WebSocket实现起来太复杂?
这就是传统消息推送方式的局限性。今天我们就来聊聊SSE(Server-Sent Events),看看它如何解决这些痛点,让你的消息推送系统更高效、更稳定。
为什么传统方式有局限性?
先说说为什么传统的轮询方式不够用。
想象一下,你是一家电商平台的后端工程师。有100万用户同时在线查看订单状态,如果用轮询方式:
- 每个用户每5秒轮询一次
- 100万用户就是每秒20万次请求
- 服务器压力巨大
- 大部分请求都是无效的(订单状态没变)
这会导致什么问题?
- 服务器资源消耗巨大:大量无效请求
- 响应不及时:轮询间隔时间内无法获取最新状态
- 网络开销大:每次请求都包含完整HTTP头
SSE:服务器推送消息的轻量级方案
SSE(Server-Sent Events)是HTML5规范的一部分,专门为服务器向客户端推送消息而设计:
- 单向通信:服务器向客户端推送
- 自动重连:连接断开后自动重连
- 事件ID:支持断线续传
- 轻量级:基于HTTP协议
- 浏览器原生支持:无需额外库
SSE vs WebSocket vs 轮询
SSE优势:
- 比WebSocket简单,实现成本低
- 比轮询高效,减少无效请求
- 自动重连,无需手动处理
- 浏览器原生支持
适用场景:
- 订单状态更新
- 系统通知推送
- 实时日志查看
- 股票价格更新
SpringBoot集成SSE实战
1. 基础SSE控制器
@RestController
public class SseController {
@GetMapping(value = "/sse/notifications", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter createSseConnection() {
// 设置超时时间(-1表示永不超时)
SseEmitter emitter = new SseEmitter(-1L);
// 添加连接成功事件
try {
emitter.send(SseEmitter.event()
.name("connect")
.data("连接成功"));
} catch (IOException e) {
emitter.completeWithError(e);
}
// 添加连接关闭回调
emitter.onCompletion(() -> {
log.info("SSE连接关闭");
});
emitter.onTimeout(() -> {
log.warn("SSE连接超时");
emitter.complete();
});
return emitter;
}
}
2. 消息推送服务
@Service
public class SseNotificationService {
// 存储所有活跃的SSE连接
private final Map<String, SseEmitter> sseEmitters = new ConcurrentHashMap<>();
public SseEmitter createConnection(String userId) {
SseEmitter emitter = new SseEmitter(-1L);
// 存储连接
sseEmitters.put(userId, emitter);
// 连接建立成功
try {
emitter.send(SseEmitter.event()
.name("connect")
.data("连接建立成功"));
} catch (IOException e) {
emitter.completeWithError(e);
}
// 连接关闭时清理
emitter.onCompletion(() -> {
sseEmitters.remove(userId);
});
emitter.onTimeout(() -> {
emitter.complete();
});
return emitter;
}
public void sendNotification(String userId, NotificationMessage message) {
SseEmitter emitter = sseEmitters.get(userId);
if (emitter != null) {
try {
emitter.send(SseEmitter.event()
.name("notification")
.data(JSON.toJSONString(message)));
} catch (IOException e) {
// 发送失败,移除连接
sseEmitters.remove(userId);
}
}
}
public void sendToAll(NotificationMessage message) {
sseEmitters.forEach((userId, emitter) -> {
try {
emitter.send(SseEmitter.event()
.name("broadcast")
.data(JSON.toJSONString(message)));
} catch (IOException e) {
sseEmitters.remove(userId);
}
});
}
}
3. 订单状态推送
@RestController
public class OrderController {
@Autowired
private SseNotificationService sseService;
@Autowired
private OrderService orderService;
@GetMapping("/sse/order/{orderId}")
public SseEmitter trackOrder(@PathVariable String orderId,
@RequestParam String userId) {
SseEmitter emitter = sseService.createConnection(userId + ":" + orderId);
// 发送当前订单状态
Order order = orderService.getOrderById(orderId);
try {
emitter.send(SseEmitter.event()
.name("order_status")
.data(JSON.toJSONString(order)));
} catch (IOException e) {
emitter.completeWithError(e);
}
return emitter;
}
// 订单状态变更时推送
@EventListener
public void handleOrderStatusChange(OrderStatusChangeEvent event) {
Order order = event.getOrder();
// 推送给订单相关的用户
sseService.sendToAll(SseMessage.builder()
.type("order_status_change")
.data(order)
.build());
}
}
4. 客户端连接管理
@RestController
public class SseConnectionController {
@Autowired
private SseNotificationService sseService;
@GetMapping("/sse/connect/{userId}")
public SseEmitter connect(@PathVariable String userId) {
return sseService.createConnection(userId);
}
@PostMapping("/sse/push/{userId}")
public ResponseEntity<String> pushMessage(@PathVariable String userId,
@RequestBody PushMessage message) {
sseService.sendNotification(userId, message);
return ResponseEntity.ok("消息推送成功");
}
@GetMapping("/sse/online-users")
public ResponseEntity<List<String>> getOnlineUsers() {
List<String> onlineUsers = new ArrayList<>(sseService.getOnlineUserIds());
return ResponseEntity.ok(onlineUsers);
}
}
高级特性实现
1. 断线续传支持
@Service
public class SseWithReconnectService {
private final Map<String, SseEmitter> sseEmitters = new ConcurrentHashMap<>();
private final Map<String, Long> lastEventIds = new ConcurrentHashMap<>();
public SseEmitter createConnection(String userId,
@RequestParam(required = false) Long lastEventId) {
SseEmitter emitter = new SseEmitter(-1L);
// 设置最后事件ID
if (lastEventId != null) {
lastEventIds.put(userId, lastEventId);
// 从指定ID开始推送历史消息
sendHistoricalMessages(emitter, userId, lastEventId);
}
sseEmitters.put(userId, emitter);
emitter.onCompletion(() -> {
sseEmitters.remove(userId);
lastEventIds.remove(userId);
});
return emitter;
}
private void sendHistoricalMessages(SseEmitter emitter, String userId, Long lastEventId) {
// 获取历史消息并推送
List<NotificationMessage> historyMessages =
notificationService.getMessagesAfter(userId, lastEventId);
historyMessages.forEach(message -> {
try {
emitter.send(SseEmitter.event()
.id(String.valueOf(message.getId()))
.name("history")
.data(JSON.toJSONString(message)));
} catch (IOException e) {
log.error("发送历史消息失败", e);
}
});
}
}
2. 消息队列集成
@Service
public class SseMessageQueueService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@PostConstruct
public void initMessageProcessor() {
// 启动消息处理线程
new Thread(this::processMessageQueue).start();
}
public void addMessageToQueue(NotificationMessage message) {
String queueKey = "sse:message:queue";
redisTemplate.opsForList().rightPush(queueKey, message);
}
private void processMessageQueue() {
String queueKey = "sse:message:queue";
while (true) {
try {
// 阻塞式获取消息
NotificationMessage message = (NotificationMessage)
redisTemplate.opsForList().leftPop(queueKey, Duration.ofSeconds(1));
if (message != null) {
// 推送给所有连接
broadcastMessage(message);
}
} catch (Exception e) {
log.error("处理消息队列失败", e);
}
}
}
private void broadcastMessage(NotificationMessage message) {
// 推送给所有活跃连接
// 实现逻辑...
}
}
3. 分组推送
@Service
public class SseGroupService {
private final Map<String, Set<String>> groupMembers = new ConcurrentHashMap<>();
private final Map<String, SseEmitter> sseEmitters = new ConcurrentHashMap<>();
public void joinGroup(String userId, String groupId) {
groupMembers.computeIfAbsent(groupId, k -> ConcurrentHashMap.newKeySet())
.add(userId);
}
public void leaveGroup(String userId, String groupId) {
Set<String> members = groupMembers.get(groupId);
if (members != null) {
members.remove(userId);
}
}
public void sendToGroup(String groupId, NotificationMessage message) {
Set<String> members = groupMembers.get(groupId);
if (members != null) {
members.forEach(userId -> {
SseEmitter emitter = sseEmitters.get(userId);
if (emitter != null) {
try {
emitter.send(SseEmitter.event()
.name("group_message")
.data(JSON.toJSONString(message)));
} catch (IOException e) {
// 处理发送失败
sseEmitters.remove(userId);
}
}
});
}
}
}
性能优化建议
1. 连接池管理
@Component
public class SseConnectionPool {
private final ExecutorService executorService =
Executors.newFixedThreadPool(20);
public void sendAsync(String userId, NotificationMessage message) {
executorService.submit(() -> {
SseEmitter emitter = sseEmitters.get(userId);
if (emitter != null) {
try {
emitter.send(SseEmitter.event()
.name("async_message")
.data(JSON.toJSONString(message)));
} catch (IOException e) {
sseEmitters.remove(userId);
}
}
});
}
}
2. 消息批量推送
@Service
public class BatchSseService {
public void batchSend(List<String> userIds, NotificationMessage message) {
List<SseEmitter> validEmitters = userIds.stream()
.map(sseEmitters::get)
.filter(Objects::nonNull)
.collect(Collectors.toList());
// 批量发送
validEmitters.parallelStream().forEach(emitter -> {
try {
emitter.send(SseEmitter.event()
.name("batch_message")
.data(JSON.toJSONString(message)));
} catch (IOException e) {
// 异常处理
}
});
}
}
客户端实现
JavaScript客户端
// 创建SSE连接
const eventSource = new EventSource('/api/sse/connect/' + userId);
// 监听连接事件
eventSource.addEventListener('connect', function(event) {
console.log('连接建立成功');
});
// 监听通知事件
eventSource.addEventListener('notification', function(event) {
const message = JSON.parse(event.data);
showNotification(message);
});
// 监听订单状态事件
eventSource.addEventListener('order_status', function(event) {
const order = JSON.parse(event.data);
updateOrderStatus(order);
});
// 监听错误
eventSource.onerror = function(event) {
console.error('SSE连接错误:', event);
};
// 断线重连处理
eventSource.addEventListener('error', function(event) {
if (eventSource.readyState === EventSource.CLOSED) {
// 重新连接
setTimeout(() => {
eventSource.close();
// 重新创建连接
}, 5000);
}
});
安全考虑
1. 认证授权
@RestController
public class SecureSseController {
@GetMapping("/sse/secure/{userId}")
public SseEmitter createSecureConnection(
@PathVariable String userId,
@RequestHeader("Authorization") String token) {
// 验证token
if (!tokenService.validateToken(token, userId)) {
throw new UnauthorizedException("认证失败");
}
return sseService.createConnection(userId);
}
}
2. 消息过滤
public class MessageFilterService {
public boolean isMessageAllowed(String userId, NotificationMessage message) {
// 检查用户是否有权限接收该消息
return permissionService.hasPermission(userId, message.getTopic());
}
}
监控与运维
1. 连接监控
@Component
public class SseMetricsCollector {
private final MeterRegistry meterRegistry;
public void recordConnection(String userId) {
Counter.builder("sse_connections_total")
.tag("user_id", userId)
.register(meterRegistry)
.increment();
}
public void recordMessageSent(String userId) {
Counter.builder("sse_messages_sent_total")
.tag("user_id", userId)
.register(meterRegistry)
.increment();
}
public int getActiveConnections() {
return sseService.getActiveConnectionCount();
}
}
最佳实践
1. 连接管理
- 合理设置超时时间:避免长时间占用连接
- 定期清理无效连接:防止内存泄漏
- 连接数限制:防止恶意连接
2. 消息设计
- 消息格式统一:便于客户端处理
- 消息大小控制:避免传输大消息
- 消息类型标识:便于客户端区分处理
3. 错误处理
- 优雅降级:SSE不可用时回退到轮询
- 重试机制:连接失败时自动重试
- 异常监控:及时发现和处理异常
总结
SSE是实现服务器推送消息的轻量级方案,相比传统的轮询方式和复杂的WebSocket,它具有以下优势:
- 简单易用:基于HTTP,实现简单
- 自动重连:无需手动处理连接管理
- 高效传输:减少无效请求
- 浏览器原生支持:无需额外库
记住,SSE适合单向服务器推送的场景,如果需要双向通信,还是需要WebSocket。但对大多数消息推送需求,SSE绝对是更优的选择!
标题:SpringBoot通过SSE实现消息推送:告别轮询,让实时消息推送更高效
作者:jiangyi
地址:http://www.jiangyi.space/articles/2026/01/03/1767453210016.html
0 评论