RxSwift-Observable序列的创建方式
2011 年 3 月 10 日
序列在RxSwift的世界里面是非常重要,平时开发过程用好序列的创建,能够给开发带来事半功倍的效果!这个篇章总结了常用的序列创建方式
1:emty
首先来一个空的序列 – 本来序列事件是 Int类型
的,这里调用 emty函数
没有序列,只能 complete
print("********emty********") let emtyOb = Observable.empty() _ = emtyOb.subscribe(onNext: { (number) in print("订阅:",number) }, onError: { (error) in print("error:",error) }, onCompleted: { print("完成回调") }) { print("释放回调") } 复制代码
- 这种方式不常用,但是我们以点及面展开分析
- 通过源码解析查看
override func subscribe(_ observer: O) -> Disposable where O.E == Element { observer.on(.completed) return Disposables.create() } 复制代码
- 很明显在订阅的时候,直接
observer.on(.completed)
发送了完成信号,非常简洁

2: just
- 单个信号序列创建
- 该方法通过传入一个默认值来初始化,构建一个只有一个元素的
Observable
队列,订阅完信息自动complete
。 - 下面的样例,我们显示地标注出了
Observable
的类型为Observable
,即指定了这个Observable
所发出的事件携带的数据类型必须是String 类型
的
print("********just********") //MARK: just // 单个信号序列创建 let array = ["LG_Cooci","LG_Kody"] Observable.just(array) .subscribe { (event) in print(event) }.disposed(by: disposeBag) _ = Observable.just(array).subscribe(onNext: { (number) in print("订阅:",number) }, onError: { (error) in print("error:",error) }, onCompleted: { print("完成回调") }) { print("释放回调") } 复制代码
- 感觉有点数据便利的感觉
- 这个序列在平时开发里面还是应用挺多的,看看底层源码
override func subscribe(_ observer: O) -> Disposable where O.E == Element { observer.on(.next(self._element)) observer.on(.completed) return Disposables.create() } 复制代码
-
observer.on(.next(self._element))
常规订阅之后就会发送.next
事件 - 之后就会自动发送完成事件,跟我们效果完全吻合

3:of
- 此方法创建一个新的可观察实例,该实例具有可变数量的元素。
- 该方法可以接受可变数量的参数(必需要是同类型的)
print("********of********") //MARK: of // 多个元素 - 针对序列处理 Observable.of("LG_Cooci","LG_Kody") .subscribe { (event) in print(event) }.disposed(by: disposeBag) // 字典 Observable.of(["name":"LG_Cooci","age":18]) .subscribe { (event) in print(event) }.disposed(by: disposeBag) // 数组 Observable.of(["LG_Cooci","LG_Kody"]) .subscribe { (event) in print(event) }.disposed(by: disposeBag) 复制代码
- 无论
字典
,数组
,多个元素
都是正常使用 - 底层源码的结构也是中规中矩
- 初始化保存调度环境和传入的元素
- 订阅流程也是利用
sink
,然后通过mutableIterator
迭代器处理发送

4:from
- 将可选序列转换为可观察序列。
- 从集合中获取序列:数组,集合,set 获取序列 – 有可选项处理 – 更安全
print("********from********") // MARK: from Observable.from(optional: ["LG_Cooci","LG_Kody"]) .subscribe { (event) in print(event) }.disposed(by: disposeBag) 复制代码
-
self._optional = optional
底层初始化可选项保存 - 订阅流程判断是否匹配我们的可选项
- 发送
observer.on(.next(element))
序列 * 随即自动observer.on(.completed)
完成序列发送

5:deferred
- 返回一个可观察序列,该序列在新观察者订阅时调用指定的工厂函数。
- 这里有一个需求:动态序列 – 根据外界的标识 – 动态输出
- 使用
deferred()
方法延迟Observable序列
的初始化,通过传入的block
来实现Observable序列
的初始化并且返回。
print("********defer********") //MARK: defer var isOdd = true _ = Observable.deferred { () -> Observable in // 这里设计我们的序列 isOdd = !isOdd if isOdd { return Observable.of(1,3,5,7,9) } return Observable.of(0,2,4,6,8) } .subscribe { (event) in print(event) } 复制代码
-
self._observableFactory = observableFactory
初始化保存了这段工厂闭包
func run() -> Disposable { do { let result = try self._observableFactory() return result.subscribe(self) } catch let e { self.forwardOn(.error(e)) self.dispose() return Disposables.create() } } 复制代码
sink

6:rang
- 使用指定的调度程序生成并发送观察者消息,生成指定范围内的可观察整数序列。
print("********rang********") //MARK: rang Observable.range(start: 2, count: 5) .subscribe { (event) in print(event) }.disposed(by: disposeBag) // 底层源码 init(start: E, count: E, scheduler: ImmediateSchedulerType) { self._start = start self._count = count self._scheduler = scheduler } override func run(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E { let sink = RangeSink(parent: self, observer: observer, cancel: cancel) let subscription = sink.run() return (sink: sink, subscription: subscription) } 复制代码
- 保存序列中第一个整数的值。
- 保存要生成的顺序整数的数目。
- 保存调度环境
if i < self._parent._count { self.forwardOn(.next(self._parent._start + i)) recurse(i + 1) } else { self.forwardOn(.completed) self.dispose() } 复制代码
- 根据之前保存的信息,数据的状态也不断攀升,然后递归到规定的要求
7:generate
- 通过运行产生序列元素的状态驱动循环,使用指定的调度程序运行循环,发送观察者消息,从而生成一个可观察序列。
- 该方法创建一个只有当提供的所有的判断条件都为
true
的时候,才会给出动作的Observable
序列。 - 初始值给定 然后
判断条件1
再判断条件2
会一直递归下去
,直到条件1或者条件2不满足
- 类似 数组遍历循环
- -参数一
initialState
: 初始状态。 - -参数二
condition
:终止生成的条件(返回“false”时)。 - -参数三
iterate
:迭代步骤函数。 - -参数四 调度器:用来运行生成器循环的调度器,默认:
CurrentThreadScheduler.instance
。 - -返回:生成的序列。
print("********generate********") //MARK: generate Observable.generate(initialState: 0,// 初始值 condition: { $0 < 10}, // 条件1 iterate: { $0 + 2 }) // 条件2 +2 .subscribe { (event) in print(event) }.disposed(by: disposeBag) // 数组遍历 let arr = ["LG_Cooci_1","LG_Cooci_2","LG_Cooci_3","LG_Cooci_4","LG_Cooci_5","LG_Cooci_6","LG_Cooci_7","LG_Cooci_8","LG_Cooci_9","LG_Cooci_10"] Observable.generate(initialState: 0,// 初始值 condition: { $0 < arr.count}, // 条件1 iterate: { $0 + 1 }) // 条件2 +2 .subscribe(onNext: { print("遍历arr:",arr[$0]) }) .disposed(by: disposeBag) 复制代码

8:timer
- 返回一个可观察序列,该序列使用指定的调度程序运行计时器,在指定的初始相对到期时间过后定期生成一个值。
- 第一次参数:第一次响应距离现在的时间
- 第二个参数:时间间隔
- 第三个参数:线程
print("********timer********") //MARK: timer Observable.timer(5, period: 2, scheduler: MainScheduler.instance) .subscribe { (event) in print(event) } .disposed(by: disposeBag) // 因为没有指定期限period,故认定为一次性 Observable.timer(1, scheduler: MainScheduler.instance) .subscribe { (event) in print("111111111 \(event)") } //.disposed(by: disposeBag) 复制代码
- 状态码的不断攀升,间隔时间不断发送响应

9:interval
- 返回一个可观察序列,该序列在每个周期之后生成一个值,使用指定的调度程序运行计时器并发送观察者消息。
print("********interval********") //MARK: interval // 定时器 Observable.interval(1, scheduler: MainScheduler.instance) .subscribe { (event) in print(event) } //.disposed(by: disposeBag) 复制代码

9:repeatElement
- 使用指定的调度程序发送观察者消息,生成无限重复给定元素的可观察序列。
print("********repeatElement********") //MARK: repeatElement Observable.repeatElement(5) .subscribe { (event) in // print("订阅:",event) } .disposed(by: disposeBag) 复制代码

10:error
- 返回一个以“error”结束的可观察序列。
- 这个序列平时在开发也比较常见,请求网络失败也会发送失败信号!
print("********error********") //MARK: error // 对消费者发出一个错误信号 Observable.error(NSError.init(domain: "lgerror", code: 10086, userInfo: ["reason":"unknow"])) .subscribe { (event) in print("订阅:",event) } .disposed(by: disposeBag) 复制代码

11:never
- 该方法创建一个永远不会发出
Event
(也不会终止)的Observable
序列。 - 这种类型的响应源 在测试或者在组合操作符中禁用确切的源非常有用
print("********never********") //MARK: never Observable.never() .subscribe { (event) in print("走你",event) } .disposed(by: disposeBag) print("********never********") 复制代码

12:create()
- 该方法接受一个 闭包形式的参数,任务是对每一个过来的订阅进行处理。
- 下面是一个简单的样例。为方便演示,这里增加了订阅相关代码
- 这也是序列创建的一般方式,应用非常之多
let observable = Observable.create{observer in //对订阅者发出了.next事件,且携带了一个数据"hangge.com" observer.onNext("hangge.com") //对订阅者发出了.completed事件 observer.onCompleted() //因为一个订阅行为会有一个Disposable类型的返回值,所以在结尾一定要returen一个Disposable return Disposables.create() } //订阅测试 observable.subscribe { print($0) } 复制代码
序列的创建也是学习 RxSwift
的根基,有很多时候我遇到很多的 BUG
,说白了就是根基没有掌握好!现在我们的已经 RxSwift
第十节课上完了,很多同学对 RxSwift
有了一个比较深入的了解,当然这里我还是以博客的形式写出来,就是希望大家能够及时回来看看,我知道,肯定会有很多同学回来继续加深基础的! 就问此时此刻还有谁?45度仰望天空,该死!我这无处安放的魅力!