Monday, March 7, 2016

Asynchronous Programming



https://www.javacodegeeks.com/2013/03/deferredresult-asynchronous-processing-in-spring-mvc.html
DeferredResult is a container for possibly not-yet-finished computation that will be available in future. Spring MVC uses it to represent asynchronous computation and take advantage of Servlet 3.0 AsyncContext asynchronous request handling. 

Spring MVC (using Servlet 3.0 capabilities) will hold on with the response, keeping idle HTTP connection. HTTP worker thread is no longer used, but HTTP connection is still open. Later some other thread will resolve DeferredResult by assigning some value to it. Spring MVC will immediately pick up this event and send response (“HTTP response is: 42” in this example) to the browser, finishing request processing.

Standard java.util.concurrent.Future does not allow registering callbacks when computation is done – so you either need to devote one thread to block until future is done or use one thread to poll several futures periodically. However the latter option consumes more CPU and introduces latency. But superior ListenableFuture<V> from Guava seems like a good fit? True, but Spring doesn’t have a dependency on Guava, thankfully bridging these two APIs is pretty straightforward.


https://dzone.com/articles/spring-and-servlet-30-asynchronous-processing
https://www.byteslounge.com/tutorials/asynchronous-servlets-in-java
  @Override
  protected void doGet(HttpServletRequest request, 
      HttpServletResponse response) throws ServletException, IOException {

    final long startTime = System.nanoTime();
    final AsyncContext asyncContext = request.startAsync(request, response);

    new Thread() {

      @Override
      public void run() {
        try {
          ServletResponse response = asyncContext.getResponse();
          response.setContentType("text/plain");
          PrintWriter out = response.getWriter();
          Thread.sleep(2000);
          out.print("Work completed. Time elapsed: " + (System.nanoTime() - startTime));
          out.flush();
          asyncContext.complete();
        } catch (IOException | InterruptedException e) {
          throw new RuntimeException(e);
        }
      }
    }.start();

  }


异步化,高并发大杀器 原创: 咖啡拿铁 
  • 异步:异步的意思就是不需要主动等待结果的返回,而是通过其他手段比如,状态通知,回调函数等。
  • 阻塞:是指结果返回之前,当前线程被挂起,不做任何事
  • 非阻塞:是指结果在返回之前,线程可以做一些其他事,不会被挂起。
一般我们的业务中有两处比较耗时:
  • cpu: cpu耗时指的是我们的一般的业务处理逻辑,比如一些数据的运算,对象的序列化。这些异步化是不能解决的,得需要靠一些算法的优化,或者一些高性能框架。
  • iowait: io耗时就像我们上面说的,一般发生在网络调用,文件传输中等等,这个时候线程一般会挂起阻塞。而我们的异步化通常用于解决这部分的问题。

上面说了异步化是用于解决IO阻塞的问题,而我们一般项目中可以使用异步化如下:
  • servlet异步化,springmvc异步化
  • rpc调用如(dubbo,thrift),http调用异步化
  • 数据库调用,缓存调用异步化
实现serlvet的关键在于http采取了长连接,也就是当请求打过来的时候就算有返回也不会关闭,因为可能还会有数据,直到返回关闭指令。 AsyncContext ac=req.startAsync(); 用于获取异步上下文,后续我们通过这个异步上下文进行回调返回数据,有点像我们买衣服的时候,给老板一个电话,而这个上下文也是一个电话,当有衣服到的时候,也就是当有数据准备好的时候就可以打电话发送数据了。 ac.complete(); 用来进行长链接的关闭。
使用起来比较简单。以前我们同步的模式的Controller是返回额ModelAndView,而异步模式直接生成一个defrredResult(支持我们超时扩展)即可保存上下文

注意: 在serlvet异步化中有个问题是filter的后置结果处理,没法使用,对于我们一些打点,结果统计直接使用serlvet异步是没法用的。在springmvc中就很好的解决了这个问题,springmvc采用了一个比较取巧的方式通过请求转发,能让请求再次过滤器。但是又引入了新的一个问题那就是过滤器会处理两次,这里可以通过SpringMVC源码中自身判断的方法,我们可以在filter中使用下面这句话来进行判断是不是属于springmvc转发过来的请求,从而不处理filter的前置事件,只处理后置事件:
  1. Object asyncManagerAttr = servletRequest.getAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE);
  2. return asyncManagerAttr instanceof WebAsyncManager

我们一般远程调用使用rpc或者http。对于rpc来说一般thrift,http,motan等支持都异步调用,其内部原理也都是采用事件驱动的NIO模型,对于http来说一般的apachehttpclient和okhttp也都提供了异步调用
http://my.oschina.net/ainilife/blog/631366
https://github.com/ainilife/zebra-dao
zebra-dao目前支持两种方式的异步API:回调和Future接口方式,对于每一个异步接口,必须要有相应的同步方法定义,因为其实所有的异步接口最后还是在线程池中调用的同步接口。
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface UserMapper {
    /**
    * Normal synchronization dao method.
    */
    public UserEntity findUserById(@Param("userId"int userId);
    /**
    * Asynchronous callback method. Return void and only one
    * callback method required.
    */
    public void findUserById(@Param("userId"int userId, AsyncDaoCallback<UserEntity> callback);
     
    /**
    * Asynchronous future method. Return future and must have the 
    * same params as synchronization method.
    */
    @TargetMethod(name = "findUserById")
    public Future<UserEntity> findUserById1(@Param("userId"int userId);
}
业务如果使用的是回调接口,那么在使用的时候必须定义回调方法,在回调方法中,通常做的是把结果放到服务框架的异步回调方法中,这样才能做到一个服务的全部异步化。在下面的例子的回调方法中,隐去了使用服务异步调用接口的具体实现,仅仅展示如何定义回调方法。
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
dao.findUserById(1new AsyncDaoCallback<UserEntity>() {
    @Override
    public void onSuccess(UserEntity user) {
        System.out.println(user);
        //another asynchronous invoke in the asynchronous invoke
        dao.findUserById(2new AsyncDaoCallback<UserEntity>() {
            @Override
            public void onSuccess(UserEntity user) {
                System.out.println(user);
            }
            @Override
            public void onException(Exception e) {
            }
        });
        //synchronization invoke in the  asynchronous invoke
        UserEntity entity = dao.findUserById(3);
        System.out.println(entity);
    }
    @Override
    public void onException(Exception e) {
    }
});
对于使用的Future接口,就和使用Future一样,没有额外特别之处。
?
1
2
3
Future<UserEntity> future = dao.findUserById1(1);
UserEntity user = future.get();
System.out.println(user);
http://calvin1978.blogcn.com/articles/async.html
1. 先认清自己的需求,不要一开始就站到某个技术上,再倒叙自己的需求。
2. 然后从浅入深,分析可以用什么样的技术满足该层次的需求,依然避免一下就冲到某个技术上,比如Akka入门到放弃。
3. 最后找出一条相对平稳的技术路线,能够贯穿各个层次的需求,而且最好不那么考验使用者的智商。

1. 需求篇

1.1 通过并行的远程调用,缩短总的响应延时。

1.1.1 远程调用分类
服务处理的过程中,总会包含如下几类的远程调用:
服务层:RESTful,RPC框架
数据层:JDBC,Memcached,Redis,ZooKeeper...
消息层:Kafka,RabbitMQ...
有些客户端已经支持异步,比如大部分的RPC框架、RESTful Client、SpyMemcached、ZooKeeper、Kafka 等。 在异步接口之中,又分返回Future 与 用户自定义的CallBack类两种风格。
但有些客户端还是同步的,比如JDBC,Jedis,在设计方案时需要把它们也考虑进去。
1.1.2 并行执行的机会有两种
一是服务提供者没有提供批量接口,比如商品查询,只能以不同的ID分次调用(题外话,始终服务方提供批量接口会更好些)。
一是两个调用之间没有依赖关系,包括参数依赖(一个调用的参数依赖另一个调用的返回结果),时间依赖(一个调用必需在另一个调用成功后执行)。
1.1.3 并行调用的复杂度分两个层次
一是简单并行,把所有可并行的服务一下子都撒出去,然后等待所有的异步调用返回,简单的Future.get()就够用。
还有一种场景是,先调一个异步接口,然后做一些同步的事情,再回头Future.get()拿之前异步调用的结果,这也达到了节省总响应时间的效果。
二是调用编排,比如一开始并行A、B、C、D服务,一旦A与B返回则根据结果调用E,一旦C返回则调用F,最后组装D、E、F的结果返回给客户端。
如果还是简单的并行,没办法做到最高效的调度,必须有一种机制,支持这种多条异步调用链分头并进的场景。此时就需要或Akka,或RxJava,或JDK8的CompletableFuture与Guava的ListenenableFuture,基于Actor,Rx or Callback机制来编排了。再后来,发现其实依然使用Future.get()多起几条线程也一样能做,不要先被某个技术迷住。

1.2 希望能用少量的固定线程,处理海量的并发请求。

这个概念并不陌生,NIO就是这个思路。
但是即使你用了Netty的NIO 或 Servlet3.0的异步Servlet 或,也只是解决了传输层面用少量传输线程处理海量并发的传输,并在入口层的编程模式上提供了异步化的可能性。
如果你的服务线程里存在阻塞的远程调用,那线程还是会等待在远程调用上而无法处理海量请求,即使异步化如Future.get(),也依然是等待。
所以,你必须有办法在阻塞等待时把线程给交回去,比如服务线程里采用全异步化的Callback模式,或引入Akka的Actor模式,或基于Quasar引入纤程协程的概念。

1.3 小结

在我看来 , 客户端简单并行->客户端并行编排->服务端少量线程处理海量请求。对于大部分普通项目,是由浅入深三个层次的需求。

2.第一层,简单并行实现篇

2.1 最简单写法

最简单就是并发的调用一堆返回Future的异步接口,再一个个Get回来,最慢的那个会阻塞住其他:
Future<Product> productFuture = productService.query(id);
Future<Long> stockFuture = stockService.query(id);
.......
Product product = productFuture.get();
Long stock = stockFuture.get();
HttpClient有一个HttpAsyncClient 的子项目提供Http的异步访问:
HttpGet request1 = new HttpGet("http://www.apache.org/");
Future<HttpResponse> future = httpclient.execute(request1, null);
HttpResponse response1 = future.get();
Spring的RestTemplate同样有AsyncRestTemplate:
Future<ResponseEntity<String>> futureEntity = template.getForEntity("http://www.apache.org/" , String.class);
ResponseEntity<String> entity = futureEntity.get();
其他类似的不一一列举。

2. 异步接口只有Callback形式时

但如果异步接口只能设置自定义Callback函数,不返回Future呢? 比如Thrift就是这样。
Callback函数在这种并行调用场景里并不好用,因此建议还是要转换回Future接口使用。
转换的方法很简单,自己实现一个默认的Callback类,里面包含一个Future,然后实现Callback接口所定义的onSucess()和onFail()函数,将结果或异常赋值到Future中。
DefaultFutureCallback<Long> callback=new DefaultFutureCallback<Long>();
myService.getPrice(1L, callback);
String result = callback.getFuture().get();
至于Future类的实现,随便抄个HttpClient的BasicFuture就好了。
题外话,Future.get() 接口只声明了ExecutionException一种异常,如果你原来的callback函数里有其他不是Runtime Exception的,就要裹在ExecutionException里了,用户还要自己getCause()把它找出来,唯一不方便的地方。

3. 对付同步接口

同步接口如JDBC与Jedis,只能在调用它们的地方实现Callable接口,异步的跑在另一个线程池里来完成并行调度。但这其实引入了线程调度的消耗,不得而为之,不可滥用。
Future<Product> future = executor.submit(new Callable<Product>(){
  @Override
  public Product call() throws Exception {
    return productDao.query(id);
  }
});
题外话,executor中返回的Future的实现类叫FutureTask,它能以“Callable 或 Runnable+预设结果(因为Runnalbe自身没结果)“作为参数构建。executor.submit() 接受“Callable 或 Runnable+预设结果” 做参数,内部构建这个FutureTask。但FutureTask本身又是个Runnable,网上有些例子让大家自己在外部把Callable构造成FutureTask,再以Runnable的身份传给executor,其实不好,也没必要。
如果是JDK8,用Lambda就可以写得短一些,只要一行,和平时同步写法也差不多了。
Future<Product> future = executor.submit(()->productDao.query(id));
最后,同步改异步后,原来存在ThreadLocal中的东西如TraceId就没有了,要用一个Context之类的在进出的时候复制。

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts