Tuesday, March 27, 2018

Reactive



https://springcamp.cn/spring-boot/reactive-program-1/
Reactive编程是一种通过将智能路由和事件消费组合起来改变行为的微架构风格

Reactive编程背后的基本思想是指 表示随时间变化的值的特定数据类型。处理这些随时间变化的值的方法里本身也包含着随时间变化的值。

可以简单的把程序当作 是一个 电子表格,把变量当作是表格里的单元格。当表格里的某些单元格的值变化后,如果其它的单元格引用了这些单元格的值,那么其它的单元格的值也会跟着改变。这跟函数式编程是一样的。


函数式编程往往代表着高性能、并发、异步、非阻塞IO。开始时我们可以不必非用函数式编程,Reactive模型可以很自然的处理这些问题

外部服务调用

当今的后端系统很多都是RESTful的,底层协议是阻塞和同步的。并且服务之间会相互调用,必须要等到第一个请求调用 完成后才能调用下一个请求。客户端可能在 服务端处理完成之前就放弃请求了。因此外部服务调用,特别是需要调用多个服务 才能完成 处理的时候,是一个需要去优化的场景。

高并发的消息消费

高并发的消息处理是企业应用的常见场景,Reactive模式非常适合处理消息(事件可以很容易的转换为消息)。

对异步调用进行抽象

Reactive编程可以让我们不用关心调用是同步的还是异步的,单纯的异步编程是很繁琐的,Reactive模式可以简化异步编程。

Ruby Event-Machine

Event-Machine是一个并发编程的抽象。可以 让Ruby用一个单线程来处理高并发的请求。

Actor Model

与面向对象编程类似,Actor Model是计算机科学的一个重要 研究方向,早在七十年代就有了。Actor是计算的一个抽象,可以用于并发系统。Actor之间相互发送消息,因此在某种意义上也是反应式的。Actor和Reactive在概念上有很高的重合度。区别往往在实现层面(例如Akka可以用于进程间通信,是这个框架的显著特点)。

Deferred results (Futures)

Java 1.5引入了很多新特性,包括Doug Lea 的 “java.util.concurrent”,其中有deferred result这个概念,被封装成Future。这是对异步编程进行抽象的一个很好的例子,可以非异步的形式来开发异步程序。在简单的并发任务场景下,Future是很好用的,但是当这些任务相互依赖的时候,会陷入“nested callback hell”。而Reactive可以避免这种情况的出现。

Map-reduce and fork-join

将并行处理进行抽象是很有意义的,这样的例子很多。Java编程中最近有了Map-reduce 和 fork-join。Map-reduce用于Hadoop,fork-join是jdk1.7之后的版本自带的功能。这2个技术与Deferred results类似,不能应对复杂的组合调用场景。

Coroutines

coroutine可以相互之间传递控制权,不用由调用者统一协调,可以简化并发编程。可以用coroutine来实现Reactive编程。Fibers和Generators都属于coroutine技术。

内置了Reactive的特性,包含了构建HTTP服务和客户端的工具。Spring构建于Reactor技术之上,但是用户可以自由选择底层是使用Reactor还是RxJava。服务器支持omcat, Jetty, Netty 和 Undertow。

Ratpack

一组构建高性能web应用的工具,Spring Boot可以直接使用该框架。

Akka

一个实现Actor模型的开发框架,可以在Scala和Java中使用,是第三代的Reactive技术。

Reactive技术的热度在不断提升,是因为该技术可以帮助我们节省服务器资源,可以用少量的线程支持更高的负载。Reactive、非阻塞、异步提供了很好的解决问题的方法

通过 https://start.spring.io 来建立一个空的项目并添加Reactor Core依赖。
Reactive由一系列事件以及发布和订阅这些事件的2个参与方组成的一个序列。我们也可以 称之为stream

我们会使用Reactor库,把publisher称为 Flux(实现了Reactive Streams的Publisher接口),在RxJava库中的名称是Observable ,代表的是类似的概念。(Reactor2.0中的名称为Stream,很容易跟Java 8 的 Streams混淆,因此我们只使用Reactor 3.0中的新定义)。
Flux 是POJO类型的事件序列的一个Publisher,例如 Flux<T> 是类型 T 的一个Publisher。Flux 有一系列从不同的来源创建示例的静态方法。例如从数组创建 Flux :

Flux<String> flux = Flux.just("red", "white", "blue");

我们创建了一个Flux,现在我们开始用它做一些事情。实际上只有2件事可以做:操作(转换或与其它序列组合)和订阅。

单值序列

我们经常遇到的序列往往只有一个元素,或者是没有元素,例如通过id查找记录。在Reactor中Mono表示单值Flux或空Flux。Mono的API与Flux类似,但是更简洁,因为不是所有的操作对单值序列有意义。RxJava中类似的 类型叫Single,空序列叫Completable。在Reactor中空序列是Mono<Void>

操作符

Flux的绝大部分方法都是操作。在这儿我们不会把所有的方法都讲一遍(可以看javadoc),我们只需要弄明白操作是什么,可以做什么。 例如,可以用log()将Flux的内部事件显示出来,或者用map()进行转换:

Flux<String> flux = Flux.just("red", "white", "blue");

Flux<String> upper = flux
  .log()
  .map(String::toUpperCase);

这段代码将输入的字符串转换成大写,非常简单明了。同时很有意思的一点是(时刻注意,虽然刚开始不太习惯),数据并没有开始处理。什么都 不会显示,因为什么都没有发生(可以自己运行一下代码),调用Flux的操作符仅仅是建立了一个执行计划。操作符实现的逻辑只有当数据开始流动时才会执行,当某一方订阅这个Flux的时候。

要让数据流生效,我们需要用subscribe()方法来订阅Flux,这些方法会回溯我们之前定义的操作链,请求publisher 产生数据。在我们的简单示例中,字符串集合会被遍历进行处理。在更复杂的场景中,可能是从文件系统读取文件,或者从数据库中读取数据,或者是调用一个http服务。
开始调用subscribe():

Flux.just("red", "white", "blue")
  .log()
  .map(String::toUpperCase)
.subscribe();


可以看到当subscribe()没有参数时, 会请求 publisher 发送所有的数据 - 只有一个request并且是 “unbounded”。我们还可以看到发布的每一项的回调(onNext()),结束的回调(onComplete()),以及原始订阅的回调(onSubscribe())。如果需要,我们还可以用Flux的doOn*()方法来监听这些 事件的回调。
subscribe()方法是重载的,有很多变体。其中一个重要 且常用的形式是带回调参数。第一个参数是 Consumer ,用于每一个数据项的 回调,还可以增加一个可选的 Consumer 用于错误处理,以及一个 序列完成后执行的 Runnable 。
例如,为每一个数据项增加回调:

Flux.just("red", "white", "blue")
    .log()
    .map(String::toUpperCase)
.subscribe(System.out::println);

我们可以通过多种方法控制数据流使它变成 “bounded” 。用于控制的 内部接口是从 Subscriber 获取到的 Subscription 。与前面简单调用 subscribe() 等价的复杂形式是:

.subscribe(new Subscriber<String>() {

    @Override
    public void onSubscribe(Subscription s) {
        s.request(Long.MAX_VALUE);
    }
    @Override
    public void onNext(String t) {
        System.out.println(t);
    }
    @Override
    public void onError(Throwable t) {
    }
    @Override
    public void onComplete() {
    }

});

想要控制数据流为一次消费2个数据项,可以更加智能的使用Subscription :

.subscribe(new Subscriber<String>() {

    private long count = 0;
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(2);
    }

    @Override
    public void onNext(String t) {
        count++;
        if (count>=2) {
            count = 0;
            subscription.request(2);
        }
     }
...

这个 Subscriber 每次会打包2个数据项。这个场景很普遍,因此我们会考虑把实现提取到一个专门的类中以方便使用。

线程、调度和后台处理

上面的示例中有一个有趣的特点是所有的log方法都是在主线程中执行的,即 subscribe() 调用者的线程。这是一个关键点:Reactor以尽可能少的线程来实现高性能。过去5年我们习惯于使用 多线程、线程池和异步处理来提升系统性能。对于这种新的思路 可能会比较诧异。但是事实是:即使是JVM这种专门对线程处理做过优化的技术,线程 切换的成本也是很高的。在单个线程上进行计算总是要快的多。Reactor给了我们进行异步编程的方法,并且假设我们知道我们在做什么。
Flux提供了一些方法来控制线程的边界。例如,可以使用 Flux.subscribeOn() 配置一个订阅在后台 线程中进行处理:

Flux.just("red", "white", "blue")
  .log()
  .map(String::toUpperCase)
  .subscribeOn(Schedulers.parallel())
.subscribe(null, 2);

可以看到订阅和所有的处理都在 “parallel-1-1” 这个后台线程中。单线程对于CPU密集型的处理来说是没问题的。然而如果是IO密集型的处理就可能会阻塞。在这个场景中,我们希望处理尽可能的完成不至于阻塞调用方。一个线程池仍会提供很大的帮助,我们可以用 Schedulers.parallel() 获取线程池。将单个数据项的处理拆分到独立的线程中进行处理,我们需要把它放到独立的发布方中,每个发布方都在后台线程中请求执行结果。一种方法是调用 flatMap() 操作,会把数据项映射到一个 Publisher 并返回一个新类型的序列:

Flux.just("red", "white", "blue")
  .log()
  .flatMap(value ->
     Mono.just(value.toUpperCase())
       .subscribeOn(Schedulers.parallel()),
     2)
.subscribe(value -> {
  log.info("Consumed: " + value);
})

注意 flatMap() 把数据项放入一个子 publisher ,这样可以控制每个子项的订阅而不是整个序列的订阅。Reactor内部的默认行为可以尽可能长的挂起在一个线程上,因此如果需要特定的 数据项在后台线程中处理,必须要明确的指明。事实上这是一系列 强制进行并行计算的方法中的一种。

现在是多个线程在进行处理,并且 flatMap() 中的 批量参数保证只要可能每次都会处理2个数据项。Reactor会让自己尽可能的聪明,预先从 Publisher 中提取数据项,并且估算订阅方的等待时间。
Flux 还有一个 publishOn() 方法的作用 类似,只不过控制的是发布方的行为:

Flux.just("red", "white", "blue")
  .log()
  .map(String::toUpperCase)
  .subscribeOn(Schedulers.newParallel("sub"))
  .publishOn(Schedulers.newParallel("pub"), 2)
.subscribe(value -> {
    log.info("Consumed: " + value);
});



注意 订阅方的回调(内容为 “Consumed: …​”) 执行在发布方线程 pub-1-1 上。如果把 subscribeOn() 方法去掉,会发现所有的数据项的处理都在线程 pub-1-1 上。这再一次说明 Reactor 使用尽可能少的线程 - 如果没有明确的指定要切换线程,下一个调用会在当前调用的线程上执行。

提取器:有副作用的订阅者

另一种订阅序列的方式是调用 Mono.block() 或 Mono.toFuture() 或 Flux.toStream() (这些是提取器方法,将 Reactive 类型转换为阻塞类型)。Flux 还有 collectList() 和 collectMap() 将 Flux 转换成 Mono。他们并没有真正的订阅序列,但是他们会抛弃控制订阅单个数据项的能力。
警告: 一个黄金规则是“永远不要调用提取器”。当然有一些例外,例如在测试程序中需要能够通过阻塞来汇总结果。
这些方法用于将 Reactive 转换为阻塞模式,当我们需要适配一个老式的API,例如Spring MVC的时候。在调用 Mono.block() 的时候,我们放弃了 Reactive Streams 所有优势。这是 Reactive Streams 和 Java 8 Streams 的关键区别 - Java Stream只有 “all or nothing” 的订阅模式,等同于 Mono.block()。当然 subscribe() 也会阻塞调用线程,因此与转换方法一样危险,但是有 足够的控制手段 - 可以用 subscribeOn() 防止阻塞,也可以通过背压来将数据项进行溢出并且定时的决定是否继续处理。


https://www.slideshare.net/InfoQ/servlet-vs-reactive-stacks-in-five-use-cases
https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html
The term "reactive" refers to programming models that are built around reacting to change — network component reacting to I/O events, UI controller reacting to mouse events, etc. In that sense non-blocking is reactive because instead of being blocked we are now in the mode of reacting to notifications as operations complete or data becomes available.


There is also another important mechanism that we on the Spring team associate with "reactive" and that is non-blocking back pressure. In synchronous, imperative code, blocking calls serve as a natural form of back pressure that forces the caller to wait. In non-blocking code it becomes important to control the rate of events so that a fast producer does not overwhelm its destination.
http://www.reactive-streams.org/


http://www.baeldung.com/reactor-core
Essentially, Reactive Streams is a specification for asynchronous stream processing.
In other words, a system where lots of events are being produced and consumed asynchronously. Think about a stream of thousands of stock updates per second coming into a financial application, and for it to have to respond to those updates in a timely manner.
One of the main goals of this is to address the problem of back pressure. If we have a producer which is emitting events to a consumer faster than it can process them, then eventually the consumer will be overwhelmed with events, running out of system resources. Backpressure means that our consumer should be able to tell the producer how much data to send in order to prevent this, and this is what is laid out in the specification.
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
  .log()
  .subscribe(elements::add);

The core difference is that Reactive is a push model, whereas the Java 8 Streams are a pull model. In reactive approach. events are pushed to the subscribers as they come in. 
With Reactive we could have an infinite stream coming in from an external resource, with multiple subscribers attached and removed on an ad hoc basis. We can also do things like combine streams, throttle streams and apply backpressure, which we will cover next.
Backpressure is when a downstream can tell an upstream to send it fewer data in order to prevent it from being overwhelmed.

Flux.just(1, 2, 3, 4)
  .log()
  .map(i -> i * 2)
  .subscribeOn(Schedulers.parallel())
  .subscribe(elements::add);
http://www.baeldung.com/spring-5-functional-web
RouterFunction<ServerResponse> routingFunction() {
    return route(path("/"), req -> ok().build());
}
RouterFunction router = route(POST("/login"), req -> req
  .body(toFormData())
  .map(MultiValueMap::toSingleValueMap)
  .filter(formData -> "baeldung".equals(formData.get("user")))
  .filter(formData -> "you_know_what_to_do".equals(formData.get("token")))
  .flatMap(formData -> ok().body(Mono.just("welcome back!"), String.class))
  .switchIfEmpty(ServerResponse.badRequest().build()))
http://www.ruanyifeng.com/blog/2017/05/server-sent_events.html

另外还有一个概念就是SSE。就是Server-sent events的缩写。这是个什么鬼。其实就是个概念。或者是一个标准。就是把数据从web服务端传输到客户端的一种做法。顾名思义:服务端发送给客户端的事件。神奇吧。
通过传统的webmvc的方式来实现reactive效果
严格地说,HTTP 协议无法做到服务器主动推送信息。但是,有一种变通方法,就是服务器向客户端声明,接下来要发送的是流信息(streaming)。
也就是说,发送的不是一次性的数据包,而是一个数据流,会连续不断地发送过来。这时,客户端不会关闭连接,会一直等着服务器发过来的新的数据流,视频播放就是这样的例子。本质上,这种通信就是以流信息的方式,完成一次用时很长的下载。
SSE 就是利用这种机制,使用流信息向浏览器推送信息。它基于 HTTP 协议,目前除了 IE/Edge,其他浏览器都支持。

SSE 与 WebSocket 作用相似,都是建立浏览器与服务器之间的通信渠道,然后服务器向浏览器推送信息。
总体来说,WebSocket 更强大和灵活。因为它是全双工通道,可以双向通信;SSE 是单向通道,只能服务器向浏览器发送,因为流信息本质上就是下载。如果浏览器向服务器发送信息,就变成了另一次 HTTP 请求。

  • SSE 使用 HTTP 协议,现有的服务器软件都支持。WebSocket 是一个独立协议。
  • SSE 属于轻量级,使用简单;WebSocket 协议相对复杂。
  • SSE 默认支持断线重连,WebSocket 需要自己实现。
  • SSE 一般只用来传送文本,二进制数据需要编码后传送,WebSocket 默认支持传送二进制数据。
  • SSE 支持自定义发送的消息类型。


Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts