RxSwift特征序列
任何序列都可以用 Observable
描述,创建序列 -> 订阅序列 -> 信号发送 -> 信号接收。
Observable.create { (observer) -> Disposable inobserver.onNext("信号1")return Disposables.create() }.subscribe(onNext: { (val) inprint("信号接收区:\(val)") }).disposed(by: disposeBag)复制代码
Observable
是通用序列的描述符,调用 .onNext
, .onError
, onCompleted
来发送信号,通用性强,但针对特殊需求可能会觉得繁琐,因此 RxSwift
还提供了一组特征序列,是 Observable
序列的变种,它能够帮助我们更准确的描述序列。即 Single
、 Completable
、 Maybe
、 Driver
、 Signal
、 ControlEvent
。
二、Single
1、定义
单元素序列,信号只发送一次,响应信号或错误信号。
Single.create { (single) -> Disposable insingle(.success("假装我是一个正儿八经的数据")) //single(.error(NSError.init(domain: "网络出现错误", code: 101, userInfo:["name":"hibo"])))return Disposables.create() }.subscribe(onSuccess: { (val) inprint("Single:\(val)") }) { (error) inprint("SingleError:\(error)") }.disposed(by: disposeBag)复制代码
-
sinngle(.success(data))
->onSuccess
发送响应元素到成功观察者 -
sinngle(.error(error))
->error
发送错误元素到错误观察者
响应元素和错误元素分开处理,此时我们可以联想到应用中的网络请求,成功数据用来渲染,错误数则据弹出提示框。
2、源码探索
2.1、Single定义
/// Sequence containing exactly 1 element public enum SingleTrait { } /// Represents a push style sequence containing 1 element. public typealias Single= PrimitiveSequencepublic enum SingleEvent{ /// One and only sequence element is produced. (underlying observable sequence emits: `.next(Element)`, `.completed`)case success(Element) /// Sequence terminated with an error. (underlying observable sequence emits: `.error(Error)`)case error(Swift.Error) }复制代码
定位到 Single.swift
文件,首先能看到 Single
是 PrimitiveSequence
结构体类型的别名, SingleEvent
是事件枚举,有 success
和 error
两个成员变量。
2.2、 create
创建序列。代码如下(此处代码标记为:one:):
extension PrimitiveSequenceType where TraitType == SingleTrait { public typealias SingleObserver = (SingleEvent) -> Void //代码省略若干行 public static func create(subscribe: @escaping (@escaping SingleObserver) -> Disposable) -> Single{let source = Observable.create { observer inreturn subscribe { event inswitch event {case .success(let element): observer.on(.next(element)) observer.on(.completed)case .error(let error): observer.on(.error(error)) } } } return PrimitiveSequence(raw: source) } }复制代码
首先看参数是一个带 Disposable
类型返回值的闭包,交由外部(业务层)实现,内部调用向外传入一个 SingleObserver
闭包,以上写法不太好理解,我们可以换一种写法:
public static func create(subscribe: @escaping (@escaping SingleObserver) -> Disposable) -> Single{let source = Observable.create { observer in// 1、内部实现一个闭包,用来接收外界传入的SingleEvent信号,接着做进一步的信号发送let block = { (event:SingleEvent) -> Void inswitch event {case .success(let element): observer.on(.next(element)) observer.on(.completed)case .error(let error): observer.on(.error(error)) } } // 2、调用外部实现的闭包方法,向外部发送内部实现的闭包方法做连接作用let disposable = subscribe(block)//3、返回值Disposable对象 return disposable }return PrimitiveSequence(raw: source)//4、创建PrimitiveSequence对象并保存Observable序列对象 }复制代码
-
内部实现一个闭包
block
,用来接收外界传入的SingleEvent
信号,接着做进一步的信号发送 -
调用外部实现的闭包方法,将内部实现的闭包
block
发送出去,起连接作用 -
创建
PrimitiveSequence
对象并保存Observable
序列对象source
,返回PrimitiveSequence
对象
在 create
方法内部实际上实现了一个 Observable
序列,由此可见 Single
序列是对 Observable
序列的封装, Disposable
对象通过闭包交由业务层创建, Single
序列在实现上,方式方法与 Observable
保持一致,此处可称一绝。当前我们只探索 Single
的信号是如何完成传递, Observable
序列的信号传递流程在 《Swift核心源码探索》
中有详细介绍。
2.3、订阅序列
也是在同 PrimitiveSequenceType
扩展中定义,代码如下(此处代码标记为:two:):
public func subscribe(onSuccess: ((ElementType) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil) -> Disposable {#if DEBUG let callStack = Hooks.recordCallStackOnError ? Thread.callStackSymbols : []#elselet callStack = [String]()#endifreturn self.primitiveSequence.subscribe { event inswitch event {case .success(let element): onSuccess?(element)case .error(let error):if let onError = onError { onError(error) } else { Hooks.defaultErrorHandler(callStack, error) } } } }复制代码
方法中先调用了 self.primitiveSequence
方法,返回了 self
,方法是在遵循 PrimitiveSequenceType
协议的 PrimitiveSequence
的扩展中,为了保证协议的一致性。代码如下:
extension PrimitiveSequence: PrimitiveSequenceType { /// Additional constraints public typealias TraitType = Trait /// Sequence element typepublic typealias ElementType = Element // Converts `self` to primitive sequence. /// /// - returns: Observable sequence that represents `self`. public var primitiveSequence: PrimitiveSequence{return self } }复制代码
紧接着调用另一个 subscribe
方法,代码如下(此处代码标记为:three:):
public func subscribe(_ observer: @escaping (SingleEvent) -> Void) -> Disposable { var stopped = falsereturn self.primitiveSequence.asObservable().subscribe { event inif stopped { return } stopped = true switch event {case .next(let element): observer(.success(element))case .error(let error): observer(.error(error))case .completed: rxFatalErrorInDebug("Singles can't emit a completion event") } } }复制代码
-
self.primitiveSequence -> asObservable() -> subscribe
-
此处截断了
completed
信号的向上传递,因此Single
序列只能收到响应信号和错误信号
该段代码也调用了 self.primitiveSequence
方法,接着调用 asObservable()
方法,查看代码发现此处是为了获取 source
对象,即 Observable
可观察序列。
再查看 subscribe
的方法(此处标记为代码:four:):
public func subscribe(_ on: @escaping (Event) -> Void) -> Disposable {let observer = AnonymousObserver { e inon(e) }return self.asObservable().subscribe(observer) }复制代码
- 代码创建了一个观察者,当前观察者将会收到发送过来的消息,并由此通过闭包一层层传到业务层。 :four: -> :three: -> :two: -> :one: ->业务层
-
当前self指向的是:one:处创建并保存的
Observable
类型的source
对象,因此该处subscribe
所调用的即是Produce
类中的subscribe
方法,在方法内部创建了sink
对象,来触发创建序列时实现的闭包,即代码:one:处所create
后的闭包 -
此时就到了业务层,通过create内部实现的闭包
single
向内部发送消息,再有observer
调用on
来向观察者发送信号 - 信号发送不做赘述,最终会到达:four:处代码的观察者,此时再由闭包一层层向上传递,直到业务层的监听闭包
总结:
序列的产生,订阅,发送,接收还是由 Observable
来实现的, Single
只是对 Observable
做了封装,去除了 onCompleted
的消息监听及消息发送。
具体的Observable序列产生到观察流程见 《Swift核心源码探索》
三、Completable
只能产生 completed
事件和 error
事件,没有序列元素值产生。
Completable.create { (completable) -> Disposable incompletable(.completed) //completable(.error(NSError.init(domain: "出现异常", code: 101, userInfo: nil)))return Disposables.create() }.subscribe(onCompleted: {print("Completable") }) { (error) inprint(error) }.disposed(by: disposeBag)复制代码
- 应用场景,只关心任务是否完成,不关心不需要结果
-
在
Competable.swift
下,在PrimitiveSequenceType
扩展中实现了序列的创建,订阅,即信号转发
定义如下:
/// Sequence containing 0 elements public enum CompletableTrait { } /// Represents a push style sequence containing 0 elements. public typealias Completable = PrimitiveSequencepublic enum CompletableEvent { /// Sequence terminated with an error. (underlying observable sequence emits: `.error(Error)`)case error(Swift.Error) /// Sequence completed successfully.case completed }复制代码
同样 Completable
类也是 PrimitiveSequence
的别名,并声明一个枚举包含, error
和 completed
成员变量,限定了事件产生类型。都是对 Observable
序列的封装,源码此处不做探索说明,和 Single
一致,只是在订阅阶段对 .next
事件做了拦截。
四、Maybe
和 Single
序列相似,发出一个元素或一个 completed
事件或 error
事件。
Maybe.create { (maybe) -> Disposable inmaybe(.success("element")) //maybe(.completed) //maybe(.error(NSError.init(domain: "出现异常", code: 101, userInfo: nil)))return Disposables.create() }.subscribe(onSuccess: { (val) inprint(val) }, onError: { (error) inprint("error:\(error)") }) {print("completed") }.disposed(by: disposeBag)复制代码
在开发中,如果一个业务有时候需要一个元素,有时候只需要知道处理完成的时候,可以使用该 Maybe
,解决不确定需求问题。源码探索略,同上。
五、Driver
老司机开车永远不会出错,因此 Driver
序列不会产生 error
事件,并一定在主线程中监听,会想新订阅者发送,上次发送出的元素,主要为简化 UI
层的代码。下面看看为什么会有 Driver
序列。
有一个需求:
-
搜索框中每次输入一个文本,获取一次网络请求,成功后渲染
UI
先实现一个简单的 UI
:
let tf = UITextField.init(frame: CGRect(x: 100, y: 100, width: 200, height: 40)) tf.borderStyle = .roundedRect tf.placeholder = "请输入"self.view.addSubview(tf)let label1 = UILabel.init(frame: CGRect(x: 100, y: 160, width: 200, height: 40)) label1.backgroundColor = .groupTableViewBackground label1.textAlignment = .center self.view.addSubview(label1)let label2 = UILabel.init(frame: CGRect(x: 100, y: 210, width: 200, height: 40)) label2.backgroundColor = .groupTableViewBackground label2.textAlignment = .center self.view.addSubview(label2)复制代码
创建了一个 textfield
,两个 label
用来展示。下面在来实现一个网络请求,返回一个 Single
序列:
func network(text:String) -> Single{return Single.create(subscribe: { (single) -> Disposable inif text == "1234"{ single(.error(NSError.init(domain: "出现错误", code: 101, userInfo: nil))) } DispatchQueue.global().async {print("请求网络") single(.success(text)) }return Disposables.create() }) }复制代码
网络请求为耗时操作,因此我们在异步中来完成,直接发送序列,假装我们请求了一次网络。
再实现 textfield
输入序列的监听,并调取网络请求方法:
let result = tf.rx.text.orEmpty.skip(1) .flatMap{return self.network(text: $0) .observeOn(MainScheduler.instance) .catchErrorJustReturn("网络请求失败") }.share(replay: 1, scope: .whileConnected) //网络请求将发送多次请求 result.subscribe(onNext: { (val) inprint("订阅一:\(val) 线程:\(Thread.current)") }).disposed(by: disposeBag) result.subscribe(onNext: { (val) inprint("订阅二:\(val) 线程:\(Thread.current)") }).disposed(by: disposeBag) result.map{"\(($0 as! String).count)"}.bind(to: label1.rx.text).disposed(by: disposeBag) result.map{"\($0)"}.bind(to: label2.rx.text).disposed(by: disposeBag)复制代码
代码介绍
-
flatMap
-
observeOn
选择在哪个线程执行 -
catchErrorJustReturn
错误处理,将onError
事件转为onNext
事件 -
share
为多个观察者共享资源,网络请求只发送呢一次,否则多个订阅将会触发多个请求
Driver实现:
let result = tf.rx.text.orEmpty .asDriver() .flatMap {return self.network(text: $0).asDriver(onErrorJustReturn: "网络请求失败") } result.map{"长度:\(($0 as! String).count)"} .drive(label1.rx.text).disposed(by: disposeBag) result.map{"\($0)"}.drive(label2.rx.text).disposed(by: disposeBag)复制代码
-
asDriver()
将序列转换为Driver
序列 -
map
重新组合并生成新的序列 -
driver
将元素在主线程中绑定到label1
和label2
上
相比非 driver
下的代码实现, Driver
序列省去了线程的设置, share
数据共享设置。
源码探索
断点查看 asDriver()
方法:
extension ControlProperty { /// Converts `ControlProperty` to `Driver` trait. /// /// `ControlProperty` already can't fail, so no special case needs to be handled. public func asDriver() -> Driver{ return self.asDriver { _ -> Driverin #if DEBUG rxFatalError("Somehow driver received error from a source that shouldn't fail.") #else return Driver.empty() #endif } } }复制代码
是 ControlProperty
的扩展方法,返回了一个 Driver
类, Driver
是 SharedSequence
的别名,用来描述不同类型的序列,最后又调用了 asDriver
方法,而该方法在 ObservableConvertibleType
的扩展中,一直追踪会发现很多类都是继承自 ObservableConvertibleType
下。
extension ObservableConvertibleType { public func asDriver(onErrorRecover: @escaping (_ error: Swift.Error) -> Driver) -> Driver{let source = self .asObservable() .observeOn(DriverSharingStrategy.scheduler) .catchError { error inonErrorRecover(error).asObservable() }return Driver(source) } }复制代码
如上代码也设置了 observerOn
方法,来指定线程,继续深入能够发现 DriverSharingStrategy.scheduler
内部指定的就是主线程,印证了上面所说的 Driver
的执行是在主线程的。下面再看flatMap方法的实现:
public func flatMap(_ selector: @escaping (E) throws -> O) -> Observable{return FlatMap(source: self.asObservable(), selector: selector) }复制代码
业务层实现闭包,在闭包中调用了网络请求方法,并向 FlatMap
中传入业务层实现的闭包…… 当前篇幅过程,源码分析需要另起篇幅。
六、Signal
与 Driver
相似,不会产生 error
事件,在主线程执行,但不会像 Driver
一样会给新观察者发送上一次发送的元素。
使用如下:
let event : Driver= button.rx.tap.asDriver() event.drive(onNext: {print("yahibo") event.drive(onNext: {print("yahibo1") }).disposed(by: self.disposeBag) }).disposed(by: disposeBag)复制代码
运行打印,发现在点击后重新订阅的观察者,会直接收到点击事件,这是我们业务不允许的。下面再看 Signal
序列:
let event : Signal= button.rx.tap.asSignal() event.emit(onNext: {print("yahibo") event.emit(onNext: {print("yahibo1") }).disposed(by: self.disposeBag) }).disposed(by: disposeBag)复制代码
运行结果,每一个新序列都会在点击后触发。
七、ControlEvent
专门用于描述 UI
控件所产生的事件,不会产生 error
事件,在主线程中监听。代码如下:
1、监听点击事件
let event : ControlEvent= button.rx.tap.asControlEvent() event.bind(onNext: {print("controllerEvent") }).disposed(by: disposeBag)复制代码
2、监听点击事件并绑定数据到其他UI
let event : ControlEvent= button.rx.tap.asControlEvent() event.map{"yahibo"}.bind(to: label1.rx.text).disposed(by: disposeBag)复制代码
总结:
以上序列都是基于 Observable
的,是对其更高层的封装,针对不同应用场景设计,简化不同场景下序列的使用流程。