下面所讨论技术方案的前提是精确触发,所以我们不讨论目前业界的一些分布式调度系统如:elastic-job,xxl-job,tbschedule等, 这些系统解决不了延迟任务精确触发问题。
原理如下:
何为死信:
RabbitMQ可以对队列和消息设置x-message-tt、expiration来控制消息的存活时间,如果超时,消息变为死信。
何为死信路由:
RabbitMQ可以对队列设置x-dead-letter-exchange和x-dead-letter-routing-key两个参数。
当消息在一个队列中变成死信后会按这两个参数路由,消息就可以重新被消费。
实例操作:
优点:
缺点:
所以需要确保业务上每个任务的延迟时间是一致的。如果有不同延时的任务,需要为每种不同延迟的任务单独创建消息队列,缺乏灵活性。
原理如下:
核心代码流程:
其原理是延迟消息会被保存到Mnesia表,在Exchange中根据每个message头设置的延迟时间x-delay,消息过期后才路由到对应队列。
实例操作:
安装插件
docker-compose.xml(将插件安装到容器中)
version: '2'
services:
rabbitmq:
hostname: rabbitmq
image: rabbitmq:3.6.8-management
mem_limit: 200m
ports:
- "5672:5672"
- "15672:15672"
volumes:
- ~/dockermapping/rabbitmq/lib:/var/lib/rabbitmq/
- /Users/oldlu/workspace/document/docker-compose/rabbitmq/rabbitmq_delayed_message_exchange-0.0.1.ez:/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.8/plugins/rabbitmq_delayed_message_exchange-0.0.1.ez
启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
核心函数
消息入队:internal_delay_message
启动Timer:maybe_delay_first
消息处理:handle_info
优点:
缺点:
有序集合(Sorted Set)是Redis提供的一种数据结构,具有set和hash的特点。
其中每个元素都关联一个score,并以这个score来排序。
其内部实现用到了两个数据结构:hash table和 skip list(跳跃表)
skip list的特点
主要命令
实现延迟队列
实例操作
root@redis:/usr/local/bin# redis-cli
127.0.0.1:6379> zadd delayqueue 1 task1
(integer) 1
127.0.0.1:6379> zadd delayqueue 2 task2
(integer) 1
127.0.0.1:6379> zadd delayqueue 4 task4
(integer) 1
127.0.0.1:6379> zadd delayqueue 3 task3
(integer) 1
127.0.0.1:6379>
127.0.0.1:6379> zrange delayqueue 0 0 withscores
1) "task1"
优点:
缺点:
DelayQueue是一个使用优先队列实现的BlockingQueue,优先队列比较的是时间,内部存储的是实现Delayed接口的对象。 只有在对象过期后才能从队列中获取对象。
内部结构
Leader/Followers
Leader/Followers是多个工作线程轮流进行事件监听、分发、处理的一种模式。 该模式最大的优点在于,它是自己监听事件并处理客户请求,从接收到处理都是在同一线程中完成, 所以不需要在线程之间传递数据,解决线程频繁切换带来的开销。
该模式工作的任何时间点,只有一个线程成为Leader ,负责事件监听,而其他线程都是Follower,在休眠中等待成为Leader。 该模式的工作线程存在三种状态,工作线程同一时间只能处于一种状态,这三种状态为:
核心源码分析:
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {//入队对象延迟时间是队列中最短的
leader = null;//重置leader
available.signal();//唤醒一个线程去监听新加入的对象
}
return true;
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();//队列为空,无限等待
else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0)//延迟时间已过,直接返回
return q.poll();
else if (leader != null)//已有leader在监听了,无限等待
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;//当前线程成为leader
try {
available.awaitNanos(delay);//在delay纳秒后唤醒
} finally {
if (leader == thisThread)// 入队一个最小延迟时间的对象时leader会被清空
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)//leader不存在且队列不为空,唤醒一个follower去成为leader去监听
available.signal();
lock.unlock();
}
}
优点:
缺点:
时间轮是一种环形的数据结构,分成多个格。
每个格代表一段时间,时间越短,精度越高。
每个格上用一个链表保存在该格的过期任务。
指针随着时间一格一格转动,并执行相应格子中的到期任务。
名词解释:
以上图为例,假设一个格子是1秒,则整个时间轮能表示的时间段为8s, 如果当前指针指向2,此时需要调度一个3s后执行的任务,需要放到第5个格子(2+3)中,指针再转3次就可以执行了。
单表时间轮存在的问题是:
格子的数量有限,所能代表的时间有限,当要存放一个10s后到期的任务怎么办?这会引起时间轮溢出。
有个办法是把轮次信息也保存到时间格链表的任务上。
如果任务要在10s后执行,算出轮次10/8 round等1,格子10%8等于2,所以放入第二格。
检查过期任务时应当只执行round为0的任务,链表中其他任务的round减1。
带轮次单表时间轮存在的问题是:
如果任务的时间跨度很大,数量很大,单层时间轮会造成任务的round很大,单个格子的链表很长,每次检查的量很大,会做很多无效的检查。怎么办?
过期任务一定是在底层轮中被执行的,其他时间轮中的任务在接近过期时会不断的降级进入低一层的时间轮中。
分层时间轮中每个轮都有自己的格数和间隔设置,当最低层的时间轮转一轮时,高一层的时间轮就转一个格子。
分层时间轮大大增加了可表示的时间范围,同时减少了空间占用。
举个例子:
上图的分层时间轮可表达8 8 8=512s的时间范围,如果用单表时间轮可能需要512个格子, 而分层时间轮只要8+8+8=24个格子,如果要设计一个时间范围是1天的分层时间轮,三个轮的格子分别用24、60、60即可。
工作原理:
时间轮指针转动有两种方式:
指针转到特定格子时有两种处理方式:
举个例子:
添加1个5s后执行的任务
添加一个50s后执行的任务
添加一个250s后执行的任务
优点:
缺点:
对于超出范围的任务可放在一个缓冲区中(可用队列、redis或数据库实现),等最高时间轮转到下一格子就从缓冲中取出符合范围的任务落到时间轮中。
比如:
调度系统提供任务操作接口供业务系统提交任务、取消任务、反馈执行结果等。
针对dubbo调用,将任务抽象成JobCallbackService接口,由业务系统实现并注册成服务。
整体架构
数据库:
内存队列:
ZooKeeper:
主节点:
调度节点:
业务系统:
数据库设计
表说明
主从切换
利用ZooKeeper临时序列节点特性,序号最小的节点为主节点,其他节点为从节点。
主节点监听集群状态,集群状态发生变化时重新分片。
从节点监听序号比它小的兄弟节点,兄弟节点发生变化重新寻找和建立监听关系。
数据分片
任务状态
主要流程
服务加载
提交任务
定时器
调度任务
任务反馈
优点
缺点
如果引入MQ,使用MQ来解耦服务调用的协议,保证任务的重试,并由消费方根据自己的处理能力控制流量会不会更好呢?
整体架构
数据库设计
主要流程
调度任务
缺点
需要业务系统依赖于MQ
本文来自网易实践者社区,经作者陈志良授权发布。