基于Redis的4种延时队列实现方式及实战

在日常开发中,我们经常会遇到需要延迟执行任务的场景,比如订单超时取消、优惠券到期提醒、邮件定时发送、消息重试等。传统的做法可能是使用定时任务扫描数据库,但这种方式效率低下,特别是在高并发场景下。

Redis作为一个高性能的内存数据库,为我们提供了多种实现延时队列的方式。今天我就来详细介绍4种基于Redis的延时队列实现方式,并分析它们各自的优缺点和适用场景。

什么是延时队列?

延时队列顾名思义,是指元素进入队列后,可以延时一定时间再被消费者取出执行。这与普通队列的区别在于,普通队列中的元素一旦入队就可以被立即消费,而延时队列中的元素需要等到指定时间后才能被消费。

为什么要使用Redis实现延时队列?

使用Redis实现延时队列有几个显著优势:

  1. 高性能:Redis基于内存操作,读写速度极快
  2. 丰富的数据结构:支持String、Hash、List、Set、ZSet、Stream等多种数据结构
  3. 原子性操作:Redis的命令都是原子性的,保证了数据一致性
  4. 持久化支持:可以配置RDB和AOF持久化,防止数据丢失

4种实现方式详解

1. 基于Sorted Set的延时队列

这是最经典也是最常用的实现方式。Sorted Set(简称ZSet)是Redis提供的一个有序集合数据结构,每个元素都关联一个double类型的分数(score),Redis会根据score值对集合中的元素进行排序。

实现原理是将任务的执行时间戳作为score,任务内容作为member。消费者定时查询score小于等于当前时间戳的元素,即为到期任务。

实现代码:

@Component
public class SortedSetDelayQueue {
    
    @Autowired
    private StringRedisTemplate redisTemplate;

    private static final String DELAY_QUEUE_KEY = "delay_queue:sorted_set";

    /**
     * 添加延时任务
     */
    public void addTask(String taskId, Object taskData, long delaySeconds) {
        long executeTime = Instant.now().plusSeconds(delaySeconds).toEpochMilli();
        String taskStr = JSON.toJSONString(taskData);
        
        // 将任务添加到ZSet,执行时间戳作为score
        redisTemplate.opsForZSet().add(DELAY_QUEUE_KEY, taskStr, executeTime);
        log.info("添加延时任务到ZSet队列,taskId: {}, 执行时间: {}, 任务数据: {}", taskId, executeTime, taskStr);
    }

    /**
     * 消费延时任务
     */
    public void consumeTasks() {
        long currentTime = System.currentTimeMillis();
        
        // 查询所有到期的任务(score <= 当前时间戳)
        Set<ZSetOperations.TypedTuple<String>> expiredTasks = 
            redisTemplate.opsForZSet().rangeByScoreWithScores(DELAY_QUEUE_KEY, 0, currentTime);

        if (expiredTasks != null && !expiredTasks.isEmpty()) {
            for (ZSetOperations.TypedTuple<String> taskTuple : expiredTasks) {
                String taskData = taskTuple.getValue();
                Double score = taskTuple.getScore();
                
                // 从队列中移除任务
                Boolean removed = redisTemplate.opsForZSet().remove(DELAY_QUEUE_KEY, taskData);
                
                if (Boolean.TRUE.equals(removed)) {
                    log.info("处理到期任务,执行时间: {}, 任务数据: {}", (long)score.doubleValue(), taskData);
                    
                    // 执行实际的任务处理逻辑
                    processTask(taskData);
                }
            }
        }
    }
    
    // ... 其他辅助方法
}

优点:

  • 实现简单,利用ZSet的score排序功能
  • 性能较好,添加和查询操作都是O(log N)
  • 支持批量处理到期任务
  • Redis原生命令支持,稳定可靠

缺点:

  • 需要定时轮询检查到期任务
  • 实时性依赖轮询频率,可能存在延迟
  • 大量任务时轮询开销较大

2. 基于List的延时队列

这种方式是将任务和执行时间封装后存入List,通过定时轮询检查是否有到期任务。

实现代码:

@Component
public class ListDelayQueue {
    
    @Autowired
    private StringRedisTemplate redisTemplate;

    private static final String DELAY_QUEUE_KEY = "delay_queue:list";

    public static class DelayTask {
        private String taskId;
        private Object taskData;
        private long executeTime; // 执行时间戳

        public DelayTask(String taskId, Object taskData, long executeTime) {
            this.taskId = taskId;
            this.taskData = taskData;
            this.executeTime = executeTime;
        }
        // ... getter/setter
    }

    /**
     * 添加延时任务
     */
    public void addTask(String taskId, Object taskData, long delaySeconds) {
        long executeTime = Instant.now().plusSeconds(delaySeconds).toEpochMilli();
        DelayTask delayTask = new DelayTask(taskId, taskData, executeTime);
        String taskStr = JSON.toJSONString(delayTask);

        // 将任务添加到List
        redisTemplate.opsForList().leftPush(DELAY_QUEUE_KEY, taskStr);
        log.info("添加延时任务到List队列,taskId: {}, 执行时间: {}, 任务数据: {}", taskId, executeTime, taskStr);
    }

    /**
     * 消费延时任务
     */
    public void consumeTasks() {
        long currentTime = System.currentTimeMillis();
        long queueSize = redisTemplate.opsForList().size(DELAY_QUEUE_KEY);

        if (queueSize == null || queueSize == 0) {
            return;
        }

        // 获取所有任务
        List<String> allTasks = redisTemplate.opsForList().range(DELAY_QUEUE_KEY, 0, -1);
        List<String> tasksToRemove = new ArrayList<>();

        if (allTasks != null) {
            for (String taskStr : allTasks) {
                try {
                    DelayTask task = JSON.parseObject(taskStr, DelayTask.class);
                    if (task != null && task.getExecuteTime() <= currentTime) {
                        // 任务到期,执行处理逻辑
                        processTask(task.getTaskData());
                        
                        // 记录需要删除的任务
                        tasksToRemove.add(taskStr);
                    }
                } catch (Exception e) {
                    log.error("解析延时任务失败: {}", taskStr, e);
                }
            }

            // 从队列中移除已处理的任务
            for (String taskStr : tasksToRemove) {
                redisTemplate.opsForList().remove(DELAY_QUEUE_KEY, 1, taskStr);
            }
        }
    }
}

优点:

  • 实现直观,易于理解
  • 添加任务速度快O(1)

缺点:

  • 查询到期任务需要遍历整个列表,性能差O(N)
  • 随着任务数量增加,性能急剧下降
  • 不适合大量任务的场景

3. 基于Pub/Sub的延时队列

这种方式结合Timer/Thread Pool和Redis Pub/Sub,到达执行时间时发布消息。创建定时任务,在指定时间后向Redis频道发布消息,消费者通过订阅该频道来接收消息。

实现代码:

@Component
public class PubSubDelayQueue {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private RedisMessageListenerContainer listenerContainer;

    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10);

    private static final String DELAY_CHANNEL_PREFIX = "delay_channel:";

    /**
     * 添加延时任务
     */
    public void addTask(String taskId, Object taskData, long delaySeconds) {
        String channelName = DELAY_CHANNEL_PREFIX + taskId;
        String taskStr = JSON.toJSONString(taskData);

        // 创建定时任务,在指定时间后发布消息
        java.util.concurrent.ScheduledFuture<?> scheduledFuture = scheduler.schedule(() -> {
            log.info("定时任务触发,发布延时消息,taskId: {}, channel: {}, 任务数据: {}", taskId, channelName, taskStr);
            redisTemplate.convertAndSend(channelName, taskStr);
        }, delaySeconds, TimeUnit.SECONDS);

        log.info("添加延时任务到Pub/Sub队列,taskId: {}, 延迟时间: {}秒, 任务数据: {}", taskId, delaySeconds, taskStr);
    }

    /**
     * 订阅指定任务通道
     */
    public void subscribeToTask(String taskId, TaskProcessor taskProcessor) {
        String channelName = DELAY_CHANNEL_PREFIX + taskId;
        ChannelTopic topic = new ChannelTopic(channelName);

        listenerContainer.addMessageListener((message, pattern) -> {
            String taskData = message.getBody().toString();
            log.info("收到延时消息,taskId: {}, channel: {}, 任务数据: {}", taskId, channelName, taskData);
            
            // 执行任务处理逻辑
            taskProcessor.process(taskData);
        }, topic);
    }
    
    @FunctionalInterface
    public interface TaskProcessor {
        void process(String taskData);
    }
}

优点:

  • 实时性最好,到达时间立即触发
  • 无需轮询,节省资源
  • 支持广播模式,可通知多个消费者

缺点:

  • 需要在应用层维护定时任务,重启后可能丢失
  • 无法持久化,进程重启后未执行的任务会丢失
  • 实现相对复杂

4. 基于Stream的延时队列

Redis 5.0新增的Stream数据结构提供了更强大的消息队列功能,支持消费者组、消息确认机制等。

实现代码:

@Component
public class StreamDelayQueue {

    @Autowired
    private StringRedisTemplate redisTemplate;

    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);

    private static final String DELAY_STREAM_KEY = "delay_stream:queue";
    private static final String CONSUMER_GROUP = "delay_consumers";
    private static final String CONSUMER_NAME = "consumer_1";

    /**
     * 添加延时任务
     */
    public void addTask(String taskId, Object taskData, long delaySeconds) {
        String taskStr = JSON.toJSONString(taskData);
        
        // 构建任务信息
        Map<String, String> taskInfo = Map.of(
            "taskId", taskId,
            "taskData", taskStr,
            "executeTime", String.valueOf(Instant.now().plusSeconds(delaySeconds).toEpochMilli())
        );

        // 创建定时任务,到时间后将任务添加到Stream
        scheduler.schedule(() -> {
            try {
                // 将任务添加到Stream
                String messageId = redisTemplate.opsForStream().add(DELAY_STREAM_KEY, taskInfo);
                log.info("延时任务已添加到Stream,messageId: {}, taskId: {}, taskInfo: {}", messageId, taskId, taskInfo);
            } catch (Exception e) {
                log.error("添加任务到Stream失败,taskId: {}", taskId, e);
            }
        }, delaySeconds, TimeUnit.SECONDS);

        log.info("添加延时任务到Stream队列,taskId: {}, 延迟时间: {}秒, 任务数据: {}", taskId, delaySeconds, taskStr);
    }

    /**
     * 消费延时任务
     */
    public void consumeTasks() {
        try {
            // 从Stream中读取消息
            Map<String, Object> streamMessages = redisTemplate.opsForStream().read(
                Consumer.from(CONSUMER_GROUP, CONSUMER_NAME),
                StreamReadOptions.empty().count(10).block(1000),
                StreamOffset.create(DELAY_STREAM_KEY, ReadOffset.lastConsumed())
            );

            if (streamMessages != null && !streamMessages.isEmpty()) {
                for (Map.Entry<String, Object> entry : streamMessages.entrySet()) {
                    String messageId = entry.getKey();
                    Map<String, String> taskInfo = (Map<String, String>) entry.getValue();
                    
                    log.info("处理Stream中的任务,messageId: {}, taskInfo: {}", messageId, taskInfo);
                    
                    // 处理任务
                    processTask(taskInfo.get("taskData"));
                    
                    // 确认消息已被处理
                    redisTemplate.opsForStream().acknowledge(DELAY_STREAM_KEY, CONSUMER_GROUP, messageId);
                }
            }
        } catch (Exception e) {
            log.error("消费Stream消息时出错", e);
        }
    }
}

优点:

  • 功能最强大,支持消费者组、消息确认机制
  • 消息持久化,不会丢失
  • 支持多消费者,可扩展性好
  • 提供丰富的消息处理功能

缺点:

  • Redis版本要求较高(5.0+)
  • 实现复杂度最高
  • 学习成本较高

方案对比总结

方案性能实时性可靠性实现复杂度适用场景
Sorted Set较好依赖轮询简单通用场景
List依赖轮询简单小规模场景
Pub/Sub中等实时性要求高
Stream中等最高复杂大规模系统

实际应用建议

  1. 一般业务场景:推荐使用Sorted Set方案,它在性能、可靠性和实现复杂度之间达到了很好的平衡
  2. 实时性要求极高:可以选择Pub/Sub方案,但要注意数据持久化问题
  3. 大规模分布式系统:建议使用Stream方案,功能最强大,适合复杂场景
  4. 小规模临时任务:可以考虑List方案,实现简单

最佳实践

  1. 合理设置轮询间隔:对于需要轮询的方案,要平衡实时性和系统开销
  2. 异常处理:确保任务处理异常时有重试机制
  3. 监控告警:对队列长度、处理延迟等指标进行监控
  4. 优雅关闭:在应用关闭时正确处理未完成的任务

总结

Redis为我们提供了多种实现延时队列的方式,每种方式都有其适用的场景。在实际项目中,我们应该根据业务需求、性能要求和系统架构来选择合适的方案。

总的来说,Sorted Set方案因其良好的性能表现和简单实现,是大多数场景下的首选。但对于特殊需求,如高实时性或大规模分布式系统,其他方案也有其独特优势。

希望这篇文章能帮助你更好地理解和应用Redis延时队列。如果你有任何问题或想法,欢迎在评论区交流讨论!


关注「服务端技术精选」,获取更多后端技术干货!
我的个人技术博客:www.jiangyi.space


标题:基于Redis的4种延时队列实现方式及实战
作者:jiangyi
地址:http://www.jiangyi.space/articles/2026/01/24/1769239781926.html

    0 评论
avatar