# share

## **Reactive Programming 概览**

2017年9月，Spring Framework 5发布了其GA版本，这是自2013年12月以来的又一个大版本升级。除了一些人们期待已久的改进，最令人兴奋的新特性是它提供了完整的端到端响应式编程的支持。这是一种不同于Servlet的全新的编程范式和技术栈，它基于异步非阻塞的特性，能够借助EventLoop以少量线程应对高并发的访问，对微服务架构也颇有助益。不夸张的说，Spring 5使得Java世界拥有了Node.js那样骨骼惊奇的神器。

2018年3月1号，Spring Boot 2.0如约发布，也是一个大版本升级。

## 新版本工具之间关系的梳理

Reactive Programming 在过去早已有之，并不是什么新鲜事物。但是在最近几年，它似乎有着越来越流行的趋势。近期，Java 技术圈围绕着 Reactive Programming 这一主题，推出了许许多多的新版本工具，让人感到眼花缭乱。本文首先选择其中的几项重点更新内容，梳理一下它们之间的关系.

### Spring Boot 2.0

Spring Boot 2.0 are now offering first-class support for developing reactive applications, via auto-configuration and starter-POMs\[1]. 一如既往体现着 Spring Boot 配置简便的特点，只需几处简单配置就可以开发 reactive 应用了。

围绕 reactive 主题，主要的更新点有：

* 基于 Spring Framework 5（包括新模块：WebFlux）构建
* 集成 Netty 作为默认的 web 服务器，支持 reactive 应用
* WebFlux 默认运行在 Netty 上

### Spring Framework 5

New spring-webflux module, an alternative to spring-webmvc built on a reactive foundation, intended for use in an event-loop execution model\[2]. 最重要的更新是新增了 WebFlux 模块，支持基于事件循环的执行模型。

主要的更新点有：

* 依赖：最低 Java 8，支持 Java 9
* 提供许多支持 reactive 的基础设施
* 提供面向 Netty 等运行时环境的适配器
* 新增 WebFlux 模块（集成的是Reactor 3.x）

### Java 9 Reactive Stream

在 Java 8 时代，Reactive Stream API 就已经存在，只不过那时它是单独的一个jar包，可用maven引入：

```
<dependency>
    <groupId>org.reactivestreams</groupId>
    <artifactId>reactive-streams</artifactId>
</dependency>
```

而在 Java 9 时代，Reactive Stream 被正式集成到了 Java 的 API 中。In Java 9, Reactive Streams is officially part of the Java API\[3].

主要的更新点有：

* 提供 Reactive Stream API（java.util.concurrent.Flow）\[4]

Java 9 的 Reactive Stream API 只是一套接口，约定了 Reactive 编程的一套规范，并没有具体的实现。而实现了这个接口的产品有：RxJava、Reactor、akka 等，而 Spring WebFlux 中集成的是 Reactor 3.x。

所以目前 Spring Framework 5.x 提供了两大开发栈：

1. Spring WebFlux：基于 Reactive Stream API，需要运行在 servlet 3.1+ 容器（Tomcat 8）或 Netty 上，这些容器支持 NIO、Reactor 模式
2. Spring MVC：基于传统的 Servlet API，运行在传统的 Servlet 容器上，one-request-per-thread，同步阻塞 IO

下图为两大开发栈的对比：

![](https://3887784244-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-Ld4L6SN74eVvyP4OOqn%2F-Ld4L7VB8_xIt-fFMc_l%2F-Ld4L8iQsg80jNkJtyAU%2Fwebb.png?generation=1555941605657205\&alt=media)注意：目前 JDBC 仍然不支持 Reactive 异步调用模式，如果在代码里调用 MySQL 数据库，线程仍会阻塞等待数据返回。

从这个图就可以看出对支持Spring 5的Spring Boot 2.0来说，新加入的响应式技术栈是其主打核心特性。具体来说，Spring Boot 2支持的响应式技术栈包括如下：

* Spring Framework 5提供的非阻塞web框架Spring Webflux；
* 遵循响应式流规范的兄弟项目Reactor；
* 支持异步I/O的Netty、Undertow等框架，以及基于Servlet 3.1+的容器（如Tomcat 8.0.23+和Jetty 9.0.4+）；
* 支持响应式的数据访问Spring Data Reactive Repositories；
* 支持响应式的安全访问控制Spring Security Reactive；
* 等。

响应式编程是关于异步的事件驱动的需要少量线程的垂直扩展而非水平扩展的无阻塞应用

## 什么是 Reactive Programming

想要搞清楚 Reactive 这种编程范式到底是什么，我们得先回顾 IO 的历史，需要了解一下过去发生了什么，基于什么原因产生了这种编程范式。我们首先从几个基本的 IO 模型说起。

### Reactor 模式

上面的代码展示的其实就是最简易的 Reactor 模式。下面我们看看 Doug Lea 大爷对 Reactor 模式的抽象、概括和总结。他把 Reactor 模式分为三种类型，分别是：Basic version, Multi-threaded version 和 Multi-reactors version\[14]\[15]。

#### Basic version (Single threaded)

![](https://github.com/ZhongyangMA/images/raw/master/webflux-streaming-demo/reactor1.png)

如图所示为 Reactor 模式的单线程版本，所有的 I/O 操作都在同一个 NIO 线程上面完成。\
NIO 线程即是 Reactor、Acceptor 负责监听多路 socket 并 Accept 新连接，又负责分派、处理请求。对于一些小容量应用场景，可以使用单线程模型，但是对于高负载、大并发的应用却不合适。实际当中基本不会采用单线程模型。

#### Multi-threaded version

![](https://github.com/ZhongyangMA/images/raw/master/webflux-streaming-demo/reactor2.png)

图为 Reactor 模式的多线程版本。\
多线程模型中有一个专门的线程负责监听和处理所有的客户端连接，网络 IO 操作由一个 NIO 线程池负责，它包含一个任务队列和 N 个可用的线程，由这些 NIO 线程负责消息的读取、解码、编码和发送。1个 NIO 线程可以同时处理 N 条链路，但是1个链路只对应1个 NIO 线程，防止发生并发操作问题。\
在绝大多数场景下，Reactor 多线程模型都可以满足性能需求。但是在更高并发连接的场景（如百万客户端并发连接），它的性能似乎不能满足需求，于是便出现了下面的多 Reactor（主从 Reactor）模型。

#### Multi-reactors version

![](https://github.com/ZhongyangMA/images/raw/master/webflux-streaming-demo/reactor3.png)

如图所示为主从 Reactor 模型。\
这种模型是将 Reactor 分成两部分，mainReactor 负责监听 server socket、accept 新连接，并将建立的 socket 分派给 subReactor；subReactor 负责多路分离已连接的 socket，读写网络数据；而对业务处理的功能，交给 worker 线程池来完成。

### 1 响应式宣言 <a href="#id-1-xiang-ying-shi-xuan-yan" id="id-1-xiang-ying-shi-xuan-yan"></a>

<https://www.reactivemanifesto.org/>和[敏捷宣言](http://agilemanifesto.org/)一样，说起响应式编程，必先提到响应式宣言。

We want systems that are Responsive, Resilient, Elastic and Message Driven. We call these Reactive Systems. -[The Reactive Manifesto](http://www.reactivemanifesto.org/)

![](https://3887784244-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-Ld4L6SN74eVvyP4OOqn%2F-Ld4L7VB8_xIt-fFMc_l%2F-Ld4L8i_iycQcVBU44uY%2Fhh.png?generation=1555941612026521\&alt=media)

不知道是不是为了向敏捷宣言致敬，响应式宣言中也包含了4组关键词：

* Responsive: 可响应的。要求系统尽可能做到在任何时候都能及时响应。
* Resilient: 可恢复的。要求系统即使出错了，也能保持可响应性。
* Elastic: 可伸缩的。要求系统在各种负载下都能保持可响应性。
* Message Driven: 消息驱动的。要求系统通过异步消息连接各个组件。

可以看到，对于任何一个响应式系统，首先要保证的就是可响应性，否则就称不上是响应式系统。

引子：被誉为“中国大数据第一人”的涂子沛先生在其成名作《数据之巅》里提到，摩尔定律、社交媒体、数据挖掘是大数据的三大成因。IBM的研究称，整个人类文明所获得的全部数据中，有90%是过去两年内产生的。在此背景下，包括NoSQL，Hadoop, Spark, Storm, Kylin在内的大批新技术应运而生。其中以[RxJava](https://github.com/ReactiveX/RxJava)和[Reactor](http://projectreactor.io/)为代表的响应式（Reactive）编程技术针对的就是经典的大数据4V定义（Volume，Variety，Velocity，Value）中的Velocity，即高并发问题，而在Spring 5中，也引入了响应式编程的支持。在接下来会围绕响应式编程分享首先从Spring 5谈起。

### Reactive Programming 的核心概念

在前文相关信息的基础上，这一部分我们将进一步抽象、总结一下 Reactive Programming 的一些核心概念\[16]\[17]。

#### Reactive Programming 和 Reactive Streams

Reactive programming 是一种编程范式，它已经存在很久了，并不是什么新的东西。就像 面向过程编程、面向对象编程、函数式编程等，它只是另外一种编程范式。

而 Reactive Streams 指的是一套规范，对于 Java 开发者来讲，Reactive Streams 具体来说就是一套 API。Reactive Streams 给我们提供了一套通用的 API，让我们可以使用 Java 进行 Reactive Programming。

#### Reactive Programming 的基本要素

响应式宣言（Reactive Manifesto）\[16] 描述了响应式系统（reactive systems）应该具备的四个关键属性：Responsive（灵敏的）、Resilient（可故障恢复的）、Elastic（可伸缩的）、Message Driven（消息驱动的）。

。

![](https://github.com/ZhongyangMA/images/raw/master/webflux-streaming-demo/manifesto.png)

* **Responsive（灵敏的）**

  ：只要有可能，系统就会及时响应。灵敏性是系统可用性的基石，除此之外，灵敏性也意味着系统的问题可以被快速地探测和解决。具有灵敏性的系统关注做出快速和一致的响应，提供可靠和一致的服务质量。
* **Resilient（可故障恢复的）**

  ：在出现故障时，系统仍然可以保持响应。一个不具可恢复性的系统一旦出现故障，就会变得无法正常响应。可恢复性可以通过复制、围控、隔离和委派等方式实现。在可恢复性的系统中，故障被包含在每个组件中，各组件之间相互隔离，从而允许系统的某些部分出故障并且在不连累整个系统的前提下进行恢复。
* **Elastic（可伸缩的）**

  ：在不同的工作负载下，系统保持响应。系统可以根据输入的工作负载，动态地增加或减少系统使用的资源。这意味着系统在设计上可以通过分片、复制等途径来动态申请系统资源并进行负载均衡，从而去中心化，避免节点瓶颈。
* **Message Driven（消息驱动的）**

  ：响应式系统依赖异步消息传递机制，从而在组件之间建立边界，这些边界可以保证组件之间的松耦合、隔离性、位置透明性，还提供了以消息的形式把故障委派出去的手段。

前三种属性（Responsive, Resilient, Elastic）更多的是跟你的架构选型有关，我们可以很容易理解像 microservices、Docker 和 Kubernetes 这样的技术对建立响应式系统的重要性。作为系统的开发者，我们最感兴趣的还是最后一点 Message Driven。Reactive Manifesto 提到了 Message Driven 的三个主要方面：failures at messages, back-pressure, and non-blocking。

**Failures at messages**：在 Reactive 编程中，我们通常需要处理流式的信息，我们最不希望看到的是突然抛出一个异常，然后处理过程终止了。理想的解决办法是我们记下这个错误，然后开始执行某种重试或恢复的逻辑。在 Reactive Streams 中，异常是一等公民，异常不会被粗鲁地抛出，错误处理是正式建立在 Reactive Streams API 规范之内的。

**Back-pressure**：有没有听过这个谚语：“Drinking from the firehose”？John Thompson 在他的 What Are Reactive Streams in Java? \[17] 一文中给出了非常生动的比喻，请看下图。

![](https://github.com/ZhongyangMA/images/raw/master/webflux-streaming-demo/back-pressure.png)

Back-pressure，中文一般翻译成“背压”、“回压”，意思是当消费端的消费能力跟不上生产端的生产速度时，消息流下游的消费方对上游的生产方说：“我喝饱了，请你慢点”。在 Reactive 的世界里，我们希望下游的消费方可以有某种机制按需请求一定数量的消息来消费（这类似消息队列中的 pull 的概念）。而不是上游把大量的消息一股脑灌给下游消费方，然后阻塞式等待，throttling(节流) is done programmatically rather than blocking threads。

**Non-blocking**：非阻塞对于一个响应式系统的重要性，也许是我们开发者最关心的一点了。John Thompson 用 Node.js Server 来举例，用它来和传统的 Java 多线程服务相对比。

Node.js Server 有着一个非阻塞的事件循环，访问请求以一种非阻塞的方式被处理，线程不会因为等待别的处理过程而卡住。

![](https://github.com/ZhongyangMA/images/raw/master/webflux-streaming-demo/nodejs-server.png)

与之相对的是典型的 Java 多线程服务，并发性依靠多线程来获得。

![](https://github.com/ZhongyangMA/images/raw/master/webflux-streaming-demo/multithreads-server.png)

John Thompson 把这两种方式比喻成了超级高速公路和有着许多红绿灯的城市道路：“With a single thread event loop, your process is cruising quickly along on a super highway. In a Multi-threaded server, your process is stuck on city streets in stop and go traffic.”

#### Reactive Stream API 介绍

让我们来简要看一下 Reactive Stream API。它只提供了四个接口。

```
Publisher：是元素（消息）序列的提供者，根据它的订阅者的需求，来发布这些元素（消息）。
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}
Subscriber：当通过 Publisher.subscribe(Subscriber) 注册后，它将通过 Subscriber.onSubscribe(Subscription) 来接收消息。
public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}
Subscription：代表了消息从 Publisher 到 Subscriber 的一个一对一的生命周期。
public interface Subscription {
    public void request(long n);
    public void cancel();
}
Processor：继承了 Publisher 和 Subscriber，用于转换发布者到订阅者之间管道中的元素。Processor<T,R> 订阅类型为 T 的数据元素，接收并转换为类型为 R 的数据，然后发布变换后的数据。
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
```

![](https://3887784244-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-Ld4L6SN74eVvyP4OOqn%2F-Ld4L7VB8_xIt-fFMc_l%2F-Ld4L8id9WJVKfQDuayv%2F111111d.png?generation=1555941611999391\&alt=media)

下图显示了发布者和订阅者之间的典型交互顺序。

![](https://github.com/ZhongyangMA/images/raw/master/webflux-streaming-demo/publisher-subscriber.png)

## Spring Boot 2.0 构建 WebFlux 应用

### Reactor 框架简介

Reactive Streams API 只是一套接口，并没有具体的实现。Reactor 项目是它的具体实现之一，并且也是 Spring Framework 5 中新增模块 WebFlux 默认集成的框架，Spring Framework 5 的响应式编程模型主要依赖 Reactor \[18]\[19]。下面我们来简要介绍一下 Reactor 框架。

> Reactor 是一个用于JVM的完全非阻塞的响应式编程框架，具备高效的需求管理（即对 “背压（backpressure）”的控制）能力。它与 Java 8 函数式 API 直接集成，比如 CompletableFuture， Stream， 以及 Duration。它提供了异步序列 API Flux（用于\[N]个元素）和 Mono（用于 \[0|1]个元素），并完全遵循和实现了“响应式扩展规范”（Reactive Extensions Specification）。
>
> Reactor与Spring是兄弟项目，侧重于Server端的响应式编程，主要 artifact 是 reactor-core，这是一个基于 Java 8 的实现了响应式流规范 （Reactive Streams specification）的响应式库。
>
> RxJava 库是 JVM 上反应式编程的先驱，也是反应式流规范的基础。RxJava 2 在 RxJava 的基础上做了很多的更新。不过 RxJava 库也有其不足的地方。RxJava 产生于反应式流规范之前，虽然可以和反应式流的接口进行转换，但是由于底层实现的原因，使用起来并不是很直观。RxJava 2 在设计和实现时考虑到了与规范的整合，不过为了保持与 RxJava 的兼容性，很多地方在使用时也并不直观。Reactor 则是完全基于反应式流规范设计和实现的库，没有 RxJava 那样的历史包袱，在使用上更加的直观易懂。Reactor 也是 Spring 5 中反应式编程的基础。学习和掌握 Reactor 可以更好地理解 Spring 5 中的相关概念。

Reactor 有两种模型，Flux 和 Mono，提供了非阻塞、支持回压机制的异步流处理能力。当数据生成缓慢时，整个流自然进入推送模式；而当生产高峰来临数据生产速度加快时，整个流又进入了拉取模式。Flux 可以触发 0 到多个事件，用于异步地处理流式的信息；Mono 至多可以触发一个事件，通常用于在异步任务完成时发出通知。Flux 和 Mono 之间可以相互转换。对一个 Flux 序列进行计数操作，得到的结果是一个 Mono对象，把两个 Mono 序列合并在一起，得到的是一个 Flux 对象\[20]\[21]。

#### 创建 Flux 序列的方式

```
通过 Flux 类的静态方法，例如：
List<String> words = Arrays.asList("aa","bb","cc","dd");
Flux<String> listWords = Flux.fromIterable(words);    //从集合获取
Flux<String> justWords = Flux.just("Hello","World");  //指定序列中包含的全部元素
listWords.subscribe(System.out::println);
justWords.subscribe(System.out::println);
使用 generate() 方法生成 Flux 序列：
generate() 方法通过同步和逐一的方式来产生 Flux 序列，通过调用 SynchronousSink 对象的 next()，complete() 和 error(Throwable) 方法来完成。
Flux.generate(sink -> {
    sink.next("Hello");   //通过 next()方法产生一个简单的值，至多调用一次
    sink.complete();      //然后通过 complete()方法来结束该序列
}).subscribe(System.out::println);
使用 create() 方法生成 Flux 序列：
与 generate() 方法的不同之处在于所使用的是 FluxSink 对象。FluxSink 支持同步和异步的消息产生，并且可以在一次调用中产生多个元素。
Flux.create(sink -> {
    for (int i = 0; i < 10; i++) {
        sink.next(i);
    }
    sink.complete();
}).subscribe(System.out::println);
```

#### 创建 Mono 的方式

```
// Mono 的创建方式与 Flux 类似。除了相同的几个静态方法之外，Mono 还有一些独有的静态方法。例如：
Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);
Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);
Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);
```

#### 流的操作函数

```
Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);  // 只输出满足filter条件的元素
Flux.just("a", "b").zipWith(Flux.just("c", "d")).subscribe(System.out::println);  // zipWith 操作符把当前流中的元素与另外一个流中的元素按照一对一的方式进行合并。
// reduce 和 reduceWith 操作符对流中包含的所有元素进行累积操作，得到一个包含计算结果的 Mono 序列。
Flux.range(1, 100).reduce((x, y) -> x + y).subscribe(System.out::println);
Flux.range(1, 100).reduceWith(() -> 100, (x, y) -> x + y).subscribe(System.out::println);
etc.
```

#### 消息处理

```
当需要处理 Flux 或 Mono 中的消息时，可以通过 subscribe 方法来添加相应的订阅逻辑。例如：
通过 subscribe()方法处理正常和错误消息：
Flux.just(1, 2)
        .concatWith(Mono.error(new IllegalStateException()))
        .subscribe(System.out::println, System.err::println);

出现错误时返回默认值0：
Flux.just(1, 2)
        .concatWith(Mono.error(new IllegalStateException()))
        .onErrorReturn(0)
        .subscribe(System.out::println);

出现错误时使用另外的流：
Flux.just(1, 2)
        .concatWith(Mono.error(new IllegalStateException()))
        .switchOnError(Mono.just(0))
        .subscribe(System.out::println);

使用 retry 操作符进行重试：
Flux.just(1, 2)
        .concatWith(Mono.error(new IllegalStateException()))
        .retry(1)
        .subscribe(System.out::println);
```

以上只是简略地列举了一些 Reactor 的操作函数，更详细的介绍请见官方文档\[22]。

### WebFlux 模块简介

WebFlux 是 Spring Framework 5 的一个新模块，包含了响应式 HTTP 和 WebSocket 的支持，另外在上层服务端支持两种不同的编程模型：第一种是 Spring MVC 中使用的基于 Java 注解的方式，第二种是基于 Java 8 的 lambda 表达式的函数式编程模型。这两种编程模型只是在代码编写方式上存在不同，它们运行在同样的反应式底层架构之上，因此在运行时是相同的。

![](https://github.com/ZhongyangMA/images/raw/master/webflux-streaming-demo/two-route-forms.png)

> WebFlux 模块的名称是 spring-webflux，名称中的 Flux 来源于 Reactor 中的类 Flux。该模块中包含了对反应式 HTTP、服务器推送事件和 WebSocket 的客户端和服务器端的支持。

#### 代码示例

```
1. 首先，引入 parent ：
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.0.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

2. 引入 WebFlux 依赖：
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

3. 入口文件跟普通的 Spring Boot 项目一样：
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

4. 创建 RouterFunction 风格的路由：
@Configuration
public class Routes {
    @Bean
    public RouterFunction<?> routerFunction() {
        return route(
            GET("/api/city").and(accept(MediaType.APPLICATION_JSON)), cityService:: findAllCity
        ).and(route(
            GET("/api/user/{id}").and(accept(MediaType.APPLICATION_JSON)), cityService:: findCityById)
        );
    }
}

5. 创建 Spring MVC 注解风格的路由：
@RestController
public class ReactiveController {
    @GetMapping("/hello_world")
    public Mono<String> sayHelloWorld() {
        return Mono.just("Hello World");
    }
}
```

Reactive Controller 操作的是异步的 ServerHttpRequest 和 ServerHttpResponse，而不再是 Spring MVC 里的 HttpServletRequest 和 HttpServletResponse。Mono 和 Flux 是异步的，当流中的数据没有就绪时，方法也能立即返回（返回的是对象引用）。当数据就绪后，web server 会扫描到这个就绪事件。\[23]\[24]

> ### 特点：
>
> 1、响应式和非阻塞并不是总能让应用跑的更快，况且将代码构建为非阻塞的执行方式本身还会带来少量的成本。但是在类似于WEB应用这样的**高并发、少计算且I/O密集的应用中**，响应式和非阻塞往往能够发挥出价值。尤其是微服务应用中，**网络I/O比较多**的情况下，效果会更加惊人
>
> 2、针对MongoDB驱动我们可以得出以下两个结论：
>
> * 相对于同步驱动来说，异步驱动在性能方面略胜一筹；
> * 在应对大量客户端线程的情况下，异步驱动能够以少量而稳定的连接数应对，意味着更少的内存消耗（每个连接消耗stack size的内存空间，默认情况下stack size为10M）
>
> 3、服务器推送事件（Server-Sent Events，SSE）允许服务器端不断地推送数据到客户端。相对于 WebSocket 而言，服务器推送事件只支持服务器端到客户端的单向数据传递。虽然功能较弱，但优势在于 SSE 在已有的 HTTP 协议上使用简单易懂的文本格式来表示传输的数据。作为 W3C 的推荐规范，SSE 在浏览器端的支持也比较广泛，除了 IE 之外的其他浏览器都提供了支持。在 IE 上也可以使用 polyfill 库来提供支持。在服务器端来说，SSE 是一个不断产生新数据的流，非常适合于用反应式流来表示。在 WebFlux 中创建 SSE 的服务器端是非常简单的。只需要返回的对象的类型是 Flux\<ServerSentEvent>，就会被自动按照 SSE 规范要求的格式来发送响应。
>
> ### 不支持：
>
> Spring Data Reactive Repositories 不支持 MySQL，进一步也不支持 MySQL 事务。所以用了 Reactivey 原来的 spring 事务管理就不好用了。jdbc jpa 的事务是基于阻塞 IO 模型的，如果 Spring Data Reactive 没有升级 IO 模型去支持 JDBC，生产上的应用只能使用不强依赖事务的。也可以使用透明的事务管理，即每次操作的时候以回调形式去传递数据库连接 connection。
>
> Spring Data Reactive Repositories 目前支持 Mongo、Cassandra、Redis、Couchbase
>
> 我们先看看这张图。Spring Boot 2.0 这里有两条不同的线分别是：\
> Spring Web MVC -> Spring Data\
> Spring WebFlux -> Spring Data Reactive
>
> 所以这里问题的答案是，如果使用 Spring Data Reactive ，原来的 Spring 针对 Spring Data （JDBC等）的事务管理肯定不起作用了。因为原来的 Spring 事务管理（Spring Data JPA）都是基于 ThreadLocal 传递事务的，其本质是基于 阻塞 IO 模型，不是异步的。但 Reactive 是要求异步的，不同线程里面 ThreadLocal 肯定取不到值了。自然，我们得想想如何在使用 Reactive 编程是做到事务，有一种方式是 回调 方式，一直传递 conn ：newTransaction(conn ->{})
>
> 因为每次操作数据库也是异步的，所以 connection 在 Reactive 编程中无法靠 ThreadLocal 传递了，只能放在参数上面传递。虽然会有一定的代码侵入行。进一步，也可以 kotlin 协程，去做到透明的事务管理，即把 conn 放到 协程的局部变量中去。那 Spring Data Reactive Repositories 不支持 MySQL，进一步也不支持 MySQL 事务，怎么办？
>
> 答案是，这个问题其实和第一个问题也相关。 为啥不支持 MySQL，即 JDBC 不支持。大家可以看到 JDBC 是所属 Spring Data 的。所以可以等待 Spring Data Reactive Repositories 升级 IO 模型，去支持 MySQL。也可以和上面也讲到了，如何使用 Reactive 编程支持事务。
>
> 如果应用只能使用不强依赖数据事务，依旧使用 MySQL

## 参考文献

\[1] Spring Boot 2.0 Release Notes:

<https://github.com/spring-projects/spring-boot/wiki/Spring-Boot-2.0-Release-Notes>

\[2] What's New in Spring Framework 5.x:

\[\[\[\[[https://github.com/spring-projects/spring-framework/wiki/What's-New-in-Spring-Framework-5.x\]\](https://github.com/spring-projects/spring-framework/wiki/What's-New-in-Spring-Framework-5.x\](https://github.com/spring-projects/spring-framework/wiki/What's-New-in-Spring-Framework-5.x\](https://github.com/spring-projects/spring-framework/wiki/What's-New-in-Spring-Framework-5.x\](https://github.com/spring-projects/spring-framework/wiki/What's-New-in-Spring-Framework-5.x)\](https://github.com/spring-projects/spring-framework/wiki/What's-New-in-Spring-Framework-5.x\](https://github.com/spring-projects/spring-framework/wiki/What's-New-in-Spring-Framework-5.x\](https://github.com/spring-projects/spring-framework/wiki/What's-New-in-Spring-Framework-5.x\](https://github.com/spring-projects/spring-framework/wiki/What's-New-in-Spring-Framework-5.x)))\\](https://github.com/spring-projects/spring-framework/wiki/What's-New-in-Spring-Framework-5.x]]\(https://github.com/spring-projects/spring-framework/wiki/What's-New-in-Spring-Framework-5.x]\(https://github.com/spring-projects/spring-framework/wiki/What's-New-in-Spring-Framework-5.x]\(https://github.com/spring-projects/spring-framework/wiki/What's-New-in-Spring-Framework-5.x]\(https://github.com/spring-projects/spring-framework/wiki/What's-New-in-Spring-Framework-5.x\)]\(https://github.com/spring-projects/spring-framework/wiki/What's-New-in-Spring-Framework-5.x]\(https://github.com/spring-projects/spring-framework/wiki/What's-New-in-Spring-Framework-5.x]\(https://github.com/spring-projects/spring-framework/wiki/What's-New-in-Spring-Framework-5.x]\(https://github.com/spring-projects/spring-framework/wiki/What's-New-in-Spring-Framework-5.x\)\)\)/))

\[3] Reactive Programming vs. Reactive stream:

<https://www.tuicool.com/articles/UZfMB3M>

\[4] Reactive Programming with JDK 9 Flow API:

<https://community.oracle.com/docs/DOC-1006738>

\[5] Socket IO 模型:

<https://blog.csdn.net/qq_23100787/article/details/60751795>

\[6] Java I/O 模型从 BIO 到 NIO 和 Reactor 模式:

<http://www.jasongj.com/java/nio_reactor/>

\[7] Java NIO、BIO、AIO 全面剖析:

<https://blog.csdn.net/jiaomingliang/article/details/47684713>

\[8] Java NIO：浅析I/O模型:

<http://www.cnblogs.com/dolphin0520/p/3916526.html>

\[9] Java IO：BIO和NIO区别及各自应用场景:

<https://blog.csdn.net/jiyiqinlovexx/article/details/51204726>

\[10] Java NIO：NIO概述:

<http://www.cnblogs.com/dolphin0520/p/3919162.html>

\[11] JAVA nio selector 入门:

<http://www.open-open.com/lib/view/open1452776436495.html>

\[12] Java NIO Socket通信:

<http://www.open-open.com/lib/view/open1343002247880.html>

\[13] Java NIO 代码示例:

<https://blog.csdn.net/u013967628/article/details/77841327>

\[14] Scalable IO in Java (Doug Lea):

<http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf>

\[15] Netty（RPC高性能之道）原理剖析:

<https://blog.csdn.net/zhiguozhu/article/details/50517551>

\[16] The Reactive Manifesto:

<https://www.reactivemanifesto.org/>

\[17] What Are Reactive Streams in Java:

<https://dzone.com/articles/what-are-reactive-streams-in-java>

\[18] Project Reactor 官网:

<http://projectreactor.io/>

\[19] Reactor 实例解析:

<http://www.open-open.com/lib/view/open1482286087274.html>

\[20] 使用 Reactor 进行反应式编程:

<https://www.ibm.com/developerworks/cn/java/j-cn-with-reactor-response-encode/index.html>

\[21] Java ProjectReactor 框架之 Flux 篇:

<https://www.jianshu.com/p/8cb66e912641>

\[22] Reactor 3 Reference Guide:

<http://projectreactor.io/docs/core/release/reference/docs/index.html>

\[23] 聊聊 Spring Boot 2.0 的 WebFlux:

<https://www.bysocket.com/?p=1987>

\[24] 使用 Spring 5 的 WebFlux 开发反应式 Web 应用:

<https://www.ibm.com/developerworks/cn/java/spring5-webflux-reactive/index.html>

\[25] **Reactive Programming 概览 :**

[https://github.com/ZhongyangMA/webflux-streaming-demo/wiki/反应式编程概览（中文版）](https://github.com/ZhongyangMA/webflux-streaming-demo/wiki/%E5%8F%8D%E5%BA%94%E5%BC%8F%E7%BC%96%E7%A8%8B%E6%A6%82%E8%A7%88%EF%BC%88%E4%B8%AD%E6%96%87%E7%89%88%EF%BC%89)
