WebSocket 广播风暴抑制:万人在线群发通知?合并小包+二进制压缩,带宽节省 70%!

做过实时消息推送的同学肯定都遇到过这个问题:当有大量用户在线时,频繁的小消息广播会导致带宽急剧上升,甚至引发广播风暴。特别是在直播弹幕、实时通知、多人协作等场景,这个问题尤为突出。

我之前就遇到过这样一个案例:一个直播平台在搞活动时,同时在线人数突破了 10 万。运营人员发了一条弹幕抽奖消息,结果瞬间产生了 10 万条 WebSocket 消息推送,导致服务器带宽直接跑满,部分用户出现消息延迟甚至断线重连。

今天我们就来聊聊 WebSocket 广播风暴的抑制方案,让您的系统轻松应对万人在线的消息推送场景。

广播风暴的根本原因

1. 小消息频繁发送

这是最常见的情况:

场景:10 万在线用户 + 每秒 10 条小消息

每条消息大小:100 bytes
每秒发送量:10 万 × 10 = 100 万条
每秒带宽消耗:100 万 × 100 bytes = 100MB/s ≈ 800Mbps

如果每条消息有 50 字节的协议开销:
实际带宽消耗:100 万 × 150 bytes = 150MB/s ≈ 1.2Gbps

2. TCP 小包问题

TCP 小包的问题:

┌─────────────────────────────────────────────────────────────┐
│  原始消息:{"type":"ping","ts":1234567890} (30 bytes)     │
├─────────────────────────────────────────────────────────────┤
│  TCP 协议栈处理:                                           │
│  - TCP 头部:20 bytes                                       │
│  - IP 头部:20 bytes                                        │
│  - 实际数据:30 bytes                                       │
│  - 总大小:70 bytes                                         │
│  - 有效载荷占比:30/70 ≈ 43%                                │
└─────────────────────────────────────────────────────────────┘

3. 缺乏消息合并机制

问题分析:
- 短时间内多条小消息分别发送
- 每条消息都有独立的 TCP 头部开销
- 网络传输效率低下
- 频繁的网络唤醒消耗资源

解决方案:消息合并 + 二进制压缩

1. 核心设计思想

我们的方案核心是三个关键技术:

  1. 消息合并(Message Batching):将短时间内的多条消息合并成一个批次发送
  2. 二进制序列化(Binary Serialization):使用 Protobuf/MessagePack 替代 JSON
  3. 压缩传输(Compression):对合并后的消息进行压缩

架构图如下:

┌─────────────────────────────────────────────────────────────┐
│                   WebSocket 广播风暴抑制系统               │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌──────────┐    ┌──────────────┐    ┌──────────────────┐  │
│  │ 业务消息 │───→│ 消息队列     │───→│ 消息合并器       │  │
│  │ (多条)   │    │ (缓冲区)     │    │ (定时/阈值触发)   │  │
│  └──────────┘    └──────────────┘    └──────────────────┘  │
│                                              │              │
│                                              ▼              │
│  ┌──────────┐    ┌──────────────┐    ┌──────────────────┐  │
│  │ 压缩器   │←───│ 二进制序列化  │←───│ 合并后的消息     │  │
│  │ (gzip)   │    │ (Protobuf)   │    │                  │  │
│  └──────────┘    └──────────────┘    └──────────────────┘  │
│       │                                                     │
│       ▼                                                     │
│  ┌───────────────────────────────────────────────────────┐  │
│  │           广播到所有在线客户端                          │  │
│  └───────────────────────────────────────────────────────┘  │
│                                                             │
└─────────────────────────────────────────────────────────────┘

2. 消息合并策略

消息合并策略:

┌─────────────────────────────────────────────────────────────┐
│ 触发条件(满足任一即可)                                     │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1. 时间触发:每隔 N 毫秒发送一次                             │
│     - 例如:每 100ms 发送一次合并消息                        │
│                                                             │
│  2. 数量触发:累积到 M 条消息发送                            │
│     - 例如:累积到 10 条消息就发送                           │
│                                                             │
│  3. 大小触发:累积消息大小达到 K bytes                        │
│     - 例如:累积到 1KB 就发送                               │
│                                                             │
└─────────────────────────────────────────────────────────────┘

3. 二进制序列化对比

序列化格式对比:

┌──────────────┬─────────────┬─────────────┬─────────────┐
│ 格式         │ 原始大小     │ 序列化后    │ 压缩率      │
├──────────────┼─────────────┼─────────────┼─────────────┤
│ JSON         │ 100 bytes   │ 100 bytes   │ 0%          │
│ MessagePack  │ 100 bytes   │ 65 bytes    │ 35%         │
│ Protobuf     │ 100 bytes   │ 45 bytes    │ 55%         │
└──────────────┴─────────────┴─────────────┴─────────────┘

4. 压缩传输效果

压缩效果对比:

原始场景:
- 10 万用户 × 100 bytes/条 = 10MB

使用二进制序列化:
- 10 万用户 × 50 bytes/条 = 5MB(节省 50%)

再加上 gzip 压缩:
- 5MB × 30% = 1.5MB(总共节省 85%)

实战方案实现

1. 消息合并器

// 消息合并器
class MessageBatchService {
    private Queue<Message> messageQueue = new ConcurrentLinkedQueue<>();
    private ScheduledExecutorService scheduler;
    private long batchIntervalMs = 100;
    private int maxBatchSize = 10;

    function init() {
        scheduler.scheduleAtFixedRate(() -> {
            flushBatch();
        }, 0, batchIntervalMs, TimeUnit.MILLISECONDS);
    }

    function addMessage(Message msg) {
        messageQueue.offer(msg);
        
        if (messageQueue.size() >= maxBatchSize) {
            flushBatch();
        }
    }

    private function flushBatch() {
        List<Message> batch = new ArrayList<>();
        messageQueue.drainTo(batch);
        
        if (batch.isEmpty()) {
            return;
        }

        byte[] serialized = serializeBatch(batch);
        byte[] compressed = compress(serialized);
        
        broadcast(compressed);
    }
}

2. 二进制序列化

// Protobuf 消息定义
message BroadcastMessage {
    string type = 1;
    int64 timestamp = 2;
    bytes payload = 3;
}

message MessageBatch {
    repeated BroadcastMessage messages = 1;
}

// 序列化服务
class BinarySerializationService {
    function serializeBatch(List<Message> messages):
        MessageBatch batch = MessageBatch.newBuilder()
        
        for message in messages:
            BroadcastMessage protoMsg = BroadcastMessage.newBuilder()
                .setType(message.getType())
                .setTimestamp(message.getTimestamp())
                .setPayload(message.getPayload())
                .build()
            
            batch.addMessages(protoMsg)
        
        return batch.toByteArray()
}

3. 压缩服务

// 压缩服务
class CompressionService {
    private static final int MIN_COMPRESS_SIZE = 512;

    function compress(data):
        if data.length < MIN_COMPRESS_SIZE:
            return data
        
        ByteArrayOutputStream baos = new ByteArrayOutputStream()
        GZIPOutputStream gzip = new GZIPOutputStream(baos)
        
        gzip.write(data)
        gzip.close()
        
        return baos.toByteArray()

    function decompress(data):
        ByteArrayInputStream bais = new ByteArrayInputStream(data)
        GZIPInputStream gzip = new GZIPInputStream(bais)
        
        ByteArrayOutputStream baos = new ByteArrayOutputStream()
        copyStream(gzip, baos)
        
        return baos.toByteArray()
}

4. 广播服务

// WebSocket 广播服务
class WebSocketBroadcastService {
    private Set<WebSocketSession> sessions = ConcurrentHashMap.newKeySet()

    function addSession(session):
        sessions.add(session)

    function removeSession(session):
        sessions.remove(session)

    function broadcast(data):
        for session in sessions:
            if session.isOpen():
                try:
                    session.getBasicRemote().sendBinary(data)
                catch:
                    removeSession(session)

    function getOnlineCount():
        return sessions.size()
}

最佳实践与注意事项

1. 消息优先级处理

消息优先级策略:

function addMessage(Message msg, priority):
    if priority == HIGH:
        // 高优先级消息立即发送
        broadcast(serialize(msg))
    else:
        // 普通优先级加入合并队列
        batchService.addMessage(msg)

2. 背压控制

背压控制策略:

function checkBackpressure():
    queueSize = messageQueue.size()
    
    if queueSize > HIGH_WATER_MARK:
        // 暂时停止接收新消息
        pauseIncomingMessages()
        
    if queueSize < LOW_WATER_MARK:
        // 恢复接收消息
        resumeIncomingMessages()

3. 客户端解压缩

// 客户端解压缩(JavaScript)
function decompress(data) {
    const inflate = new Zlib.Gunzip(data);
    return inflate.decompress();
}

function deserialize(data) {
    const batch = MessageBatch.decode(data);
    return batch.messages;
}

// WebSocket 消息处理
ws.onmessage = (event) => {
    const compressed = event.data;
    const decompressed = decompress(compressed);
    const messages = deserialize(decompressed);
    
    messages.forEach(msg => {
        handleMessage(msg);
    });
};

4. 连接状态管理

连接状态管理:

function onSessionOpened(session):
    broadcastService.addSession(session)
    sendWelcomeMessage(session)

function onSessionClosed(session):
    broadcastService.removeSession(session)

function onError(session, error):
    log.error("WebSocket error: {}", error)
    broadcastService.removeSession(session)

5. 监控指标

监控指标:

┌──────────────────┬─────────────────────────────────────────┐
│ 指标名称         │ 说明                                   │
├──────────────────┼─────────────────────────────────────────┤
│ onlineCount      │ 当前在线用户数                          │
│ messageRate      │ 消息发送速率(条/秒)                   │
│ batchSize        │ 平均批次大小                           │
│ compressionRatio │ 压缩率(压缩后大小/原始大小)          │
│ bandwidthUsage   │ 带宽使用量(字节/秒)                   │
│ messageLatency   │ 消息延迟(毫秒)                       │
└──────────────────┴─────────────────────────────────────────┘

效果对比

方案带宽消耗延迟复杂度适用场景
原始单条发送消息量少
仅消息合并小消息多
合并+二进制中低一般场景
合并+二进制+压缩大规模广播

总结

WebSocket 广播风暴抑制的核心原则:

  1. 合并小包:将短时间内的多条小消息合并成批次发送
  2. 二进制序列化:使用 Protobuf/MessagePack 替代 JSON
  3. 压缩传输:对合并后的消息进行 gzip 压缩
  4. 优先级控制:高优先级消息立即发送,普通消息批量处理
  5. 背压控制:防止消息队列无限增长

记住:带宽是有限的,高效利用才是关键。通过消息合并和压缩,可以让您的系统轻松应对万人在线的消息推送场景。


源码获取

文章已同步至小程序博客栏目,需要源码的请关注小程序博客。

公众号:服务端技术精选

小程序码:


标题:WebSocket 广播风暴抑制:万人在线群发通知?合并小包+二进制压缩,带宽节省 70%!
作者:jiangyi
地址:http://www.jiangyi.space/articles/2026/05/24/1779200705055.html
公众号:服务端技术精选
    评论
    0 评论
avatar

取消