DevilKing's blog

冷灯看剑,剑上几分功名?炉香无需计苍生,纵一穿烟逝,万丈云埋,孤阳还照古陵

0%

Reactive Programing

原文链接

因为无状态是微服务的潜在要求也是最重要的一个要求之一; 函数式编程相较于其他编程范式更强调无副作用的编程思维,和微服务的基本要求自然契合的很好

函数式编程的基本思路是将程序的执行看作是一堆函数的组合处理和求值过程;纯粹的函数式编程要求数据是不可变的, 同样的数值输入在流经同样的函数处理的时候必须得到确定的输出,不容许有预料之外的副作用产生。(数据的不变性)

流+函数式->响应式

1
2
3
4
5
6
7
8
9
10
11
12
13
requestStream.subscribe(function(requestUrl) {
// execute the request
var responseStream = Rx.Observable.create(function (observer) {
jQuery.getJSON(requestUrl)
.done(function(response) { observer.onNext(response); })
.fail(function(jqXHR, status, error) { observer.onError(error); })
.always(function() { observer.onCompleted(); });
});

responseStream.subscribe(function(response) {
// do something with the response
});
}

处理这些响应数据的代码放在第二个subscribe中

Java8采用stream()接口来引入

RxJava扩展观察者模式

1
2
3
4
5
6
7
import io.reactivex.*;

public class HelloWorld {
public static void main(String[] args) {
Flowable.just("Hello world").subscribe(System.out::println);
}
}

数据流传递给方法

使用并发的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Flowable.range(1, 10)
.flatMap(v ->
Flowable.just(v)
.subscribeOn(Schedulers.computation())
.map(w -> w * w)
).blockingSubscribe(System.out::println);


Flowable.range(1, 10)
.parallel()
.runOn(Schedulers.computation())
.map(v -> v * v)
.sequential()
.blockingSubscribe(System.out::println);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Assume the record values represent lines of text.  For the sake of this example, you can ignore
// whatever may be stored in the record keys.
KStream<String, String> textLines = ...;

KStream<String, Long> wordCounts = textLines
// Split each text line, by whitespace, into words. The text lines are the record
// values, i.e. you can ignore whatever data is in the record keys and thus invoke
// `flatMapValues` instead of the more generic `flatMap`.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
// Group the stream by word to ensure the key of the record is the word.
.groupBy((key, word) -> word)
// Count the occurrences of each word (record key).
// This will change the stream type from `KGroupedStream<String, String>` to
// `KTable<String, Long>` (word -> count).
.count()
// Convert the `KTable<String, Long>` into a `KStream<String, Long>`.
.toStream();