消息推送总延迟?这7步架构设计让你的推送秒到用户手机!
消息推送总延迟?这7步架构设计让你的推送秒到用户手机!
作为一名后端开发,经历过太多消息推送的"惨案":
- 某电商大促期间,优惠券推送延迟2小时,用户错过了最佳抢购时机,客服电话被打爆
- 某社交平台重要通知推送延迟,用户错过了好友消息,投诉量暴涨300%
- 某金融APP的到账通知推送失败,用户以为钱丢了,差点报警
消息推送,看似简单,实则暗藏杀机。今天就结合自己踩过的坑,跟大家聊聊实时订阅推送到底是怎么实现的,让你的推送秒到用户手机!
一、实时推送到底是个啥?为啥大家都在用?
实时推送的核心就是:把消息实时送到用户设备,让用户第一时间收到重要信息。
为啥实时推送这么香?
- 用户体验好:重要消息秒到,用户不会错过关键信息
- 业务价值高:电商大促推送能提升30%转化率,社交平台推送能增加50%用户活跃度
- 技术逼格高:看起来就很高级,用户觉得你很专业
二、实时推送的7步架构设计,一步都不能错!
实时推送就像送快递,得按流程来,一步走错就GG。
第1步:消息接入层,就像"快递收件点"
消息先进入接入层,这里要扛住高并发,不能让用户发消息就卡死。
核心代码:
@RestController
public class MessageReceiveController {
@Autowired
private KafkaProducer kafkaProducer;
@PostMapping("/api/message/send")
public SendResponse sendMessage(@RequestBody MessageRequest request) {
try {
// 参数校验
validateMessage(request);
// 生成消息ID
String messageId = UUID.randomUUID().toString();
// 封装消息
PushMessage message = PushMessage.builder()
.messageId(messageId)
.userId(request.getUserId())
.content(request.getContent())
.messageType(request.getType())
.priority(request.getPriority())
.timestamp(System.currentTimeMillis())
.build();
// 发送到Kafka,按用户ID分区保证顺序性
kafkaProducer.send("message-topic",
message.getUserId().hashCode() % 10,
messageId,
message);
return SendResponse.success(messageId);
} catch (Exception e) {
log.error("发送消息失败", e);
return SendResponse.fail("消息发送失败");
}
}
}
第2步:消息路由层,就像"快递分拣中心"
根据消息类型和用户标签,把消息分到不同的处理队列。
消息路由逻辑:
@Component
public class MessageRouter {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public void routeMessage(PushMessage message) {
String userId = message.getUserId();
// 查询用户订阅配置
UserSubscription subscription = getUserSubscription(userId);
// 检查用户是否订阅了此类消息
if (!subscription.isSubscribed(message.getMessageType())) {
log.info("用户未订阅此类消息: {}", message.getMessageType());
return;
}
// 根据消息优先级路由到不同队列
String queueName = determineQueueName(message);
// 发送到对应队列
redisTemplate.opsForList().rightPush(queueName, message);
// 记录消息统计
recordMessageStats(message);
}
private String determineQueueName(PushMessage message) {
// 高优先级消息走快速通道
if (message.getPriority() >= 8) {
return "high-priority-queue";
}
// 普通消息按用户分片
int shard = message.getUserId().hashCode() % 20;
return "normal-queue-" + shard;
}
}
第3步:用户连接管理,就像"快递地址簿"
管理所有在线用户的WebSocket连接,支持千万级并发。
连接管理核心代码:
@Component
public class WebSocketConnectionManager {
// 用户ID -> WebSocket会话
private final Map<String, WebSocketSession> userSessions =
new ConcurrentHashMap<>();
// Redis存储在线状态
@Autowired
private RedisTemplate<String, String> redisTemplate;
public void addConnection(String userId, WebSocketSession session) {
// 存储WebSocket连接
userSessions.put(userId, session);
// 标记用户在线状态
redisTemplate.opsForValue().set(
"online:" + userId,
"1",
Duration.ofMinutes(5)
);
log.info("用户连接建立: {}", userId);
}
public void removeConnection(String userId) {
userSessions.remove(userId);
redisTemplate.delete("online:" + userId);
log.info("用户连接断开: {}", userId);
}
public boolean isUserOnline(String userId) {
return redisTemplate.hasKey("online:" + userId);
}
public void sendToUser(String userId, String message) {
WebSocketSession session = userSessions.get(userId);
if (session != null && session.isOpen()) {
try {
session.sendMessage(new TextMessage(message));
log.info("消息推送成功: {}", userId);
} catch (Exception e) {
log.error("消息推送失败: {}", userId, e);
removeConnection(userId);
}
}
}
}
第4步:消息推送引擎,就像"快递派送员"
核心推送逻辑,支持多种推送策略和失败重试。
推送引擎实现:
@Service
public class PushEngine {
@Autowired
private WebSocketConnectionManager connectionManager;
@Autowired
private PushRetryService retryService;
@Scheduled(fixedDelay = 100)
public void processMessages() {
// 处理高优先级消息
processHighPriorityMessages();
// 处理普通消息
processNormalMessages();
}
private void processHighPriorityMessages() {
String messageJson = redisTemplate.opsForList().leftPop("high-priority-queue");
if (messageJson != null) {
PushMessage message = JSON.parseObject(messageJson, PushMessage.class);
pushToUser(message);
}
}
private void pushToUser(PushMessage message) {
String userId = message.getUserId();
// 检查用户是否在线
if (!connectionManager.isUserOnline(userId)) {
// 用户不在线,加入重试队列
retryService.addToRetryQueue(message);
return;
}
try {
// 构造推送消息
PushNotification notification = PushNotification.builder()
.title(getMessageTitle(message))
.content(message.getContent())
.messageId(message.getMessageId())
.timestamp(message.getTimestamp())
.build();
// 发送消息
connectionManager.sendToUser(userId, JSON.toJSONString(notification));
// 记录推送成功
recordPushSuccess(message);
} catch (Exception e) {
log.error("推送失败,加入重试队列: {}", message.getMessageId(), e);
retryService.addToRetryQueue(message);
}
}
}
第5步:离线消息存储,就像"快递代收点"
用户不在线时,消息要保存起来,等用户上线再推送。
离线消息存储:
@Component
public class OfflineMessageStorage {
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final String OFFLINE_KEY_PREFIX = "offline:";
private static final int MAX_OFFLINE_MESSAGES = 1000;
public void storeOfflineMessage(String userId, PushMessage message) {
String key = OFFLINE_KEY_PREFIX + userId;
// 使用Redis List存储离线消息,限制数量
Long size = redisTemplate.opsForList().size(key);
if (size >= MAX_OFFLINE_MESSAGES) {
// 消息过多,移除最早的消息
redisTemplate.opsForList().leftPop(key);
}
// 存储消息
redisTemplate.opsForList().rightPush(key, JSON.toJSONString(message));
// 设置过期时间(7天)
redisTemplate.expire(key, Duration.ofDays(7));
}
public List<PushMessage> getOfflineMessages(String userId) {
String key = OFFLINE_KEY_PREFIX + userId;
List<String> messages = redisTemplate.opsForList().range(key, 0, -1);
if (messages == null) {
return Collections.emptyList();
}
return messages.stream()
.map(msg -> JSON.parseObject(msg, PushMessage.class))
.collect(Collectors.toList());
}
public void clearOfflineMessages(String userId) {
redisTemplate.delete(OFFLINE_KEY_PREFIX + userId);
}
}
第6步:消息重试机制,就像"快递二次派送"
推送失败的消息要有重试机制,确保最终送达。
重试机制实现:
@Service
public class PushRetryService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private PushEngine pushEngine;
private static final String RETRY_QUEUE_PREFIX = "retry:";
private static final int[] RETRY_DELAYS = {1, 5, 30, 300, 1800}; // 秒
public void addToRetryQueue(PushMessage message) {
String retryKey = RETRY_QUEUE_PREFIX + message.getMessageId();
// 记录重试次数
int retryCount = 0;
message.setRetryCount(retryCount);
// 设置首次重试时间
long nextRetryTime = System.currentTimeMillis() +
RETRY_DELAYS[retryCount] * 1000;
redisTemplate.opsForZSet().add(
"retry-queue",
JSON.toJSONString(message),
nextRetryTime
);
}
@Scheduled(fixedDelay = 1000)
public void processRetryMessages() {
long now = System.currentTimeMillis();
// 获取需要重试的消息
Set<String> messages = redisTemplate.opsForZSet()
.rangeByScore("retry-queue", 0, now);
if (messages != null) {
for (String messageJson : messages) {
PushMessage message = JSON.parseObject(messageJson, PushMessage.class);
// 从重试队列移除
redisTemplate.opsForZSet().remove("retry-queue", messageJson);
// 检查重试次数
if (message.getRetryCount() >= RETRY_DELAYS.length) {
// 重试次数用尽,标记为失败
markMessageFailed(message);
continue;
}
// 重新推送
pushEngine.pushToUser(message);
// 增加重试次数,再次入队
message.setRetryCount(message.getRetryCount() + 1);
long nextRetryTime = System.currentTimeMillis() +
RETRY_DELAYS[message.getRetryCount()] * 1000;
redisTemplate.opsForZSet().add(
"retry-queue",
JSON.toJSONString(message),
nextRetryTime
);
}
}
}
}
第7步:监控告警,就像"快递跟踪系统"
实时监控推送成功率、延迟等指标,出问题立即告警。
监控告警实现:
@Component
public class PushMonitor {
private final MeterRegistry meterRegistry;
private final Counter pushSuccessCounter;
private final Counter pushFailureCounter;
private final Timer pushLatencyTimer;
public PushMonitor(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.pushSuccessCounter = Counter.builder("push_success_total")
.description("推送成功的消息总数")
.register(meterRegistry);
this.pushFailureCounter = Counter.builder("push_failure_total")
.description("推送失败的消息总数")
.register(meterRegistry);
this.pushLatencyTimer = Timer.builder("push_latency")
.description("推送延迟时间")
.register(meterRegistry);
}
public void recordPushSuccess(PushMessage message, long latencyMs) {
pushSuccessCounter.increment();
pushLatencyTimer.record(latencyMs, TimeUnit.MILLISECONDS);
// 记录用户维度的推送成功
meterRegistry.counter("push_success_user", "user_id", message.getUserId())
.increment();
}
public void recordPushFailure(PushMessage message, String reason) {
pushFailureCounter.increment();
// 按失败原因统计
meterRegistry.counter("push_failure_reason", "reason", reason)
.increment();
}
@Scheduled(fixedDelay = 30000)
public void checkMetrics() {
// 检查推送成功率
double successRate = getSuccessRate();
if (successRate < 0.95) {
// 告警:推送成功率低于95%
sendAlert("推送成功率异常:" + successRate);
}
// 检查平均延迟
double avgLatency = getAverageLatency();
if (avgLatency > 1000) {
// 告警:平均延迟超过1秒
sendAlert("推送延迟异常:" + avgLatency + "ms");
}
}
}
三、实战案例:某电商平台的实时推送架构演进
下面分享一个真实的电商平台从日活1万到500万的推送架构演进案例。
1. 第一阶段:单机推送(日活1万)
架构特点:
- 单机WebSocket服务器
- Redis存储在线状态
- 单机处理推送
遇到的问题:
- 单机只能支撑1万并发
- 服务器重启导致所有连接断开
- 消息丢失严重
2. 第二阶段:集群推送(日活10万)
架构升级:
# Nginx负载均衡配置
upstream websocket_backend {
server 192.168.1.10:8080 weight=3;
server 192.168.1.11:8080 weight=3;
server 192.168.1.12:8080 weight=2;
ip_hash; # 保证同一用户的连接落在同一台服务器
}
server {
listen 80;
server_name push.yourdomain.com;
location /ws {
proxy_pass http://websocket_backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
}
}
3. 第三阶段:分布式推送(日活100万)
技术方案:
- Kafka消息队列解耦
- Redis集群存储状态
- 多机房部署
Redis集群配置:
# Redis集群配置
spring:
redis:
cluster:
nodes:
- 192.168.1.21:7000
- 192.168.1.22:7000
- 192.168.1.23:7000
- 192.168.1.24:7000
- 192.168.1.25:7000
- 192.168.1.26:7000
max-redirects: 3
timeout: 3000
lettuce:
pool:
max-active: 1000
max-idle: 100
min-idle: 50
4. 第四阶段:亿级推送架构(日活500万)
最终架构:
- 全球多机房部署
- 消息分级处理
- 智能路由
- 实时监控
性能指标:
- 支持500万日活用户同时在线
- 消息推送延迟<100ms
- 推送成功率>99.9%
- 峰值QPS 10万
四、7个避坑指南,90%的人都踩过!
1. WebSocket连接管理坑
问题:连接泄露导致内存溢出
解决方案:
// 定期清理无效连接
@Scheduled(fixedDelay = 60000)
public void cleanupConnections() {
userSessions.entrySet().removeIf(entry -> {
WebSocketSession session = entry.getValue();
return !session.isOpen();
});
}
2. Redis集群脑裂坑
问题:Redis集群脑裂导致状态不一致
解决方案:
- 使用哨兵模式
- 设置合理的超时时间
- 降级到单机模式
3. 消息顺序性坑
问题:同一用户的消息乱序到达
解决方案:
- Kafka按用户ID分区
- 使用消息序号
- 客户端本地排序
4. 消息幂等性坑
问题:重复推送同一条消息
解决方案:
// 消息去重
public boolean isDuplicate(String messageId) {
String key = "msg:dedup:" + messageId;
Boolean result = redisTemplate.opsForValue().setIfAbsent(key, "1", Duration.ofHours(1));
return result != null && !result;
}
5. 客户端断线重连坑
问题:客户端断线后疯狂重连导致服务器崩溃
解决方案:
- 指数退避重连
- 设置最大重连次数
- 使用心跳机制
6. 消息体大小坑
问题:推送大消息导致网络阻塞
解决方案:
- 限制单条消息<1KB
- 大消息使用CDN链接
- 分批推送
7. 跨域问题坑
问题:WebSocket跨域连接失败
解决方案:
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(pushHandler(), "/ws")
.setAllowedOrigins("*") // 允许跨域
.addInterceptors(new HandshakeInterceptor());
}
}
五、5个核心监控指标,缺一不可!
1. 在线用户数监控
// 实时监控在线用户数
Gauge.builder("websocket.online.users")
.register(meterRegistry, () -> connectionManager.getOnlineUserCount());
2. 消息推送成功率
- 目标值:>99.9%
- 告警阈值:<99%
3. 消息推送延迟
- 目标值:<100ms
- 告警阈值:>500ms
4. WebSocket连接数
- 单机最大连接数:5万
- 集群总连接数:100万
5. 系统资源使用率
- CPU使用率:<70%
- 内存使用率:<80%
- 网络带宽:<70%
六、总结:实时推送架构的4个关键点
- 高可用:多机房部署,故障自动切换
- 高性能:消息队列解耦,异步处理
- 高并发:WebSocket集群,负载均衡
- 可监控:全链路监控,实时告警
这套架构我们已经稳定运行2年,支撑了从日活1万到500万的业务增长。记住:推送系统的核心是可靠性,宁可慢一点,也不能丢消息!
如果你也在做推送系统,欢迎留言交流踩坑经验!
标题:消息推送总延迟?这7步架构设计让你的推送秒到用户手机!
作者:jiangyi
地址:http://www.jiangyi.space/articles/2025/12/21/1766304288464.html
- 一、实时推送到底是个啥?为啥大家都在用?
- 二、实时推送的7步架构设计,一步都不能错!
- 第1步:消息接入层,就像"快递收件点"
- 第2步:消息路由层,就像"快递分拣中心"
- 第3步:用户连接管理,就像"快递地址簿"
- 第4步:消息推送引擎,就像"快递派送员"
- 第5步:离线消息存储,就像"快递代收点"
- 第6步:消息重试机制,就像"快递二次派送"
- 第7步:监控告警,就像"快递跟踪系统"
- 三、实战案例:某电商平台的实时推送架构演进
- 1. 第一阶段:单机推送(日活1万)
- 2. 第二阶段:集群推送(日活10万)
- 3. 第三阶段:分布式推送(日活100万)
- 4. 第四阶段:亿级推送架构(日活500万)
- 四、7个避坑指南,90%的人都踩过!
- 1. WebSocket连接管理坑
- 2. Redis集群脑裂坑
- 3. 消息顺序性坑
- 4. 消息幂等性坑
- 5. 客户端断线重连坑
- 6. 消息体大小坑
- 7. 跨域问题坑
- 五、5个核心监控指标,缺一不可!
- 1. 在线用户数监控
- 2. 消息推送成功率
- 3. 消息推送延迟
- 4. WebSocket连接数
- 5. 系统资源使用率
- 六、总结:实时推送架构的4个关键点
0 评论