Saturday, August 22, 2015

架构必备:Rate limiting 的作用和常见方式 - 互联网技术和架构



限流的五种使用Redis的实现 TODO
限流是常考的一道面试题。也是工业界常用的必备功能。用于保护服务以免受到滥用和攻击。一亩三分地也有这个功能:「抱歉,您所在的用户组每小时限制发回帖x个,请稍候再发表」。能够实现Rate Limit的方法很多很多,比如用Nginx。但是如果要实现分布式,高并发,低延迟,似乎离不开Redis。

先举些各大公司的限流例子:

  • 谷歌 Google Sheets API has a limit 100 requests per 100 seconds per user. Gmail Daily Usage 1,000,000,000 quota units per day,  250 quota units per user per second, moving average (allows short bursts)
  • 脸书 4800 calls per daily engaged. This 24-hour period is a sliding window that is updated every few minutes.
  • 推特 15 calls every 15 minutes

大家都知道的Token Bucket是限流的主打算法。但是除了它以外,还有如下,总共五种常见设计:
  • Leaky Bucket
  • Fixed Window
  • Sliding Window Log
  • Sliding Window Prorate


Token Bucket 和 Leaky Bucket

Token Bucket 和 Leaky Bucket 虽然理论上有很多不同点。前者固定速率添加Token,如果Token不够则拒绝请求。后者请求形成队列。队列以某种速度处理,因而带来延迟。超过队列能够承受的请求被拒绝。前者无法防止短时间的密集访问,后者严格控制处理速度即便短时间的密集访问也能够通过延迟而平滑。

但是考虑到低延迟这一需求,从Redis上的实现上看基本是一种实现。即:
  • 你访问和上次访问间差距超过配额要求的时间段,那么我给你若干Tokens,
  • 然后每次访问都消耗Token。如果消耗完了,就会拒绝请求(但是不记录你拒绝访问的时间,这样最后一次被允许的访问的时间就会一直被记着)。直到下次访问和上次访问间差距超过配额要求的时间段。
  • 重复。

http://systemdesigns.blogspot.com/2015/12/rate-limiter.html
接下来就继续算法实现部分,这部分,我主要是根据九章的讲义进行介绍。假设面试官要我们Limit QPS = 5, 我们可以怎么做呢?
我相信最容想到的方法就是: 保证每两次请求之间的间隔 》= 1/5 =0.2m。这个就是Algorithm of Gap. 也就是第一重要介绍的算法---间隔算法。

      1.    Algorithm of Gap


Bad case:
关于Gap算法是有个明显的问题的,大家看出来没?就是有可能规定的时间内,在这里是一分钟,可能会多于5个请求。




      2.    Algorithm of Time-bucket
           这里介另外一种算法,Time-Bucket.


就是每一秒钟,建一个bucket,如果某个bucket已经超5个,后面的request就放了。
对这个算法,也是有问题的:
1mCounter[]会非常浪费空间;
2)没法保证每个1s期间只接受5个及以内请求。举个例子:0.9s进来5个请求,1.1s也进来5个请求,在0~1s1~2s两个bucket内各自满足了5个请求,但是0.5~1.5s这个1sbucket里就有10个请求。


      3.    Time-bucket with Database

关于time-bucket算法,如果用database实现的话,其实不难。但是也存在以上的两个问题。

Implement a limit rater using Redishttp://redis.io/commands/INCR#pattern-rate-limiter


4.    Algorithm of One-bucket

针对23算法的第一个缺点:storing counter for each sec比较浪费空间,这里提出One-bucket算法,核心思想是只maintain一个count变量,每一秒过去后,把这个bucket清空就好。


Bad case:
但是之前23种算法中提到的第二个缺点仍然存在,也就是每个1s内可能接收多于5request


所以one-bucket相对于time-bucket,改进的是空间问题,但还没解决任何一秒内只有不多与5个请求的问题。

5.    Algorithm of RequestList

要解决234算法的qps>5的缺陷,我们提出新的RequestList算法,用额外的数据结构记录下之前的request timestamp




     但是 Algorithm of requestList记录下了所有request的信息,这个明显没必要的。



6.    Fixed RequestList

为了解决算法5中提到的空间问题,这里提出了Fixed RequestList算法,只需要记录5requesttimestamp即可。



         7.    Algorithm of Token Bucket
  
  九章里还提到了Token Bucket算法:https://en.wikipedia.org/wiki/Token_bucket

  另外建议大家去看下Google Guava 里面的rate limiter的实现


3. How to improve?

·       Follow-up Questions:
           1.    How to save space with 10^9 query per hour?      Batch queries
           2.    How to support multiple threads?      Lock
           3.    How to support limiter on users?       <uid, requestList>
           4.    How to support query with different quotas?      Acquire ( quota )

http://blog.csdn.net/big_gutan/article/details/46413167
计数器(简单粗暴)
简单维护一个单位时内的计数器,请求过来则递增,计算器过期则清理。
long timeStamp=getNowTime();
int reqCount=0;
const int rate=100;//时间周期内最大请求数
const long interval=1000;//时间控制周期ms

bool grant(){
    long now=getNowTime();
    if (now <timeStamp+interval){//在时间控制范围内
        reqCount++;
        return reqCount>rate;//当前时间范围内超过最大请求控制数
    }else{
        timeStamp=now;//超时后重置
        reqCount=0;
        return true;
    }
}

漏桶算法(Leaky Bucket)

水(请求)先进入到漏桶里,漏桶以恒定的速度出水,当水流入速度过大会导致溢出。
漏桶算法示意图
伪代码:

long timeStamp=getNowTime();        
int capacity;        // 桶的容量
int rate ;          //水漏出的速度
int water;          //当前水量

bool grant() {
  //先执行漏水,因为rate是固定的,所以可以认为“时间间隔*rate”即为漏出的水量
  long  now = getNowTime();
  water = max(0, water- (now - timeStamp)*rate);
  timeStamp = now;

  if (water < capacity) { // 水还未满,加水
    water ++;
    return true;
  } else {
    return false;//水满,拒绝加水
  }
}
在某些情况下,漏桶算法不能够有效地使用网络资源。因为漏桶的漏出速率是固定的参数,所以,即使网络中不存在资源冲突(没有发生拥塞),漏桶算法也不能使某一个单独的流突发到端口速率。因此,漏桶算法对于存在突发特性的流量来说缺乏效率。而令牌桶算法则能够满足这些具有突发特性的流量。通常,漏桶算法与令牌桶算法可以结合起来为网络流量提供更大的控制。

令牌桶算法(Token Bucket)

令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。
令牌桶的另外一个好处是可以方便的改变速度。 一旦需要提高速率,则按需提高放入桶中的令牌的速率。
一般会定时(比如100毫秒)往桶中增加一定数量的令牌, 有些变种算法则实时的计算应该增加的令牌的数量, 比如华为的专利"采用令牌漏桶进行报文限流的方法"(CN 1536815 A),提供了一种动态计算可用令牌数的方法, 相比其它定时增加令牌的方法, 它只在收到一个报文后,计算该报文与前一报文到来的时间间隔内向令牌漏桶内注入的令牌数, 并计算判断桶内的令牌数是否满足传送该报文的要求。
public class FixedIntervalRefillStrategy implements TokenBucketImpl.RefillStrategy
{
  private final Ticker ticker;
  private final long numTokensPerPeriod;
  private final long periodDurationInNanos;
  private long lastRefillTime;
  private long nextRefillTime;

  /**
   * Create a FixedIntervalRefillStrategy.
   *
   * @param ticker             A ticker to use to measure time.
   * @param numTokensPerPeriod The number of tokens to add to the bucket every period.
   * @param period             How often to refill the bucket.
   * @param unit               Unit for period.
   */
  public FixedIntervalRefillStrategy(Ticker ticker, long numTokensPerPeriod, long period, TimeUnit unit)
  {
    this.ticker = ticker;
    this.numTokensPerPeriod = numTokensPerPeriod;
    this.periodDurationInNanos = unit.toNanos(period);
    this.lastRefillTime = -periodDurationInNanos;
    this.nextRefillTime = -periodDurationInNanos;
  }

  @Override
  public synchronized long refill()
  {
    long now = ticker.read();
    if (now < nextRefillTime) {
      return 0;
    }

    // We now know that we need to refill the bucket with some tokens, the question is how many.  We need to count how
    // many periods worth of tokens we've missed.
    long numPeriods = Math.max(0, (now - lastRefillTime) / periodDurationInNanos);

    // Move the last refill time forward by this many periods.
    lastRefillTime += numPeriods * periodDurationInNanos;

    // ...and we'll refill again one period after the last time we refilled.
    nextRefillTime = lastRefillTime + periodDurationInNanos;

    return numPeriods * numTokensPerPeriod;
  }

  @Override
  public long getDurationUntilNextRefill(TimeUnit unit)
  {
    long now = ticker.read();
    return unit.convert(Math.max(0, nextRefillTime - now), TimeUnit.NANOSECONDS);
  }
}

public synchronized boolean tryConsume(long numTokens)
{
  checkArgument(numTokens > 0, "Number of tokens to consume must be positive");
  checkArgument(numTokens <= capacity, "Number of tokens to consume must be less than the capacity of the bucket.");

  refill(refillStrategy.refill());

  // Now try to consume some tokens
  if (numTokens <= size) {
    size -= numTokens;
    return true;
  }

  return false;
}
public void consume(long numTokens)
{
  while (true) {
    if (tryConsume(numTokens)) {
      break;
    }

    sleepStrategy.sleep();
  }
}

以恒定速度往桶里放入Token,请求过来需从桶里获取Token,当请求速度过大会导致获取Token失败。
令牌桶算法示意图
Google开源工具包Guava提供了限流工具类RateLimiter,该类基于令牌桶算法来完成限流

基于上述计数器方式,将单位时间切割到更小的时间片,每个时间片维护一个计数器,随着时间推送,清理单位时间外的所有计数器,统计当前单位时间内的所有计数器。
其他算法示意图

http://xiaobaoqiu.github.io/blog/2015/07/02/ratelimiter/
Guava RateLimit
源码注释中的一个例子,比如我们有很多任务需要执行,但是我们不希望每秒超过两个任务执行,那么我们就可以使用RateLimiter:



1
2
3
4
5
6
7
final RateLimiter rateLimiter = RateLimiter.create(2.0);
void submitTasks(List<Runnable> tasks, Executor executor) {
    for (Runnable task : tasks) {
        rateLimiter.acquire(); // may wait
        executor.execute(task);
    }
}

另外一个例子,假如我们会产生一个数据流,然后我们想以每秒5kb的速度发送出去.我们可以每获取一个令牌(permit)就发送一个byte的数据,这样我们就可以通过一个每秒5000个令牌的RateLimiter来实现:



1
2
3
4
5
final RateLimiter rateLimiter = RateLimiter.create(5000.0);
void submitPacket(byte[] packet) {
    rateLimiter.acquire(packet.length);
    networkService.send(packet);
}

另外,我们也可以使用非阻塞的形式达到降级运行的目的,即使用非阻塞的tryAcquire()方法:



1
2
3
4
5
if(limiter.tryAcquire()) { //未请求到limiter则立即返回false
    doSomething();
}else{
    doSomethingElse();
}

RateLimiter的主要功能就是提供一个稳定的速率,实现方式就是通过限制请求流入的速度,比如计算请求等待合适的时间阈值.
实现QPS速率的最简单的方式就是记住上一次请求的最后授权时间,然后保证1/QPS秒内不允许请求进入.比如QPS=5,如果我们保证最后一个被授权请求之后的200ms的时间内没有请求被授权,那么我们就达到了预期的速率.如果一个请求现在过来但是最后一个被授权请求是在100ms之前,那么我们就要求当前这个请求等待100ms.按照这个思路,请求15个新令牌(许可证)就需要3秒.
有一点很重要:上面这个设计思路的RateLimiter记忆非常的浅,它的脑容量非常的小,只记得上一次被授权的请求的时间.如果RateLimiter的一个被授权请求q之前很长一段时间没有被使用会怎么样?这个RateLimiter会立马忘记过去这一段时间的利用不足,而只记得刚刚的请求q.
过去一段时间的利用不足意味着有过剩的资源是可以利用的.这种情况下,RateLimiter应该加把劲(speed up for a while)将这些过剩的资源利用起来.比如在向网络中发生数据的场景(限流),过去一段时间的利用不足可能意味着网卡缓冲区是空的,这种场景下,我们是可以加速发送来将这些过程的资源利用起来.
为了处理这种情况,RateLimiter中增加了一个维度的信息,就是过去一段时间的利用不足(past underutilization),代码中使用storedPermits变量表示.当没有利用不足这个变量为0,最大能达到maxStoredPermits(maxStoredPermits表示完全没有利用).因此,请求的令牌可能从两个地方来:
1.过去剩余的令牌(stored permits, 可能没有)
2.现有的令牌(fresh permits,当前这段时间还没用完的令牌)
我们将通过一个例子来解释它是如何工作的:
对一个每秒产生一个令牌的RateLimiter,每有一个没有使用令牌的一秒,我们就将storedPermits加1,如果RateLimiter在10秒都没有使用,则storedPermits变成10.0.这个时候,一个请求到来并请求三个令牌(acquire(3)),我们将从storedPermits中的令牌为其服务,storedPermits变为7.0.这个请求之后立马又有一个请求到来并请求10个令牌,我们将从storedPermits剩余的7个令牌给这个请求,剩下还需要三个令牌,我们将从RateLimiter新产生的令牌中获取.我们已经知道,RateLimiter每秒新产生1个令牌,就是说上面这个请求还需要的3个请求就要求其等待3秒.
想象一个RateLimiter每秒产生一个令牌,现在完全没有使用(处于初始状态),限制一个昂贵的请求acquire(100)过来.如果我们选择让这个请求等待100秒再允许其执行,这显然很荒谬.我们为什么什么也不做而只是傻傻的等待100秒,一个更好的做法是允许这个请求立即执行(和acquire(1)没有区别),然后将随后到来的请求推迟到正确的时间点.这种策略,我们允许这个昂贵的任务立即执行,并将随后到来的请求推迟100秒.这种策略就是让任务的执行和等待同时进行.
一个重要的结论:RateLimiter不会记最后一个请求,而是即下一个请求允许执行的时间.这也可以很直白的告诉我们到达下一个调度时间点的时间间隔.然后定一个一段时间未使用的Ratelimiter也很简单:下一个调度时间点已经过去,这个时间点和现在时间的差就是Ratelimiter多久没有被使用,我们会将这一段时间翻译成storedPermits.所有,如果每秒钟产生一个令牌(rate==1),并且正好每秒来一个请求,那么storedPermits就不会增长.

6.RateLimiter主要源码

RateLimiter定义了两个create函数用于构建不同形式的RateLimiter:
1.public static RateLimiter create(double permitsPerSecond)
用于创建SmoothBursty类型的RateLimiter
2.public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)
用于创建
源码下面以acquire为例子,分析一下RateLimiter如何实现限流:



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public double acquire() {
    return acquire(1);
}
public double acquire(int permits) {
    long microsToWait = reserve(permits);
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
final long reserve(int permits) {
    checkPermits(permits);
    synchronized (mutex()) {    //应对并发情况需要同步
      return reserveAndGetWaitLength(permits, stopwatch.readMicros());
    }
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
    return max(momentAvailable - nowMicros, 0);
}

下面方法来自RateLimiter的具体实现类SmoothRateLimiter:



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
    resync(nowMicros);  //补充令牌
    long returnValue = nextFreeTicketMicros;
    //这次请求消耗的令牌数目
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
    double freshPermits = requiredPermits - storedPermitsToSpend;

    long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
        + (long) (freshPermits * stableIntervalMicros);

    this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;
    this.storedPermits -= storedPermitsToSpend;
    return returnValue;
}
private void resync(long nowMicros) {
    // if nextFreeTicket is in the past, resync to now
    if (nowMicros > nextFreeTicketMicros) {
        storedPermits = min(maxPermits,
        storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros);
        nextFreeTicketMicros = nowMicros;
    }
}

另外,对于storedPermits的使用,RateLimiter存在两种策略,二者区别主要体现在使用storedPermits时候需要等待的时间。这个逻辑由storedPermitsToWaitTime函数实现:



1
2
3
4
5
6
7
8
9
/**
 * Translates a specified portion of our currently stored permits which we want to
 * spend/acquire, into a throttling time. Conceptually, this evaluates the integral
 * of the underlying function we use, for the range of
 * [(storedPermits - permitsToTake), storedPermits].
 *
 * <p>This always holds: {@code 0 <= permitsToTake <= storedPermits}
 */
abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake);

存在两种策略就是为了应对我们上面讲到的,存在资源使用不足大致分为两种情况: (1).资源确实使用不足,这些剩余的资源我们私海可以使用的; (2).提供资源的服务过去还没准备好,比如服务刚启动等;
为此,RateLimiter实际上由两种实现策略,其实现分别见SmoothBursty和SmoothWarmingUp。二者主要的区别就是storedPermitsToWaitTime实现以及maxPermits数量的计算。

6.1 SmoothBursty

SmoothBursty使用storedPermits不需要额外等待时间。并且默认maxBurstSeconds未1,因此maxPermits为permitsPerSecond,即最多可以存储1秒的剩余令牌,比如QPS=5,则maxPermits=5.
下面这个RateLimiter的入口就是用来创建SmoothBursty类型的RateLimiter,



1
public static RateLimiter create(double permitsPerSecond)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
     * This implements a "bursty" RateLimiter, where storedPermits are translated to
     * zero throttling. The maximum number of permits that can be saved (when the RateLimiter is
     * unused) is defined in terms of time, in this sense: if a RateLimiter is 2qps, and this
     * time is specified as 10 seconds, we can save up to 2 * 10 = 20 permits.
     */
    static final class SmoothBursty extends SmoothRateLimiter {
        /** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */
        final double maxBurstSeconds;

        SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
            super(stopwatch);
            this.maxBurstSeconds = maxBurstSeconds;
        }

        void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
            double oldMaxPermits = this.maxPermits;
            maxPermits = maxBurstSeconds * permitsPerSecond;
            System.out.println("maxPermits=" + maxPermits);
            if (oldMaxPermits == Double.POSITIVE_INFINITY) {
                // if we don't special-case this, we would get storedPermits == NaN, below
                storedPermits = maxPermits;
            } else {
                storedPermits = (oldMaxPermits == 0.0)
                        ? 0.0 // initial state
                        : storedPermits * maxPermits / oldMaxPermits;
            }
        }

        long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
            return 0L;
        }
    }

一个简单的使用示意图及解释,下面私海一个QPS=4的SmoothBursty:
(1).t=0,这时候storedPermits=0,请求1个令牌,等待时间=0;
(2).t=1,这时候storedPermits=3,请求3个令牌,等待时间=0;
(3).t=2,这时候storedPermits=4,请求10个令牌,等待时间=0,超前使用了2个令牌;
(4).t=3,这时候storedPermits=0,请求1个令牌,等待时间=0.5;
代码的输出:



1
2
3
4
5
6
7
8
9
10
11
maxPermits=4.0, storedPermits=7.2E-4, stableIntervalMicros=250000.0, nextFreeTicketMicros=1472
acquire(1), sleepSecond=0.0

maxPermits=4.0, storedPermits=3.012212, stableIntervalMicros=250000.0, nextFreeTicketMicros=1004345
acquire(3), sleepSecond=0.0

maxPermits=4.0, storedPermits=4.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=2004668
acquire(10), sleepSecond=0.0

maxPermits=4.0, storedPermits=0.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=3504668
acquire(1), sleepSecond=0.499591

6.2 SmoothWarmingUp




1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
static final class SmoothWarmingUp extends SmoothRateLimiter {
        private final long warmupPeriodMicros;
        /**
         * The slope of the line from the stable interval (when permits == 0), to the cold interval
         * (when permits == maxPermits)
         */
        private double slope;
        private double halfPermits;

        SmoothWarmingUp(SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit) {
            super(stopwatch);
            this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod);
        }

        @Override
        void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
            double oldMaxPermits = maxPermits;
            maxPermits = warmupPeriodMicros / stableIntervalMicros;
            halfPermits = maxPermits / 2.0;
            // Stable interval is x, cold is 3x, so on average it's 2x. Double the time -> halve the rate
            double coldIntervalMicros = stableIntervalMicros * 3.0;
            slope = (coldIntervalMicros - stableIntervalMicros) / halfPermits;
            if (oldMaxPermits == Double.POSITIVE_INFINITY) {
                // if we don't special-case this, we would get storedPermits == NaN, below
                storedPermits = 0.0;
            } else {
                storedPermits = (oldMaxPermits == 0.0)
                        ? maxPermits // initial state is cold
                        : storedPermits * maxPermits / oldMaxPermits;
            }
        }

        @Override
        long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
            double availablePermitsAboveHalf = storedPermits - halfPermits;
            long micros = 0;
            // measuring the integral on the right part of the function (the climbing line)
            if (availablePermitsAboveHalf > 0.0) {
                double permitsAboveHalfToTake = min(availablePermitsAboveHalf, permitsToTake);
                micros = (long) (permitsAboveHalfToTake * (permitsToTime(availablePermitsAboveHalf)
                        + permitsToTime(availablePermitsAboveHalf - permitsAboveHalfToTake)) / 2.0);
                permitsToTake -= permitsAboveHalfToTake;
            }
            // measuring the integral on the left part of the function (the horizontal line)
            micros += (stableIntervalMicros * permitsToTake);
            return micros;
        }

        private double permitsToTime(double permits) {
            return stableIntervalMicros + permits * slope;
        }
    }

maxPermits等于热身(warmup)期间能产生的令牌数,比如QPS=4,warmup为2秒,则maxPermits=8.halfPermits为maxPermits的一半.
  1. public double acquire() {  
  2.     return acquire(1);  
  3. }  
  4.   
  5. public double acquire(int permits) {  
  6.     checkPermits(permits);  //检查参数是否合法(是否大于0)  
  7.     long microsToWait;  
  8.     synchronized (mutex) { //应对并发情况需要同步  
  9.         microsToWait = reserveNextTicket(permits, readSafeMicros()); //获得需要等待的时间   
  10.     }  
  11.     ticker.sleepMicrosUninterruptibly(microsToWait); //等待,当未达到限制时,microsToWait为0  
  12.     return 1.0 * microsToWait / TimeUnit.SECONDS.toMicros(1L);  
  13. }  
  14.   
  15. private long reserveNextTicket(double requiredPermits, long nowMicros) {  
  16.     resync(nowMicros); //补充令牌  
  17.     long microsToNextFreeTicket = nextFreeTicketMicros - nowMicros;  
  18.     double storedPermitsToSpend = Math.min(requiredPermits, this.storedPermits); //获取这次请求消耗的令牌数目  
  19.     double freshPermits = requiredPermits - storedPermitsToSpend;  
  20.   
  21.     long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)  
  22.             + (long) (freshPermits * stableIntervalMicros);   
  23.   
  24.     this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;  
  25.     this.storedPermits -= storedPermitsToSpend; // 减去消耗的令牌  
  26.     return microsToNextFreeTicket;  
  27. }  
  28.   
  29. private void resync(long nowMicros) {  
  30.     // if nextFreeTicket is in the past, resync to now  
  31.     if (nowMicros > nextFreeTicketMicros) {  
  32.         storedPermits = Math.min(maxPermits,  
  33.                 storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros);  
  34.         nextFreeTicketMicros = nowMicros;  
  35.     }  
  36. }  

架构必备:Rate limiting 的作用和常见方式 - 互联网技术和架构
从整个架构的稳定性角度看,一般 SOA 架构的每个接口的有限资源的情况下,所能提供的单位时间服务能力是有限的。假如超过服务能力,一般会造成整个接口服务停顿,或者应用 Crash,或者带来连锁反应,将延迟传递给服务调用方造成整个系统的服务能力丧失。有必要在服务能力超限的情况下 Fail Fast。
另外,根据排队论,由于 API 接口服务具有延迟随着请求量提升迅速提升的特点,为了保证 SLA 的低延迟,需要控制单位时间的请求量。这也是 Little's law 所说的。

还有,公开 API 接口服务,Rate limiting 应该是一个必备的功能,否则公开的接口不知道哪一天就会被服务调用方有意无意的打垮。
所以,提供资源能够支撑的服务,将过载请求快速抛弃对整个系统架构的稳定性非常重要。这就要求在应用层实现 Rate limiting 限制。

常见的 Rate limiting 的实现方式

Proxy 层的实现,针对部分 URL 或者 API 接口进行访问频率限制

Nginx 模块

limit_req_zone $binary_remote_addr zone=one:10m rate=1r/s;

server {
    location /search/ {
        limit_req zone=one burst=5;
    }
详细参见:ngx_http_limit_req_module

Haproxy 提供的功能

详细参见:Haproxy Rate limit 模块

Java、Scala JVM 系应用层实现

Google Guava 提供了一个 RateLimiter 实现。使用方式简单明了,在自己的应用中简单封装即可,放到 HTTP 服务或者其他逻辑接口调用的前端。
 final RateLimiter rateLimiter = RateLimiter.create(2.0); // rate is "2 permits per second"
  void submitTasks(List<Runnable> tasks, Executor executor) {
    for (Runnable task : tasks) {
      rateLimiter.acquire(); // may wait
      executor.execute(task);
    }
  }
详细参见:Google Guava RateLimiter
http://massivetechinterview.blogspot.com/2015/08/ratelimiter-discovering-google-guava.html
基于 Redis 功能的实现
这个在 Redis 官方文档有非常详细的实现。一般适用于所有类型的应用,比如 PHP、Python 等等。Redis 的实现方式可以支持分布式服务的访问频率的集中控制。Redis 的频率限制实现方式还适用于在应用中无法状态保存状态的场景。
参见:Redis INCR rate limiter

http://redis.io/commands/INCR
the problem to solve is limiting the number of API calls to a maximum of ten requests per second per IP address.
FUNCTION LIMIT_API_CALL(ip)
ts = CURRENT_UNIX_TIME()
keyname = ip+":"+ts
current = GET(keyname)
IF current != NULL AND current > 10 THEN
    ERROR "too many requests per second"
ELSE
    MULTI
        INCR(keyname,1)
        EXPIRE(keyname,10)
    EXEC
    PERFORM_API_CALL()
END
Basically we have a counter for every IP, for every different second. But this counters are always incremented setting an expire of 10 seconds so that they'll be removed by Redis automatically when the current second is a different one.
Note the used of MULTI and EXEC in order to make sure that we'll both increment and set the expire at every API call.

using Redis lists instead of counters. The implementation is more complex and uses more advanced features but has the advantage of remembering the IP addresses of the clients currently performing an API call, that may be useful or not depending on the application.
FUNCTION LIMIT_API_CALL(ip)
current = LLEN(ip)
IF current > 10 THEN
    ERROR "too many requests per second"
ELSE
    IF EXISTS(ip) == FALSE
        MULTI
            RPUSH(ip,ip)
            EXPIRE(ip,1)
        EXEC
    ELSE
        RPUSHX(ip,ip)
    END
    PERFORM_API_CALL()
END

http://www.1point3acres.com/bbs/thread-161293-1-1.html
第一轮:一个中年白人,表情比较严肃,让我介绍了一下自己和现在在做的projects。剩下不到半小时,让我设计一个rate limiter,就是给定每秒接受request的最大次数,当一个新的request来的时候怎么判断是否接受。这题看似很简单,但你只要想到用queue或者其他数据结构去存储request,你就输了。我事后问了很多人,他们的思路跟我一样,都是用数据结构去存储request,这样当存储的request过多会有很多问题,是个死胡同。我在网上找了经典答案,确实非常巧妙,不用任何数据结构,也不涉及多线程问题 http://stackoverflow.com/questions/667508/whats-a-good-rate-limiting-algorithm。当时我沿着错误的思路修修补补,面试官怎么都不满意,总提出漏洞,看来这完全不是他想要的答案。这题我认为你做过就会做,没做过的话在面试中打死你都想不到这么巧妙的解。我事后感觉第一轮被亮红灯无疑了。
Here the simplest algorithm, if you want just to drop messages when they arrive too quickly (instead of queuing them, which makes sense because the queue might get arbitrarily large):
rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    discard_message();
  else:
    forward_message();
    allowance -= 1.0;
There are no datastructures, timers etc. in this solution and it works cleanly :) To see this, 'allowance' grows at speed 5/8 units per seconds at most, i.e. at most five units per eight seconds. Every message that is forwarded deducts one unit, so you can't send more than five messages per every eight seconds.
Note that rate should be an integer, i.e. without non-zero decimal part, or the algorithm won't work correctly (actual rate will not be rate/per). E.g. rate=0.5; per=1.0; does not work because allowance will never grow to 1.0. But rate=1.0; per=2.0; works fine.
http://vinoyang.com/2015/08/23/redis-incr-implement-rate-limit/
#模式:Rate limiter
频次限制器模式是一种特殊的计数器,它常被用来限制某个操作可以被执行的频次。这个模式的实质其实是限制对一个公共API执行访问请求的次数限制。我们使用incr命令提供该模式的两种实现。这里我们假设需要解决的问题是:对每个IP,限制对某API的调用次数最高位10次每秒。
##模式:Rate limiter 1
对该模式一个相对简单和直接的实现,请见如下代码:





1
2
3
4
5
6
7
8
9
10
11
12
13
FUNCTION LIMIT_API_CALL(ip)
ts = CURRENT_UNIX_TIME()
keyname = ip+":"+ts
current = GET(keyname)
IF current != NULL AND current > 10 THEN
    ERROR "too many requests per second"
ELSE
    MULTI
        INCR(keyname,1)
        EXPIRE(keyname,10)
    EXEC
    PERFORM_API_CALL()
END

简单来说,我们对每个IP的每一秒都有一个计数器,但每个计数器都有一个额外的设置:它们都将被设置一个10秒的过期时间。这可以使得当时间已经不是当前秒时(此时该计数器也无效了),能够让redis自动移除它。
需要注意的是,这里我们使用multiexec命令来确保对每个API调用既执行了incr也同时能够执行expire命令。
multi命令用于标识一个命令集被包含在一个事务块中,exec保证该事务块命令集执行的原子性。
##模式:Rate limiter 2
另外的一种实现是采用单一的计数器,但是为了避免race condition(竞态条件),它也更复杂。我们来看几种不同的变体:





1
2
3
4
5
6
7
8
9
10
11
FUNCTION LIMIT_API_CALL(ip):
current = GET(ip)
IF current != NULL AND current > 10 THEN
    ERROR "too many requests per second"
ELSE
    value = INCR(ip)
    IF value == 1 THEN
        EXPIRE(value,1)
    END
    PERFORM_API_CALL()
END

该计数器在当前秒内第一次请求被执行时创建,但它只能存活一秒。如果在当前秒内,发送超过10次请求,那么该计数器将超过10。否则它将失效并从0开始重新计数。
在上面的代码中,存在一个race condition。如果因为某个原因,上面的代码只执行了incr命令,却没有执行expire命令,那么这个key将会被泄漏,直到我们再次遇到相同的ip(备注,如果这里没有辅助的删除该key的措施,那么该key将永不过期,也将每次都发生错误,详情可见本人之前一篇文章)。
这种问题也不难处理,可以将incr命令以及另外的expire命令打包到一个lua脚本里,该脚本可以用eval命令提交给redis执行(该方式只在redis版本大于等于2.6之后才能支持)。





1
2
3
4
5
local current
current = redis.call("incr",KEYS[1])
if tonumber(current) == 1 then
    redis.call("expire",KEYS[1],1)
end

当然,也有另一种方式来解决这个问题而不需要动用lua脚本,但需要用redis的list数据结构来替代计数器。这种实现方式将会更复杂,并使用更高级的特性。但它有一个好处是记住调用当前API的每个客户端的IP。这种方式可能很有用也可能没用,这取决于应用需求。





1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
FUNCTION LIMIT_API_CALL(ip)
current = LLEN(ip)
IF current > 10 THEN
    ERROR "too many requests per second"
ELSE
    IF EXISTS(ip) == FALSE
        MULTI
            RPUSH(ip,ip)
            EXPIRE(ip,1)
        EXEC
    ELSE
        RPUSHX(ip,ip)
    END
    PERFORM_API_CALL()
END

rpushx命令只在key存在时才会将值加入list
仍然需要注意的是,这里也存在一个race condition(但这却不会产生太大的影响)。问题是:exists可能返回false,但在我们执行multi/exec块内的创建list的代码之前,该list可能已被其他客户端创建。然而,在这个race condition发生时,将仅仅只是丢失一个API调用,所以rate limiting仍然工作得很好。
这里产生race condition不会有大问题的原因在于,else分支使用的rpushx,它不会导致if not than init的问题,并且expire命令将在创建list的时候以原子的形式捆绑执行。不会产生key泄漏,导致永不失效的情况产生。
https://github.com/hamsterready/redis-ratelimit/blob/master/src/main/java/com/sentaca/redis/ratelimit/RateLimitService.java

https://en.wikipedia.org/wiki/Little%27s_law
http://web.mit.edu/sgraves/www/papers/Little's%20Law-Published.pdf
利特尔法则英语:Little's law),基于等候理论,由约翰·利特尔在1954年提出。利特尔法则可用于一个稳定的、非占先式的系统中。其内容为:
在一个稳定的系统 L中,长期的平均顾客人数,等于长期的有效抵达率,λ,乘以顾客在这个系统中平均的等待时间,W;或者,我们可以用一个代数式来表达:L = λW。
利特尔法则可用来确定在途存货的数量。此法则认为,系统中的平均存货等于存货单位离开系统的比率(亦即平均需求率)与存货单位在系统中平均时间的乘积。
In queueing theory, a discipline within the mathematical theory of probabilityLittle's resulttheoremlemmalaw or formula[1][2] is a theorem by John Little which states:
The long-term average number of customers in a stable system L is equal to the long-term average effective arrival rate, λ, multiplied by the (Palm‑)average time a customer spends in the system, W; or expressed algebraically: L = λW.
Although it looks intuitively reasonable, it is quite a remarkable result, as the relationship is "not influenced by the arrival process distribution, the service distribution, the service order, or practically anything else."[3]
The result applies to any system, and particularly, it applies to systems within systems.[4] So in a bank, the customer line might be one subsystem, and each of the tellers another subsystem, and Little's result could be applied to each one, as well as the whole thing. The only requirements are that the system is stable and non-preemptive; this rules out transition states such as initial startup or shutdown.

http://calvin1978.blogcn.com/articles/ratelimiter.html

1.各种目的

1. 保护每个服务节点。
2. 保护服务集群背后的资源,比如数据库。
3. 避免单个调用者过度使用服务,影响其他调用者。

2. 各种设定维度

2.1. 节点级别 vs 集群级别

如果以保护每个服务节点为目的,可以简单的在本地做节点级别的限流。
但如果以保护服务集群背后的资源为目的,就需要做集群级别的限流。
集群级别的一个做法是使用Redis,Memcached之类做一个集群级别的计数器。但额外多一次访问Redis的消耗,代价有点大。
而另一个做法是把集群限流总数分摊到每个节点上,但一是不够精准,二是如果使用Docker动态缩扩容,需要动态更新这个分摊数。

2.2 客户端 vs 服务端

当以避免单个调用者过度使用服务为目的,可以针对客户端设定限流。
此时限流可以在客户端实现,节约了网络往返,但同样有调用者的节点 or 集群之惑。
也可以在服务端实现,让所有限流逻辑集中于一处发生。

2.3 服务级别 vs 方法级别

可以对消耗特别大的方法专门配置,比如复杂的查询,昂贵的写操作。
然后其他方法使用统一的值,或者配一个所有方法加起来的总和。

3. 各种触发条件

触发条件的设定,难点在于服务的容量,受着本服务节点的能力,背后的资源的能力,下游服务的响应的多重约束。

3.1 静态配置固定值

当然,这个固定值可以被动态更新。

3.2 根据预设规则触发

规则的条件可以是服务平均时延,可以是背后数据库的CPU情况等。
比如平时不限流,当服务时延大于100ms,则触发限流500 QPS。
还可以是多级条件,比如>100ms 限流500 QPS, >200ms 限流200 QPS。

3.3 全动态自动增减调控

这个诱人的想法,永远存在于老板的心里。

4. 各种处理

4.1 立刻返回拒绝错误

由客户端进行降级处理。

4.2 进行短暂的等待

短暂等待,期待有容量空余,直到超时。

4.3 触发服务降级,调用服务端的降级方法

与拒绝服务由客户端降级处理相比,服务端的降级方法,走服务端的简单路径与预设值,则代表了服务端这边的态度和逻辑,各有适用的场景,等下一篇《服务降级》 再详述。 
不知道有哪些服务治理框架,比较完整的实现了,全动态自动调控之外的种种需求?

Read full article from 架构必备:Rate limiting 的作用和常见方式 - 互联网技术和架构

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts