前言
延迟任务是作为后端开发肯定会遇到的问题,以我现在公司为例,下订单 15分钟/7天 之后还没有支付,就会取消订单,返还用户的优惠券等操作,当然我们公司现在的做法实在是有点low , 我们首先定义任务操作接口 , 然后借助Linux cron 执行任务接口
考虑
- 项目重启对已存在的未执行的任务的影响,任务放入内存还是落地,如果是内存,所在机器故障对于任务的影响
- 监控任务分布情况
- 监控执行情况:开始执行时间,耗时,参数,执行机器ID,执行结果
- 大任务单机执行还是分布式执行,如何进行分片等,单机执行如果防止重复执行等
- 已存在但是未执行的任务是否可以取消
- 如果使用了轮询机制,需要优化轮询策略,减少不必要的轮询
- 此处主要考虑延迟队列的机制,对于一条任务该如果执行,数据结构设计等此处不多考虑
- 任务失败是否需要再入队列,最终失败报警机制
- 提供默认的再入队列策略,各个任务指定自己需要的再入队列策略,任务执行失败后按照任务自己的再入队列策略再入队列
实现
数据库
- 思路
- 设计一个 delay_task 表,核心字段:执行方法名,参数,执行时间,任务状态(未执行,执行中,执行结束等)。按照执行时间倒序并且小于当前时间轮询该表
- 优点
- 实现简单
- 对任务的操作更方便,增删改任务
- 缺点
- 基于数据库,依赖数据库架构设计,是否主从,单点依赖等,并发容量也依赖数据库架构(同时下订单,同时插入15分钟后的一条任务记录)
- 轮询优化需要考虑
Redis
- 思路:
- zset,string。借助 zset 的有序性实现延迟任务,借助 string (或者其他合适的数据结构) 存储具体的任务
- 优点:
- 实现简单
- 任务再修改简单
- 借助 Redis 生态,任务落地
- 缺点:
- 需要单独实现任务日志,比如我想知道过去某个时间点的某个应该执行的任务是否执行以及执行情况等
有赞方案总结
- 整体架构
- Job Pool用来存放所有Job的元信息
- Delay Bucket是一组以时间为维度的有序队列,用来存放所有需要延迟的/已经被reserve的Job(这里只存放Job Id)
- Timer负责实时扫描各个Bucket,并将delay时间大于等于当前时间的Job放入到对应的Ready Queue
- Ready Queue存放处于Ready状态的Job(这里只存放Job Id),以供消费程序消费。(就是一个到期待执行队列,将到期任务放入到该队列等待线程执行,或者根据topic 放入到不同的队列)
- 重点
- 基于wait/notify方式的Timer实现
- 任务有个属性是topic , 可以使用不同的队列,可以达到分散消息的目标(比如我们用 RabbitMQ 实现延迟队列 , 可以有多个延迟队列,根据topic 放入不同的队列 )
- Java 实现
-
在 GitHub 上发现了一个基于 Redis 的 Java 版本的有赞延迟队列实现,可供参考。
kafka , zk 也提供了delayqueue 的实现,水平有限,以后待加