Friday, March 9, 2018

Design Delayed Job Scheduler



Related: http://massivetechinterview.blogspot.com/2015/11/java-delayqueue.html
请教一道面试题 -- delay scheduler
经常在面经里看到要求implement a delay scheduler这道题,就是给一个任务和指定的delay时间,要求把task schedule在指定的时间执行,比如schedule method长这样:Future schedule(int delay_time, Runnable task)。

我对这样的题目感到无从下手,也一直没看到一个比较详细的解题思路,根据目前搜集到的信息,感觉是不是应该这样做:

1.  当拿到一个任务,先把当前的timestamp加上指定的delay_time,形成一个新的timestamp,然后把这个新的timestamp连同task本身存到一个自定义的类里面,比如这个类叫DelayedTask.
2. 维护一个PriorityQueue,把DelayedTask存到PriorityQueue里面,timestamp最小的在栈顶。
3. 用一个loop不停比较栈顶task的timestamp和当前时间,当二者相等时,用ExecutorService来submit Runnable task,从而得到一个Future object,然后返回这个Future object。

这样的作法可行吗?还有就是如果发现 当前时间超过了栈顶task本来plan好要执
行的时间要怎么处理呢?

如果是考察对java API了解程度的话,直接说用DelayQueue应该就可以了。不过看之前面经貌似这题是让自己设计一个类似DelayQueue的结构

PriorityBlockingQueue + polling, 是大家最容易想到的方案,然而轮询通常有个很大的缺点,就是时间间隔不好设置,间隔太长,任务无法及时处理,间隔太短,会很耗CPU。

比较好的两个方案是 DelayQueue 和 HashedWheelTimer https://soulmachine.gitbooks.io/system-design/content/cn/task-scheduler.html

我觉得这个是系统设计吧 如果回答成这个样子就变成考算法了。

一般这样的系统需要数据存储的 否则client create一些future tasks 你的机器重新启动不就全丢掉了。。。所以你的future tasks应该是存在某种数据库里里面的 然后有另外的process去poll这个数据库  具体的讨论就是怎么高效的poll这个数据库一类的
http://www.rowkey.me/blog/2017/12/28/delay-trigger/
https://soulmachine.gitbooks.io/system-design/content/cn/task-scheduler.html
请实现一个定时任务调度器,有很多任务,每个任务都有一个时间戳,任务会在该时间点开始执行。
定时执行任务是一个很常见的需求,所以这题也是从实践中提炼出来的,做好了将来说不定能用上。
仔细分析一下,这题可以分为三个部分:
+

  • 优先队列。因为多个任务需要按照时间从小到大排序,所以需要用优先队列。
  • 生产者。不断往队列里塞任务。
  • 消费者。如果队列里有任务过期了,则取出来执行该任务。

方案1: PriorityBlockingQueue + Polling

我们很快可以想到第一个办法:
  • 用一个java.util.concurrent.PriorityBlockingQueue来作为优先队列。因为我们需要一个优先队列,又需要线程安全,用PriorityBlockingQueue再合适不过了。你也可以手工实现一个自己的PriorityBlockingQueue,用java.util.PriorityQueue + ReentrantLock,用一把锁把这个队列保护起来,就是线程安全的啦
  • 对于生产者,可以用一个while(true),造一些随机任务塞进去
  • 对于消费者,起一个线程,在 while(true)里每隔几秒检查一下队列,如果有任务,则取出来执行。
这个方案的确可行,总结起来就是轮询(polling)。轮询通常有个很大的缺点,就是时间间隔不好设置,间隔太长,任务无法及时处理,间隔太短,会很耗CPU。

方案2: PriorityBlockingQueue + 时间差

可以把方案1改进一下,while(true)里的逻辑变成:
  • 偷看一下堆顶的元素,但并不取出来,如果该任务过期了,则取出来
  • 如果没过期,则计算一下时间差,然后 sleep()该时间差
不再是 sleep() 一个固定间隔了,消除了轮询的缺点。

方案3: DelayQueue

方案2虽然已经不错了,但是还可以优化一下,Java里有一个DelayQueue,完全符合题目的要求。DelayQueue 设计得非常巧妙,可以看做是一个特化版的PriorityBlockingQueue,它把计算时间差并让消费者等待该时间差的功能集成进了队列,消费者不需要关心时间差的事情了,直接在while(true)里不断take()就行了。

方案3: DelayQueue

方案2虽然已经不错了,但是还可以优化一下,Java里有一个DelayQueue,完全符合题目的要求。DelayQueue 设计得非常巧妙,可以看做是一个特化版的PriorityBlockingQueue,它把计算时间差并让消费者等待该时间差的功能集成进了队列,消费者不需要关心时间差的事情了,直接在while(true)里不断take()就行了。

DelayQueue这个方案,每个消费者线程只需要等待所需要的时间差,因此响应速度更快。它内部用了一个优先队列,所以插入和删除的时间复杂度都是\log n
+

JDK里还有一个ScheduledThreadPoolExecutor,原理跟DelayQueue类似,封装的更完善,平时工作中可以用它,不过面试中,还是拿DelayQueue来讲吧,它封装得比较薄,容易讲清楚原理。
public class DelayQueue<E extends Delayed> {
    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    private final Condition available = lock.newCondition();
    private Thread leader = null;
    public boolean put(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
1. put()方法
if (q.peek() == e) {
    leader = null;
    available.signal();
}
如果第一个元素等于刚刚插入进去的元素,说明刚才队列是空的。现在队列里有了一个任务,那么就应该唤醒所有在等待的消费者线程。将leader重置为null,这些消费者之间互相竞争,自然有一个会被选为leader。
2. 线程leader的作用
leader这个成员有啥作用?它主要是为了减少不必要的等待时间。比如队列头部还有5秒就要开始了,那么就让消费者线程sleep 5秒,消费者不再需要等待固定的时间间隔了。
想象一下有个多个消费者线程用take方法去取任务,内部先加锁,然后每个线程都去peek头节点。如果leader不为空说明已经有线程在取了,让当前消费者无限等待。
if (leader != null)
   available.await();
如果为空说明没有其他消费者去取任务,设置leader为当前消费者,并让改消费者等待指定的时间,
else {
    Thread thisThread = Thread.currentThread();
    leader = thisThread;
    try {
         available.awaitNanos(delay);
    } finally {
         if (leader == thisThread)
             leader = null;
    }
}
下次循环会走如下分支,取到任务结束,
if (delay <= 0)
    return q.poll();
3. take()方法中为什么释放first
first = null; // don't retain ref while waiting
我们可以看到 Doug Lea 后面写的注释,那么这行代码有什么用呢?
如果删除这行代码,会发生什么呢?假设现在有3个消费者线程,
  • 线程A进来获取first,然后进入 else 的 else ,设置了leader为当前线程A,并让A等待一段时间
  • 线程B进来获取first, 进入else的阻塞操作,然后无限期等待,这时线程B是持有first引用的
  • 线程A等待指定时间后被唤醒,获取对象成功,出队,这个对象理应被GC回收,但是它还被线程B持有着,GC链可达,所以不能回收这个first
  • 只要线程B无限期的睡眠,那么这个本该被回收的对象就不能被GC销毁掉,那么就会造成内存泄露

JDK中有一个接口java.util.concurrent.Delayed,可以用于表示具有过期时间的元素,刚好可以拿来表示任务这个概念。
DelayQueue这个方案,每个消费者线程只需要等待所需要的时间差,因此响应速度更快。它内部用了一个优先队列,所以插入和删除的时间复杂度都是\log n
JDK里还有一个ScheduledThreadPoolExecutor,原理跟DelayQueue类似,封装的更完善,平时工作中可以用它,不过面试中,还是拿DelayQueue来讲吧,它封装得比较薄,容易讲清楚原理。



+

方案4: HashedWheelTimer
https://my.oschina.net/haogrgr/blog/489320
时间轮(HashedWheelTimer)其实很简单,就是一个循环队列,如下图所示,
上图是一个长度为8的循环队列,假设该时间轮精度为秒,即每秒走一格,像手表那样,走完一圈就是8秒。每个格子指向一个任务集合,时间轮无限循环,每转到一个格子,就扫描该格子下面的所有任务,把时间到期的任务取出来执行。
举个例子,假设指针当前正指向格子0,来了一个任务需要4秒后执行,那么这个任务就会放在格子4下面,如果来了一个任务需要20秒后执行怎么?由于这个循环队列转一圈只需要8秒,这个任务需要多转2圈,所以这个任务的位置虽然依旧在格子4(20%8+0=4)下面,不过需要多转2圈后才执行。因此每个任务需要有一个字段记录需圈数,每转一圈就减1,减到0则立刻取出来执行。
怎么实现时间轮呢?Netty中已经有了一个时间轮的实现, HashedWheelTimer.java,可以参考它的源代码。
时间轮的优点是性能高,插入和删除的时间复杂度都是O(1)。Linux 内核中的定时器采用的就是这个方案。
Follow up: 如何设计一个分布式的定时任务调度器呢? 答: Redis ZSet, RabbitMQ等
+
https://www.jianshu.com/p/7beebbc61229
对于这种延时任务,我们一般有以下的4中解决方式:
  • 利用quartz等定时任务
  • delayQueue
  • wheelTimer
  • rabbitMq的延迟队列

利用quartz等定时任务

相信目前还有很多的公司依然沿用着这种做法,那么利用quartz怎么解决这个延时任务的问题呢?
具体的方式就是这样的,比如我们有个下单15分钟后用户不付款就关闭订单的任务.我的订单是存储在mysql的一个表里,表里会有各种状态和创建时间.
利用quartz来设定一个定时任务,我们暂时设置为每5分钟扫描一次.扫描的条件为未付款并且当前时间大于创建时间超过15分钟.然后我们再去逐一的操作每一条数据.
优点: 简单易用,可以利用quartz的分布式特性轻易的进行横向扩展。
缺点: 需要扫表会增加程序负荷、任务执行不够准时。

怎么使用delayQueue呢?

优点: 效率高,任务触发时间延迟低。
缺点: 复杂度比quartz要高,自己要处理分布式横向扩展的问题,因为数据是放在内存里,需要自己写持久化的备案以达到高可用。
可以将 HashedWheelTimer 理解为一个 Set<Task>[] 数组, 图中每个槽位(slot)表示一个 Set<Task>
HashedWheelTimer 有两个重要参数
tickDuration: 每 tick 一次的时间间隔, 每 tick 一次就会到达下一个槽位
ticksPerWheel: 轮中的 slot 数
上图就是一个 ticksPerWheel = 8 的时间轮, 假如说 tickDuration = 100 ms, 则 800ms 可以走完一圈
在 timer.start() 以后, 便开始 tick, 每 tick 一次, timer 会将记录总的 tick 次数 ticks
我们加入一个新的超时任务时, 会根据超时的任务的超时时间与时间轮开始时间算出来它应该在的槽位.

怎么使用WheelTimer呢?

在netty中已经有了时间轮算法的实现HashWheelTimer,HashWheelTimer的使用非常的简单:先new一个HashedWheelTimer,然后调用它的newTimeout方法传递相应的延时任务就ok了。

这个方法需要一个TimerTask对象以知道当时间到时要执行什么逻辑,然后需要delay时间数值和TimeUnit时间的单位

优点: 效率高,根据楼主自己写的测试,在大量高负荷的任务堆积的情况下,HashWheelTimer基本要比delayQueue低上一倍的延迟率.netty中也有了时间轮算法的实现,实现难度低
缺点: 内存占用相对较高,对时间精度要求相对不高.和delayQueue有着相同的问题,自己要处理分布式横向扩展的问题,因为数据是放在内存里,需要自己写持久化的备案以达到高可用。

如何使用rabbitMq的延迟队列

  • AMQP和RabbitMQ本身没有直接支持延迟队列功能,但是可以通过以下特性模拟出延迟队列的功能。
  • RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter
  • lRabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,用来控制队列内出现了deadletter,则按照这两个参数重新路由。
  • 结合以上两个特性,就可以模拟出延迟消息的功能
具体实现可参照官方文档:
http://www.rabbitmq.com/ttl.html
http://www.rabbitmq.com/dlx.html
优点: 高效,可以利用rabbitmq的分布式特性轻易的进行横向扩展,消息支持持久化增加了可靠性。
缺点: 本身的易用度要依赖于rabbitMq的运维.因为要引用rabbitMq,所以复杂度和成本变高



https://medium.com/netflix-techblog/distributed-delay-queues-based-on-dynomite-6b31eca37fbc
Traditionally, we have been using a Cassandra based queue recipe along with Zookeeper for distributed locks, since Cassandra is the de facto storage engine at Netflix. Using Cassandra for queue like data structure is a known anti-pattern, also using a global lock on queue while polling, limits the amount of concurrency on the consumer side as the lock ensures only one consumer can poll from the queue at a time. This can be addressed a bit by sharding the queue but the concurrency is still limited within the shard.
We wanted the following in the queue recipe:
  1. Distributed
  2. No external locks (e.g. Zookeeper locks)
  3. Highly concurrent
  4. At-least-once delivery semantics
  5. No strict FIFO
  6. Delayed queue (message is not taken out of the queue until some time in the future)
  7. Priorities within the shard


A queue is stored as a sorted set (ZADD, ZRANGE etc. operations) within Redis. Redis sorts the members in a sorted set using the provided score. When storing an element in the queue, the score is computed as a function of the message priority and timeout (for timed queues).

For each queue three set of Redis data structures are maintained:
  1. A Sorted Set containing queued elements by score.
  2. A Hash set that contains message payload, with key as message ID.
  3. A Sorted Set containing messages consumed by client but yet to be acknowledged. Un-ack set.
Push
  • Calculate the score as a function of message timeout (delayed queue) and priority
  • Add to sortedset for queue
  • Add message payload by ID into Redis hashed set with key as message ID.
Poll
  • Calculate max score as current time
  • Get messages with score between 0 and max
  • Add the message ID to unack set and remove from the sorted set for the queue.
  • If the previous step succeeds, retrieve the message payload from the Redis set based on ID
Ack
  • Remove from unack set by ID
  • Remove from the message payload set
Messages that are not acknowledged by the client are pushed back to the queue (at-least once semantics).
Availability Zone / Rack Awareness
Our queue recipe was built on top of Dynomite’s Java client, Dyno. Dyno provides connection pooling for persistent connections, and can be configured to be topology aware (token aware). Moreover, Dyno provides application specific local rack (in AWS a rack is a zone, e.g. us-east-1a, us-east-1b etc.) affinity based on request routing to Dynomite nodes. A client in us-east-1a will connect to a Dynomite/Redis node in the same AZ (unless the node is not available, in which case the client will failover). This property is exploited for sharding the queues by availability zone.

Sharding
Queues are sharded based on the availability zone. When pushing an element to the queue, the shard is determined based on round robin. This will ensure eventually all the shards are balanced. Each shard represents a sorted set on Redis with key being combination of queueName & AVAILABILITY _ZONE.

Dynomite consistency
The message broker uses a Dynomite cluster with consistency level set to DC_SAFE_QUORUM. Reads and writes are propagated synchronously to quorum number of nodes in the local data center and asynchronously to the rest. The DC_SAFE_QUORUM configuration writes to the number of nodes that make up a quorum. A quorum is calculated, and then rounded down to a whole number. This consistency level ensures all the writes are acknowledged by majority quorum.
Avoiding Global Lock




  • Each node (N1…Nn in the above diagram) has affinity to the availability zone and talks to the redis servers in that zone.
  • A Dynomite/Redis node serves only one request at a time. Dynomite can hold thousands of concurrent connections, however requests are processed by a single thread inside Redis. This ensures when two concurrent calls are issued to poll an element from queue, they are served sequentially by Redis server avoiding any local or distributed locks on the message broker side.
  • In an event of failover, DC_SAFE_QUORUM write ensures no two client connections are given the same message out of a queue, as write to UNACK collection will only succeed for a single node for a given element. This ensures if the same element is picked up by two broker nodes (in an event of a failover connection to Dynomite) only one will be able to add the message to the UNACK collection and another will receive failure. The failed node then moves onto peek another message from the queue to process.

Queue Rebalancing
Useful when queues are not balanced or new availability zone is added or an existing one is removed permanently.
Handling Un-Ack’ed messages
A background process monitors for the messages in the UNACK collections that are not acknowledged by a client in a given time (configurable per queue). These messages are moved back into the queue.

Multiple consumers
A modified version can be implemented, where the consumer can “subscribe” for a message type (message type being metadata associated with a message) and a message is delivered to all the interested consumers.
Ephemeral Queues
Ephemeral queues have messages with a specified TTL and are only available to consumer until the TTL expires. Once expired, the messages are removed from queue and no longer visible to consumer. The recipe can be modified to add TTL to messages thereby creating an ephemeral queue. When adding elements to the Redis collections, they can be TTLed, and will be removed from collection by Redis upon expiry.

  1. KafkaKafka provides robust messaging solution with at-least once delivery semantics. Kafka lends itself well for message streaming use cases. Kafka makes it harder to implement the semantics around priority queues and time based queue (both are required for our primary use case). Case can be made to create large number of partitions in a queue to handle client usage — but then again adding a message broker in the middle will complicate things further.
  2. SQSAmazon SQS is a viable alternative and depending upon the use case might be a good fit. However, SQS does not support priority or time based queues beyond 15 minute delay.
  3. DisqueDisque is a project that aims to provide distributed queues with Redis like semantics. At the time we started working on this project, Disque was in beta (RC is out).
  4. Zookeeper (or comparable) distributed locks / coordinator based solutions.A distributed queue can be built with Cassandra or similar backend with zookeeper as the global locking solution. However, zookeeper quickly becomes the bottleneck as the no. of clients grow adding to the latencies. Cassandra itself is known to have queues as anti-pattern use case.

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts