DevilKing's blog

冷灯看剑,剑上几分功名?炉香无需计苍生,纵一穿烟逝,万丈云埋,孤阳还照古陵

0%

Spring Reactive Programing

原文链接

FRP的程序,程序的主要逻辑不需要再关注底层的操作是怎么被调度的,而仅仅关心一个一个具体的操作应该做什么,互相配合完成系统目标。这个角度来说,FRP的方式是声明式的;而声明式的代码相对传统的过程式代码有更好的可读性和可维护性。

GoF在设计模式里面特别声明了我们需要考虑优先使用组合而不是继承,不幸的是这一忠告从来就没有被人们认真对待; FRP的思维方式完全不提继承的事儿,但是封装依然是必要的;组合则被提到了首要的位置,因为函数式编程的主要复用方式就是组合

传统的Spring MVC框架工作机制如下

  1. DispatcherServelet 会搜索WebApplicationContext来查找DI容器中注册的Controller以处理进来的HTTP请求
  2. 本地化解析的Bean在这一过程中也会被一并查找并关联起来以便后续渲染View的时候使用来本地化View中的显示内容
  3. 主题解析的Bean则被用来关联后续要使用的View模板,以进行CSS渲染等额外处理
  4. 如果HTTP请求包含多部分媒体内容,那么请求会被封装在一个MultipartHttpServeletRequest中处理
  5. Dispatcher会搜索对应的Handler,找到之后,handler对应的controller以及其前置处理、后续处理会被按照顺序依次处理以准备模型返回,或者被用于后续View渲染
  6. 如果一个模型被返回,对应的View就会被渲染并返回响应的HTTP消息 整体的处理逻辑是一个线性的同步处理逻辑。

传统的Sping MVC框架的接口都定义在 org.springframework.web.servlet包中,而支持响应式编程的Web框架被命名为WebFlux,对应的接口和注解放在一个新的Java包中:org.springframework.web.reactive。它是全异步、非阻塞的

Mono或者Flux对象概念有些类似于Java8中的CompletableFuture,自身支持类似的lambda表达式组合来实现流式操作。这两个类型本质上实现了Reactive Stream中的Publish的概念,可以认为它是流的发布者。

FluxMono的不同是,它本身会产生0到N个事件输出到流中;然后才最终完成或者报错。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# netty
HttpHandler handler = ...
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
HttpServer.create(host, port).newHandler(adapter).block();


#jetty
HttpHandler handler = ...
Servlet servlet = new JettyHttpHandlerAdapter(handler);

Server server = new Server();
ServletContextHandler contextHandler = new ServletContextHandler(server, "");
contextHandler.addServlet(new ServletHolder(servlet), "/");
contextHandler.start();

ServerConnector connector = new ServerConnector(server);
connector.setHost(host);
connector.setPort(port);
server.addConnector(connector);
server.start();

将返回值的处理与路由进行分离

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class PersonHandler {
private final PersonRepository repository;

public PersonHandler(PersonRepository repository) {
this.repository = repository;
}

public Mono<ServerResponse> listPeople(ServerRequest request) {
Flux<Person> people = repository.allPeople();
return ServerResponse.ok().contentType(APPLICATION_JSON).body(people, Person.class);
}

public Mono<ServerResponse> createPerson(ServerRequest request) {
Mono<Person> person = request.bodyToMono(Person.class);
return ServerResponse.ok().build(repository.savePerson(person));
}

public Mono<ServerResponse> getPerson(ServerRequest request) {
int personId = Integer.valueOf(request.pathVariable("id"));
Mono<ServerResponse> notFound = ServerResponse.notFound().build();
Mono<Person> personMono = this.repository.getPerson(personId);
return personMono
.flatMap(person -> ServerResponse.ok().contentType(APPLICATION_JSON).body(fromObject(person)))
.switchIfEmpty(notFound);
}
}


# router

PersonRepository repository = ...
PersonHandler handler = new PersonHandler(repository);

RouterFunction<ServerResponse> personRoute =
route(GET("/person/{id}").and(accept(APPLICATION_JSON)), handler::getPerson)
.andRoute(GET("/person").and(accept(APPLICATION_JSON)), handler::listPeople)
.andRoute(POST("/person").and(contentType(APPLICATION_JSON)), handler::createPerson);