java reactor 响应式编程

java响应式编程。类似订阅发布模式,或者观察者模式。通过异步提高并行性能。

Reactor

  • Flux<T> 是一个标准的Publisher,Flux表示的是包含0到N个元素的异步序列,可以通过一个完成信号或错误信号终止。
    • 这3种类型的信号转化为对一个下游订阅者的onNext,onComplete,onError3个方法的调用
  • Mono<T> 是一个特殊的Publisher,0或者1个元素的异步序列,可以使用onComplete信号或onError信号来终止。Flux和Mono之间可以进行转换。对一个Flux序列进行计数操作,得到的结果是一个Mono对象。把两个Mono序列合并在一起,得到的是一个Flux对象。
    • subscribe() 订阅并且触发
  • Disposable 基于lambda的订阅方法都返回一个Disposable类型,通过调用它的dispose()来取消这个订阅

响应式与传统编程最大的区别是: 响应式中的方法调用是在构造一个流以及处理流中数据的逻辑,当流中产生了数据(发布,订阅),才会执行构造好的逻辑. 传统编程则是直接执行逻辑获取结果.

map: 同步转换流中的元素
flatMap: 异步转换流中的元素为新的流,再把所有流中的元素进行合并。
flatMapMany: 转换Mono中的元素为Flux(1转多)
concat: 将多个流连接在一起组成一个流(按顺序订阅)
merge: 将多个流合并在一起,同时订阅流
zip: 压缩多个流中的元素
zipWhen: zipWhen(Function<T,Mono<? extends T2>> rightGenerator, BiFunction<T,T2,O> combinator),Wait for the result from this mono, use it to create a second mono via the provided rightGenerator function and combine both results into an arbitrary O object, as defined by the provided combinator function
then: 流完成后执行
doOnNext: 流中产生数据时执行
doOnError: 发送错误时执行
doOnCancel: 流被取消时执行
onErrorContinue: 流发生错误时,继续处理数据而不是终止整个流
defaultIfEmpty: 当流为空时,使用默认值
switchIfEmpty: 当流为空时,切换为另外一个流
as: 将流作为参数,转为另外一个结果。官方例子:flux.as(Mono::from).subscribe() 将flux通过Mono.from函数转化为mono
collectList(): 例子:换成以List<Integer>为对象的Mono Mono<List<Integer>> mono = Flux.range(1,5).collectList();
blockFirst,blockLast:阻塞至第一个或者最后一个值处理完成
cache: 当前的流将会被缓存起来,用于之后的操作
publishOn 和 subscribeOn: 这两个方法的作用是指定执行 Reactive Streaming 的 Scheduler(线程池),publishOn 影响在其之后的 operator 执行的线程池,而 subscribeOn 则会从源头影响整个执行过程

Flux:
just():可以指定序列中包含的全部元素。创建出来的 Flux 序列在发布这些元素之后会自动结束。
fromArray(),fromIterable()和 fromStream():可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。
empty():创建一个不包含任何元素,只发布结束消息的序列。
error(Throwable error):创建一个只包含错误消息的序列。
never():创建一个不包含任何消息通知的序列。
range(int start, int count):创建包含从 start 起始的 count 个数量的 Integer 对象的序列。
interval(Duration period)和 interval(Duration delay, Duration period):创建一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间。
intervalMillis(long period)和 intervalMillis(long delay, long period):与 interval()方法的作用相同,只不过该方法通过毫秒数来指定时间间隔和延迟时间。

Mono

concatMap 操作符的作用也是把流中的每个元素转换成一个流,再把所有流进行合并。与 flatMap 不同的是,concatMap 会根据原始流中的元素顺序依次把转换之后的流进行合并;与 flatMapSequential 不同的是,concatMap 对转换之后的流的订阅是动态进行的,而 flatMapSequential 在合并之前就已经订阅了所有的流。concatMap字面上可以看出这是concat跟map两个单词的拼写,在实际功能上也确实如此,concatMap将传入的数据进行了转换,转换后的数据流拼接起来作为一个新的publisher

zipWith zipWith 操作符把当前流中的元素与另外一个流中的元素按照一对一的方式进行合并。也就是轮流各取一个参数合并成二元组、三元组…。a b c d + 1 2 3→[a 1] [b 2] [c 3]
take 系列操作符用来从当前流中提取元素。
reduce 和 reduceWith 操作符对流中包含的所有元素进行累积操作,得到一个包含计算结果的 Mono 序列。
filter 对流中包含的元素进行过滤
merge 和 mergeSequential 操作符用来把多个流合并成一个 Flux 序列。
window 操作符的作用类似于 buffer,所不同的是 window 操作符是把当前流中的元素收集到另外的 Flux 序列中,因此返回值类型是 Flux<flux>
subscribe() 处理正常和错误消息

WebFlux

Spring WebFlux集成的是Reactor。
WebFlux 应用中,所有数据都应该以Mono、Flux的形式表示,这样才能带来最好的性能和高吞吐量

Vert.x

rabbitmq

https://mvnrepository.com/ 中查找相关依赖

demo:https://github.com/reactor/reactor-rabbitmq

1
2
3
4
5
6
7
8
9
10
11
12
13
<!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.2.8.RELEASE</version>
</dependency>

<!-- https://mvnrepository.com/artifact/io.projectreactor.rabbitmq/reactor-rabbitmq -->
<dependency>
<groupId>io.projectreactor.rabbitmq</groupId>
<artifactId>reactor-rabbitmq</artifactId>
<version>1.4.3.RELEASE</version>
</dependency>

日志配置文件,不要配置错了不然打印不出来可以用System.out.println()试试。

其他概念

  • 函数式编程:没有共享的可变数据,将方法和函数即代码传递给其他方法的能力,就像数学函数。最简单的说“就它是一种使用函数进行编程的方式”

参考