Thursday, September 20, 2018

HashedWheelTimer



https://blog.acolyer.org/2015/11/23/hashed-and-hierarchical-timing-wheels/
https://www.confluent.io/blog/apache-kafka-purgatory-hierarchical-timing-wheels/
Today’s choice is the 1987 paper by Varghese and Lauck in which they study a number of different approaches for the efficient management of timers, and introduce the concept of hierarchical timing wheels as used by Kafka. They model timers as composed of two user-facing operations, startand stop, and two internal operations: per-tick bookkeeping and expiry processing.
  • Start timer is called by clients specifying a timer duration and a callback. In the author’s model the client also passes in a request id to distinguish the timer, though nowadays we would be more inclined to return a timer-id in response to a start-timer request.
  • Stop timer takes a request (timer) id and finds and stops (removes) the associated timer.
  • Per-tick bookkeeping happens on every ‘tick’ of the timer clock. If the unit of granularity for setting timers is T units of time (e.g. 1 second), then per-tick bookkeeping will happen every T units of time. It checks whether any outstanding timers have expired, and if so it removes them and invokes expiry processing.
  • Expiry processing is responsible for invoked the user-supplied callback (or other user requested action, depending on your model).

7. Hierarchical Timing Wheels

Another way to deal with the memory issues caused by the simple timing wheel approach is to use multiple timing wheels in a hierarchy. Suppose we want to store timers with second granularity, that can be set for up to 100 days in the future. We might construct four wheels:
  • days wheel with 100 slots
  • An hours wheel with 24 slots
  • minutes wheel with 60 slots
  • seconds wheel with 60 slots
This is a total of 244 slots to address a total of 8.64 million possible timer values. Every time we make one complete revolution in a wheel, we advance the next biggest wheel by one slot (the paper describes a slight variation with minute, hour, and day ticking clocks, but the effect is the same). For example, when the seconds wheel has rotated back to index ‘0’ we move the index pointer in the minutes wheel round by one position. We then take all the timers in that slot on the minutes wheel (which are now due to expire within the next 60 seconds) and insert them into their correct positions in the seconds wheel. Expiry time processing in the seconds wheel works exactly as described in scheme 4 (it’s just a simple timer wheel that happens to get replenished on every revolution).
To insert a timer, find the first wheel (from largest units to smallest) for which the timer should expire 1 or more wheel-units into the future. For example, a timer due to expire 11 hours, 15 minutes and 15 seconds into the future would be inserted at slot ‘current-index + 11’ in the hours wheel , storing the remainder of 15 minutes and 15 seconds with the timer. After the hours wheel has advanced by 11 positions, this timer will be removed from that wheel and inserted at ‘current index + 15’ slots round in the minutes wheel, storing the remainder of 15 seconds. When the minutes wheel has subsequently advanced by 15 positions, this timer will be removed from the wheel and placed in the seconds wheel at ‘current index + 15’ slots round in the seconds wheel. 15 seconds later, the timer will expire!
Note: the paper uses the seconds, minutes, hours, days example and it certainly makes it easy to follow, but if you just get given timers as e.g. tseconds in the future for up to a 32 bit timer value, then it would be more efficient to simply divide this into four wheels with 28 slots in each, or similar (which makes it very efficient to determine which wheel an entry should go in).

https://github.com/netty/netty/blob/4.1/common/src/main/java/io/netty/util/HashedWheelTimer.java
https://gborah.wordpress.com/2011/08/01/timeout-management-using-hashed-timing-wheels/

https://github.com/ifesdjeen/hashed-wheel-timer
The concept on the Timer Wheel is rather simple to understand: in order to keep track of events on given resolution, an array of linked lists (alternatively - sets or even arrays, YMMV) is preallocated. When event is scheduled, it's address is found by dividing deadline time t by resolution and wheel size. The registration is then assigned with rounds (how many times we should go around the wheel in order for the time period to be elapsed).
For each scheduled resolution, a bucket is created. There are wheel size buckets, each one of which is holding Registrations. Timer is going through each bucket from the first until the next one, and decrements rounds for each registration. As soon as registration's rounds is reaching 0, the timeout is triggered. After that it is either rescheduled (with same offset and amount of rounds as initially) or removed from timer.

https://segmentfault.com/a/1190000010987765

http://mp.weixin.qq.com/s/mvFwjgxliwx808Hn_9ruEA
很多时候,业务有定时任务或者定时超时的需求,当任务量很大时,可能需要维护大量的timer,或者进行低效的扫描。

例如58到家APP实时消息通道系统,对每个用户会维护一个APP到服务器的TCP连接,用来实时收发消息,对这个TCP连接,有这样一个需求:“如果连续30s没有请求包(例如登录,消息,keepalive包),服务端就要将这个用户的状态置为离线”。

其中,单机TCP同时在线量约在10w级别,keepalive请求包大概30s一次,吞吐量约在3000qps

一般来说怎么实现这类需求呢?

“轮询扫描法”
1)用一个Map<uid, last_packet_time>来记录每一个uid最近一次请求时间last_packet_time
2)当某个用户uid有请求包来到,实时更新这个Map
3启动一个timer,当Map中不为空时,轮询扫描这个Map,看每个uidlast_packet_time是否超过30s,如果超过则进行超时处理 
效率低下。已经被执行过记录,仍然会被扫描(只是不会出现在结果集中),存在大量的重复计算; 时效性差。时间误差取决于轮询的间隔;如果间隔过小,重复被扫描的次数更高,效率会变得更低下。  

“多timer触发法”
1)用一个Map<uid, last_packet_time>来记录每一个uid最近一次请求时间last_packet_time
2)当某个用户uid有请求包来到,实时更新这个Map,并同时对这个uid请求包启动一个timer30s之后触发
3每个uid请求包对应的timer触发后,看Map中,查看这个uidlast_packet_time是否超过30s,如果超过则进行超时处理

方案一:只启动一个timer,但需要轮询,效率较低
方案二:不需要轮询,但每个请求包要启动一个timer,比较耗资源

特别在同时在线量很大时,很容易CPU100%如何高效维护和触发大量的定时/超时任务,是本文要讨论的问题


二、环形队列法
废话不多说,三个重要的数据结构:
130s超时,就创建一个index0到30环形队列(本质是个数组)
2)环上每一个slot是一个Set<uid>,任务集合
3)同时还有一个Map<uid, index>,记录uid落在环上的哪个slot


同时
1)启动一个timer,每隔1s,在上述环形队列中移动一格0->1->2->3…->29->30->0…
2)有一个Current Index指针来标识刚检测过的slot

当有某用户uid有请求包到达时
1)从Map结构中,查找出这个uid存储在哪一个slot
2从这个slotSet结构中,删除这个uid
3uid重新加入到新的slot,具体是哪一个slot => Current Index指针所指向的上一个slot,因为这个slot,会被timer30s之后扫描到
4更新Map,这个uid对应slotindex

哪些元素会被超时掉呢?
Current Index每秒种移动一个slot,这个slot对应的Set<uid>中所有uid都应该被集体超时!如果最近30s有请求包来到,一定被放到Current Index的前一个slot了,Current Index所在的slot对应Set中所有元素,都是最近30s没有请求包来到的。

所以,当没有超时时,Current Index扫到的每一个slotSet中应该都没有元素。

优势
1只需要1timer
2timer1s只需要一次触发,消耗CPU很低
3批量超时Current Index扫到的slotSet中所有元素都应该被超时掉
public class HashWheelTimer {
    private final static Logger logger = LoggerFactory.getLogger(HashWheelTimer.class);
    private static AtomicInteger ai = new AtomicInteger(0);
    private static int size = 11;
    private static Map<String, Integer> map = new ConcurrentHashMap<>();
    private static Set[] uidArr = new Set[size];
    private ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
    private static Object lock=new Object();
    public HashWheelTimer() {
        for (int i = 0; i < size; i++) {
            Set set = new HashSet();
            uidArr[i]=set;
        }
        pool.scheduleAtFixedRate(new Runnable() {
            public void run() {
            synchronized (lock) {
                int curIndex = ai.getAndIncrement()% size;
                    logger.info("curIndex:{},ele:{}", curIndex, uidArr[curIndex]);
                    Set s = uidArr[curIndex];
                    if (s != null && s.size() > 0) {
                        logger.info("超时了:{}", s);
                        s.clear();
                    }
                    
                
                }
                
            }
        }, 0, 1, TimeUnit.SECONDS);
    }

    private static void testArrive(String uid) {
        synchronized (lock) {
           Integer index =ai.get()%size;
        logger.info("-- arrive index:{}",index);
           if(map.get(uid)!=null){
           Integer oldIndex=map.get(uid);
            Set s = uidArr[oldIndex];
            s.remove(uid);
           }
            int  lastIndex = (index-1);
            uidArr[lastIndex].add(uid);
            map.put(uid, lastIndex);
            logger.info("arrive,uid:{},last index:{}", uid, lastIndex);
        }       
    }

三、总结
这个环形队列法是一个通用的方法,SetMap中可以是任何task,本文的uid是一个最简单的举例。

HashedWheelTimer也是类似的原理,有兴趣的同学可以百度一下这个数据结构,Netty中的一个工具类,希望大家有收获,帮忙转发一下哈。

上述展示描述了一种业务场景,通过环形队列的方式我们还可以处理很多类似场景。 某打车软件订单完成后,如果用户一直不评价,48小时后会将自动评价为5星; 某数据产品用户修改设置,1小时后生效; ------ 


环形队列是一个实现“延时消息”的好方法


Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts