Ezio's Blog
Posts Categories Tags Music Mood About
Ezio's Blog· Light
☰ Menu
Posts Categories Tags Music Mood About
Expand all Back to top Go to bottom

Spring Webflux

Author: Ezio Date: August 25, 2022  11:04:18 Category: Spring

什么是webflux

Spring Webflux 是 Spring5 中添加的新模块,用于web开发,功能和 SpringMVC 相似,Webflux 使用的是响应式编程方式。

Webflux 的特点:

  • 异步非阻塞:Webflux是一个异步非阻塞的Web框架,它能够充分利用多核CPU的硬件资源去处理大量的并发请求。
  • 响应式函数编程:以函数式编程(lambda)为基础,以Reactor(实现Reactive-stream)为核心。
  • 可运行在支持NIO的容器上,支持 Netty、Undertow、Servlet容器(前提是容器要支持Servlet3.1,因为非阻塞IO是使用了Servlet3.1的特性)。
  • 不能使接口的响应时间缩短,它仅仅能够提升吞吐量和伸缩性。

Spring Webflux 和 Spring MVC 的比较:

  • 两个框架都可以使用注解方式,都可以运行在 Tomet 等容器中
  • SpringMVC 采用命令式编程,Webflux 采用异步响应式编程

Webflux 工作流程

  • 容器reactor-netty:即基于netty实现的符合reactor标准的容器,Spring Boot默认使用它。其对应的关键核心接口是HttpHandler,webflux中对应的重要实现类是:WebHttpHandlerBuilder,它是整个webflux程序的入口。
  • Webfilter:过滤器
  • DispatcherHandler:核心处理器,协调如下三个核心组件工作
    • HandleMapping:存储请求URI和处理器的对应关系
    • HandlerAdapter:封装了主要处理逻辑,处理结果封装成HandlerResult
    • HandlerResultHandler:针对上一步结果的处理器
  • WebExceptionHandler:整个流程中抛出的任何异常,都会被它捕获,“真”全局异常处理

Webflux 应用场景

  1. 特别适合在IO密集型的服务中,比如微服务网关。IO 密集型包括:磁盘IO密集型, 网络IO密集型,微服务网关就属于网络 IO 密集型,使用异步非阻塞式编程模型,能够显著地提升网关对下游服务转发的吞吐量,在微服务架构中,SpringMVC 和 SpringWebflux可以混合使用。
  2. **WebFlux 不是 Spring MVC 的替代方案。**虽然 WebFlux 也可以被运行在 Servlet 容器上(需是 Servlet 3.1+ 以上的容器),但是 WebFlux 主要还是应用在异步非阻塞编程模型,而 Spring MVC 是同步阻塞的,如果你目前在 Spring MVC 框架中大量使用非同步方案,那么,WebFlux 更适合的,否则,还是使用 Spring MVC 更合适。

什么是响应式编程

响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。本质上是对数据流或某种变化所作出的反应,但是这个变化什么时候发生是未知的,所以他是一种基于异步、回调的方式在处理问题。

响应式编程基于Reactor(Reactor 是一个运行在 Java8 之上的响应式框架)的思想,当你做一个带有一定延迟的才能够返回的io操作时,不会阻塞,而是立刻返回一个流,并且订阅这个流,当这个流上产生了返回数据,可以立刻得到通知并调用回调函数处理数据。

电子表格程序就是响应式编程的一个例子。单元格可以包含字面值或类似”=B1+C1”的公式,而包含公式的单元格的值会依据其他单元格的值的变化而变化。

响应式传播核心特点之一:变化传播:一个单元格变化之后,会像多米诺骨牌一样,导致直接和间接引用它的其他单元格均发生相应变化。

Reactive Streams

Reactive Streams,翻译为反应式流,是针对于响应式编程所提出的一套规范。对于Java程序员来说,Reactive Streams为我们提供了Java中的 Reactive Programming 的通用API。

Reactive Streams 非常类似于 JPA 或 JDBC。两者都是API规范,实际使用时需要使用API对应的具体实现。例如,从JDBC规范中,有DataSource接口,而Oracle JDBC实现了DataSource接口。Microsoft的SQL Server JDBC实现也实现了DataSource接口。

背压:由消费者控制生产者的生产速度,以解决生产者生产的速度远大于消费者消费的速度时所造成的消息的积压

Reactive Streams API中仅仅包含了如下四个接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//发布者
public interface Publisher<T> {
//接收订阅者,数据的流向者
public void subscribe(Subscriber<?super T>s);
}

//订阅者
public interface Subscriber<T> {
//订阅成功后触发,并且表明可以开始接收订阅数据了
public void onSubscribe(Subscription s);

//获取接受数据的下一项
public void onNext(T t);

//在发布者或订阅遇到错误时触发
public void onError(Throwable t);

//接受完所有的数据后触发
public void onComplete();
}

//订阅关系
public interface Subscription {
//背压 请求数据个数,解决数据拥堵
public void request(long n);

//取消订阅
public void cancel();
}

//处理器,表示一个处理阶段,它既是订阅者也是发布者,并且遵守两者的契约,消费和下发数据
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {

}

Reactor

Reactor 就是 Reactive Streams 的实现。Reactor 有两个核心类: Flux<T> 和 Mono<T>,这两个类都实现 Publisher 接口。

Flux 代表的是 0-N 个元素的响应式序列,而 Mono 代表的是 0-1 个的元素的结果。

Flux 和 Mono 都是数据流的发布者,使用 Flux 和 Mono 都可以发出三种数据信号:元素值,错误信号,完成信号;错误信号和完成信号都代表终止信号,终止信号用于告诉订阅者数据流结束了,错误信号终止数据流同时把错误信息传递给订阅者。

三种信号的特点:

  • 错误信号和完成信号都是终止信号,不能共存
  • 如果没有发送任何元素值,而是直接发送错误或者完成信号,表示是空数据流
  • 如果没有错误信号,也没有完成信号,表示是无限数据流

Moon 和 Flux 的区别, 其实就是 one / more 的区别. many one = more , 同时也可以分解。

Flux 类中常用的静态方法

  • Just() 可以指定序列中包含的全部元素。创建出来的Flux序列在发布这些元素之后会自动结束
1
2
// just() 
Flux.just("Hello", "World").subscribe(System.out::println);
  • fromArray() 通过一个数组创建Flux对象
1
2
Integer[] array = new Integer[]{1, 2, 3};
Flux.fromArray(array).subscribe(System.out::println);
  • fromIterable() 通过Iterable对象创建Flux对象
1
2
List<Integer> list = Arrays.asList(new Integer[]{1, 2, 3});
Flux.fromIterable(list).subscribe(System.out::println);
  • fromStream() 通过Stream对象创建Flux对象
1
Flux.fromStream(list.stream()).subscribe(System.out::println);
  • empty() 创建一个不包含任何元素,只发布结束消息的序列
1
Flux.empty().subscribe(System.out::println);
  • range() 创建包含从start起始到count个数量的Integer对象的序列
1
Flux.range(1, 10).subscribe(System.out::println);
  • interval() 创建一个包含了从0开始递增的Long对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间
1
Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
  • never() 创建一个不包含任务消息通知的序列
1
Flux.never().subscribe(System.out::println);
  • generate() 创建复杂逻辑的序列,generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)
    • stateSupplier 用来提供初始的状态对象, 状态对象会作为generator使用的第一个参数传入,类似初始值
    • 序列的产生是通过调用所提供的的 SynchronousSink 对象的next(),complete()和error(Throwable)方法来完成的
1
2
3
4
5
Flux.generate(() -> 0, (i, sink) -> {
sink.next("2*" + i + "=" + 2 * i);
if (i == 9) sink.complete();
return i + 1;
}).subscribe(System.out::println);
  • create() 与generate()类似,不同之处在于所使用的是 SynchronousSink 变为了 FluxSink 对象。
1
2
3
4
5
// FluxSink支持同步和异步的消息产生,并且可以在一次调用中产生多个元素。
Flux.create(sink -> {
for (int i = 0; i < 4; i++) sink.next(i);
sink.complete();
}).subscribe(System.out::println);

Mono 类中常用的静态方法

  • just() 可以指定序列中包含的全部元素。
1
Mono.just("hello world").subscribe(System.out::println);
  • empty() 创建一个不包含任何元素。
1
Mono.empty().subscribe(System.out::println);
  • error(Throwable error) 创建一个只包含错误消息的序列。
1
Mono.error(new RuntimeException()).subscribe(System.out::println);
  • fromCallable()、 fromCompletionStage()、 fromFuture()、 fromRunnable() 和 fromSupplier():

分别从 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中创建 Mono。

1
2
3
4
Mono.fromCallable(() -> "callback function").subscribe(System.out::println);
Mono.fromFuture(CompletableFuture.completedFuture("from future")).subscribe(System.out::println);
Mono.fromRunnable(() -> System.out.println(Thread.currentThread().getName())).subscribe();
Mono.fromSupplier(() -> new Date().toString()).subscribe(System.out::println);
  • delay(Duration duration) 创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。
1
Mono.delay(Duration.ofMillis(2)).map(String::valueOf).subscribe(System.out::println);
  • ignoreElements(Publisher source) 创建一个 Mono 序列,忽略作为源的 Publisher 中的所有元素(删除它们),只产生结束消息。
1
2
Mono<String> mono = Mono.just("hello world");
Mono.ignoreElements(mono).subscribe(System.out::println);
  • justOrEmpty(Optional<? extends T> data) 和 **justOrEmpty(T data) 从一个 Optional 对象或可能为 null 的对象中创建 Mono。只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。
1
Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);

操作符

  • buffer() 和 bufferTimeout()

这两个操作符的作用是把当前流中的元素收集到集合中,并把集合对象作为流中的新元素。

在进行收集时可以指定不同的条件:所包含的元素的最大数量或收集的时间间隔。方法buffer()仅使用一个条件,而bufferTimeout()可以同时指定两个条件。

指定时间间隔时可以使用Duration对象或毫秒数,即使用bufferMillis()或bufferTimeoutMillis()两个方法。

除了元素数量和时间间隔外,还可以通过 bufferUntil 和 bufferWhile 操作符来进行收集。这两个操作符的参数时表示每个集合中的元素索要满足的条件的Predicate对象。

bufferUntil会一直收集直到Predicate返回true。

使得Predicate返回true的那个元素可以选择添加到当前集合或下一个集合中;bufferWhile则只有当Predicate返回true时才会收集。一旦为false,会立即开始下一次收集。

1
2
3
4
Flux.range(1, 100).buffer(20).subscribe(System.out::println);
Flux.intervalMillis(100).bufferMillis(1001).take(2).toStream().forEach(System.out::println);
Flux.range(1, 10).bufferUntil(i -> i%2 == 0).subscribe(System.out::println);
Flux.range(1, 10).bufferWhile(i -> i%2 == 0).subscribe(System.out::println);
  • filter() 对流中包含的元素进行过滤,只留下满足Predicate指定条件的元素。
1
Flux.range(1, 10).filter(i -> i%2 == 0).subscribe(System.out::println);
  • zipWith() 把当前流中的元素与另一个流中的元素按照一对一的方式进行合并。在合并时可以不做任何处理,由此得到的是一个元素类型为Tuple2的流;也可以通过一个BiFunction函数对合并的元素进行处理,所得到的流的元素类型为该函数的返回值。
1
2
Flux.just("a", "b").zipWith(Flux.just("c", "d")).subscribe(System.out::println);
Flux.just("a", "b").zipWith(Flux.just("c", "d"), (s1, s2) -> String.format("%s-%s", s1, s2)).subscribe(System.out::println);
  • take() 从当前流中提取元素
    • take(long n),take(Duration timespan)和takeMillis(long timespan):按照指定的数量或时间间隔来提取
    • takeLast(long n):提取流中的最后N个元素
    • takeUntil(Predicate<? super T> predicate) :提取元素直到Predicate返回true
    • takeWhile(Predicate<? super T> continuePredicate):当Predicate返回true时才进行提取
    • takeUntilOther(Publisher<?> other):提取元素知道另外一个流开始产生元素
1
2
3
4
Flux.range(1, 1000).take(10).subscribe(System.out::println);
Flux.range(1, 1000).takeLast(10).subscribe(System.out::println);
Flux.range(1, 1000).takeWhile(i -> i < 10).subscribe(System.out::println);
Flux.range(1, 1000).takeUntil(i -> i == 10).subscribe(System.out::println);
  • reduce() 和 reduceWith()

reduce 和 reduceWith 操作符对流中包含的所有元素进行累积操作,得到一个包含计算结果的 Mono 序列。累积操作是通过一个 BiFunction 来表示的。在操作时可以指定一个初始值。如果没有初始值,则序列的第一个元素作为初始值。

1
2
Flux.range(1, 100).reduce((x, y) -> x + y).subscribe(System.out::println);
Flux.range(1, 100).reduceWith(() -> 100, (x, y) -> x + y).subscribe(System.out::println);
  • merge() 和 mergeSequential()

merge 和 mergeSequential 操作符用来把多个流合并成一个 Flux 序列。不同之处在于 merge 按照所有流中元素的实际产生顺序来合并,而 mergeSequential 则按照所有流被订阅的顺序,以流为单位进行合并。

1
2
Flux.merge(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5)).toStream().forEach(System.out::println);
Flux.mergeSequential(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5)).toStream().forEach(System.out::println);
  • flatMap() 和 flatMapSequential()

flatMap 和 flatMapSequential 操作符把流中的每个元素转换成一个流,再把所有流中的元素进行合并。一个按照实际产生顺序,一个按照被订阅的顺序。

1
2
3
4
Flux.just(5, 10)
.flatMap(x -> Flux.interval(Duration.of(x * 2, ChronoUnit.SECONDS)).take(x))
.toStream()
.forEach(System.out::println);
  • concatMap()

concatMap 操作符的作用也是把流中的每个元素转换成一个流,再把所有流进行合并。与 flatMap 不同的是,concatMap 会根据原始流中的元素顺序依次把转换之后的流进行合并;与 flatMapSequential 不同的是,concatMap 对转换之后的流的订阅是动态进行的,而 flatMapSequential 在合并之前就已经订阅了所有的流。

1
2
3
4
Flux.just(5, 10)
.concatMap(x -> Flux.interval(Duration.of(x * 2, ChronoUnit.SECONDS)).take(x))
.toStream()
.forEach(System.out::println);
  • combineLatest()

combineLatest 操作符把所有流中的最新产生的元素合并成一个新的元素,作为返回结果流中的元素。只要其中任何一个流中产生了新的元素,合并操作就会被执行一次,结果流中就会产生新的元素。

1
2
3
4
5
6
Flux.combineLatest(
Arrays::toString,
Flux.interval(Duration.ofMillis(1000)).take(5),
Flux.interval(Duration.ofSeconds(3)).take(5))
.toStream()
.forEach(System.out::println);

Author: Ezio

Permalink: https://ezioy.cn/2022/08/25/Spring-Webflux/

License: Copyright (c) 2019 CC-BY-NC-4.0 LICENSE

Slogan: Nothing is true,Everything is permitted

Tag(s): # Webflux # 响应式编程
back · home
Kafka入门 Spring Gateway
Ezio © 2019 - 2026 | Powered by Hexo & Chic | 访客数量:   浏览次数: | 渝公网安备50011302222043 | 渝ICP备2023013933号-1