WebSocket 广播风暴抑制:万人在线群发通知?合并小包+二进制压缩,带宽节省 70%!
做过实时消息推送的同学肯定都遇到过这个问题:当有大量用户在线时,频繁的小消息广播会导致带宽急剧上升,甚至引发广播风暴。特别是在直播弹幕、实时通知、多人协作等场景,这个问题尤为突出。
我之前就遇到过这样一个案例:一个直播平台在搞活动时,同时在线人数突破了 10 万。运营人员发了一条弹幕抽奖消息,结果瞬间产生了 10 万条 WebSocket 消息推送,导致服务器带宽直接跑满,部分用户出现消息延迟甚至断线重连。
今天我们就来聊聊 WebSocket 广播风暴的抑制方案,让您的系统轻松应对万人在线的消息推送场景。
广播风暴的根本原因
1. 小消息频繁发送
这是最常见的情况:
场景:10 万在线用户 + 每秒 10 条小消息
每条消息大小:100 bytes
每秒发送量:10 万 × 10 = 100 万条
每秒带宽消耗:100 万 × 100 bytes = 100MB/s ≈ 800Mbps
如果每条消息有 50 字节的协议开销:
实际带宽消耗:100 万 × 150 bytes = 150MB/s ≈ 1.2Gbps
2. TCP 小包问题
TCP 小包的问题:
┌─────────────────────────────────────────────────────────────┐
│ 原始消息:{"type":"ping","ts":1234567890} (30 bytes) │
├─────────────────────────────────────────────────────────────┤
│ TCP 协议栈处理: │
│ - TCP 头部:20 bytes │
│ - IP 头部:20 bytes │
│ - 实际数据:30 bytes │
│ - 总大小:70 bytes │
│ - 有效载荷占比:30/70 ≈ 43% │
└─────────────────────────────────────────────────────────────┘
3. 缺乏消息合并机制
问题分析:
- 短时间内多条小消息分别发送
- 每条消息都有独立的 TCP 头部开销
- 网络传输效率低下
- 频繁的网络唤醒消耗资源
解决方案:消息合并 + 二进制压缩
1. 核心设计思想
我们的方案核心是三个关键技术:
- 消息合并(Message Batching):将短时间内的多条消息合并成一个批次发送
- 二进制序列化(Binary Serialization):使用 Protobuf/MessagePack 替代 JSON
- 压缩传输(Compression):对合并后的消息进行压缩
架构图如下:
┌─────────────────────────────────────────────────────────────┐
│ WebSocket 广播风暴抑制系统 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────────┐ ┌──────────────────┐ │
│ │ 业务消息 │───→│ 消息队列 │───→│ 消息合并器 │ │
│ │ (多条) │ │ (缓冲区) │ │ (定时/阈值触发) │ │
│ └──────────┘ └──────────────┘ └──────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────┐ ┌──────────────┐ ┌──────────────────┐ │
│ │ 压缩器 │←───│ 二进制序列化 │←───│ 合并后的消息 │ │
│ │ (gzip) │ │ (Protobuf) │ │ │ │
│ └──────────┘ └──────────────┘ └──────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ 广播到所有在线客户端 │ │
│ └───────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
2. 消息合并策略
消息合并策略:
┌─────────────────────────────────────────────────────────────┐
│ 触发条件(满足任一即可) │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 时间触发:每隔 N 毫秒发送一次 │
│ - 例如:每 100ms 发送一次合并消息 │
│ │
│ 2. 数量触发:累积到 M 条消息发送 │
│ - 例如:累积到 10 条消息就发送 │
│ │
│ 3. 大小触发:累积消息大小达到 K bytes │
│ - 例如:累积到 1KB 就发送 │
│ │
└─────────────────────────────────────────────────────────────┘
3. 二进制序列化对比
序列化格式对比:
┌──────────────┬─────────────┬─────────────┬─────────────┐
│ 格式 │ 原始大小 │ 序列化后 │ 压缩率 │
├──────────────┼─────────────┼─────────────┼─────────────┤
│ JSON │ 100 bytes │ 100 bytes │ 0% │
│ MessagePack │ 100 bytes │ 65 bytes │ 35% │
│ Protobuf │ 100 bytes │ 45 bytes │ 55% │
└──────────────┴─────────────┴─────────────┴─────────────┘
4. 压缩传输效果
压缩效果对比:
原始场景:
- 10 万用户 × 100 bytes/条 = 10MB
使用二进制序列化:
- 10 万用户 × 50 bytes/条 = 5MB(节省 50%)
再加上 gzip 压缩:
- 5MB × 30% = 1.5MB(总共节省 85%)
实战方案实现
1. 消息合并器
// 消息合并器
class MessageBatchService {
private Queue<Message> messageQueue = new ConcurrentLinkedQueue<>();
private ScheduledExecutorService scheduler;
private long batchIntervalMs = 100;
private int maxBatchSize = 10;
function init() {
scheduler.scheduleAtFixedRate(() -> {
flushBatch();
}, 0, batchIntervalMs, TimeUnit.MILLISECONDS);
}
function addMessage(Message msg) {
messageQueue.offer(msg);
if (messageQueue.size() >= maxBatchSize) {
flushBatch();
}
}
private function flushBatch() {
List<Message> batch = new ArrayList<>();
messageQueue.drainTo(batch);
if (batch.isEmpty()) {
return;
}
byte[] serialized = serializeBatch(batch);
byte[] compressed = compress(serialized);
broadcast(compressed);
}
}
2. 二进制序列化
// Protobuf 消息定义
message BroadcastMessage {
string type = 1;
int64 timestamp = 2;
bytes payload = 3;
}
message MessageBatch {
repeated BroadcastMessage messages = 1;
}
// 序列化服务
class BinarySerializationService {
function serializeBatch(List<Message> messages):
MessageBatch batch = MessageBatch.newBuilder()
for message in messages:
BroadcastMessage protoMsg = BroadcastMessage.newBuilder()
.setType(message.getType())
.setTimestamp(message.getTimestamp())
.setPayload(message.getPayload())
.build()
batch.addMessages(protoMsg)
return batch.toByteArray()
}
3. 压缩服务
// 压缩服务
class CompressionService {
private static final int MIN_COMPRESS_SIZE = 512;
function compress(data):
if data.length < MIN_COMPRESS_SIZE:
return data
ByteArrayOutputStream baos = new ByteArrayOutputStream()
GZIPOutputStream gzip = new GZIPOutputStream(baos)
gzip.write(data)
gzip.close()
return baos.toByteArray()
function decompress(data):
ByteArrayInputStream bais = new ByteArrayInputStream(data)
GZIPInputStream gzip = new GZIPInputStream(bais)
ByteArrayOutputStream baos = new ByteArrayOutputStream()
copyStream(gzip, baos)
return baos.toByteArray()
}
4. 广播服务
// WebSocket 广播服务
class WebSocketBroadcastService {
private Set<WebSocketSession> sessions = ConcurrentHashMap.newKeySet()
function addSession(session):
sessions.add(session)
function removeSession(session):
sessions.remove(session)
function broadcast(data):
for session in sessions:
if session.isOpen():
try:
session.getBasicRemote().sendBinary(data)
catch:
removeSession(session)
function getOnlineCount():
return sessions.size()
}
最佳实践与注意事项
1. 消息优先级处理
消息优先级策略:
function addMessage(Message msg, priority):
if priority == HIGH:
// 高优先级消息立即发送
broadcast(serialize(msg))
else:
// 普通优先级加入合并队列
batchService.addMessage(msg)
2. 背压控制
背压控制策略:
function checkBackpressure():
queueSize = messageQueue.size()
if queueSize > HIGH_WATER_MARK:
// 暂时停止接收新消息
pauseIncomingMessages()
if queueSize < LOW_WATER_MARK:
// 恢复接收消息
resumeIncomingMessages()
3. 客户端解压缩
// 客户端解压缩(JavaScript)
function decompress(data) {
const inflate = new Zlib.Gunzip(data);
return inflate.decompress();
}
function deserialize(data) {
const batch = MessageBatch.decode(data);
return batch.messages;
}
// WebSocket 消息处理
ws.onmessage = (event) => {
const compressed = event.data;
const decompressed = decompress(compressed);
const messages = deserialize(decompressed);
messages.forEach(msg => {
handleMessage(msg);
});
};
4. 连接状态管理
连接状态管理:
function onSessionOpened(session):
broadcastService.addSession(session)
sendWelcomeMessage(session)
function onSessionClosed(session):
broadcastService.removeSession(session)
function onError(session, error):
log.error("WebSocket error: {}", error)
broadcastService.removeSession(session)
5. 监控指标
监控指标:
┌──────────────────┬─────────────────────────────────────────┐
│ 指标名称 │ 说明 │
├──────────────────┼─────────────────────────────────────────┤
│ onlineCount │ 当前在线用户数 │
│ messageRate │ 消息发送速率(条/秒) │
│ batchSize │ 平均批次大小 │
│ compressionRatio │ 压缩率(压缩后大小/原始大小) │
│ bandwidthUsage │ 带宽使用量(字节/秒) │
│ messageLatency │ 消息延迟(毫秒) │
└──────────────────┴─────────────────────────────────────────┘
效果对比
| 方案 | 带宽消耗 | 延迟 | 复杂度 | 适用场景 |
|---|---|---|---|---|
| 原始单条发送 | 高 | 低 | 低 | 消息量少 |
| 仅消息合并 | 中 | 中 | 中 | 小消息多 |
| 合并+二进制 | 中低 | 中 | 中 | 一般场景 |
| 合并+二进制+压缩 | 低 | 中 | 高 | 大规模广播 |
总结
WebSocket 广播风暴抑制的核心原则:
- 合并小包:将短时间内的多条小消息合并成批次发送
- 二进制序列化:使用 Protobuf/MessagePack 替代 JSON
- 压缩传输:对合并后的消息进行 gzip 压缩
- 优先级控制:高优先级消息立即发送,普通消息批量处理
- 背压控制:防止消息队列无限增长
记住:带宽是有限的,高效利用才是关键。通过消息合并和压缩,可以让您的系统轻松应对万人在线的消息推送场景。
源码获取
文章已同步至小程序博客栏目,需要源码的请关注小程序博客。
公众号:服务端技术精选
小程序码:
标题:WebSocket 广播风暴抑制:万人在线群发通知?合并小包+二进制压缩,带宽节省 70%!
作者:jiangyi
地址:http://www.jiangyi.space/articles/2026/05/24/1779200705055.html
公众号:服务端技术精选
评论
0 评论