Saturday, August 22, 2015

Guava RateLimiter



Let's say that we want to limit the rate of execution of the doSomeLimitedOperation() to 2 times per second.

We can create a RateLimiter instance using its create() factory method:

1
RateLimiter rateLimiter = RateLimiter.create(2);

Next, in order to get an execution permit from the RateLimiter, we need to call the acquire() method:

1
rateLimiter.acquire(1);
http://xiaobaoqiu.github.io/blog/2015/07/02/ratelimiter/
过去一段时间的利用不足意味着有过剩的资源是可以利用的.这种情况下,RateLimiter应该加把劲(speed up for a while)将这些过剩的资源利用起来.比如在向网络中发生数据的场景(限流),过去一段时间的利用不足可能意味着网卡缓冲区是空的,这种场景下,我们是可以加速发送来将这些过程的资源利用起来.
另一方面,过去一段时间的利用不足可能意味着处理请求的服务器对即将到来的请求是准备不足的(less ready for future requests),比如因为很长一段时间没有请求当前服务器的cache是陈旧的,进而导致即将到来的请求会触发一个昂贵的操作(比如重新刷新全量的缓存).
为了处理这种情况,RateLimiter中增加了一个维度的信息,就是过去一段时间的利用不足(past underutilization),代码中使用storedPermits变量表示.当没有利用不足这个变量为0,最大能达到maxStoredPermits(maxStoredPermits表示完全没有利用).
1.过去剩余的令牌(stored permits, 可能没有)
2.现有的令牌(fresh permits,当前这段时间还没用完的令牌)
我们将通过一个例子来解释它是如何工作的:
对一个每秒产生一个令牌的RateLimiter,每有一个没有使用令牌的一秒,我们就将storedPermits加1,如果RateLimiter在10秒都没有使用,则storedPermits变成10.0.这个时候,一个请求到来并请求三个令牌(acquire(3)),我们将从storedPermits中的令牌为其服务,storedPermits变为7.0.这个请求之后立马又有一个请求到来并请求10个令牌,我们将从storedPermits剩余的7个令牌给这个请求,剩下还需要三个令牌,我们将从RateLimiter新产生的令牌中获取.我们已经知道,RateLimiter每秒新产生1个令牌,就是说上面这个请求还需要的3个请求就要求其等待3秒.
想象一个RateLimiter每秒产生一个令牌,现在完全没有使用(处于初始状态),限制一个昂贵的请求acquire(100)过来.如果我们选择让这个请求等待100秒再允许其执行,这显然很荒谬.我们为什么什么也不做而只是傻傻的等待100秒,一个更好的做法是允许这个请求立即执行(和acquire(1)没有区别),然后将随后到来的请求推迟到正确的时间点.这种策略,我们允许这个昂贵的任务立即执行,并将随后到来的请求推迟100秒.这种策略就是让任务的执行和等待同时进行.
一个重要的结论:RateLimiter不会记最后一个请求,而是即下一个请求允许执行的时间.这也可以很直白的告诉我们到达下一个调度时间点的时间间隔.然后定一个一段时间未使用的Ratelimiter也很简单:下一个调度时间点已经过去,这个时间点和现在时间的差就是Ratelimiter多久没有被使用,我们会将这一段时间翻译成storedPermits.所有,如果每秒钟产生一个令牌(rate==1),并且正好每秒来一个请求,那么storedPermits就不会增长.
另外,对于storedPermits的使用,RateLimiter存在两种策略,二者区别主要体现在使用storedPermits时候需要等待的时间。这个逻辑由storedPermitsToWaitTime函数实现:
1
2
3
4
5
6
7
8
9
/**
 * Translates a specified portion of our currently stored permits which we want to
 * spend/acquire, into a throttling time. Conceptually, this evaluates the integral
 * of the underlying function we use, for the range of
 * [(storedPermits - permitsToTake), storedPermits].
 *
 * <p>This always holds: {@code 0 <= permitsToTake <= storedPermits}
 */
abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake);
存在两种策略就是为了应对我们上面讲到的,存在资源使用不足大致分为两种情况: (1).资源确实使用不足,这些剩余的资源我们私海可以使用的; (2).提供资源的服务过去还没准备好,比如服务刚启动等;
为此,RateLimiter实际上由两种实现策略,其实现分别见SmoothBursty和SmoothWarmingUp。二者主要的区别就是storedPermitsToWaitTime实现以及maxPermits数量的计算。
SmoothBursty使用storedPermits不需要额外等待时间。并且默认maxBurstSeconds未1,因此maxPermits为permitsPerSecond,即最多可以存储1秒的剩余令牌,比如QPS=5,则maxPermits=5.
https://segmentfault.com/a/1190000012875897
从另一方面讲,RateLimiter通过限制后面请求的等待时间,来支持一定程度的突发请求(预消费)。
但是某些情况下并不需要这种突发请求处理能力,如某IM厂商提供消息推送接口,但推送接口有严格的频率限制(600次/30秒),在调用该IM厂商推送接口时便不能预消费,否则,则可能出现推送频率超出限制而失败

Guava有两种限流模式,一种为稳定模式(SmoothBursty:令牌生成速度恒定),一种为渐进模式(SmoothWarmingUp:令牌生成速度缓慢提升直到维持在一个稳定值)
两种模式实现思路类似,主要区别在等待时间的计算上

根据令牌桶算法,桶中的令牌是持续生成存放的,有请求时需要先从桶中拿到令牌才能开始执行,谁来持续生成令牌存放呢?
一种解法是,开启一个定时任务,由定时任务持续生成令牌。这样的问题在于会极大的消耗系统资源,如,某接口需要分别对每个用户做访问频率限制,假设系统中存在6W用户,则至多需要开启6W个定时任务来维持每个桶中的令牌数,这样的开销是巨大的。
另一种解法则是延迟计算,如上resync函数。该函数会在每次获取令牌之前调用,其实现思路为,若当前时间晚于nextFreeTicketMicros,则计算该段时间内可以生成多少令牌,将生成的令牌加入令牌桶中并更新数据。这样一来,只需要在获取令牌时计算一次即可。
void resync(long nowMicros) {
    // if nextFreeTicket is in the past, resync to now
    if (nowMicros > nextFreeTicketMicros) {
      double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
      storedPermits = min(maxPermits, storedPermits + newPermits);
      nextFreeTicketMicros = nowMicros;
    }
}
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
  resync(nowMicros);
  long returnValue = nextFreeTicketMicros; // 返回的是上次计算的nextFreeTicketMicros
  double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // 可以消费的令牌数
  double freshPermits = requiredPermits - storedPermitsToSpend; // 还需要的令牌数
  long waitMicros =
      storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
          + (long) (freshPermits * stableIntervalMicros); // 根据freshPermits计算需要等待的时间

  this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); // 本次计算的nextFreeTicketMicros不返回
  this.storedPermits -= storedPermitsToSpend;
  return returnValue;
}
该函数用于获取requiredPermits个令牌,并返回需要等待到的时间点
其中,storedPermitsToSpend为桶中可以消费的令牌数,freshPermits为还需要的(需要补充的)令牌数,根据该值计算需要等待的时间,追加并更新到nextFreeTicketMicros
需要注意的是,该函数的返回是更新前的(上次请求计算的)nextFreeTicketMicros,而不是本次更新的nextFreeTicketMicros,通俗来讲,本次请求需要为上次请求的预消费行为埋单,这也是RateLimiter可以预消费(处理突发)的原理所在。若需要禁止预消费,则修改此处返回更新后的nextFreeTicketMicros值。
SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
  super(stopwatch);
  this.maxBurstSeconds = maxBurstSeconds; // 最大存储maxBurstSeconds秒生成的令牌
}

@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
  double oldMaxPermits = this.maxPermits;
  maxPermits = maxBurstSeconds * permitsPerSecond; // 计算最大存储令牌数
  if (oldMaxPermits == Double.POSITIVE_INFINITY) {
    // if we don't special-case this, we would get storedPermits == NaN, below
    storedPermits = maxPermits;
  } else {
    storedPermits =
        (oldMaxPermits == 0.0)
            ? 0.0 // initial state
            : storedPermits * maxPermits / oldMaxPermits;
  }
}
@CanIgnoreReturnValue
public double acquire() {
  return acquire(1);
}

@CanIgnoreReturnValue
public double acquire(int permits) {
  long microsToWait = reserve(permits);
  stopwatch.sleepMicrosUninterruptibly(microsToWait);
  return 1.0 * microsToWait / SECONDS.toMicros(1L);
}

final long reserve(int permits) {
  checkPermits(permits);
  synchronized (mutex()) {
    return reserveAndGetWaitLength(permits, stopwatch.readMicros());
  }
}
acquire函数主要用于获取permits个令牌,并计算需要等待多长时间,进而挂起等待,并将该值返回



  // Can't be initialized in the constructor because mocks don't call the constructor.
  private volatile Object mutexDoNotUseDirectly;

  private Object mutex() {
    Object mutex = mutexDoNotUseDirectly;
    if (mutex == null) {
      synchronized (this) {
        mutex = mutexDoNotUseDirectly;
        if (mutex == null) {
          mutexDoNotUseDirectly = mutex = new Object();
        }
      }
    }
    return mutex;

  }
https://github.com/jabley/rate-limit
RateLimiter - discovering Google Guava | NoBlogDefFound
rate limiter distributes permits at a configurable rate. Each acquire() blocks if necessary until a permit is available [...] Rate limiters are often used to restrict the rate at which some physical or logical resource is accessed
Basically this small utility class can be used e.g. to limit the number of requests per second your API wishes to handle or to throttle your own client code, avoiding denial of service of someone else's API if we are hitting it too often.

@WebFilter(urlPatterns=Array("/*"))
class RateLimiterFilter extends Filter {
    val limiter = RateLimiter.create(100)
    def init(filterConfig: FilterConfig) {}
    def doFilter(request: ServletRequest, response: ServletResponse, chain: FilterChain) {
        if(limiter.tryAcquire()) {
            chain.doFilter(request, response)
        } else {
            response.asInstanceOf[HttpServletResponse].sendError(SC_TOO_MANY_REQUESTS)
        }
    }
    def destroy() {}
}
Another self-descriptive sample. This time we limit our API to handle not more than 100 requests per second (of course RateLimiter is thread safe). All HTTP requests that come through our filter are subject to rate limiting. If we cannot handle incoming request, we send HTTP 429 - Too Many Requests error code (not yet available in servlet spec). Alternatively you may wish to block the client for a while instead of eagerly rejecting it. That's fairly straightforward as well:
1
2
3
4
def doFilter(request: ServletRequest, response: ServletResponse, chain: FilterChain) {
    limiter.acquire()
    chain.doFilter(request, response)
}
limiter.acquire() will block as long as it's needed to keep desired 100 requests per second limit. Yet another alternative is to use tryAcquire() with timeout (blocking up to given amount of time). Blocking approach is better if you want to avoid sending errors to the client. However under high load it's easy to imagine almost all HTTP threads blocked waiting for RateLimiter, eventually causing servlet container to reject connections. So dropping of clients can be only partially avoided.
This filter is a good starting point to build more sophisticated solutions. Map of rate limiters by IP or user name are good examples.

warm-up functionality - if RateLimiter was completely idle for a long time, it will gradually increase allowed frequency over configured time up to configured maximum value instead of allowing maximum frequency from the very beginning
https://java.awsblog.com/post/Tx3VAYQIZ3Q0ZVW/Rate-Limited-Scans-in-Amazon-DynamoDB
do {
    // Let the rate limiter wait until our desired throughput "recharges"
    rateLimiter.acquire(permitsToConsume);
     
    // Do the scan
    ScanRequest scan = new ScanRequest()
        .withTableName("ProductCatalog")
        .withLimit(100)
        .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
        .withExclusiveStartKey(exclusiveStartKey);
    ScanResult result = dynamodb.scan(scan);
    exclusiveStartKey = result.getLastEvaluatedKey();
     
    // Account for the rest of the throughput we consumed,
    // now that we know how much that scan request cost
    double consumedCapacity = result.getConsumedCapacity().getCapacityUnits();
    permitsToConsume = (int)(consumedCapacity - 1.0);
    if(permitsToConsume <= 0) {
        permitsToConsume = 1;
    }
     
    // Process results here
    processYourResults(result);
     
} while (exclusiveStartKey  != null);


Guava不仅仅在集合、缓存、异步回调等方面功能强大,而且还给我们封装好了限流的API!
Guava RateLimiter基于令牌桶算法,我们只需要告诉RateLimiter系统限制的QPS是多少,那么RateLimiter将以这个速度往桶里面放入令牌,然后请求的时候,通过tryAcquire()方法向RateLimiter获取许可(令牌)。
Read full article from RateLimiter - discovering Google Guava | NoBlogDefFound

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts