Thursday, July 9, 2015

RxJava - reactive programming



https://iamzhangyi.github.io/2018/02/01/thinking-rule-of-rp
响应式编程(Reactive Programming)的本质是异步非阻塞的高响应式处理,最核心思想则为Everything is stream,即针对流进行处理,这是其根本。从这个角度讲,我们可以将响应式编程的设计思想视为Stream-Oriented Design,即面向流的设计。
正如面向对象设计以对象为基本设计要素,函数式编程思想以函数为基本设计要素,响应式编程则应该以流为基本设计要素。这带来设计思想上根本的变化,包括:
  • 以流作为建模的元素
  • 流存在松耦合的上下游关系
  • 以流为重用的单位
  • 对流进行转换、运算、合并与拆分

http://blog.danlew.net/2016/01/25/rxjavas-repeatwhen-and-retrywhen-explained/

http://blog.freeside.co/2015/01/29/simple-background-polling-with-rxjava/
Observable.interval(30, TimeUnit.SECONDS)
That will return an Observable<Long> that emits a value every 30 seconds. The values are clock ticks starting at zero.

Observable.interval(30, TimeUnit.SECONDS)
          .map((tick) -> messageService.getRecentMessages())
Now we have an Observable<List<Message>>. So far so good. However, that map operation is doing blocking I/O so we should shift it onto an appropriate scheduler (yes, Retrofit can return rx.Observable so we could handle this in a non-blocking way but I’ll skip that for the purposes of this discussion).

Observable.interval(30, TimeUnit.SECONDS, Schedulers.io())
          .map(tick -> messageService.getRecentMessages())


Reactive programming is a paradigm that revolves around the propagation of change.
If a program propagates all the changes that modify its data to all the interested parties, then this program can be called reactive.

Reactive programming is a programming paradigm based on the concept of an asynchronous data flow.
http://www.reactivemanifesto.org/

We can become modular if our system is event-driven/message-driven.

A key concept of reactive programming is events. Events can be awaited, can trigger procedures, and can trigger other events.

Easy concurrency to better use their server's power
Easy conditional asynchronous execution

The push approach works on subscription and reaction: the consumer subscribes to new values' emissions; the producer pushes these new values when they are available, and notifies the consumer. At this point, the consumer, well, consumes them.

The producer can now signal that there is no more data available: the onCompleted() event
The producer can now signal that an error occurred: the onError() event
RxJava Observables can be composed instead of nested, saving the developer from the callback hell

The Observer pattern is a behavioral pattern and it provides a way to bind objects in a one-to-many dependency: when one object changes, all the objects depending on it are notified and updated automatically.

Comparing the iterator pattern and the RxJava Observable
Instances of the RxJava Observable class behave somewhat like asynchronous iterators, which notify that there is a next value their subscribers/consumers by themselves. In fact, the Observable class adds to the classic Observer pattern two things available in the Iterable type.

  • The ability to signal the consumer that there is no more data available. Instead of calling the hasNext() method, we can attach a subscriber to listen for a 'OnCompleted' notification.
  • The ability to signal the subscriber that an error has occurred. Instead of try-catching an error, we can attach an error listener to the Observable instance.

Reactive Programming with RxJava: Creating Asynchronous, Event-Based Applications

Observable<Data> memory = ...;  
Observable<Data> disk = ...;  
Observable<Data> network = ...;

// Retrieve the first source with data
Observable<Data> source = Observable  
  .concat(memory, disk, network)
  .first();

first() vs. takeFirst()
As an alternative to using first() for this pattern, you could also use takeFirst().

The difference between the two calls is that first() will throw a NoSuchElementException if none of the sources emits valid data, whereas takeFirst() will simply complete without exception.

How to wait for async Observable to complete
https://groups.google.com/forum/#!topic/rxjava/jMrjk5YeyLI
.toList()
.toBlocking()
.single();

Observable
.timer(10, TimeUnit.SECONDS)
.toBlocking()
.last();    // Wait for observable to complete. Last item discarded.

The computation Scheduler instance is your real choice for doing background work—computations or processing thus its name.

The IO scheduler is reserved for blocking IO operations. Use it for requests to servers, reading from files and sockets, and other similar blocking tasks. Note that its thread pool is unbounded; if its workers are not unsubscribed, the pool will grow indefinitely.

it is expensive to create new threads each time, so in most cases, the computation and the IO Scheduler instances should be used.
http://stackoverflow.com/questions/29566777/does-spring-make-the-securitycontext-available-to-the-thread-executing-a-hystrix
 I'm having is that when the HystrixCommand is executed it is run in a separate thread from a Hystrix thread pool and when my code tries to access the spring security context it is not available on the new thread.
You should be able to get the ApplicationContext in your bean by the normal means. I can see two ways to pass the authentication object: 1) as a parameter to your method, or 2) run hystrix with Semaphore isolation rather than on a separate thread.
@HystrixCommand(fallbackMethod = "updateThingFallback", commandProperties = {
        @HystrixProperty(name = "execution.isolation.strategy", value = "SEMAPHORE")
})
https://yongjhih.gitbooks.io/feed/content/RxJava.html?utm_campaign=CodeBaku&utm_medium=web&utm_source=CodeBaku_10
有效解決重複的 loop 增進效能,維持同個 loop
Observable<Integer> getAgeObs(List<User> users) {
    return Observable.from(users).map(user -> user.getAge());
}

// 如果你堅持一定要傳遞 List
List<Integer> getAgeList(List<User> users) {
    return getAgeObs(users).toList().toBlocking().single();
}

提前打斷迴圈的能力,避免不必要的過濾與轉換
拉平巢狀 callback 增加易讀性
如何導入套用與改變撰寫

既有長時間存取的函式改成 Observable
Observable<File> downloadObs(String url) {
    return Observable.defer(() -> Observable.just(download(url)));
}
downloadObs(url).subscribeOn(Schedulers.io()).subscribe(file -> {
  System.out.println(file);
});

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts