java reactor 响应式编程

java响应式编程。类似订阅发布模式,或者观察者模式。通过异步提高并行性能。

Reactor

  • Flux<T> 是一个标准的Publisher,Flux表示的是包含0到N个元素的异步序列,可以通过一个完成信号或错误信号终止。
    • 这3种类型的信号转化为对一个下游订阅者的onNext,onComplete,onError3个方法的调用
  • Mono<T> 是一个特殊的Publisher,0或者1个元素的异步序列,可以使用onComplete信号或onError信号来终止。Flux和Mono之间可以进行转换。对一个Flux序列进行计数操作,得到的结果是一个Mono对象。把两个Mono序列合并在一起,得到的是一个Flux对象。
    • subscribe() 订阅并且触发
  • Disposable 基于lambda的订阅方法都返回一个Disposable类型,通过调用它的dispose()来取消这个订阅

响应式与传统编程最大的区别是: 响应式中的方法调用是在构造一个流以及处理流中数据的逻辑,当流中产生了数据(发布,订阅),才会执行构造好的逻辑. 传统编程则是直接执行逻辑获取结果.

map: 同步转换流中的元素
flatMap: 异步转换流中的元素为新的流,再把所有流中的元素进行合并。
flatMapMany: 转换Mono中的元素为Flux(1转多)
concat: 将多个流连接在一起组成一个流(按顺序订阅)
merge: 将多个流合并在一起,同时订阅流
zip: 压缩多个流中的元素
zipWhen: zipWhen(Function<T,Mono<? extends T2>> rightGenerator, BiFunction<T,T2,O> combinator),Wait for the result from this mono, use it to create a second mono via the provided rightGenerator function and combine both results into an arbitrary O object, as defined by the provided combinator function
then: 流完成后执行
doOnNext: 流中产生数据时执行
doOnError: 发送错误时执行
doOnCancel: 流被取消时执行
onErrorContinue: 流发生错误时,继续处理数据而不是终止整个流
defaultIfEmpty: 当流为空时,使用默认值
switchIfEmpty: 当流为空时,切换为另外一个流
as: 将流作为参数,转为另外一个结果。官方例子:flux.as(Mono::from).subscribe() 将flux通过Mono.from函数转化为mono
collectList(): 例子:换成以List<Integer>为对象的Mono Mono<List<Integer>> mono = Flux.range(1,5).collectList();
blockFirst,blockLast:阻塞至第一个或者最后一个值处理完成
cache: 当前的流将会被缓存起来,用于之后的操作
publishOn 和 subscribeOn: 这两个方法的作用是指定执行 Reactive Streaming 的 Scheduler(线程池),publishOn 影响在其之后的 operator 执行的线程池,而 subscribeOn 则会从源头影响整个执行过程

Flux:
just():可以指定序列中包含的全部元素。创建出来的 Flux 序列在发布这些元素之后会自动结束。
fromArray(),fromIterable()和 fromStream():可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。
empty():创建一个不包含任何元素,只发布结束消息的序列。
error(Throwable error):创建一个只包含错误消息的序列。
never():创建一个不包含任何消息通知的序列。
range(int start, int count):创建包含从 start 起始的 count 个数量的 Integer 对象的序列。
interval(Duration period)和 interval(Duration delay, Duration period):创建一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间。
intervalMillis(long period)和 intervalMillis(long delay, long period):与 interval()方法的作用相同,只不过该方法通过毫秒数来指定时间间隔和延迟时间。
defer() 订阅时才决定

Mono

concatMap 操作符的作用也是把流中的每个元素转换成一个流,再把所有流进行合并。与 flatMap 不同的是,concatMap 会根据原始流中的元素顺序依次把转换之后的流进行合并;与 flatMapSequential 不同的是,concatMap 对转换之后的流的订阅是动态进行的,而 flatMapSequential 在合并之前就已经订阅了所有的流。concatMap字面上可以看出这是concat跟map两个单词的拼写,在实际功能上也确实如此,concatMap将传入的数据进行了转换,转换后的数据流拼接起来作为一个新的publisher

zipWith zipWith 操作符把当前流中的元素与另外一个流中的元素按照一对一的方式进行合并。也就是轮流各取一个参数合并成二元组、三元组…。a b c d + 1 2 3→[a 1] [b 2] [c 3]
take 系列操作符用来从当前流中提取元素。
reduce 和 reduceWith 操作符对流中包含的所有元素进行累积操作,得到一个包含计算结果的 Mono 序列。
filter 对流中包含的元素进行过滤
merge 和 mergeSequential 操作符用来把多个流合并成一个 Flux 序列。
window 操作符的作用类似于 buffer,所不同的是 window 操作符是把当前流中的元素收集到另外的 Flux 序列中,因此返回值类型是 Flux<flux>
subscribe() 处理正常和错误消息

WebFlux

Spring WebFlux集成的是Reactor。
WebFlux 应用中,所有数据都应该以Mono、Flux的形式表示,这样才能带来最好的性能和高吞吐量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class Test {
public static void create() throws InterruptedException {
Flux.create((t) -> {
t.next("create");
t.next("create1");
t.complete();
}).subscribe(System.out::println);
}
public static void test1() {
Flux<String> requestBody = Flux.create((t) -> {
t.next("info");
t.next("info");
t.complete();
});

Flux<String> decodedRequest = requestBody.map(data -> data + "tech"); // 进行同步转换
decodedRequest.doOnNext(s -> { // OnNext的时候做的操作
System.out.println(s);
}).subscribe(); // 订阅并且触发流数据,还提供了传一个参数对正常数据处理、第二个参数异常数据的处理
// 还可以拼接起来 requestBody.map(data -> data + "tech").doOnNext(System.out::println).subscribe();
// 还可以这样 requestBody.map(data -> data + "tech").subscribe(System.out::println);
// 还可以 requestBody.map(data -> data + "tech").subscribe(System.out::println,System.err::println,() -> System.out.println("complete"));

Flux.just(1, 2, 3, 4, 5, 6) // 指定元素
.map(i -> i + 1) // map 数据同步转换,还可以转换格式,等价.map(i -> {return i + 1;})
.subscribe(System.out::print, System.err::println,
() -> System.out.println("\ncomplete"), subscription -> {
System.out.println("订阅发生了");
subscription.request(10); // 消费10个元素,如果小于6或者注释该项就不会执行complete
});
}
public static void test2() {
Flux.range(0, 100).subscribe(System.out::println);

Flux.interval(Duration.of(500, ChronoUnit.MILLIS))
.subscribe(System.out::println);
//防止程序过早退出,放一个CountDownLatch拦住
CountDownLatch latch = new CountDownLatch(1);
latch.await();
}
}

文件上传与下载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@RestController
@Slf4j
public class FileController {
@PostMapping(value = "/test", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public Mono<String> test(@RequestPart("file") FilePart filePart) throws IOException {
Path tempFile = Files.createTempFile("test", filePart.filename());
filePart.transferTo(tempFile.toFile());
System.out.println(tempFile.toString());
return Mono.just(filePart.filename()); // 非json格式-测试。
}
}

/**
WebFlux注解下读取ServerHttpRequest 请求报文流数据方法
*/
@Autowired
private ServerCodecConfigurer serverCodecConfigurer = null;

@PostMapping("/connect")
public Mono<Void> connect(final ServerHttpRequest request, final ServerHttpResponse response) {
log.debug("connect...");
final ResolvableType reqDataType = ResolvableType.forClass(byte[].class);
return response.writeWith(serverCodecConfigurer.getReaders().stream()
.filter(reader -> reader.canRead(reqDataType, MediaType.ALL))
.findFirst()
.orElseThrow(() -> new IllegalStateException("No Data"))
.readMono(reqDataType, request, Collections.emptyMap()) // 改成 read
.cast(byte[].class)
.map(bytes -> {
try {
final String reqBody = new String(bytes, StandardCharsets.UTF_8);
log.info("reqBody => \n {}", reqBody);
// TODO: 实现自己的业务

final NettyDataBufferFactory nettyDataBufferFactory = new NettyDataBufferFactory(new UnpooledByteBufAllocator(false));
return nettyDataBufferFactory.wrap("ok".getBytes(StandardCharsets.UTF_8));
} catch (Throwable ex) {
log.warn("connect-exp: {}", ex.getMessage());
return null;
}
})
);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
// 读取body源码分析 readMono 分析

// getBody()
is100ContinueExpected:251, HttpUtil (io.netty.handler.codec.http) //是否100-continue
receiveObject:303, HttpServerOperations (reactor.netty.http.server)
receive:240, ChannelOperations (reactor.netty.channel)
getBody:184, ReactorServerHttpRequest (org.springframework.http.server.reactive)
readMono:105, DecoderHttpMessageReader (org.springframework.http.codec)
// decodeToMono
@Override
public Mono<T> decodeToMono(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {

return DataBufferUtils.join(input, this.maxInMemorySize) // private int maxInMemorySize = 256 * 1024; bytes 比如上传文件大于这个大小就会报错
.map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints)); // 连接、解码
}
// 读取数据操作
updateCount:82, LimitedDataBufferList (org.springframework.core.io.buffer)
add:57, LimitedDataBufferList (org.springframework.core.io.buffer)
accept:-1, 1314754700 (org.springframework.core.io.buffer.DataBufferUtils$$Lambda$607)
onNext:111, MonoCollect$CollectSubscriber (reactor.core.publisher)
onNext:114, FluxMap$MapSubscriber (reactor.core.publisher)
onNext:192, FluxPeek$PeekSubscriber (reactor.core.publisher)
onNext:114, FluxMap$MapSubscriber (reactor.core.publisher)
onInboundNext:346, FluxReceive (reactor.netty.channel)
onInboundNext:358, ChannelOperations (reactor.netty.channel)
onInboundNext:500, HttpServerOperations (reactor.netty.http.server)
channelRead:96, ChannelOperationsHandler (reactor.netty.channel)
invokeChannelRead:379, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:365, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:357, AbstractChannelHandlerContext (io.netty.channel)
channelRead:235, HttpTrafficHandler (reactor.netty.http.server)
invokeChannelRead:379, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:365, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:357, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:436, CombinedChannelDuplexHandler$DelegatingChannelHandlerContext (io.netty.channel)
fireChannelRead:324, ByteToMessageDecoder (io.netty.handler.codec)
fireChannelRead:311, ByteToMessageDecoder (io.netty.handler.codec)
callDecode:432, ByteToMessageDecoder (io.netty.handler.codec)
channelRead:276, ByteToMessageDecoder (io.netty.handler.codec)
channelRead:251, CombinedChannelDuplexHandler (io.netty.channel)
invokeChannelRead:379, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:365, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:357, AbstractChannelHandlerContext (io.netty.channel)
channelRead:1410, DefaultChannelPipeline$HeadContext (io.netty.channel)
invokeChannelRead:379, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:365, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:919, DefaultChannelPipeline (io.netty.channel)
read:166, AbstractNioByteChannel$NioByteUnsafe (io.netty.channel.nio)
processSelectedKey:719, NioEventLoop (io.netty.channel.nio)
processSelectedKeysOptimized:655, NioEventLoop (io.netty.channel.nio)
processSelectedKeys:581, NioEventLoop (io.netty.channel.nio)
run:493, NioEventLoop (io.netty.channel.nio)
run:989, SingleThreadEventExecutor$4 (io.netty.util.concurrent)
run:74, ThreadExecutorMap$2 (io.netty.util.internal)
run:30, FastThreadLocalRunnable (io.netty.util.concurrent)
run:748, Thread (java.lang)


lambda$invoke$0:172, InvocableHandlerMethod (org.springframework.web.reactive.result.method)
apply:-1, 484869674 (org.springframework.web.reactive.result.method.InvocableHandlerMethod$$Lambda$589)
onNext:118, MonoFlatMap$FlatMapMain (reactor.core.publisher)
complete:1782, Operators$MonoSubscriber (reactor.core.publisher)
signal:247, MonoZip$ZipCoordinator (reactor.core.publisher)
onNext:329, MonoZip$ZipInner (reactor.core.publisher)
onNext:173, MonoPeekTerminal$MonoTerminalPeekSubscriber (reactor.core.publisher)
request:2344, Operators$ScalarSubscription (reactor.core.publisher)
request:132, MonoPeekTerminal$MonoTerminalPeekSubscriber (reactor.core.publisher)
onSubscribe:318, MonoZip$ZipInner (reactor.core.publisher)
onSubscribe:145, MonoPeekTerminal$MonoTerminalPeekSubscriber (reactor.core.publisher)
subscribe:54, MonoJust (reactor.core.publisher)
subscribe:4252, Mono (reactor.core.publisher)
subscribe:128, MonoZip (reactor.core.publisher)
subscribe:64, InternalMonoOperator (reactor.core.publisher)
subscribe:52, MonoDefer (reactor.core.publisher)
drain:153, MonoIgnoreThen$ThenIgnoreMain (reactor.core.publisher)
subscribe:56, MonoIgnoreThen (reactor.core.publisher)
subscribe:64, InternalMonoOperator (reactor.core.publisher)
onNext:150, MonoFlatMap$FlatMapMain (reactor.core.publisher)
onNext:67, FluxSwitchIfEmpty$SwitchIfEmptySubscriber (reactor.core.publisher)
onNext:76, MonoNext$NextSubscriber (reactor.core.publisher)
innerNext:274, FluxConcatMap$ConcatMapImmediate (reactor.core.publisher)
onNext:851, FluxConcatMap$ConcatMapInner (reactor.core.publisher)
onNext:121, FluxMapFuseable$MapFuseableSubscriber (reactor.core.publisher)
onNext:173, MonoPeekTerminal$MonoTerminalPeekSubscriber (reactor.core.publisher)
request:2344, Operators$ScalarSubscription (reactor.core.publisher)
request:132, MonoPeekTerminal$MonoTerminalPeekSubscriber (reactor.core.publisher)
request:162, FluxMapFuseable$MapFuseableSubscriber (reactor.core.publisher)
set:2152, Operators$MultiSubscriptionSubscriber (reactor.core.publisher)
onSubscribe:2026, Operators$MultiSubscriptionSubscriber (reactor.core.publisher)
onSubscribe:90, FluxMapFuseable$MapFuseableSubscriber (reactor.core.publisher)
onSubscribe:145, MonoPeekTerminal$MonoTerminalPeekSubscriber (reactor.core.publisher)
subscribe:54, MonoJust (reactor.core.publisher)
subscribe:4252, Mono (reactor.core.publisher)
drain:441, FluxConcatMap$ConcatMapImmediate (reactor.core.publisher)
onSubscribe:211, FluxConcatMap$ConcatMapImmediate (reactor.core.publisher)
subscribe:161, FluxIterable (reactor.core.publisher)
subscribe:86, FluxIterable (reactor.core.publisher)
subscribe:64, InternalMonoOperator (reactor.core.publisher)
subscribe:52, MonoDefer (reactor.core.publisher)
subscribe:40, MonoSentinelOperator (com.alibaba.csp.sentinel.adapter.reactor)
subscribe:64, InternalMonoOperator (reactor.core.publisher)
subscribe:52, MonoDefer (reactor.core.publisher)
subscribe:4252, Mono (reactor.core.publisher)
drain:172, MonoIgnoreThen$ThenIgnoreMain (reactor.core.publisher)
subscribe:56, MonoIgnoreThen (reactor.core.publisher)
subscribe:64, InternalMonoOperator (reactor.core.publisher)
onStateChange:65, HttpServerHandle (reactor.netty.http.server)
onStateChange:518, ReactorNetty$CompositeConnectionObserver (reactor.netty)
onStateChange:278, TcpServerBind$ChildObserver (reactor.netty.tcp)
onInboundNext:475, HttpServerOperations (reactor.netty.http.server)
channelRead:96, ChannelOperationsHandler (reactor.netty.channel)
invokeChannelRead:379, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:365, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:357, AbstractChannelHandlerContext (io.netty.channel)
channelRead:191, HttpTrafficHandler (reactor.netty.http.server)
invokeChannelRead:379, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:365, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:357, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:436, CombinedChannelDuplexHandler$DelegatingChannelHandlerContext (io.netty.channel)
fireChannelRead:324, ByteToMessageDecoder (io.netty.handler.codec)
fireChannelRead:311, ByteToMessageDecoder (io.netty.handler.codec)
callDecode:432, ByteToMessageDecoder (io.netty.handler.codec)
channelRead:276, ByteToMessageDecoder (io.netty.handler.codec)
channelRead:251, CombinedChannelDuplexHandler (io.netty.channel)
invokeChannelRead:379, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:365, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:357, AbstractChannelHandlerContext (io.netty.channel)
channelRead:1410, DefaultChannelPipeline$HeadContext (io.netty.channel)
invokeChannelRead:379, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:365, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:919, DefaultChannelPipeline (io.netty.channel)
read:166, AbstractNioByteChannel$NioByteUnsafe (io.netty.channel.nio)
processSelectedKey:719, NioEventLoop (io.netty.channel.nio)
processSelectedKeysOptimized:655, NioEventLoop (io.netty.channel.nio)
processSelectedKeys:581, NioEventLoop (io.netty.channel.nio)
run:493, NioEventLoop (io.netty.channel.nio)
run:989, SingleThreadEventExecutor$4 (io.netty.util.concurrent)
run:74, ThreadExecutorMap$2 (io.netty.util.internal)
run:30, FastThreadLocalRunnable (io.netty.util.concurrent)
run:748, Thread (java.lang)


public abstract class AbstractNioByteChannel extends AbstractNioChannel {
protected class NioByteUnsafe extends AbstractNioUnsafe {
@Override
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);

ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}

allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());

allocHandle.readComplete();
pipeline.fireChannelReadComplete();

if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}

}

public class InvocableHandlerMethod extends HandlerMethod {
public Mono<HandlerResult> invoke(
ServerWebExchange exchange, BindingContext bindingContext, Object... providedArgs) {

return getMethodArgumentValues(exchange, bindingContext, providedArgs).flatMap(args -> {
Object value;
try {
ReflectionUtils.makeAccessible(getBridgedMethod());
Method method = getBridgedMethod();
if (KotlinDetector.isKotlinReflectPresent() &&
KotlinDetector.isKotlinType(method.getDeclaringClass()) &&
CoroutinesUtils.isSuspendingFunction(method)) {
value = CoroutinesUtils.invokeSuspendingFunction(method, getBean(), args);
}
else {
value = method.invoke(getBean(), args);
}
}
catch (IllegalArgumentException ex) {
assertTargetBean(getBridgedMethod(), getBean(), args);
String text = (ex.getMessage() != null ? ex.getMessage() : "Illegal argument");
return Mono.error(new IllegalStateException(formatInvokeError(text, args), ex));
}
catch (InvocationTargetException ex) {
return Mono.error(ex.getTargetException());
}
catch (Throwable ex) {
// Unlikely to ever get here, but it must be handled...
return Mono.error(new IllegalStateException(formatInvokeError("Invocation failure", args), ex));
}

HttpStatus status = getResponseStatus();
if (status != null) {
exchange.getResponse().setStatusCode(status);
}

MethodParameter returnType = getReturnType();
ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(returnType.getParameterType());
boolean asyncVoid = isAsyncVoidReturnType(returnType, adapter);
if ((value == null || asyncVoid) && isResponseHandled(args, exchange)) {
return (asyncVoid ? Mono.from(adapter.toPublisher(value)) : Mono.empty());
}

HandlerResult result = new HandlerResult(this, value, returnType, bindingContext);
return Mono.just(result);
});
}
}

所以webflux从body中读取文件,会报错。.readMono改成read就不会报错了。而且需要write到response中,如果不write就不会触发map中的转换。还有网络上的其他方法也是,需要return回去。自己如何手动读取内容,而不是通过map来获取内容呢?

还有网络上的其他方法,也是一样要有return mono,后续程序读取才可以正常操执行到flatMap。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* 缓存body内容.
*
* @return
*/
public Mono<Void> readBody(ServerWebExchange exchange,
GatewayFilterChain chain,
GatewayContext gatewayContext) {
log.info("readJsonBody start");
// 解析exchange,返回一个全新的mono(缓存body的mono)
Mono<Void> monoData = DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {
// 创建一个容量为dataBuffer容量大小的字节数组
byte[] bytes = new byte[dataBuffer.readableByteCount()];
// dataBuffer类容读取到bytes中
dataBuffer.read(bytes);
// 释放缓冲区
DataBufferUtils.release(dataBuffer);
// 创建新缓冲区并写入数据
Flux<DataBuffer> cachedFlux = Flux.defer(() -> {
DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);
DataBufferUtils.retain(buffer);
return Mono.just(buffer);
});
// 由于原来的request请求参数被消费,需要提供新的请求
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
};
// 创建新的exchange并构建解析的数据
ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build();
Mono<Void> mono = ServerRequest.create(mutatedExchange, MESSAGE_READERS)
.bodyToMono(String.class)
.doOnNext(objectValue -> {
log.info("resolve success postBody is:{}", objectValue);
gatewayContext.setCacheBody(objectValue);
}).then(chain.filter(mutatedExchange));
return mono;
});
return monoData;
}

如果是使用springMVC方法就多了比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@PostMapping
@ResponseBody
public Result save() {
HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
return Result.ok();
}

@PostMapping
public String save(HttpServletRequest request,HttpServletResponse response) {

}

@Autowired
private HttpServletRequest request;
@Autowired
private HttpServletResponse response;

@PostMapping
public String save() {

}

Vert.x

rabbitmq

https://mvnrepository.com/ 中查找相关依赖

demo:https://github.com/reactor/reactor-rabbitmq

1
2
3
4
5
6
7
8
9
10
11
12
13
<!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.2.8.RELEASE</version>
</dependency>

<!-- https://mvnrepository.com/artifact/io.projectreactor.rabbitmq/reactor-rabbitmq -->
<dependency>
<groupId>io.projectreactor.rabbitmq</groupId>
<artifactId>reactor-rabbitmq</artifactId>
<version>1.4.3.RELEASE</version>
</dependency>

日志配置文件,不要配置错了不然打印不出来可以用System.out.println()试试。

其他概念

  • 函数式编程:没有共享的可变数据,将方法和函数即代码传递给其他方法的能力,就像数学函数。最简单的说“就它是一种使用函数进行编程的方式”

参考