异步编程入门之RxJs(一)
-
Promise
的特点是无论有没有人关心它的执行结果,它都会立即开始执行,并且你没有机会取消这次执行。显然,在某些情况下这么做是浪费的甚至错误的 - 以电商为例,如果某商户的订单不允许取消,你还会去买吗?
- 如果你发起了一个
Ajax
请求,然后用户导航到了另一个路由,显然,你这个请求如果还没有完成就应该被取消,而不应该发出去 - 使用
Promise
,你做不到,不是因为实现方面的原因,而是因为它在概念层(接口定义上)就无法支持取消 - 由于
Promise
只会承载一个值,因此当我们要处理的是一个集合的时候就比较困难了。比如对于一个随机数列(总数未知),如果我们要借助Web API
检查每个数字的有效性,然后对前一百个有效数字进行求和,那么用Promise
写就比较麻烦了
1.2 Observable
Observable Observable
1.3 ReactiveX宝石图
- 中间的带箭头的线就像传送带,用来表示数据序列,这个数据序列被称为“流”。上方的流叫做输入流,下方的流叫做输出流。输入流可能有多个,但是输出流只会有一个(不过,流中的每个数据项也可以是别的流)
- 数据序列上的每个圆圈表示一个数据项,圆圈的位置表示数据出现的先后顺序,但是一般不会表示精确的时间比例,比如在一毫秒内接连出现的两个数据之间仍然会有较大的距离。只有少数涉及到时间的操作,其宝石图才会表现出精确的时间比例
- 圆圈的最后,通常会有一条竖线或者一个叉号。竖线表示这个流正常终止了,也就是说不会再有更多的数据提供出来了。而叉号表示这个流抛出错误导致异常中止了。还有一种流,既没有竖线也没有叉号,这种叫做无尽流,比如一个由所有自然数组成的流就不会主动终止。但是要注意,无尽流仍然是可以处理的,因为需要多少项是由消费者决定的。你可以把这个“智能”传送带理解为由下一个工位“叫号”的,没“叫号”下一项数据就不会过来
- 中间的大方框表示一个操作,也就是 operator —— 一个函数,比如这个图中的操作就是把输入流中的条目乘以十后放入输出流中。
- 看懂了宝石图,就能很形象的理解各种操作符了
二、Rxjs介绍
2.1 介绍
-
RxJS
是ReactiveX
编程理念的JavaScript
版本。ReactiveX
来自微软,它是一种针对异步数据 流的编程。简单来说,它将一切数据,包括HTTP
请求,DOM
事件或者普通数据等包装成流的形式,然后用强大丰富的操作符对流进行处理,使你能以同步编程的方式处理异步数据,并组合不同的操作符来轻松优雅的实现你所需要的功能 -
RxJS
是一种针对异步数据流编程工具,或者叫响应式扩展编程;可不管如何解释RxJS
其目 标就是异步编程,Angular
引入RxJS
为了就是让异步可控、更简单。
目前常见的异步编程的几种方法:
Promise Rxjs
2.2 Promise和RxJS处理异步对比
// promise异步处理 // Promise 处理异步 getPromiseData() { return new Promise(resolve = >{ setTimeout(() = >{ resolve('---promise timeout---'); }, 2000); }); // 使用 getPromiseData().then(d=>console.log(d))
// RxJS 处理异步: getRxjsData() { return new Observable(observer = >{ setTimeout(() = >{ observer.next('observable timeout'); }, 2000); }); } // 使用 getRxjsData().subscribe(d=>console.log(d))
从上面列子可以看到 RxJS
和 Promise
的基本用法非常类似,除了一些关键词不同。 Promise
里面用的是 then()
和 resolve()
,而 RxJS
里面用的是 next()
和 subscribe()
Rxjs
相比 Promise
要强大很多。 比如 Rxjs
中可以中途撤回、 Rxjs
可以发射多个值、 Rxjs
提供了多种工具函数等等
2.3 Rxjs基本概念
-
Observable
(可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。 -
Observer
(观察者): 一个回调函数的集合,它知道如何去监听由Observable
提供的值。 -
Subscription
(订阅): 表示Observable
的执行,主要用于取消Observable
的执行。 -
Operators
(操作符): 采用函数式编程风格的纯函数 (pure function
),使用像map
、filter
、concat
、flatMap
等这样的操作符来处理集合。Subject
(主体): 相当于EventEmitter
,并且是将值或事件多路推送给多个Observer
的唯一方式。 -
Schedulers
(调度器):用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如setTimeout
或requestAnimationFrame
或其他
2.4 Rxjs使用场景
- 涉及复杂的时序操作:在游戏的某一关卡中,连续按下上上下下左右左右baba,每次点按间隔不超过0.4ms,就发送信息到服务器
A
- 涉及复杂的条件处理:用户输入时,每输入一个字符,发送信息给服务器
A
,如果服务器A
返回的数据有问题,则请求服务器B
,如果用户输入’fuck’则停止上述操作并请求服务器c
- 涉及复杂的状态管理:早上的时候每隔
10
秒检查一次用户信息,晚上的时候每隔5秒检查一次用户信息,检测到变更后响应式更新所有视图 - 真要用
Rx
建议还是Angular
环境下
三、基础入门
3.1 典型的写法
of(1,2,3).pipe( filter(item=>item % 2 === 1), map(item=>item * 3), ).subscribe(item=> console.log(item))
-
of
称为创建器,用来创建流,它返回一个Observable
类型的对象,filter
和 map 称为操作符(operator
),用来对条目进行处理。这些操作符被当作Observable
对象的pipe
方法的参数传进去 -
Observable
对象的subscribe
方法表示消费者要订阅这个流,当流中出现数据时,传给subscribe
方法的回调函数就会被调用,并且把这个数据传进去。这个回调函数可能被调用很多次,取决于这个流中有多少条数据 - 注意,
Observable
必须被subscribe
之后才会开始生产数据。如果没人subscribe
它,那就什么都不会做
3.2 简单创建器
RxJS
3.2.1 of – 单一值转为流
它接收任意多个参数,参数可以是任意类型,然后它会把这些参数逐个放入流中
3.2.2 from – 数组转为流
它接受一个数组型参数,数组中可以有任意数据,然后把数组的每个元素逐个放入流中
3.2.3 range – 范围转为流
它接受两个数字型参数,一个起点,一个终点,然后按 1
递增,把中间的每个数字(含边界值)放入流中
3.2.4 fromPromise – Promise 转为流
- 接受一个
Promise
,当这个Promise
有了输出时,就把这个输出放入流中。 - 要注意的是,当
Promise
作为参数传给fromPromise
时,这个Promise
就开始执行了,你没有机会防止它被执行。 - 如果你需要这个
Promise
被消费时才执行,那就要改用接下来要讲的defer
创建器
3.2.5 defer – 惰性创建流
defer
3.2.6 timer – 定时器流
- 它有两个数字型的参数,第一个是首次等待时间,第二个是重复间隔时间。从图上可以看出,它实际上是个无尽流 —— 没有终止线。因此它会按照预定的规则往流中不断重复发出数据。
- 要注意,虽然名字有相关性,但它不是
setTimeout
的等价物,事实上它的行为更像是setInterval
3.2.7 interval – 定时器流
- 它和
timer
唯一的差别是它只接受一个参数。事实上,它就是一个语法糖,相当于timer(1000, 1000)
,也就是说初始等待时间和间隔时间是一样的。 - 如果需求确实是
interval
的语义,那么就优先使用这个语法糖,毕竟,从行为上它和setInterval
几乎是一样的
3.3 Subject – 主体对象
它和创建器不同,创建器是供直接调用的函数,而 Subject
则是一个实现了 observable
接口的类。也就是说,你要先把它 new
出来(假设实例叫 subject
),然后你就可以通过程序控制的方式往流里手动放数据了。它的典型用法是用来管理事件,比如当用户点击了某个按钮时,你希望发出一个事件,那么就可以调用 subject.next(someValue)
来把事件内容放进流中
Subject
3.4 合并创建器
我们不但可以直接创建流,还可以对多个现有的流进行不同形式的合并,创建一个新的流。常见的合并方式有三种:并联、串联、拉链
3.4.1 merge – 并联
- 从图上我们可以看到两个流中的内容被合并到了一个流中。只要任何一个流中出现了值就会立刻被输出,哪怕其中一个流是完全空的也不影响结果 —— 等同于原始流。
- 这种工作方式非常像电路中的并联行为,因此我称其为并联创建器。
- 并联在什么情况下起作用呢?举个例子吧:有一个列表需要每隔
5
秒钟定时刷新一次,但是一旦用户按了搜索按钮,就必须立即刷新,而不能等待5
秒间隔。这时候就可以用一个定时器流和一个自定义的用户操作流(subject
)merge
在一起。这样,无论哪个流中出现了数据,都会进行刷新
3.4.2 concat – 串联
Web API
3.4.3 zip – 拉链
-
zip
的直译就是拉链,事实上,有些压缩软件的图标就是一个带拉链的钥匙包。拉链的特点是两边各有一个“齿”,两者会啮合在一起。这里的zip
操作也是如此。
从图上我们可以看到,两个输入流中分别出现了一些数据,当仅仅输入流 A
中出现了数据时,输出流中什么都没有,因为它还在等另一个“齿”。当输出流 B
中出现了数据时,两个“齿”都凑齐了,于是对这两个齿执行中间定义的运算(取 A
的形状, B
的颜色,并合成为输出数据)
zip
3.5 操作符
RxJS
有很多操作符,事实上比创建器还要多一些,但是我们并不需要一一讲解,因为它们中的很大一部分都是函数式编程中的标配,比如 map
、 reduce
、 filter
等
3.5.1 retry – 失败时重试
401 retry
这里我为什么一直在强调失败时重试呢?因为还有一个操作符负责成功时重试
3.5.2 repeat – 成功时重试
- 除了重复的条件之外,
repeat
的行为几乎和retry
一模一样。 -
repeat
很少会单独用,一般会组合上delay
操作,以提供暂停时间,否则就容易 DoS 了服务器
3.5.3 delay – 延迟
- 这才是真正的
setTimeout
的等价操作。它接受一个毫秒数(图中是20
毫秒),每当它从输入流中读取一个数据之后,会先等待20
毫秒,然后再放到输出流中。 - 可以看到,输入流和输出流内容是完全一样的,只是时机上,输出流中的每个条目都恰好比输入流晚
20
毫秒出现
3.5.4 toArray – 收集为数组
- 事实上,你几乎可以把它看做是
from
的逆运算。from
把数组打散了逐个放进流中,而toArray
恰好相反,把流中的内容收集到一个数组中 —— 直到这个流结束。 - 这个操作符几乎总是放在最后一步,因为
RxJS
的各种operator
本身就可以对流中的数据进行很多类似数组的操作,比如查找最小值、最大值、过滤等。所以通常会先使用各种 operator 对数据流进行处理,等到要脱离RxJS
的体系时,再转换成数组传出去
3.5.5 debounceTime – 防抖
underscore/lodash debounceTime
3.5.6 switchMap – 切换成另一个流
- 有时候,我们会希望根据一个立即数发起一个远程查询,并且把这个异步取回的结果放进流中。比如,流中是一些学生的
id
,每过来一个id
,你要发起一个Ajax
请求来根据这个id
获取这个学生的详情,并且把详情放进输出流中。 - 注意,这是一个异步操作,所以你没法用普通的
map
来实现,否则映射出来的结果就会是一个个Observable
对象。 -
switchMap
就是用来解决这个问题的。它在回调函数中接受从输入流中传来的数据,并转换成一个新的Observable
对象(新的流,每个流中包括三个值,每个值都等于输入值的十倍),switchMap
会订阅这个Observable
对象,并把它的值放入输出流中。注意图中竖线的位置 —— 只有当所有新的流都结束时,输出流才会结束
四、进一步使用
4.1 Rxjs unsubscribe 取消订阅
Promise
的创建之后,动作是无法撤回的。 Observable
不一样,动作可以通过 unsbscribe()
方法中途撤回,而且 Observable
在内部做了智能的处理.
Promise 创建之后动作无法撤回
let promise = new Promise(resolve = >{ setTimeout(() = >{ resolve('---promise timeout---'); }, 2000); }); promise.then(value = >console.log(value));
Rxjs 可以通过 unsubscribe() 可以撤回 subscribe 的动作
let stream = new Observable(observer = >{ let timeout = setTimeout(() = >{ clearTimeout(timeout); observer.next('observable timeout'); }, 2000); }); let disposable = stream.subscribe(value = >console.log(value)); setTimeout(() = >{ //取消执行 disposable.unsubscribe(); }, 1000);
4.2 Rxjs 订阅后多次执行
如果我们想让异步里面的方法多次执行,比如下面代码。
这一点 Promise
是做不到的,对于 Promise
来说,最终结果要么 resolve
(兑现)、要么 reject
(拒绝),而且都只能触发一次。如果在同一个 Promise
对象上多次调用 resolve
方法, 则会抛异常。而 Observable
不一样,它可以不断地触发下一个值,就像 next()
这个方法的 名字所暗示的那样。
// promise let promise = new Promise(resolve = >{ setInterval(() = >{ resolve('---promise setInterval---'); }, 2000); }); promise.then(value = >console.log(value));
// Rxjs let stream = new Observable < number > (observer = >{ let count = 0; setInterval(() = >{ observer.next(count++); }, 1000); }); stream.subscribe(value = >console.log("Observable>" + value));
五、RXJS 实例操作符小结
常用的实例方法
-
pip
:功能类似于let
操作符 -
map
:转换输出的数据 -
pluck
:提取属性值并输出 -
do
:不做数据格式化,可用于调试 -
filter
:用于过滤一些Observable
-
take
:表示取几条数据 -
takeWhile
:满足什么条件时开始取数据 -
skip
:表示跳过多少条数据后开始取 -
distinctUntilChanged
:比较输入的Observable
计算得出的值当前与后最后的值是否相等使用===
判断 -
scan
:功能有点类似于reduce
这个方法,可用于累加数据同时可以使用startWith
的数据用途scan
的初始值,最后返回累加的数据 -
delay
:表示Observable
延时多久开始处理订阅数据 -
toArray
:把输出值格式化成数据形式 -
toMap
:给当前的输出取个名字或标签 -
expand
:实现递归 -
forkJoin
:类似于Promise.all
,只有数据全部返回且状态为complete
时,表示成功处理了请求,否则失败 -
let
:这个操作符可以获取完整的输入Observable
对象,做相应的处理后返回新的Observable
对象 -
catch
:用于Observable
处理数据异常的处理 -
combineLatest
:用于组且各个输入的Observable
,并获取和返回各个Observable
最新的数据 -
merge
:用于把两个Observable
合成一个处理