RxSwift特征序列

任何序列都可以用 Observable
描述,创建序列 -> 订阅序列 -> 信号发送 -> 信号接收。

Observable.create { (observer) -> Disposable inobserver.onNext("信号1")return Disposables.create()
}.subscribe(onNext: { (val) inprint("信号接收区:\(val)")
}).disposed(by: disposeBag)复制代码

Observable
是通用序列的描述符,调用 .onNext
.onError
onCompleted
来发送信号,通用性强,但针对特殊需求可能会觉得繁琐,因此 RxSwift
还提供了一组特征序列,是 Observable
序列的变种,它能够帮助我们更准确的描述序列。即 Single
Completable
Maybe
Driver
Signal
ControlEvent

二、Single

1、定义

单元素序列,信号只发送一次,响应信号或错误信号。

Single.create { (single) -> Disposable insingle(.success("假装我是一个正儿八经的数据"))
            //single(.error(NSError.init(domain: "网络出现错误", code: 101, userInfo:["name":"hibo"])))return Disposables.create()
        }.subscribe(onSuccess: { (val) inprint("Single:\(val)")
        }) { (error) inprint("SingleError:\(error)")
        }.disposed(by: disposeBag)复制代码
  • sinngle(.success(data))
    -> onSuccess
    发送响应元素到成功观察者

  • sinngle(.error(error))
    -> error
    发送错误元素到错误观察者

响应元素和错误元素分开处理,此时我们可以联想到应用中的网络请求,成功数据用来渲染,错误数则据弹出提示框。

2、源码探索

2.1、Single定义

/// Sequence containing exactly 1 element
public enum SingleTrait { }
/// Represents a push style sequence containing 1 element.
public typealias Single= PrimitiveSequencepublic enum SingleEvent{
    /// One and only sequence element is produced. (underlying observable sequence emits: `.next(Element)`, `.completed`)case success(Element)
    
    /// Sequence terminated with an error. (underlying observable sequence emits: `.error(Error)`)case error(Swift.Error)
}复制代码

定位到 Single.swift
文件,首先能看到 Single
PrimitiveSequence
结构体类型的别名, SingleEvent
是事件枚举,有 success
error
两个成员变量。

2.2、 create
创建序列。代码如下(此处代码标记为:one:):

extension PrimitiveSequenceType where TraitType == SingleTrait {
    public typealias SingleObserver = (SingleEvent) -> Void
          //代码省略若干行
    public static func create(subscribe: @escaping (@escaping SingleObserver) -> Disposable) -> Single{let source = Observable.create { observer inreturn subscribe { event inswitch event {case .success(let element):
                    observer.on(.next(element))
                    observer.on(.completed)case .error(let error):
                    observer.on(.error(error))
                }
            }
        }        return PrimitiveSequence(raw: source)
    }
}复制代码

首先看参数是一个带 Disposable
类型返回值的闭包,交由外部(业务层)实现,内部调用向外传入一个 SingleObserver
闭包,以上写法不太好理解,我们可以换一种写法:

public static func create(subscribe: @escaping (@escaping SingleObserver) -> Disposable) -> Single{let source = Observable.create { observer in// 1、内部实现一个闭包,用来接收外界传入的SingleEvent信号,接着做进一步的信号发送let block = { (event:SingleEvent) -> Void inswitch event {case .success(let element):
                observer.on(.next(element))
                observer.on(.completed)case .error(let error):
                observer.on(.error(error))
            }
        }
        // 2、调用外部实现的闭包方法,向外部发送内部实现的闭包方法做连接作用let disposable = subscribe(block)//3、返回值Disposable对象 return disposable
    }return PrimitiveSequence(raw: source)//4、创建PrimitiveSequence对象并保存Observable序列对象
}复制代码
  • 内部实现一个闭包 block
    ,用来接收外界传入的 SingleEvent
    信号,接着做进一步的信号发送

  • 调用外部实现的闭包方法,将内部实现的闭包 block
    发送出去,起连接作用

  • 创建 PrimitiveSequence
    对象并保存 Observable
    序列对象 source
    ,返回 PrimitiveSequence
    对象

create
方法内部实际上实现了一个 Observable
序列,由此可见 Single
序列是对 Observable
序列的封装, Disposable
对象通过闭包交由业务层创建, Single
序列在实现上,方式方法与 Observable
保持一致,此处可称一绝。当前我们只探索 Single
的信号是如何完成传递, Observable
序列的信号传递流程在 《Swift核心源码探索》
中有详细介绍。
2.3、订阅序列

也是在同 PrimitiveSequenceType
扩展中定义,代码如下(此处代码标记为:two:):

public func subscribe(onSuccess: ((ElementType) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil) -> Disposable {#if DEBUG let callStack = Hooks.recordCallStackOnError ? Thread.callStackSymbols : []#elselet callStack = [String]()#endifreturn self.primitiveSequence.subscribe { event inswitch event {case .success(let element):
            onSuccess?(element)case .error(let error):if let onError = onError {
                onError(error)
            } else {
                Hooks.defaultErrorHandler(callStack, error)
            }
        }
    }
}复制代码

方法中先调用了 self.primitiveSequence
方法,返回了 self
,方法是在遵循 PrimitiveSequenceType
协议的 PrimitiveSequence
的扩展中,为了保证协议的一致性。代码如下:

extension PrimitiveSequence: PrimitiveSequenceType {
    /// Additional constraints
    public typealias TraitType = Trait
    /// Sequence element typepublic typealias ElementType = Element

    // Converts `self` to primitive sequence.
    ///
    /// - returns: Observable sequence that represents `self`.
    public var primitiveSequence: PrimitiveSequence{return self
    }
}复制代码

紧接着调用另一个 subscribe
方法,代码如下(此处代码标记为:three:):

public func subscribe(_ observer: @escaping (SingleEvent) -> Void) -> Disposable {
    var stopped = falsereturn self.primitiveSequence.asObservable().subscribe { event inif stopped { return }
        stopped = true
        switch event {case .next(let element):
            observer(.success(element))case .error(let error):
            observer(.error(error))case .completed:
            rxFatalErrorInDebug("Singles can't emit a completion event")
        }
    }
}复制代码
  • self.primitiveSequence -> asObservable() -> subscribe

  • 此处截断了 completed
    信号的向上传递,因此 Single
    序列只能收到响应信号和错误信号

该段代码也调用了 self.primitiveSequence
方法,接着调用 asObservable()
方法,查看代码发现此处是为了获取 source
对象,即 Observable
可观察序列。

再查看 subscribe
的方法(此处标记为代码:four:):

public func subscribe(_ on: @escaping (Event) -> Void)
    -> Disposable {let observer = AnonymousObserver { e inon(e)
        }return self.asObservable().subscribe(observer)
}复制代码
  • 代码创建了一个观察者,当前观察者将会收到发送过来的消息,并由此通过闭包一层层传到业务层。 :four: -> :three: -> :two: -> :one: ->业务层
  • 当前self指向的是:one:处创建并保存的 Observable
    类型的 source
    对象,因此该处 subscribe
    所调用的即是 Produce
    类中的 subscribe
    方法,在方法内部创建了 sink
    对象,来触发创建序列时实现的闭包,即代码:one:处所 create
    后的闭包

  • 此时就到了业务层,通过create内部实现的闭包 single
    向内部发送消息,再有 observer
    调用 on
    来向观察者发送信号

  • 信号发送不做赘述,最终会到达:four:处代码的观察者,此时再由闭包一层层向上传递,直到业务层的监听闭包

总结:

序列的产生,订阅,发送,接收还是由 Observable
来实现的, Single
只是对 Observable
做了封装,去除了 onCompleted
的消息监听及消息发送。

具体的Observable序列产生到观察流程见 《Swift核心源码探索》

三、Completable

只能产生 completed
事件和 error
事件,没有序列元素值产生。

Completable.create { (completable) -> Disposable incompletable(.completed)
    //completable(.error(NSError.init(domain: "出现异常", code: 101, userInfo: nil)))return Disposables.create()
}.subscribe(onCompleted: {print("Completable")
}) { (error) inprint(error)
}.disposed(by: disposeBag)复制代码
  • 应用场景,只关心任务是否完成,不关心不需要结果
  • Competable.swift
    下,在 PrimitiveSequenceType
    扩展中实现了序列的创建,订阅,即信号转发

定义如下:

/// Sequence containing 0 elements
public enum CompletableTrait { }
/// Represents a push style sequence containing 0 elements.
public typealias Completable = PrimitiveSequencepublic enum CompletableEvent {
    /// Sequence terminated with an error. (underlying observable sequence emits: `.error(Error)`)case error(Swift.Error)
    
    /// Sequence completed successfully.case completed
}复制代码

同样 Completable
类也是 PrimitiveSequence
的别名,并声明一个枚举包含, error
completed
成员变量,限定了事件产生类型。都是对 Observable
序列的封装,源码此处不做探索说明,和 Single
一致,只是在订阅阶段对 .next
事件做了拦截。

四、Maybe

Single
序列相似,发出一个元素或一个 completed
事件或 error
事件。

Maybe.create { (maybe) -> Disposable inmaybe(.success("element"))
        //maybe(.completed)
        //maybe(.error(NSError.init(domain: "出现异常", code: 101, userInfo: nil)))return Disposables.create()
    }.subscribe(onSuccess: { (val) inprint(val)
    }, onError: { (error) inprint("error:\(error)")
    }) {print("completed")
    }.disposed(by: disposeBag)复制代码

在开发中,如果一个业务有时候需要一个元素,有时候只需要知道处理完成的时候,可以使用该 Maybe
,解决不确定需求问题。源码探索略,同上。

五、Driver

老司机开车永远不会出错,因此 Driver
序列不会产生 error
事件,并一定在主线程中监听,会想新订阅者发送,上次发送出的元素,主要为简化 UI
层的代码。下面看看为什么会有 Driver
序列。
有一个需求:

  • 搜索框中每次输入一个文本,获取一次网络请求,成功后渲染 UI

先实现一个简单的 UI

let tf = UITextField.init(frame: CGRect(x: 100, y: 100, width: 200, height: 40))
tf.borderStyle = .roundedRect
tf.placeholder = "请输入"self.view.addSubview(tf)let label1 = UILabel.init(frame: CGRect(x: 100, y: 160, width: 200, height: 40))
label1.backgroundColor = .groupTableViewBackground
label1.textAlignment = .center
self.view.addSubview(label1)let label2 = UILabel.init(frame: CGRect(x: 100, y: 210, width: 200, height: 40))
label2.backgroundColor = .groupTableViewBackground
label2.textAlignment = .center
self.view.addSubview(label2)复制代码

创建了一个 textfield
,两个 label
用来展示。下面在来实现一个网络请求,返回一个 Single
序列:

func network(text:String) -> Single{return Single.create(subscribe: { (single) -> Disposable inif text == "1234"{
            single(.error(NSError.init(domain: "出现错误", code: 101, userInfo: nil)))
        }
        DispatchQueue.global().async {print("请求网络")
            single(.success(text))
        }return Disposables.create()
    })
}复制代码

网络请求为耗时操作,因此我们在异步中来完成,直接发送序列,假装我们请求了一次网络。

再实现 textfield
输入序列的监听,并调取网络请求方法:

let result = tf.rx.text.orEmpty.skip(1)
                .flatMap{return self.network(text: $0)
                        .observeOn(MainScheduler.instance)
                        .catchErrorJustReturn("网络请求失败")
            }.share(replay: 1, scope: .whileConnected)
//网络请求将发送多次请求
result.subscribe(onNext: { (val) inprint("订阅一:\(val) 线程:\(Thread.current)")
}).disposed(by: disposeBag)

result.subscribe(onNext: { (val) inprint("订阅二:\(val) 线程:\(Thread.current)")
}).disposed(by: disposeBag)

result.map{"\(($0 as! String).count)"}.bind(to: label1.rx.text).disposed(by: disposeBag)
result.map{"\($0)"}.bind(to: label2.rx.text).disposed(by: disposeBag)复制代码

代码介绍

  • flatMap

  • observeOn
    选择在哪个线程执行

  • catchErrorJustReturn
    错误处理,将 onError
    事件转为 onNext
    事件

  • share
    为多个观察者共享资源,网络请求只发送呢一次,否则多个订阅将会触发多个请求

Driver实现:

let result = tf.rx.text.orEmpty
    .asDriver()
    .flatMap {return self.network(text: $0).asDriver(onErrorJustReturn: "网络请求失败")
    }
result.map{"长度:\(($0 as! String).count)"}
        .drive(label1.rx.text).disposed(by: disposeBag)
result.map{"\($0)"}.drive(label2.rx.text).disposed(by: disposeBag)复制代码
  • asDriver()
    将序列转换为 Driver
    序列

  • map
    重新组合并生成新的序列

  • driver
    将元素在主线程中绑定到 label1
    label2

相比非 driver
下的代码实现, Driver
序列省去了线程的设置, share
数据共享设置。

源码探索

断点查看 asDriver()
方法:

extension ControlProperty {
    /// Converts `ControlProperty` to `Driver` trait.
    ///
    /// `ControlProperty` already can't fail, so no special case needs to be handled.
    public func asDriver() -> Driver{
        return self.asDriver { _ -> Driverin
            #if DEBUG
                rxFatalError("Somehow driver received error from a source that shouldn't fail.")
            #else
                return Driver.empty()
            #endif
        }
    }
}复制代码

ControlProperty
的扩展方法,返回了一个 Driver

类, Driver
SharedSequence
的别名,用来描述不同类型的序列,最后又调用了 asDriver
方法,而该方法在 ObservableConvertibleType
的扩展中,一直追踪会发现很多类都是继承自 ObservableConvertibleType
下。

extension ObservableConvertibleType {
    public func asDriver(onErrorRecover: @escaping (_ error: Swift.Error) -> Driver) -> Driver{let source = self
            .asObservable()
            .observeOn(DriverSharingStrategy.scheduler)
            .catchError { error inonErrorRecover(error).asObservable()
            }return Driver(source)
    }
}复制代码

如上代码也设置了 observerOn
方法,来指定线程,继续深入能够发现 DriverSharingStrategy.scheduler
内部指定的就是主线程,印证了上面所说的 Driver
的执行是在主线程的。下面再看flatMap方法的实现:

public func flatMap(_ selector: @escaping (E) throws -> O)
    -> Observable{return FlatMap(source: self.asObservable(), selector: selector)
}复制代码

业务层实现闭包,在闭包中调用了网络请求方法,并向 FlatMap
中传入业务层实现的闭包…… 当前篇幅过程,源码分析需要另起篇幅。

六、Signal

Driver
相似,不会产生 error
事件,在主线程执行,但不会像 Driver
一样会给新观察者发送上一次发送的元素。

使用如下:

let event : Driver= button.rx.tap.asDriver()
event.drive(onNext: {print("yahibo")
    event.drive(onNext: {print("yahibo1")
    }).disposed(by: self.disposeBag)
}).disposed(by: disposeBag)复制代码

运行打印,发现在点击后重新订阅的观察者,会直接收到点击事件,这是我们业务不允许的。下面再看 Signal
序列:

let event : Signal= button.rx.tap.asSignal()
event.emit(onNext: {print("yahibo")
    event.emit(onNext: {print("yahibo1")
    }).disposed(by: self.disposeBag)
}).disposed(by: disposeBag)复制代码

运行结果,每一个新序列都会在点击后触发。

七、ControlEvent

专门用于描述 UI
控件所产生的事件,不会产生 error
事件,在主线程中监听。代码如下:

1、监听点击事件

let event : ControlEvent= button.rx.tap.asControlEvent()
event.bind(onNext: {print("controllerEvent")
}).disposed(by: disposeBag)复制代码

2、监听点击事件并绑定数据到其他UI

let event : ControlEvent= button.rx.tap.asControlEvent()
event.map{"yahibo"}.bind(to: label1.rx.text).disposed(by: disposeBag)复制代码

总结:

以上序列都是基于 Observable
的,是对其更高层的封装,针对不同应用场景设计,简化不同场景下序列的使用流程。