文章目录
- 一、概述
- 1、Spring体系定位
- 2、Spring MVC和WebFlux差异
- 二、入门
- 1、依赖
- 2、ReactorHttpHandlerAdapter(main启动)
- 3、DispatcherHandler(SpringWebFlux启动)
- 4、WebFilter
- 三、DispatcherHandler理解
- 1、handle
前置知识(很重要):Reactive Streams&Reactor Core
一、概述
官网地址
1、Spring体系定位
我们不难发现整个响应式编程和传统的Servlet API属于两套完全不同但是又并行的技术栈,所以如果老项目想要改造为响应式编程,项目改造的成本很大
2、Spring MVC和WebFlux差异
1)技术栈体系上:
在声明接口处理上,如果是使用注解的形式,那么两边可以复用。具体到业务代码的处理写法上,必定涉及到不小的改造。
2)执行流程上:
二、入门
1、依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
</dependencies>
2、ReactorHttpHandlerAdapter(main启动)
服务端代码:
@SneakyThrows
public static void main(String[] args) {
HttpHandler handler = (ServerHttpRequest request, ServerHttpResponse response) -> {
URI uri = request.getURI();
System.out.println(Thread.currentThread() + "接收请求:" + uri);
request.getHeaders(); //获取请求头
//数据的发布者:Mono<DataBuffer>、Flux<DataBuffer>
// 1、创建响应数据的DataBuffer
DataBufferFactory factory = response.bufferFactory();
// 2、数据写入Buffer
DataBuffer buffer = factory.wrap((uri + " ==> Hand success").getBytes());
// 3、构建DataBuffer的发布者
return response.writeWith(Mono.just(buffer));
};
// 启动一个服务器,监听8080端口,接受数据,拿到数据交给 HttpHandler 进行请求处理
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
// 使用HttpServer启动Netty服务器
HttpServer.create()
.host("localhost")
.port(8080)
.handle(adapter)
.bindNow();
System.out.println("start success");
System.in.read();
}
浏览器请求:
控制台返回:
3、DispatcherHandler(SpringWebFlux启动)
服务端代码,注意点:
- 涉及Http调用时,不要再使用RestTemplate发起,而是需要替换为WebClient处理
- 如果接口需要支持服务端推送的效果,需要设置produces为MediaType.TEXT_EVENT_STREAM_VALUE(text/event-stream)
public class TestController {
@RequestMapping("/test")
public String test() {
System.out.println("http request test");
return "test";
}
@GetMapping(value = "/test2", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Tuple2<Long, String>> test2() {
WebClient webClient = WebClient.create("http://localhost:20888");
return webClient.post()
.uri("test")
.retrieve()
.bodyToFlux(String.class)
.delayElements(Duration.ofSeconds(1)).index();
}
@GetMapping(value = "/test3", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Tuple2<Long, Integer>> test3() {
return Flux.range(1, 3)
.delayElements(Duration.ofSeconds(1))
.timestamp();
}
}
访问接口:
http://127.0.0.1:20888/test2
浏览器效果:
控制台:
访问接口:
http://127.0.0.1:20888/test3
4、WebFilter
由于webFlux使用的是Netty服务器,所以无法再使用Tomcat中Servlet规范下的Filter,此时我们需要借助Spring-web的org.springframework.web.server.WebFilter来处理我们的Filter逻辑
@Component
public class TestWebFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return Mono.defer(() -> {
ServerHttpRequest request = exchange.getRequest();
System.out.println("TestWebFilter: " + request.getPath());
return chain.filter(exchange);
});
}
}
三、DispatcherHandler理解
1、handle
类比Spring MVC中DispatcherServlet#doDispatch方法
Spring WebFlux底提供了NettyWebServer,程序启动时,会调用NettyWebServer的startHttpServer(),并且给HttpServer绑定一个ReactorHttpHandlerAdapter,它包装了一个HttpHandler,ReactorHttpHandlerAdapter从Reactor-Netty-Http收到Request和Response对象后,就会传递给HttpHandler进行处理,HttpHandler链路依次为:
- ReactorHttpHandlerAdapter内部实际会委托DelayedInitializationHttpHandler进行处理
- 再次向下委托到HttpWebHandlerAdapter,会它负责将Request和Response包装生成一个ServerWebExchange对象,后面Handler接收到的都是ServerWebExchange对象
- 再次委托ExceptionHandlingWebHandler(可作用于异常隔离)
- 再次委托FilteringWebHandler(内部有一个chain用于存放我们创建的WebFilter)
- 最后,委托了DispatcherHandler,DispatcherHandler会根据当前请求,找到Controller中对应的方法。从而得到Mono或Flux
public Mono<Void> handle(ServerWebExchange exchange) {
if (this.handlerMappings == null) {
return createNotFoundError();
}
// 跨域问题处理
if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {
return handlePreFlight(exchange);
}
//拿到所有的 handlerMappings
return Flux.fromIterable(this.handlerMappings)
// 找每一个mapping看谁能处理请求
.concatMap(mapping -> mapping.getHandler(exchange))
// 直接触发获取元素; 拿到流的第一个元素; 找到第一个能处理这个请求的handlerAdapter
.next()
// 如果没拿到这个元素,则响应404错误;
.switchIfEmpty(createNotFoundError())
// 异常处理,一旦前面发生异常,调用处理异常
.onErrorResume(ex -> handleDispatchError(exchange, ex))
// 调用方法处理请求,得到响应结果
.flatMap(handler -> handleRequestWith(exchange, handler));
}