SpringBoot + ResponseBodyEmitter 实时异步流式推送:告别轮询,让数据推送更高效

引言:实时数据推送的挑战

用户在线查看实时日志,但页面一直刷新才看到最新信息?或者需要实时显示系统状态,但轮询方式消耗资源又不及时?再或者要实现在线聊天功能,却发现WebSocket实现起来太复杂?

这就是传统数据推送方式的局限性。今天我们就来聊聊ResponseBodyEmitter,看看它如何解决这些痛点,让你的数据推送系统更高效、更稳定。

为什么传统方式有局限性?

先说说为什么传统的轮询方式不够用。

想象一下,你是一家运维平台的后端工程师。有1000个用户同时在线查看实时日志,如果用轮询方式:

  1. 每个用户每2秒轮询一次
  2. 1000用户就是每秒500次请求
  3. 大部分请求都是无效的(日志没有新内容)
  4. 服务器压力巨大

这会导致什么问题?

  • 服务器资源消耗巨大:大量无效请求
  • 响应不及时:轮询间隔时间内无法获取最新数据
  • 网络开销大:每次请求都包含完整HTTP头

ResponseBodyEmitter:SpringBoot的流式推送方案

ResponseBodyEmitter是Spring框架提供的流式响应工具,专门为服务器向客户端推送数据而设计:

  • 流式传输:支持持续数据推送
  • 异步处理:非阻塞式数据发送
  • 连接管理:自动处理连接状态
  • 错误处理:完善的异常处理机制
  • 内存友好:避免大数据量占用内存

ResponseBodyEmitter vs 其他方案对比

ResponseBodyEmitter优势

  • 比WebSocket简单,无需额外协议
  • 比SSE更灵活,支持复杂数据格式
  • 比轮询高效,减少无效请求
  • 基于HTTP,兼容性好

适用场景

  • 实时日志查看
  • 进度条更新
  • 数据流推送
  • 实时监控数据

SpringBoot集成ResponseBodyEmitter实战

1. 基础流式推送控制器

@RestController
public class StreamController {
    
    @GetMapping(value = "/stream/logs", produces = MediaType.TEXT_PLAIN_VALUE)
    public ResponseBodyEmitter streamLogs() {
        ResponseBodyEmitter emitter = new ResponseBodyEmitter();
        
        // 设置超时时间(-1表示永不超时)
        emitter.setTimeout(Long.MAX_VALUE);
        
        // 添加连接成功事件
        try {
            emitter.send("连接建立成功\n");
        } catch (IOException e) {
            emitter.completeWithError(e);
            return emitter;
        }
        
        // 添加连接关闭回调
        emitter.onCompletion(() -> {
            log.info("流式连接关闭");
        });
        
        emitter.onTimeout(() -> {
            log.warn("流式连接超时");
            emitter.complete();
        });
        
        // 启动异步日志推送
        startLogStreaming(emitter);
        
        return emitter;
    }
    
    private void startLogStreaming(ResponseBodyEmitter emitter) {
        // 模拟日志数据推送
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        
        scheduler.scheduleAtFixedRate(() -> {
            try {
                String logLine = String.format("[%s] %s\n", 
                    LocalDateTime.now(), 
                    "这是实时日志内容 " + System.currentTimeMillis());
                
                emitter.send(logLine);
            } catch (IOException e) {
                log.error("日志推送失败", e);
                emitter.completeWithError(e);
                scheduler.shutdown();
            }
        }, 0, 1, TimeUnit.SECONDS);
    }
}

2. 实时日志推送服务

@Service
public class RealTimeLogService {
    
    // 存储所有活跃的流式连接
    private final Map<String, ResponseBodyEmitter> activeEmitters = new ConcurrentHashMap<>();
    
    public ResponseBodyEmitter createLogStream(String userId, String logType) {
        String streamId = userId + ":" + logType;
        ResponseBodyEmitter emitter = new ResponseBodyEmitter();
        
        // 设置超时时间
        emitter.setTimeout(Long.MAX_VALUE);
        
        // 存储连接
        activeEmitters.put(streamId, emitter);
        
        // 连接建立成功
        try {
            emitter.send("实时日志流建立成功\n");
        } catch (IOException e) {
            emitter.completeWithError(e);
        }
        
        // 连接关闭时清理
        emitter.onCompletion(() -> {
            activeEmitters.remove(streamId);
            log.info("日志流连接关闭: {}", streamId);
        });
        
        emitter.onTimeout(() -> {
            emitter.complete();
        });
        
        return emitter;
    }
    
    public void pushLogToUser(String userId, String logType, String logMessage) {
        String streamId = userId + ":" + logType;
        ResponseBodyEmitter emitter = activeEmitters.get(streamId);
        
        if (emitter != null) {
            try {
                emitter.send(logMessage + "\n");
            } catch (IOException e) {
                // 发送失败,移除连接
                activeEmitters.remove(streamId);
                log.error("日志推送失败", e);
            }
        }
    }
    
    public void pushLogToAll(String logMessage) {
        activeEmitters.forEach((streamId, emitter) -> {
            try {
                emitter.send(logMessage + "\n");
            } catch (IOException e) {
                activeEmitters.remove(streamId);
            }
        });
    }
}

3. 进度条实时更新

@RestController
public class ProgressController {
    
    @Autowired
    private ProgressService progressService;
    
    @GetMapping("/stream/progress/{taskId}")
    public ResponseBodyEmitter streamProgress(@PathVariable String taskId) {
        ResponseBodyEmitter emitter = new ResponseBodyEmitter();
        
        emitter.setTimeout(Long.MAX_VALUE);
        
        // 发送初始进度
        try {
            emitter.send("开始任务\n");
        } catch (IOException e) {
            emitter.completeWithError(e);
            return emitter;
        }
        
        // 启动进度监控
        progressService.startMonitoring(taskId, progress -> {
            try {
                String progressMessage = String.format("进度: %d%% - %s\n", 
                    progress.getPercentage(), 
                    progress.getDescription());
                emitter.send(progressMessage);
                
                if (progress.isCompleted()) {
                    emitter.complete();
                }
            } catch (IOException e) {
                log.error("进度推送失败", e);
                emitter.completeWithError(e);
            }
        });
        
        return emitter;
    }
}

4. 数据流推送

@RestController
public class DataStreamController {
    
    @Autowired
    private DataStreamService dataStreamService;
    
    @GetMapping("/stream/data/{streamId}")
    public ResponseBodyEmitter streamData(@PathVariable String streamId,
                                       @RequestParam(defaultValue = "1000") long intervalMs) {
        ResponseBodyEmitter emitter = new ResponseBodyEmitter();
        
        emitter.setTimeout(Long.MAX_VALUE);
        
        // 启动数据流推送
        dataStreamService.startDataStreaming(streamId, intervalMs, data -> {
            try {
                // 将数据转换为JSON格式并发送
                String jsonData = JSON.toJSONString(data);
                emitter.send(jsonData + "\n");
            } catch (IOException e) {
                log.error("数据流推送失败", e);
                emitter.completeWithError(e);
            }
        });
        
        return emitter;
    }
}

高级特性实现

1. 连接池管理

@Service
public class StreamConnectionPool {
    
    private final Map<String, ResponseBodyEmitter> emitters = new ConcurrentHashMap<>();
    private final Map<String, ScheduledExecutorService> schedulers = new ConcurrentHashMap<>();
    
    public ResponseBodyEmitter createStream(String streamId, StreamConfig config) {
        ResponseBodyEmitter emitter = new ResponseBodyEmitter();
        emitter.setTimeout(config.getTimeout());
        
        // 存储连接
        emitters.put(streamId, emitter);
        
        // 创建调度器
        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
        schedulers.put(streamId, scheduler);
        
        // 连接管理回调
        emitter.onCompletion(() -> {
            emitters.remove(streamId);
            ScheduledExecutorService removedScheduler = schedulers.remove(streamId);
            if (removedScheduler != null) {
                removedScheduler.shutdown();
            }
        });
        
        emitter.onTimeout(() -> {
            emitter.complete();
        });
        
        return emitter;
    }
    
    public boolean sendToStream(String streamId, String data) {
        ResponseBodyEmitter emitter = emitters.get(streamId);
        if (emitter != null) {
            try {
                emitter.send(data + "\n");
                return true;
            } catch (IOException e) {
                emitters.remove(streamId);
                return false;
            }
        }
        return false;
    }
}

2. 缓存与批量推送

@Service
public class BatchStreamService {
    
    private final Map<String, List<String>> messageBuffers = new ConcurrentHashMap<>();
    private final ScheduledExecutorService batchScheduler = Executors.newScheduledThreadPool(5);
    
    public void addToBuffer(String streamId, String message) {
        messageBuffers.computeIfAbsent(streamId, k -> new ArrayList<>()).add(message);
    }
    
    public void startBatchProcessing(String streamId, int batchSize, long intervalMs) {
        batchScheduler.scheduleAtFixedRate(() -> {
            List<String> messages = messageBuffers.get(streamId);
            if (messages != null && !messages.isEmpty()) {
                synchronized (messages) {
                    if (!messages.isEmpty()) {
                        List<String> batch = new ArrayList<>();
                        int size = Math.min(batchSize, messages.size());
                        for (int i = 0; i < size; i++) {
                            batch.add(messages.remove(0));
                        }
                        
                        // 批量发送
                        sendBatchToStream(streamId, batch);
                    }
                }
            }
        }, 0, intervalMs, TimeUnit.MILLISECONDS);
    }
    
    private void sendBatchToStream(String streamId, List<String> messages) {
        String batchData = JSON.toJSONString(messages);
        // 发送到对应的流
        // 实现逻辑...
    }
}

3. 断线重连机制

@Service
public class ReconnectStreamService {
    
    private final Map<String, StreamSession> activeSessions = new ConcurrentHashMap<>();
    
    public ResponseBodyEmitter createReconnectableStream(String userId, 
                                                       String sessionId,
                                                       long lastEventId) {
        ResponseBodyEmitter emitter = new ResponseBodyEmitter();
        emitter.setTimeout(Long.MAX_VALUE);
        
        // 创建会话
        StreamSession session = new StreamSession(userId, sessionId, emitter, lastEventId);
        activeSessions.put(sessionId, session);
        
        // 发送历史数据(如果需要)
        if (lastEventId > 0) {
            sendHistoricalData(emitter, userId, lastEventId);
        }
        
        emitter.onCompletion(() -> {
            activeSessions.remove(sessionId);
        });
        
        return emitter;
    }
    
    private void sendHistoricalData(ResponseBodyEmitter emitter, String userId, long lastEventId) {
        // 获取历史数据并发送
        List<DataEvent> history = dataService.getEventsAfter(userId, lastEventId);
        history.forEach(event -> {
            try {
                emitter.send(JSON.toJSONString(event) + "\n");
            } catch (IOException e) {
                log.error("发送历史数据失败", e);
            }
        });
    }
}

性能优化建议

1. 内存管理

@Component
public class StreamMemoryManager {
    
    private final AtomicLong totalConnections = new AtomicLong(0);
    private final AtomicLong maxConnections = new AtomicLong(1000); // 最大连接数限制
    
    public boolean canAcceptNewConnection() {
        long current = totalConnections.get();
        return current < maxConnections.get();
    }
    
    public void registerConnection() {
        totalConnections.incrementAndGet();
    }
    
    public void unregisterConnection() {
        totalConnections.decrementAndGet();
    }
    
    @Scheduled(fixedRate = 60000) // 每分钟检查一次
    public void cleanupIdleConnections() {
        // 清理长时间无活动的连接
        // 实现逻辑...
    }
}

2. 异步处理优化

@Service
public class AsyncStreamService {
    
    @Autowired
    private TaskExecutor streamExecutor;
    
    public void sendAsync(String streamId, String data) {
        streamExecutor.execute(() -> {
            ResponseBodyEmitter emitter = emitters.get(streamId);
            if (emitter != null) {
                try {
                    emitter.send(data + "\n");
                } catch (IOException e) {
                    emitters.remove(streamId);
                }
            }
        });
    }
}

客户端实现

JavaScript客户端

// 创建流式连接
function createStreamConnection(url) {
    const xhr = new XMLHttpRequest();
    xhr.open('GET', url, true);
    xhr.onreadystatechange = function() {
        if (xhr.readyState === 3 || xhr.readyState === 4) {
            const newData = xhr.responseText.substring(xhr.lastIndex || 0);
            if (newData) {
                const lines = newData.split('\n');
                lines.forEach(line => {
                    if (line.trim()) {
                        handleStreamData(line);
                    }
                });
                xhr.lastIndex = xhr.responseText.length;
            }
        }
    };
    
    xhr.onerror = function() {
        console.error('流式连接错误');
        // 尝试重连
        setTimeout(() => createStreamConnection(url), 5000);
    };
    
    xhr.send();
}

// 或者使用fetch API
async function createFetchStream(url) {
    const response = await fetch(url);
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    
    while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        
        const chunk = decoder.decode(value);
        const lines = chunk.split('\n');
        lines.forEach(line => {
            if (line.trim()) {
                handleStreamData(line);
            }
        });
    }
}

安全考虑

1. 认证授权

@RestController
public class SecureStreamController {
    
    @GetMapping("/stream/secure/{userId}")
    public ResponseBodyEmitter createSecureStream(@PathVariable String userId,
                                               @RequestHeader("Authorization") String token) {
        
        // 验证token
        if (!tokenService.validateToken(token, userId)) {
            throw new UnauthorizedException("认证失败");
        }
        
        return streamService.createStream(userId);
    }
}

2. 速率限制

@Service
public class RateLimitedStreamService {
    
    private final Map<String, RateLimiter> rateLimiters = new ConcurrentHashMap<>();
    
    public boolean canSend(String userId, String streamType) {
        RateLimiter limiter = rateLimiters.computeIfAbsent(
            userId + ":" + streamType,
            k -> RateLimiter.create(10) // 每秒最多10条消息
        );
        
        return limiter.tryAcquire();
    }
    
    public void sendWithRateLimit(String userId, String streamType, String data) {
        if (canSend(userId, streamType)) {
            sendToStream(userId, data);
        } else {
            log.warn("用户 {} 发送频率过高", userId);
        }
    }
}

监控与运维

1. 连接监控

@Component
public class StreamMetricsCollector {
    
    private final MeterRegistry meterRegistry;
    
    public void recordConnection(String userId) {
        Counter.builder("stream_connections_total")
            .tag("user_id", userId)
            .register(meterRegistry)
            .increment();
    }
    
    public void recordDataSent(String userId, long dataSize) {
        Counter.builder("stream_data_sent_bytes_total")
            .tag("user_id", userId)
            .register(meterRegistry)
            .increment(dataSize);
    }
    
    public int getActiveConnections() {
        return streamService.getActiveConnectionCount();
    }
}

最佳实践

1. 连接管理

  • 合理设置超时时间:避免长时间占用连接
  • 定期清理无效连接:防止内存泄漏
  • 连接数限制:防止恶意连接

2. 数据格式

  • 统一数据格式:便于客户端处理
  • 消息大小控制:避免传输大消息
  • 编码处理:确保字符编码正确

3. 错误处理

  • 优雅降级:连接失败时回退到轮询
  • 重试机制:连接失败时自动重试
  • 异常监控:及时发现和处理异常

总结

ResponseBodyEmitter是实现服务器推送数据的高效方案,相比传统的轮询方式,它具有以下优势:

  1. 高效传输:减少无效请求
  2. 实时性好:数据变更立即推送
  3. 实现简单:基于HTTP协议
  4. 内存友好:流式处理大数据

记住,ResponseBodyEmitter适合单向服务器推送的场景,对于需要双向通信的场景,还是需要WebSocket。但对大多数实时数据推送需求,ResponseBodyEmitter绝对是更优的选择!


标题:SpringBoot + ResponseBodyEmitter 实时异步流式推送:告别轮询,让数据推送更高效
作者:jiangyi
地址:http://www.jiangyi.space/articles/2026/01/04/1767503226329.html

    0 评论
avatar