渊源

参考博客:http://www.open-open.com/lib/view/open1482286087274.html

关于回压

回压是RS规范和Reactor主要关注点之一(如果还有其它关注点的话)。回压的原理是说,在一个推送场景里,生产者的生产速度比消费者的消费速度快,消费者会向生产者发出信号说“嘿,慢一点,我处理不过来了。”生产者可以借机控制数据生成的速度,而不是抛弃数据或者冒着产生级联错误的风险继续生成数据。

你也许会想,在Mono里为什么也需要回压:什么样的消费者会被一个单独的触发事件压垮?答案是“应该不会有这样的消费者”。不过,在Mono和CompletableFuture工作原理之间仍然有一个关键的不同点。后者只有推送:如果你持有一个Future的引用,那么说明一个异步任务已经在执行了。另一方面,回压的Flux或Mono会启动延迟的拉取-推送迭代:

  1. 延迟是因为在调用subscribe()方法之前不会发生任何事情

  2. 拉取是因为在订阅和发出请求时,Subscriber会向上游发出信号,准备拉取下一个数据块

  3. 接下来生产者向消费者推送数据,这些数据在消费者的请求范围之内

对Mono来说,subscribe()方法就相当于一个按钮,按下它就等于说“我准备好接收数据了”。Flux也有一个类似的按钮,不过它是request(n)方法,这个方法是subscribe()的一般化用法。

Mono作为一个Publisher,它往往代表着一个耗费资源的任务(在IO、延迟等方面),意识到这点是理解回压的关键:如果不对其进行订阅,你就不需要为之付出任何代价。因为Mono经常跟具有回压的Flux一起被编排到一个响应式链上,来自多个异步数据源的结果有可能被组合到一起,这种按需触发的能力是避免阻塞的关键。

我们可以使用回压来区分Mono的不同使用场景,相比上述的例子,Mono有另外一个常见的使用场景:把Flux的数据异步地聚合到Mono里。reduce和hasElement可以消费Flux里的每一个元素,再把这些数据以某种形式聚合起来(分别是reduce函数的调用结果和一个boolean值),作为一个Mono对外暴露数据。在这种情况下,使用Long.MAX_VALUE向上游发出回压信号,上游会以完全推送的方式工作。

关于回压另一个有意思的话题是它如何对存储在内存里的流的对象数量进行限制。作为一个Publisher,数据源很有可能出现生成数据缓慢的问题,而来自下游的请求超出了可用数据项。在这种情况下,整个流很自然地进入到推送模式,消费者会在有新数据到达时收到通知。当生产高峰来临,或者在生产速度加快的情况下,整个流又回到了拉取模式。在以上两种情况下,最多有N项数据(request()请求的数据量)会被保留在内存里。

你可以对内存的使用情况进行更精确的推算,把N项数据跟每项数据需要消耗的内存W结合起来:这样你就可以推算出最多需要消耗W*N的内存。实际上,Reactor在大多数情况下会根据N来做出优化:根据情况创建内部队列,并应用预取策略,每次自动请求75%的数据量。

Reactor的操作有时候会根据它们所代表的语义和调用者的期望来改变回压信号。例如对于操作buffer(10):下游请求N项数据,而这个操作会向上游请求10N的数据量,这样就可以填满缓冲区,为订阅者提供足够的数据。这通常被称为“主动式回压”,开发人员可以充分利用这种特性,例如在微批次场景里,可以显式地告诉Reactor该如何从一个输入源切换到一个输出地。

跟Spring的关系

Reactor是Spring整个生态系统的基础,特别是Spring 5(通过Spring Web Reactive)和Spring Data “kay”(跟spring-data-commons 2.0相对应的)。

这两个项目的响应式版本是非常有用的,我们因此可以开发出完全响应式的Web应用:异步地处理请求,一直到数据库,最后异步地返回结果。Spring应用因此可以更有效地利用资源,避免为每个请求单独分配一个线程,还要等待I/O阻塞。

Reactor将被用于未来Spring应用的内部响应式核心组件,以及这些Spring组件暴露出来的API。一般情况下,它们可以处理RS Publisher,不过大多数时候它们要面对的是Flux/Mono,需要用到Reactor的丰富特性。当然,你也可以自行选择其它响应式框架,Reactor提供了可以用来适配其它Reactor类型和RxJava类型甚至简单的RS类型的钩子接口。

目前,你可以通过Spring Boot 2.0.0.BUILD-SNAPSHOT和spring-boot-starter-web-reactive依赖项(可以在start.spring.io上生成一个这样的项目)来体验Spring Web Reactive:

Last updated