# 延迟任务场景技术选型

实现延迟任务的方式有很多,各有利弊,有单机和分布式的,延迟任务有别于定式任务,定式任务往往是固定周期的,有明确的触发时间,场景很多

  • 支付超时取消订单
  • 评价超时自动好评
  • ...

下面来探讨一些实现方案

# 1. DelayQueue

缺点是单机运行在内存中导致 OOM、无法持久化、宕机任务丢失

# 1.1. Main

import java.util.concurrent.*;

/**
 * DelayQueue - 按时间排序出队列
 *
 * 任务调度 - 定时任务 - 延时队列
 *
 * @author wliduo[i@dolyw.com]
 * @date 2020/4/23 15:35
 */
public class T04_BlockingQueue_3_DelayQueue {

    public static BlockingQueue<MyTask> tasks = new DelayQueue<>();

    public static class MyTask implements Delayed {
        public String name;
        public long runningTime;

        MyTask(String name, long rt) {
            this.name = name;
            this.runningTime = rt;
        }

        @Override
        public int compareTo(Delayed o) {
            if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
                return -1;
            } else if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
                return 1;
            } else {
                return 0;
            }
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        @Override
        public String toString() {
            return name + " " + runningTime;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        long now = System.currentTimeMillis();
        MyTask t1 = new MyTask("t1", now + 10000);
        MyTask t2 = new MyTask("t2", now + 20000);
        MyTask t3 = new MyTask("t3", now + 15000);
        MyTask t4 = new MyTask("t4", now + 25000);
        MyTask t5 = new MyTask("t5", now + 5000);

        tasks.put(t1);
        tasks.put(t2);
        tasks.put(t3);
        tasks.put(t4);
        tasks.put(t5);

        System.out.println(tasks);

        /*for (int i = 0; i < 5; i++) {
            System.out.println(tasks.take());
        }

        tasks.put(t1);
        tasks.put(t2);
        tasks.put(t3);
        tasks.put(t4);
        tasks.put(t5);*/

        MyTask t = null;
        while (tasks.size() > 0) {
            t = tasks.take();
            System.out.println(t.runningTime + ", " + System.currentTimeMillis());
        }
    }
}

# 2. HashedWheelTimer

Netty 提供的 HashedWheelTimer 工具类来实现延迟任务,采用时间轮算法,相比 DelayQueue 的数据结构,时间轮在算法复杂度上有一定优势。DelayQueue 由于涉及到排序,需要调堆,插入和移除的复杂度是 O(lgn),而时间轮在插入和移除的复杂度都是 O(1)

缺点是单机运行在内存中导致 OOM、无法持久化、宕机任务丢失

# 2.1. pom

<!-- HashedWheelTimer -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-common</artifactId>
    <version>4.1.23.Final</version>
</dependency>

# 2.2. Task

/**
 * HashedWheelTimer实现
 *
 * @author wliduo[i@dolyw.com]
 * @date 2020/8/14 11:50
 */
public class CustomTimerTask implements TimerTask {

    private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");

    /**
     * 任务ID
     */
    private final long taskId;

    /**
     * 创建时间戳
     */
    private final long timestamp;

    public CustomTimerTask(long taskId) {
        this.taskId = taskId;
        this.timestamp = System.currentTimeMillis();
    }

    @Override
    public void run(Timeout timeout) {
        // 异步处理任务
        System.out.println(
                String.format("任务执行时间:%s,任务创建时间:%s,任务ID:%s",
                        LocalDateTime.now().format(F),
                        LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F),
                        taskId
                )
        );
    }
}

# 2.3. Main

/**
 * RunHashedWheelTimer运行
 *
 * @author wliduo[i@dolyw.com]
 * @date 2020/8/14 11:54
 */
public class RunHashedWheelTimer {

    private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");

    public static void main(String[] args) throws Exception {

        ThreadFactory factory = r -> {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("HashedWheelTimerWorker");
            return thread;
        };

        /**
         * @param tickDuration 每tick一次的时间间隔
         * @param unit tickDuration的时间单位
         * @param ticksPerWheel 时间轮中的槽数
         * @param leakDetection 检查内存溢出
         */
        Timer timer = new HashedWheelTimer(factory, 1,
                TimeUnit.SECONDS, 100, true);
        System.out.println(String.format("开始任务时间:%s", LocalDateTime.now().format(F)));

        // 任务ID-6,5秒后执行
        TimerTask timerTask = new CustomTimerTask(6);
        timer.newTimeout(timerTask, 5, TimeUnit.SECONDS);

        Thread.sleep(Integer.MAX_VALUE);
    }

}

# 3. 任务调度

任务调度按设定时间间隔执行一次,及时性可能没有那么块,必须等到任务调用执行,选取那种任务调度框架(Quartz,XXL-JOB,OhMyScheduler等)了

缺点是需要全表扫描(当然可以每次处理指定时间段的数据),任务设置轮询时间就是最大延迟时间,对数据库有一定压力,仅适合数据量少的业务场景

# 4. Redis ZSet

Redis 中的 ZSet 是一个有序的 Set,内部使用 HashMap 和跳表(SkipList)来保证数据的存储和有序,HashMap 里放的是成员到 score 的映射,而跳跃表里存放的是所有的成员,排序依据是 HashMap 里存的 score,使用跳跃表的结构可以获得比较高的查找效率,并且在实现上比较简单,借助 ZSet 数据类型,把延迟任务存储在此数据集合中,然后在开启一个无线循环查询当前时间的所有任务进行消费

但是可能有并发问题,即两个线程或者两个进程都会拿到一样的数据,然后重复执行,最后又都会删除。如果是单机多线程执行,或者分布式环境下,可以使用 Redis 事务,也可以使用由 Redis 实现的分布式锁,或者使用下例中 Redis Script。你可以在 Redis 官方的 Transaction 章节找到事务的相关内容

这种方式比较推荐,可以满足持久化,分布式的场景,使用的话,Redisson 框架有封装好,直接使用即可,如下简单实现一个订单超时自动评价功能

# 4.1. pom

<!-- JDK 1.6+ compatible -->
<!--<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>2.10.4</version>
</dependency>-->

<!-- JDK 1.8+ compatible,Redisson包含了netty -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.12.1</version>
</dependency>

# 4.2. Config

spring:
  redis:
    database: 2
    host: 127.0.0.1
    port: 6379
    password: 
/**
 * RedissonConfig配置
 *
 * @author wliduo[i@dolyw.com]
 * @date 2020/8/14 16:16
 */
@Configuration
@ConfigurationProperties(prefix = "spring.redis")
public class RedissonConfig {

    private static final Logger logger = LoggerFactory.getLogger(RedissonConfig.class);

    private String host;

    private int port;

    private String password;

    private int database;

    /**
     * RedissonClient配置
     *
     * @param
     * @return org.redisson.api.RedissonClient
     * @throws
     * @author wliduo[i@dolyw.com]
     * @date 2020/8/14 16:25
     */
    @Bean
    public RedissonClient redissonClient() {
        RedissonClient redissonClient = null;
        Config config = new Config();
        String url = "redis://" + host + ":" + port;
        // 单节点配置
        config.useSingleServer().setAddress(url).setDatabase(database);
        // 主从配置
        /*config.useMasterSlaveServers()
                // 设置redis主节点
                .setMasterAddress("redis://192.168.1.120:6379")
                // 设置redis从节点
                .addSlaveAddress("redis://192.168.1.130:6379", "redis://192.168.1.140:6379");*/
        // 哨兵部署方式,sentinel是采用Paxos拜占庭协议,一般sentinel至少3个节点
        /*config.useSentinelServers()
                .setMasterName("my-sentinel-name")
                .addSentinelAddress("redis://192.168.1.120:6379")
                .addSentinelAddress("redis://192.168.1.130:6379")
                .addSentinelAddress("redis://192.168.1.140:6379");*/
        // 集群部署方式,cluster方式至少6个节点,3主3从,3主做sharding,3从用来保证主宕机后可以高可用
        /*config.useClusterServers()
                // 集群状态扫描间隔时间,单位是毫秒
                .setScanInterval(2000)
                .addNodeAddress("redis://192.168.1.120:6379")
                .addNodeAddress("redis://192.168.1.130:6379")
                .addNodeAddress("redis://192.168.1.140:6379")
                .addNodeAddress("redis://192.168.1.150:6379")
                .addNodeAddress("redis://192.168.1.160:6379")
                .addNodeAddress("redis://192.168.1.170:6379");*/
        // 云托管部署方式,这种方式主要解决redis提供商为云服务的提供商的redis连接,比如亚马逊云、微软云
        /*config.useReplicatedServers()
                // 主节点变化扫描间隔时间
                .setScanInterval(2000)
                .addNodeAddress("redis://192.168.1.120:6379")
                .addNodeAddress("redis://192.168.1.130:6379")
                .addNodeAddress("redis://192.168.1.140:6379");*/
        try {
            redissonClient = Redisson.create(config);
            return redissonClient;
        } catch (Exception e) {
            logger.error("RedissonClient init redis url:[{}], Exception:", url, e);
            return null;
        }
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public int getDatabase() {
        return database;
    }

    public void setDatabase(int database) {
        this.database = database;
    }
}

# 4.3. Util

/**
 * RedissonDelayedUtil
 *
 * @author wliduo[i@dolyw.com]
 * @date 2020/8/14 16:34
 */
@Component
public class RedissonDelayedUtil<T> {

    private static final Logger logger = LoggerFactory.getLogger(RedissonDelayedUtil.class);

    @Autowired
    private RedissonClient redissonClient;

    /**
     * 添加延时队列任务
     *
     * @param blockingQueueName 队列名
	 * @param t 对象
	 * @param delay 时间
	 * @param timeUnit 时间单位
     * @return boolean
     * @throws
     * @author wliduo[i@dolyw.com]
     * @date 2020/8/14 16:41
     */
    public boolean offer(String blockingQueueName, T t, long delay, TimeUnit timeUnit) {
        RBlockingQueue<T> blockingQueue = null;
        RDelayedQueue<T> delayedQueue = null;
        try {
            blockingQueue = redissonClient.getBlockingQueue(blockingQueueName);
            delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
            delayedQueue.offer(t, delay, timeUnit);
            // https://blog.csdn.net/zsj777/article/details/105223853
            // 解决延迟队列take数据阻塞不执行,必须等到下一个内容offer时,队列才会把阻塞的消息全部处理掉
            // offer后再offer一个空值即可
            delayedQueue.offer(null, 1, TimeUnit.MILLISECONDS);
            return true;
        } catch (Exception e) {
            logger.error("RedissonDelayedUtil put Exception:", e);
        } finally {
            if (delayedQueue != null) {
                delayedQueue.destroy();
            }
        }
        return false;
    }

    /**
     * 获取延时队列任务
     *
     * @param blockingQueueName 队列名
     * @return org.redisson.api.RBlockingQueue<T>
     * @throws
     * @author wliduo[i@dolyw.com]
     * @date 2020/8/14 17:03
     */
    public RBlockingQueue<T> take(String blockingQueueName) {
        RBlockingQueue<T> blockingQueue = null;
        RDelayedQueue<T> delayedQueue = null;
        try {
            blockingQueue = redissonClient.getBlockingQueue(blockingQueueName);
            delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
            // 解决延迟队列首次启动获取为空的问题,先offer一个空值就没问题了
            delayedQueue.offer(null, 1, TimeUnit.MILLISECONDS);
            return blockingQueue;
        } catch (Exception e) {
            logger.error("RedissonDelayedUtil put Exception:", e);
        } finally {
            if (delayedQueue != null) {
                delayedQueue.destroy();
            }
        }
        return null;
    }

}

# 4.4. Model

/**
 * OrderDto
 *
 * @author wliduo[i@dolyw.com]
 * @date 2020/8/14 16:31
 */
public class OrderDto implements Serializable {

    /**
     * 订单号
     */
    private String orderCode;

    /**
     * 订单名
     */
    private String orderName;

    public OrderDto(String orderCode) {
        this.orderCode = orderCode;
    }

    public OrderDto(String orderCode, String orderName) {
        this.orderCode = orderCode;
        this.orderName = orderName;
    }

    public String getOrderCode() {
        return orderCode;
    }

    public void setOrderCode(String orderCode) {
        this.orderCode = orderCode;
    }

    public String getOrderName() {
        return orderName;
    }

    public void setOrderName(String orderName) {
        this.orderName = orderName;
    }
}
/**
 * RedissonDelayedQueue枚举
 *
 * @author wliduo[i@dolyw.com]
 * @date 2020/8/14 16:43
 */
public enum RedissonDelayedEnum {

    /**
     * 订单超时默认评价
     */
    ORDER_DEFAULT_EVALUATION("orderDefaultEvaluation", 1, TimeUnit.MINUTES);

    public String name;

    public int delay;

    public TimeUnit timeUnit;

    RedissonDelayedEnum(String name, int delay, TimeUnit timeUnit) {
        this.name = name;
        this.delay = delay;
        this.timeUnit = timeUnit;
    }
}

# 4.5. Impl

/**
 * Redisson延时队列测试
 *
 * @author wliduo[i@dolyw.com]
 * @date 2020/8/14 16:23
 */
@RestController
@RequestMapping("/")
public class WebController {

    @Autowired
    private RedissonDelayedUtil redissonDelayedUtil;

    @GetMapping
    public String hello() {
        return "Hello World";
    }

    @GetMapping("/put")
    public void put() {
        // 10秒后执行
        redissonDelayedUtil.offer(RedissonDelayedEnum.ORDER_DEFAULT_EVALUATION.name,
                new OrderDto(String.valueOf(System.currentTimeMillis()), "Test"),
                RedissonDelayedEnum.ORDER_DEFAULT_EVALUATION.delay,
                RedissonDelayedEnum.ORDER_DEFAULT_EVALUATION.timeUnit);
    }

    @GetMapping("/putName")
    public void putName(@RequestParam("name") String name) {
        // 10秒后执行
        redissonDelayedUtil.offer(RedissonDelayedEnum.ORDER_DEFAULT_EVALUATION.name,
                new OrderDto(String.valueOf(System.currentTimeMillis()), name),
                RedissonDelayedEnum.ORDER_DEFAULT_EVALUATION.delay,
                RedissonDelayedEnum.ORDER_DEFAULT_EVALUATION.timeUnit);
    }

}
/**
 * 延时任务启动执行,自动消费
 *
 * @author wliduo[i@dolyw.com]
 * @date 2020/8/14 17:01
 */
@Component
@Order
public class DelayedTask implements CommandLineRunner {

    private static final Logger logger = LoggerFactory.getLogger(DelayedTask.class);

    private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");

    @Autowired
    private RedissonDelayedUtil redissonDelayedUtil;

    @Override
    public void run(String... args) throws Exception {
        // 订单默认评价
        this.orderDefaultEvaluation();
    }

    /**
     * 订单默认评价
     *
     * @param
     * @return void
     * @throws
     * @author wliduo[i@dolyw.com]
     * @date 2020/8/14 17:10
     */
    public void orderDefaultEvaluation() throws InterruptedException {
        RBlockingQueue<OrderDto> blockingQueue = redissonDelayedUtil.take(RedissonDelayedEnum.ORDER_DEFAULT_EVALUATION.name);
        if (blockingQueue == null) {
            logger.error("延时任务启动失败");
            return;
        }
        while (true) {
            OrderDto orderDto = blockingQueue.take();
            if (orderDto == null) {
                continue;
            }
            logger.info("名称: {},执行时间: {},入队时间: {}", orderDto.getOrderName(), LocalDateTime.now().format(F),
                    LocalDateTime.ofInstant(Instant.ofEpochMilli(
                            Long.parseLong(orderDto.getOrderCode())), ZoneId.systemDefault()
                    ).format(F));
        }
    }

}

# 5. Redis键通知

Redis 键通知是不可靠的

  • 开启键通知会对 Redis 有额外的开销
  • 键通知暂时 Redis 并不保证消息必达,Redis 客户端断开连接所有 Key 丢失
  • 消费速度不可自控,如果一瞬间 QPS 非常高,接收到的通知会非常密集,消费不过来

# 6. MQ延迟队列

几乎所有的 MQ 中间件都可以实现延迟任务,在这里更准确的叫法应该叫延迟队列,MQ 可以做到消息零丢失,可抗高并发,需要额外引入 MQ 中间件,提高系统复杂性和 MQ 高可用维护性

如果专门开启一个 MQ 中间件来执行延迟任务,就有点杀鸡用宰牛刀般的奢侈了,不过已经有了 MQ 环境的话,用它来实现延迟任务的话,还是可取的

# 6.1. RabbitMQ

  • RabbitMQ 的 TTL 和 DXL 实现延迟队列,通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列
  • 使用 rabbitmq-delayed-message-exchange 插件实现延迟功能

延迟插件 rabbitmq-delayed-message-exchange 是在 RabbitMQ 3.5.7 及以上的版本才支持的,依赖 Erlang/OPT 18.0 及以上运行环境

由于使用死信交换器比较麻烦,所以推荐使用第二种实现方式 rabbitmq-delayed-message-exchange 插件的方式实现延迟队列

参考

上次更新时间: 2023-12-15 03:14:55