异步编程入门之RxJs(一)

  • Promise 的特点是无论有没有人关心它的执行结果,它都会立即开始执行,并且你没有机会取消这次执行。显然,在某些情况下这么做是浪费的甚至错误的
  • 以电商为例,如果某商户的订单不允许取消,你还会去买吗?
  • 如果你发起了一个 Ajax 请求,然后用户导航到了另一个路由,显然,你这个请求如果还没有完成就应该被取消,而不应该发出去
  • 使用 Promise ,你做不到,不是因为实现方面的原因,而是因为它在概念层(接口定义上)就无法支持取消
  • 由于 Promise 只会承载一个值,因此当我们要处理的是一个集合的时候就比较困难了。比如对于一个随机数列(总数未知),如果我们要借助 Web API 检查每个数字的有效性,然后对前一百个有效数字进行求和,那么用 Promise 写就比较麻烦了

1.2 Observable

Observable
Observable

1.3 ReactiveX宝石图

  • 中间的带箭头的线就像传送带,用来表示数据序列,这个数据序列被称为“流”。上方的流叫做输入流,下方的流叫做输出流。输入流可能有多个,但是输出流只会有一个(不过,流中的每个数据项也可以是别的流)
  • 数据序列上的每个圆圈表示一个数据项,圆圈的位置表示数据出现的先后顺序,但是一般不会表示精确的时间比例,比如在一毫秒内接连出现的两个数据之间仍然会有较大的距离。只有少数涉及到时间的操作,其宝石图才会表现出精确的时间比例
  • 圆圈的最后,通常会有一条竖线或者一个叉号。竖线表示这个流正常终止了,也就是说不会再有更多的数据提供出来了。而叉号表示这个流抛出错误导致异常中止了。还有一种流,既没有竖线也没有叉号,这种叫做无尽流,比如一个由所有自然数组成的流就不会主动终止。但是要注意,无尽流仍然是可以处理的,因为需要多少项是由消费者决定的。你可以把这个“智能”传送带理解为由下一个工位“叫号”的,没“叫号”下一项数据就不会过来
  • 中间的大方框表示一个操作,也就是 operator —— 一个函数,比如这个图中的操作就是把输入流中的条目乘以十后放入输出流中。
  • 看懂了宝石图,就能很形象的理解各种操作符了

二、Rxjs介绍

2.1 介绍

  • RxJSReactiveX 编程理念的 JavaScript 版本。 ReactiveX 来自微软,它是一种针对异步数据 流的编程。简单来说,它将一切数据,包括 HTTP 请求, DOM 事件或者普通数据等包装成流的形式,然后用强大丰富的操作符对流进行处理,使你能以同步编程的方式处理异步数据,并组合不同的操作符来轻松优雅的实现你所需要的功能
  • RxJS 是一种针对异步数据流编程工具,或者叫响应式扩展编程;可不管如何解释 RxJS 其目 标就是异步编程, Angular 引入 RxJS 为了就是让异步可控、更简单。

目前常见的异步编程的几种方法:

Promise
Rxjs

2.2 Promise和RxJS处理异步对比

// promise异步处理
// Promise 处理异步
  getPromiseData() {
    return new Promise(resolve = >{
    setTimeout(() = >{
        resolve('---promise timeout---');
    },
    2000);
  });
  
// 使用

getPromiseData().then(d=>console.log(d))

// RxJS 处理异步:
getRxjsData() {
   return new Observable(observer = >{
      setTimeout(() = >{
        observer.next('observable timeout');
      },
      2000);
  }); 
}
// 使用
getRxjsData().subscribe(d=>console.log(d))

从上面列子可以看到 RxJSPromise 的基本用法非常类似,除了一些关键词不同。 Promise 里面用的是 then()resolve() ,而 RxJS 里面用的是 next()subscribe()
Rxjs 相比 Promise 要强大很多。 比如 Rxjs 中可以中途撤回、 Rxjs 可以发射多个值、 Rxjs 提供了多种工具函数等等

2.3 Rxjs基本概念

  • Observable (可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。
  • Observer (观察者): 一个回调函数的集合,它知道如何去监听由 Observable 提供的值。
  • Subscription (订阅): 表示 Observable 的执行,主要用于取消 Observable 的执行。
  • Operators (操作符): 采用函数式编程风格的纯函数 ( pure function ),使用像 mapfilterconcatflatMap 等这样的操作符来处理集合。
    Subject (主体): 相当于 EventEmitter ,并且是将值或事件多路推送给多个 Observer 的唯一方式。
  • Schedulers (调度器):用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如 setTimeoutrequestAnimationFrame 或其他

2.4 Rxjs使用场景

  • 涉及复杂的时序操作:在游戏的某一关卡中,连续按下上上下下左右左右baba,每次点按间隔不超过0.4ms,就发送信息到服务器 A
  • 涉及复杂的条件处理:用户输入时,每输入一个字符,发送信息给服务器 A ,如果服务器 A 返回的数据有问题,则请求服务器 B ,如果用户输入’fuck’则停止上述操作并请求服务器 c
  • 涉及复杂的状态管理:早上的时候每隔 10 秒检查一次用户信息,晚上的时候每隔5秒检查一次用户信息,检测到变更后响应式更新所有视图
  • 真要用 Rx 建议还是 Angular 环境下

三、基础入门

3.1 典型的写法

of(1,2,3).pipe(
  filter(item=>item % 2 === 1),
  map(item=>item * 3),
).subscribe(item=> console.log(item))

  • of 称为创建器,用来创建流,它返回一个 Observable 类型的对象, filter 和 map 称为操作符( operator ),用来对条目进行处理。这些操作符被当作 Observable 对象的 pipe 方法的参数传进去
  • Observable 对象的 subscribe 方法表示消费者要订阅这个流,当流中出现数据时,传给 subscribe 方法的回调函数就会被调用,并且把这个数据传进去。这个回调函数可能被调用很多次,取决于这个流中有多少条数据
  • 注意, Observable 必须被 subscribe 之后才会开始生产数据。如果没人 subscribe 它,那就什么都不会做

3.2 简单创建器

RxJS

3.2.1 of – 单一值转为流

它接收任意多个参数,参数可以是任意类型,然后它会把这些参数逐个放入流中

3.2.2 from – 数组转为流

它接受一个数组型参数,数组中可以有任意数据,然后把数组的每个元素逐个放入流中

3.2.3 range – 范围转为流

它接受两个数字型参数,一个起点,一个终点,然后按 1 递增,把中间的每个数字(含边界值)放入流中

3.2.4 fromPromise – Promise 转为流

  • 接受一个 Promise ,当这个 Promise 有了输出时,就把这个输出放入流中。
  • 要注意的是,当 Promise 作为参数传给 fromPromise 时,这个 Promise 就开始执行了,你没有机会防止它被执行。
  • 如果你需要这个 Promise 被消费时才执行,那就要改用接下来要讲的 defer 创建器

3.2.5 defer – 惰性创建流

defer

3.2.6 timer – 定时器流

  • 它有两个数字型的参数,第一个是首次等待时间,第二个是重复间隔时间。从图上可以看出,它实际上是个无尽流 —— 没有终止线。因此它会按照预定的规则往流中不断重复发出数据。
  • 要注意,虽然名字有相关性,但它不是 setTimeout 的等价物,事实上它的行为更像是 setInterval

3.2.7 interval – 定时器流

  • 它和 timer 唯一的差别是它只接受一个参数。事实上,它就是一个语法糖,相当于 timer(1000, 1000) ,也就是说初始等待时间和间隔时间是一样的。
  • 如果需求确实是 interval 的语义,那么就优先使用这个语法糖,毕竟,从行为上它和 setInterval 几乎是一样的

3.3 Subject – 主体对象

它和创建器不同,创建器是供直接调用的函数,而 Subject 则是一个实现了 observable 接口的类。也就是说,你要先把它 new 出来(假设实例叫 subject ),然后你就可以通过程序控制的方式往流里手动放数据了。它的典型用法是用来管理事件,比如当用户点击了某个按钮时,你希望发出一个事件,那么就可以调用 subject.next(someValue) 来把事件内容放进流中

Subject

3.4 合并创建器

我们不但可以直接创建流,还可以对多个现有的流进行不同形式的合并,创建一个新的流。常见的合并方式有三种:并联、串联、拉链

3.4.1 merge – 并联

  • 从图上我们可以看到两个流中的内容被合并到了一个流中。只要任何一个流中出现了值就会立刻被输出,哪怕其中一个流是完全空的也不影响结果 —— 等同于原始流。
  • 这种工作方式非常像电路中的并联行为,因此我称其为并联创建器。
  • 并联在什么情况下起作用呢?举个例子吧:有一个列表需要每隔 5 秒钟定时刷新一次,但是一旦用户按了搜索按钮,就必须立即刷新,而不能等待 5 秒间隔。这时候就可以用一个定时器流和一个自定义的用户操作流( subjectmerge 在一起。这样,无论哪个流中出现了数据,都会进行刷新

3.4.2 concat – 串联

Web API

3.4.3 zip – 拉链

  • zip 的直译就是拉链,事实上,有些压缩软件的图标就是一个带拉链的钥匙包。拉链的特点是两边各有一个“齿”,两者会啮合在一起。这里的 zip 操作也是如此。

从图上我们可以看到,两个输入流中分别出现了一些数据,当仅仅输入流 A 中出现了数据时,输出流中什么都没有,因为它还在等另一个“齿”。当输出流 B 中出现了数据时,两个“齿”都凑齐了,于是对这两个齿执行中间定义的运算(取 A 的形状, B 的颜色,并合成为输出数据)

zip

3.5 操作符

RxJS 有很多操作符,事实上比创建器还要多一些,但是我们并不需要一一讲解,因为它们中的很大一部分都是函数式编程中的标配,比如 mapreducefilter

3.5.1 retry – 失败时重试

401
retry

这里我为什么一直在强调失败时重试呢?因为还有一个操作符负责成功时重试

3.5.2 repeat – 成功时重试

  • 除了重复的条件之外, repeat 的行为几乎和 retry 一模一样。
  • repeat 很少会单独用,一般会组合上 delay 操作,以提供暂停时间,否则就容易 DoS 了服务器

3.5.3 delay – 延迟

  • 这才是真正的 setTimeout 的等价操作。它接受一个毫秒数(图中是 20 毫秒),每当它从输入流中读取一个数据之后,会先等待 20 毫秒,然后再放到输出流中。
  • 可以看到,输入流和输出流内容是完全一样的,只是时机上,输出流中的每个条目都恰好比输入流晚 20 毫秒出现

3.5.4 toArray – 收集为数组

  • 事实上,你几乎可以把它看做是 from 的逆运算。 from 把数组打散了逐个放进流中,而 toArray 恰好相反,把流中的内容收集到一个数组中 —— 直到这个流结束。
  • 这个操作符几乎总是放在最后一步,因为 RxJS 的各种 operator 本身就可以对流中的数据进行很多类似数组的操作,比如查找最小值、最大值、过滤等。所以通常会先使用各种 operator 对数据流进行处理,等到要脱离 RxJS 的体系时,再转换成数组传出去

3.5.5 debounceTime – 防抖

underscore/lodash
debounceTime

3.5.6 switchMap – 切换成另一个流

  • 有时候,我们会希望根据一个立即数发起一个远程查询,并且把这个异步取回的结果放进流中。比如,流中是一些学生的 id ,每过来一个 id ,你要发起一个 Ajax 请求来根据这个 id 获取这个学生的详情,并且把详情放进输出流中。
  • 注意,这是一个异步操作,所以你没法用普通的 map 来实现,否则映射出来的结果就会是一个个 Observable 对象。
  • switchMap 就是用来解决这个问题的。它在回调函数中接受从输入流中传来的数据,并转换成一个新的 Observable 对象(新的流,每个流中包括三个值,每个值都等于输入值的十倍), switchMap 会订阅这个 Observable 对象,并把它的值放入输出流中。注意图中竖线的位置 —— 只有当所有新的流都结束时,输出流才会结束

四、进一步使用

4.1 Rxjs unsubscribe 取消订阅

Promise 的创建之后,动作是无法撤回的。 Observable 不一样,动作可以通过 unsbscribe() 方法中途撤回,而且 Observable 在内部做了智能的处理.

Promise 创建之后动作无法撤回

let promise = new Promise(resolve = >{
    setTimeout(() = >{
        resolve('---promise timeout---');
    },
    2000);
});
promise.then(value = >console.log(value));

Rxjs 可以通过 unsubscribe() 可以撤回 subscribe 的动作

let stream = new Observable(observer = >{
    let timeout = setTimeout(() = >{
        clearTimeout(timeout);
        observer.next('observable timeout');
    },
    2000);
});
let disposable = stream.subscribe(value = >console.log(value));
setTimeout(() = >{
    //取消执行 disposable.unsubscribe();
},
1000);

4.2 Rxjs 订阅后多次执行

如果我们想让异步里面的方法多次执行,比如下面代码。

这一点 Promise 是做不到的,对于 Promise 来说,最终结果要么 resolve (兑现)、要么 reject (拒绝),而且都只能触发一次。如果在同一个 Promise 对象上多次调用 resolve 方法, 则会抛异常。而 Observable 不一样,它可以不断地触发下一个值,就像 next() 这个方法的 名字所暗示的那样。

// promise
let promise = new Promise(resolve = >{
    setInterval(() = >{
        resolve('---promise setInterval---');
    },
    2000);
});
promise.then(value = >console.log(value));

// Rxjs

let stream = new Observable < number > (observer = >{
    let count = 0;
    setInterval(() = >{
        observer.next(count++);
    },
    1000);
});
stream.subscribe(value = >console.log("Observable>" + value));

五、RXJS 实例操作符小结

常用的实例方法

  • pip :功能类似于 let 操作符
  • map :转换输出的数据
  • pluck :提取属性值并输出
  • do :不做数据格式化,可用于调试
  • filter :用于过滤一些 Observable
  • take :表示取几条数据
  • takeWhile :满足什么条件时开始取数据
  • skip :表示跳过多少条数据后开始取
  • distinctUntilChanged :比较输入的 Observable 计算得出的值当前与后最后的值是否相等使用 === 判断
  • scan :功能有点类似于 reduce 这个方法,可用于累加数据同时可以使用 startWith 的数据用途 scan 的初始值,最后返回累加的数据
  • delay :表示 Observable 延时多久开始处理订阅数据
  • toArray :把输出值格式化成数据形式
  • toMap :给当前的输出取个名字或标签
  • expand :实现递归
  • forkJoin :类似于 Promise.all ,只有数据全部返回且状态为 complete 时,表示成功处理了请求,否则失败
  • let :这个操作符可以获取完整的输入 Observable 对象,做相应的处理后返回新的 Observable 对象
  • catch :用于 Observable 处理数据异常的处理
  • combineLatest :用于组且各个输入的 Observable ,并获取和返回各个 Observable 最新的数据
  • merge :用于把两个 Observable 合成一个处理