使用Reactor响应式编程

介绍

响应式编程

响应式编程不同于我们熟悉的命令式编程,我们熟悉的命令式编程即 代码就是一行接一行的指令,按照它们的顺序一次一条地出现。一个任务被执行,程序就需要等到它执行完了,才能执行下一个任务。每一步,数据都需要完全获取到了才能被处理,因此它需要作为一个整体来处理 。但是所谓的响应式编程 是函数式和声明式的。响应式流处理数据时只要数据是可用的就进行处理,而不是需要将数据作为一个整体进行提供。事实上,输入数据可以是无穷的(例如,一个地点的实时温度数据的恒定流)。 如下通过一个例子来描述响应式编程和命令式编程的差别:

:chestnut::某地发生火灾,附近有一个水池,我们需要利用水池中的水来灭火。

首先我们将这一系列步骤进行任务抽象:

  1. 取到水池中的水。
  2. 把水运送到火灾地进行灭火。

那么命令式编程,我们把一池水都看成一个整体,那个首先我们需要将一池子的水全部放入救火车中,全部放完后才能拉着这一池子水赶往火灾地进行灭火。这也符合上面对命令式编程的描述。 一个任务被执行,程序就需要等到它执行完了,才能执行下一个任务。每一步,数据都需要完全获取到了才能被处理,因此它需要作为一个整体来处理

但是响应式编程就不一样了,响应式编程并不要求我们把一池子水看成一个整体,而是一系列(无穷的水滴),我们的做法就像拉一根很长的水管,一端连着水池,一端在火灾地。我们使用抽水机把水源源不断的输送到火灾地进行灭火,而不需要命令式编程那样必须一个任务一个任务串行。即: 响应式流处理数据时只要数据是可用的就进行处理,而不是需要将数据作为一个整体进行提供。事实上,输入数据可以是无穷的

通过上述的例子,可以清晰的分辨响应式编程和传统的命令式编程。

Reactor

Reactor是基于响应式流的第四代响应式库规范,用于在JVM上构建非阻塞应用程序。Reactor 工程实现了响应式流的规范,它提供由响应式流组成的函数式 API。正如你将在后面看到的,Reactor 是 Spring 5 响应式编程模型的基础。关于响应式流在这里简要介绍下:

响应式流的规范可以通过四个接口定义来概括:Publisher,Subscriber,Subscription 和 Processor。

  • Publisher:数据生产者
  • Subscriber:数据订阅者
  • Subscription:数据载体
  • Processor:对生产者的数据进行特定处理,并传给Subscriber。

关于响应式流的具体规范可以看 这里

回头看Reactor中,存在两个核心概念:Mono和Flux。

Flux 表示零个、一个或多个(可能是无限个)数据项的管道。

Mono 特定用于已知的数据返回项不多于一个的响应式类型。

使用弹珠图来描述二者:

Flux:

Mono:

Spring Boot中使用Reactor

添加依赖



    io.projectreactor
    reactor-core



    io.projectreactor
    reactor-test
    test

Reactor使用示例

Flux和Mono的操作方法有很多,我们大致的将他们的所有操作分为四类:

  • 创建操作
  • 联合操作
  • 传输操作
  • 逻辑处理操作

创建操作

使用just()方法并传入元素来创建Flux:

@Test
public void 创建一个Flux并且输出() {
  Flux flux = Flux.just("1", "2", "3", "4", "5");
  flux.subscribe(f -> System.out.println("Here's some number: " + f));
}

我们可以传入数组,集合,Stream类来创建Flux:

@Test
public void 从数组中创建一个集合() {
    String[] strs = {"1", "2", "3"};
    Flux flux = Flux.fromArray(strs);
    StepVerifier.create(flux)
      .expectNext("1")
      .expectNext("2")
      .expectNext("3")
      .verifyComplete();

    List strList = new ArrayList();
    strList.add("1");
    strList.add("2");
    strList.add("3");
    Flux.fromIterable(strList);

    Flux.fromStream(strList.stream());
}

指定一个范围来创建Flux:

@Test
public void 提供范围生成一个Flux() {
    Flux flux = Flux.range(0, 3);
    StepVerifier.create(flux)
      .expectNext(0)
      .expectNext(1)
      .expectNext(2)
      .verifyComplete();

    //:sun_with_face:来个附加操作:interval方法设置Flux发送数据的频率,这里设置每一秒发送一次。
    //:first_quarter_moon_with_face:take方法表示限制条目数量,在这里我们设定Flux最多发送三条数据。
    Flux flux1 = Flux.interval(Duration.ofSeconds(1L)).take(3L);
    StepVerifier.create(flux1)
      .expectNext(0L)
      .expectNext(1L)
      .expectNext(2L)
      .verifyComplete();
}

联合操作

Flux提供了多种联合操作,来结合多个Flux流进行操作:

merge操作:

@Test
public void merge多个Flux() {
    Flux flux = Flux.range(0, 3).delayElements(Duration.ofMillis(500));
    Flux flux1 = Flux.range(3,2).delaySubscription(Duration.ofMillis(250))
      .delayElements(Duration.ofMillis(500));
    //:wave:使用mergeWith方法来结合两个Flux流,mergeWith方法不能保证合并后的流中元素的顺序
    //:wave:所以上面操作我们使用delaySubscription和delayElements来保证元素的顺序
    //delaySubscription:指定时间延迟发送  delayElements:发送元素的时间间隔 
    Flux flux2 = flux.mergeWith(flux1);
    flux2.subscribe(f -> System.out.println("Here's some number: " + f));
    StepVerifier.create(flux2)
      .expectNext(0)
      .expectNext(3)
      .expectNext(1)
      .expectNext(4)
      .expectNext(2)
      .verifyComplete();
}

图解上述代码:

zip操作:

@Test
public void 合并多个Flux() {
  Flux flux = Flux.range(0, 3).delayElements(Duration.ofMillis(500));
  Flux flux1 = Flux.range(3, 2).delaySubscription(Duration.ofMillis(250))
    .delayElements(Duration.ofMillis(500));
    //:wave:zip操作将合并两个Flux流,并且生成一个Tuple2对象,Tuple2中包含两个流中同顺序的元素各一个。
  Flux<Tuple2> flux3 = Flux.zip(flux, flux1);
  flux3.take(3).subscribe(f -> System.out.println(f.toString()));
  StepVerifier.create(flux3)
    .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
    .expectNextMatches(t -> t.getT1() == 1 && t.getT2() == 4)
    .verifyComplete();
}

图解上述代码:

zip配合指定逻辑操作:

@Test
    public void 合并多个Flux() {
        Flux flux = Flux.range(0, 3).delayElements(Duration.ofMillis(500));
        Flux flux1 = Flux.range(3, 2).delaySubscription(Duration.ofMillis(250))
          .delayElements(Duration.ofMillis(500));
                //:wave:在zip操作中传入指定的逻辑操作,返回一个操作结果Flux
        Flux flux4 = Flux.zip(flux, flux1, (x, y) -> x + y);
        flux4.take(3).subscribe(f -> System.out.println(f.toString()));
        StepVerifier.create(flux4)
                .expectNext(3)
                .expectNext(5)
                .verifyComplete();
    }

图解上述代码:

first操作:

@Test
    public void 只获取最先发布的Flux() {
        Flux flux = Flux.range(0, 3).delayElements(Duration.ofMillis(500));
        Flux flux1 = Flux.range(3, 2).delaySubscription(Duration.ofMillis(250))
          .delayElements(Duration.ofMillis(500));
        //first操作只会使用最先发布元素的那个流
        Flux flux2 = Flux.first(flux, flux1);
        StepVerifier.create(flux2)
                .expectNext(0)
                .expectNext(1)
                .expectNext(2)
                .verifyComplete();
    }

图解上述操作:

转换&过滤操作

skip操作

@Test
public void 过滤Flux中的数据() {
  //:wave:skip操作,跳过指定数量的元素
  Flux flux = Flux.range(0, 10).skip(8);
  StepVerifier.create(flux)
    .expectNext(8)
    .expectNext(9)
    .verifyComplete();
}

图解上述操作:

@Test
public void 过滤Flux中的数据() {
  //:wave:在skip方法中传入是个时间段,表示跳过这个时间段内输出的元素
  //:wave:搭配delayElements方法,每个100毫秒输出一次
  //:wave:所以这个测试只会得到7,8,9
  Flux flux1 = Flux.range(0, 10).delayElements(Duration.ofMillis(100))
    .skip(Duration.ofMillis(800));
  StepVerifier.create(flux1)
    .expectNext(7)
    .expectNext(8)
    .expectNext(9)
    .verifyComplete();
}

图解上述方法:

take操作

@Test
public void 过滤Flux中的数据() {
  //:wave:take操作与skip相反,表示获取指定数量的前几个元素
  Flux flux2 = Flux.range(0, 10).delayElements(Duration.ofMillis(100))
    .take(Duration.ofMillis(350));
  StepVerifier.create(flux2)
    .expectNext(0)
    .expectNext(1)
    .expectNext(2)
    .verifyComplete();
}

图解上述方法:

@Test
public void take() {
    //:wave:take方法支持传入一个时间段,表示只取这个时间段内发布的元素
    //:wave:下面操作中我们规定一秒发布一个元素,取3.5秒内的元素
    //:wave:所以最后只能得到前三个元素
    Flux nationalParkFlux = Flux.just(
        "Yellowstone", "Yosemite", "Grand Canyon","Zion", "Grand Teton")
        .delayElements(Duration.ofSeconds(1))
        .take(Duration.ofMillis(3500));
    
    StepVerifier.create(nationalParkFlux)
        .expectNext("Yellowstone", "Yosemite", "Grand Canyon")
        .verifyComplete();
}

图解上述方法:

filter操作

@Test
public void 过滤Flux中的数据() {
  //:wave:filter方法规定一个条件,只拿取符合条件的元素
  //:wave:下面操作中,我们只拿取小于2的元素
  Flux flux3 = Flux.range(0, 10).filter(n -> n < 2);
  StepVerifier.create(flux3)
    .expectNext(0)
    .expectNext(1)
    .verifyComplete();
}

图解上述方法:

distinct操作

ja@Test
public void 过滤Flux中的数据() {
  //:wave:distinct方法用于元素去重
  Flux flux4 = Flux.just(1, 2, 3, 3, 4, 5, 5).distinct();
  StepVerifier.create(flux4)
    .expectNext(1)
    .expectNext(2)
    .expectNext(3)
    .expectNext(4)
    .expectNext(5)
    .verifyComplete();
}

图解上述方法:

map操作

@Test
public void 映射Flux() {
    //:wave:map方法,将元素转换成指定的另一种数据
  //:wave:下面操作中我们传入一个匿名的转换类,指定了我们将字符串转换为数字
  Flux flux = Flux.just("1", "2", "3")
    .map(Integer::valueOf);
  StepVerifier.create(flux)
    .expectNext(1)
    .expectNext(2)
    .expectNext(3)
    .verifyComplete();
}

图解如上方法:

flatMap操作

flatMap() 将每个对象映射到一个新的 Mono 或 Flux,最后这些新的Mono或者Flux会被压成(合成)一个新的Flux。

@Test
public void 映射Flux() {
  //:wave:如下的flatMap方法将传入的每个元素都转成一个Mono
  //:wave:随后在Mono里面传入一个map转换逻辑(String->Integer)
  //:wave:使用subscribeOn来做了一个异步处理
  //:wave:最终会形成一个新的Flux,包含来转换后的元素,但是由于异步,不能保证顺序
  Flux flux1 = Flux.just("1", "2", "3", "4")
    .flatMap(m -> Mono.just(m).map(c -> Integer.valueOf(c))
             .subscribeOn(Schedulers.parallel()));
  List list = Stream.of(1, 2, 3, 4).collect(Collectors.toList());
  StepVerifier.create(flux1)
    .expectNextMatches(list::contains)
    .expectNextMatches(list::contains)
    .expectNextMatches(list::contains)
    .expectNextMatches(list::contains)
    .verifyComplete();
}

图解上述代码:

buffer操作

@Test
public void 缓冲Flux() {
  Flux flux = Flux.just(1, 2, 3, 4, 5, 6);
  //:wave:buffer方法起到一个缓冲的作用
  //:wave:我们在buffer中指定一个数字,只有buffer被充满时或者没有剩余元素时,才会发布出去
  //:wave:因为你有了缓存,所以发布出去的是一个元素集合
  Flux<List> listFlux = flux.buffer(3);
  StepVerifier.create(listFlux)
    .expectNext(Arrays.asList(1, 2, 3))
    .expectNext(Arrays.asList(4, 5, 6))
    .verifyComplete();

  //:wave:运行下面的代码,查看buffer是如何工作的
  Flux.just("apple", "orange", "banana", "kiwi", "strawberry")
    .buffer(3)
    .flatMap(x ->
             Flux.fromIterable(x)
             .map(y -> y.toUpperCase())
             .subscribeOn(Schedulers.parallel())
             .log()
            ).subscribe();
}

图解上述方法:

collectList操作

@Test
public void 缓冲Flux() {
  Flux flux1 = Flux.range(1, 6);
  //:wave:collectList方法用于将含有多个元素的Flux转换为含有一个元素列表的Mono
  Mono<List> mono2 = flux1.collectList();
  StepVerifier.create(mono2)
    .expectNext(Arrays.asList(1, 2, 3, 4, 5, 6))
    .verifyComplete();
}

图解上述方法:

collectMap操作

@Test
public void 缓冲Flux() {
    //:wave:collectMap方法用于将含有多个元素的Flux转换为含有一个Map的Mono
    //:wave:collectMap方法中传入的是生成键的逻辑
    Flux flux2 = Flux.range(1, 6);
    Mono<Map> mapMono = flux2.collectMap(f -> String.valueOf(f + "i"));
    StepVerifier.create(mapMono)
      .expectNextMatches(m -> m.size() == 6
                         && m.get("1i").equals(1)
                         && m.get("2i").equals(2)
                         && m.get("3i").equals(3))
      .verifyComplete();
}

图解上述方法:

逻辑操作

@Test
public void Flux的逻辑操作() {
    //:wave:有时你只需要知道 Mono 或 Flux 发布的条目是否符合某些条件。all() 和 any() 操作将执行这样的逻辑。
    Flux flux = Flux.just(1, 2, 3, 4, 5, 6);
    //:wave:any方法,只要任何一个元素符合要求,即返回true
    Mono mono = flux.any(f -> f < 0);
    StepVerifier.create(mono)
            .expectNext(false)
            .verifyComplete();
        //:wave:all方法,所有元素符合要求,即返回true
    Mono mono1 = flux.all(f -> f > 0);
    StepVerifier.create(mono1)
            .expectNext(true)
            .verifyComplete();
}

图解上述方法:

总结

本文主要介绍了响应式编程的基本概念,并用一个例子来说明响应式编程和命令式编程的差别。介绍了响应式流模型的实现库Reactor,并且解释了Reactor中的一些响应式流概念。使用SpringBoot引入Reactor库来进行Reactor开发,最后演示了Reactor的一些常见操作。

本文示例代码地址: https://gitee.com/jeker8chen/react-demo.git

关注笔者公众号,推送各类原创/优质技术文章 :arrow_down: