SpringBoot + ResponseBodyEmitter 实时异步流式推送:告别轮询,让数据推送更高效
引言:实时数据推送的挑战
用户在线查看实时日志,但页面一直刷新才看到最新信息?或者需要实时显示系统状态,但轮询方式消耗资源又不及时?再或者要实现在线聊天功能,却发现WebSocket实现起来太复杂?
这就是传统数据推送方式的局限性。今天我们就来聊聊ResponseBodyEmitter,看看它如何解决这些痛点,让你的数据推送系统更高效、更稳定。
为什么传统方式有局限性?
先说说为什么传统的轮询方式不够用。
想象一下,你是一家运维平台的后端工程师。有1000个用户同时在线查看实时日志,如果用轮询方式:
- 每个用户每2秒轮询一次
- 1000用户就是每秒500次请求
- 大部分请求都是无效的(日志没有新内容)
- 服务器压力巨大
这会导致什么问题?
- 服务器资源消耗巨大:大量无效请求
- 响应不及时:轮询间隔时间内无法获取最新数据
- 网络开销大:每次请求都包含完整HTTP头
ResponseBodyEmitter:SpringBoot的流式推送方案
ResponseBodyEmitter是Spring框架提供的流式响应工具,专门为服务器向客户端推送数据而设计:
- 流式传输:支持持续数据推送
- 异步处理:非阻塞式数据发送
- 连接管理:自动处理连接状态
- 错误处理:完善的异常处理机制
- 内存友好:避免大数据量占用内存
ResponseBodyEmitter vs 其他方案对比
ResponseBodyEmitter优势:
- 比WebSocket简单,无需额外协议
- 比SSE更灵活,支持复杂数据格式
- 比轮询高效,减少无效请求
- 基于HTTP,兼容性好
适用场景:
- 实时日志查看
- 进度条更新
- 数据流推送
- 实时监控数据
SpringBoot集成ResponseBodyEmitter实战
1. 基础流式推送控制器
@RestController
public class StreamController {
@GetMapping(value = "/stream/logs", produces = MediaType.TEXT_PLAIN_VALUE)
public ResponseBodyEmitter streamLogs() {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
// 设置超时时间(-1表示永不超时)
emitter.setTimeout(Long.MAX_VALUE);
// 添加连接成功事件
try {
emitter.send("连接建立成功\n");
} catch (IOException e) {
emitter.completeWithError(e);
return emitter;
}
// 添加连接关闭回调
emitter.onCompletion(() -> {
log.info("流式连接关闭");
});
emitter.onTimeout(() -> {
log.warn("流式连接超时");
emitter.complete();
});
// 启动异步日志推送
startLogStreaming(emitter);
return emitter;
}
private void startLogStreaming(ResponseBodyEmitter emitter) {
// 模拟日志数据推送
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
try {
String logLine = String.format("[%s] %s\n",
LocalDateTime.now(),
"这是实时日志内容 " + System.currentTimeMillis());
emitter.send(logLine);
} catch (IOException e) {
log.error("日志推送失败", e);
emitter.completeWithError(e);
scheduler.shutdown();
}
}, 0, 1, TimeUnit.SECONDS);
}
}
2. 实时日志推送服务
@Service
public class RealTimeLogService {
// 存储所有活跃的流式连接
private final Map<String, ResponseBodyEmitter> activeEmitters = new ConcurrentHashMap<>();
public ResponseBodyEmitter createLogStream(String userId, String logType) {
String streamId = userId + ":" + logType;
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
// 设置超时时间
emitter.setTimeout(Long.MAX_VALUE);
// 存储连接
activeEmitters.put(streamId, emitter);
// 连接建立成功
try {
emitter.send("实时日志流建立成功\n");
} catch (IOException e) {
emitter.completeWithError(e);
}
// 连接关闭时清理
emitter.onCompletion(() -> {
activeEmitters.remove(streamId);
log.info("日志流连接关闭: {}", streamId);
});
emitter.onTimeout(() -> {
emitter.complete();
});
return emitter;
}
public void pushLogToUser(String userId, String logType, String logMessage) {
String streamId = userId + ":" + logType;
ResponseBodyEmitter emitter = activeEmitters.get(streamId);
if (emitter != null) {
try {
emitter.send(logMessage + "\n");
} catch (IOException e) {
// 发送失败,移除连接
activeEmitters.remove(streamId);
log.error("日志推送失败", e);
}
}
}
public void pushLogToAll(String logMessage) {
activeEmitters.forEach((streamId, emitter) -> {
try {
emitter.send(logMessage + "\n");
} catch (IOException e) {
activeEmitters.remove(streamId);
}
});
}
}
3. 进度条实时更新
@RestController
public class ProgressController {
@Autowired
private ProgressService progressService;
@GetMapping("/stream/progress/{taskId}")
public ResponseBodyEmitter streamProgress(@PathVariable String taskId) {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
emitter.setTimeout(Long.MAX_VALUE);
// 发送初始进度
try {
emitter.send("开始任务\n");
} catch (IOException e) {
emitter.completeWithError(e);
return emitter;
}
// 启动进度监控
progressService.startMonitoring(taskId, progress -> {
try {
String progressMessage = String.format("进度: %d%% - %s\n",
progress.getPercentage(),
progress.getDescription());
emitter.send(progressMessage);
if (progress.isCompleted()) {
emitter.complete();
}
} catch (IOException e) {
log.error("进度推送失败", e);
emitter.completeWithError(e);
}
});
return emitter;
}
}
4. 数据流推送
@RestController
public class DataStreamController {
@Autowired
private DataStreamService dataStreamService;
@GetMapping("/stream/data/{streamId}")
public ResponseBodyEmitter streamData(@PathVariable String streamId,
@RequestParam(defaultValue = "1000") long intervalMs) {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
emitter.setTimeout(Long.MAX_VALUE);
// 启动数据流推送
dataStreamService.startDataStreaming(streamId, intervalMs, data -> {
try {
// 将数据转换为JSON格式并发送
String jsonData = JSON.toJSONString(data);
emitter.send(jsonData + "\n");
} catch (IOException e) {
log.error("数据流推送失败", e);
emitter.completeWithError(e);
}
});
return emitter;
}
}
高级特性实现
1. 连接池管理
@Service
public class StreamConnectionPool {
private final Map<String, ResponseBodyEmitter> emitters = new ConcurrentHashMap<>();
private final Map<String, ScheduledExecutorService> schedulers = new ConcurrentHashMap<>();
public ResponseBodyEmitter createStream(String streamId, StreamConfig config) {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
emitter.setTimeout(config.getTimeout());
// 存储连接
emitters.put(streamId, emitter);
// 创建调度器
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
schedulers.put(streamId, scheduler);
// 连接管理回调
emitter.onCompletion(() -> {
emitters.remove(streamId);
ScheduledExecutorService removedScheduler = schedulers.remove(streamId);
if (removedScheduler != null) {
removedScheduler.shutdown();
}
});
emitter.onTimeout(() -> {
emitter.complete();
});
return emitter;
}
public boolean sendToStream(String streamId, String data) {
ResponseBodyEmitter emitter = emitters.get(streamId);
if (emitter != null) {
try {
emitter.send(data + "\n");
return true;
} catch (IOException e) {
emitters.remove(streamId);
return false;
}
}
return false;
}
}
2. 缓存与批量推送
@Service
public class BatchStreamService {
private final Map<String, List<String>> messageBuffers = new ConcurrentHashMap<>();
private final ScheduledExecutorService batchScheduler = Executors.newScheduledThreadPool(5);
public void addToBuffer(String streamId, String message) {
messageBuffers.computeIfAbsent(streamId, k -> new ArrayList<>()).add(message);
}
public void startBatchProcessing(String streamId, int batchSize, long intervalMs) {
batchScheduler.scheduleAtFixedRate(() -> {
List<String> messages = messageBuffers.get(streamId);
if (messages != null && !messages.isEmpty()) {
synchronized (messages) {
if (!messages.isEmpty()) {
List<String> batch = new ArrayList<>();
int size = Math.min(batchSize, messages.size());
for (int i = 0; i < size; i++) {
batch.add(messages.remove(0));
}
// 批量发送
sendBatchToStream(streamId, batch);
}
}
}
}, 0, intervalMs, TimeUnit.MILLISECONDS);
}
private void sendBatchToStream(String streamId, List<String> messages) {
String batchData = JSON.toJSONString(messages);
// 发送到对应的流
// 实现逻辑...
}
}
3. 断线重连机制
@Service
public class ReconnectStreamService {
private final Map<String, StreamSession> activeSessions = new ConcurrentHashMap<>();
public ResponseBodyEmitter createReconnectableStream(String userId,
String sessionId,
long lastEventId) {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
emitter.setTimeout(Long.MAX_VALUE);
// 创建会话
StreamSession session = new StreamSession(userId, sessionId, emitter, lastEventId);
activeSessions.put(sessionId, session);
// 发送历史数据(如果需要)
if (lastEventId > 0) {
sendHistoricalData(emitter, userId, lastEventId);
}
emitter.onCompletion(() -> {
activeSessions.remove(sessionId);
});
return emitter;
}
private void sendHistoricalData(ResponseBodyEmitter emitter, String userId, long lastEventId) {
// 获取历史数据并发送
List<DataEvent> history = dataService.getEventsAfter(userId, lastEventId);
history.forEach(event -> {
try {
emitter.send(JSON.toJSONString(event) + "\n");
} catch (IOException e) {
log.error("发送历史数据失败", e);
}
});
}
}
性能优化建议
1. 内存管理
@Component
public class StreamMemoryManager {
private final AtomicLong totalConnections = new AtomicLong(0);
private final AtomicLong maxConnections = new AtomicLong(1000); // 最大连接数限制
public boolean canAcceptNewConnection() {
long current = totalConnections.get();
return current < maxConnections.get();
}
public void registerConnection() {
totalConnections.incrementAndGet();
}
public void unregisterConnection() {
totalConnections.decrementAndGet();
}
@Scheduled(fixedRate = 60000) // 每分钟检查一次
public void cleanupIdleConnections() {
// 清理长时间无活动的连接
// 实现逻辑...
}
}
2. 异步处理优化
@Service
public class AsyncStreamService {
@Autowired
private TaskExecutor streamExecutor;
public void sendAsync(String streamId, String data) {
streamExecutor.execute(() -> {
ResponseBodyEmitter emitter = emitters.get(streamId);
if (emitter != null) {
try {
emitter.send(data + "\n");
} catch (IOException e) {
emitters.remove(streamId);
}
}
});
}
}
客户端实现
JavaScript客户端
// 创建流式连接
function createStreamConnection(url) {
const xhr = new XMLHttpRequest();
xhr.open('GET', url, true);
xhr.onreadystatechange = function() {
if (xhr.readyState === 3 || xhr.readyState === 4) {
const newData = xhr.responseText.substring(xhr.lastIndex || 0);
if (newData) {
const lines = newData.split('\n');
lines.forEach(line => {
if (line.trim()) {
handleStreamData(line);
}
});
xhr.lastIndex = xhr.responseText.length;
}
}
};
xhr.onerror = function() {
console.error('流式连接错误');
// 尝试重连
setTimeout(() => createStreamConnection(url), 5000);
};
xhr.send();
}
// 或者使用fetch API
async function createFetchStream(url) {
const response = await fetch(url);
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
lines.forEach(line => {
if (line.trim()) {
handleStreamData(line);
}
});
}
}
安全考虑
1. 认证授权
@RestController
public class SecureStreamController {
@GetMapping("/stream/secure/{userId}")
public ResponseBodyEmitter createSecureStream(@PathVariable String userId,
@RequestHeader("Authorization") String token) {
// 验证token
if (!tokenService.validateToken(token, userId)) {
throw new UnauthorizedException("认证失败");
}
return streamService.createStream(userId);
}
}
2. 速率限制
@Service
public class RateLimitedStreamService {
private final Map<String, RateLimiter> rateLimiters = new ConcurrentHashMap<>();
public boolean canSend(String userId, String streamType) {
RateLimiter limiter = rateLimiters.computeIfAbsent(
userId + ":" + streamType,
k -> RateLimiter.create(10) // 每秒最多10条消息
);
return limiter.tryAcquire();
}
public void sendWithRateLimit(String userId, String streamType, String data) {
if (canSend(userId, streamType)) {
sendToStream(userId, data);
} else {
log.warn("用户 {} 发送频率过高", userId);
}
}
}
监控与运维
1. 连接监控
@Component
public class StreamMetricsCollector {
private final MeterRegistry meterRegistry;
public void recordConnection(String userId) {
Counter.builder("stream_connections_total")
.tag("user_id", userId)
.register(meterRegistry)
.increment();
}
public void recordDataSent(String userId, long dataSize) {
Counter.builder("stream_data_sent_bytes_total")
.tag("user_id", userId)
.register(meterRegistry)
.increment(dataSize);
}
public int getActiveConnections() {
return streamService.getActiveConnectionCount();
}
}
最佳实践
1. 连接管理
- 合理设置超时时间:避免长时间占用连接
- 定期清理无效连接:防止内存泄漏
- 连接数限制:防止恶意连接
2. 数据格式
- 统一数据格式:便于客户端处理
- 消息大小控制:避免传输大消息
- 编码处理:确保字符编码正确
3. 错误处理
- 优雅降级:连接失败时回退到轮询
- 重试机制:连接失败时自动重试
- 异常监控:及时发现和处理异常
总结
ResponseBodyEmitter是实现服务器推送数据的高效方案,相比传统的轮询方式,它具有以下优势:
- 高效传输:减少无效请求
- 实时性好:数据变更立即推送
- 实现简单:基于HTTP协议
- 内存友好:流式处理大数据
记住,ResponseBodyEmitter适合单向服务器推送的场景,对于需要双向通信的场景,还是需要WebSocket。但对大多数实时数据推送需求,ResponseBodyEmitter绝对是更优的选择!
标题:SpringBoot + ResponseBodyEmitter 实时异步流式推送:告别轮询,让数据推送更高效
作者:jiangyi
地址:http://www.jiangyi.space/articles/2026/01/04/1767503226329.html
- 引言:实时数据推送的挑战
- 为什么传统方式有局限性?
- ResponseBodyEmitter:SpringBoot的流式推送方案
- ResponseBodyEmitter vs 其他方案对比
- SpringBoot集成ResponseBodyEmitter实战
- 1. 基础流式推送控制器
- 2. 实时日志推送服务
- 3. 进度条实时更新
- 4. 数据流推送
- 高级特性实现
- 1. 连接池管理
- 2. 缓存与批量推送
- 3. 断线重连机制
- 性能优化建议
- 1. 内存管理
- 2. 异步处理优化
- 客户端实现
- JavaScript客户端
- 安全考虑
- 1. 认证授权
- 2. 速率限制
- 监控与运维
- 1. 连接监控
- 最佳实践
- 1. 连接管理
- 2. 数据格式
- 3. 错误处理
- 总结
0 评论