RxSwift-Subject即攻也守
在掌握前面序列以还有观察者的前提下,我们今天来看一个非常特殊的类型- Subject
.为什么说它特殊呢?原因很简单:** Subject
既可以做序列,也可以做观察者!**正是因为这一特性,所以在实际开发中被大量运用。下面我们一起来解读一下这个特殊的 Subject
即攻也守的原理
首先我们来看看: SubjectType
的原理!
public protocol SubjectType : ObservableType { // 关联了观察者类型,具备这个类型的能力 associatedtype SubjectObserverType : ObserverType func asObserver() -> SubjectObserverType } 复制代码
-
SubjectType
首先就是继承了ObservableType
,具有序列特性 - 关联了观察者类型,具备这个类型的能力
- 下面我们通过一个具体类型来感受一下
subject
// 1:初始化序列 let publishSub = PublishSubject() // 2:发送响应序列 publishSub.onNext(1) // 3:订阅序列 publishSub.subscribe { print("订阅到了:",$0)} .disposed(by: disposbag) // 再次发送响应 publishSub.onNext(2) publishSub.onNext(3) 复制代码
- 很明显能够订阅信号(序列最基本的能力)
- 能够发送响应,又是观察者的能力
- 查看底层源码分析
订阅响应流程
public override func subscribe -> Disposable { self._lock.lock() let subscription = self._synchronized_subscribe(observer) self._lock.unlock() return subscription } func _synchronized_subscribe -> Disposable { // 省略不必要的代码 let key = self._observers.insert(observer.on) return SubscriptionDisposable(owner: self, key: key) } 复制代码
-
self._observers.insert(observer.on):
通过一个集合添加进去所有的订阅事件,很明显在合适的地方一次性全部执行 - 其中也返回这次订阅的销毁者,方便执行善后工作:
synchronizedUnsubscribe
->self._observers.removeKey(disposeKey)
mutating func removeKey(_ key: BagKey) -> T? { if _key0 == key { _key0 = nil let value = _value0! _value0 = nil return value } if let existingObject = _dictionary?.removeValue(forKey: key) { return existingObject } for i in 0 ..< _pairs.count where _pairs[i].key == key { let value = _pairs[i].value _pairs.remove(at: i) return value } return nil } 复制代码
- 便利通过
key
获取响应bag
中的value,执行集合移除 - 因为没有相应持有关系,达到自动释放销毁
发送信号流程
public func on(_ event: Event) { dispatch(self._synchronized_on(event), event) } 复制代码
- 这个地方估计大家看起来麻烦恶心一点,但是你用心看不难体会
- 这里主要调用了
dispatch
函数,传了两个参数:self._synchronized_on(event)
和event
- 查看
dispatch
函数源码
func dispatch(_ bag: Bag) { bag._value0?(event) if bag._onlyFastPath { return } let pairs = bag._pairs for i in 0 ..< pairs.count { pairs[i].value(event) } if let dictionary = bag._dictionary { for element in dictionary.values { element(event) } } } 复制代码
-
bag._value0?(event)
首先执行事件的回调 - 判断
bag._onlyFastPath
的情况,默认会开启快速通道! - 如果是开启慢速通道,需要从刚刚添加进
bag
包裹里面的匹配对挨个进行pairs[i].value(event)
,外界事件回调,然后拿回外界封装的闭包的闭包调用:element(event)
func _synchronized_on(_ event: Event) -> Observers { self._lock.lock(); defer { self._lock.unlock() } switch event { case .next: if self._isDisposed || self._stopped { return Observers() } return self._observers case .completed, .error: if self._stoppedEvent == nil { self._stoppedEvent = event self._stopped = true let observers = self._observers self._observers.removeAll() return observers } return Observers() } } 复制代码
- 这里如果
self._isDisposed || self._stopped
成立就会返回一个空的集合,也就没有序列的响应 - 在
.completed, .error
都会改变状态self._stopped = true
,也就是说序列完成或者错误之后都无法再次响应了 - 在
.completed, .error
还会移除添加在集合里面的内容
其实如果你对前面序列的流程掌握了,这个 subject
的流程也不再话下,只是 subject
把订阅流程和响应流程都内部实现,所以也就没有必要引入 sink
各种Subject
PublishSubject
可以不需要初始来进行初始化(也就是可以为空),并且它只会向订阅者发送在订阅之后才接收到的元素。
// PublishSubject // 1:初始化序列 let publishSub = PublishSubject() //初始化一个PublishSubject 装着Int类型的序列 // 2:发送响应序列 publishSub.onNext(1) // 3:订阅序列 publishSub.subscribe { print("订阅到了:",$0)} .disposed(by: disposbag) // 再次发送响应 publishSub.onNext(2) publishSub.onNext(3) 复制代码
-
信号:1
是无法被订阅的,只接受订阅之后的响应
BehaviorSubject
通过一个默认初始值来创建,当订阅者订阅 BehaviorSubject
时,会收到订阅后 Subject
上一个发出的 Event
,如果还没有收到任何数据,会发出一个 默认值
。之后就和 PublishSubject
一样,正常接收新的事件。
和 publish
稍微不同就是 behavior
这个家伙有个存储功能: 存储上一次的信号
// BehaviorSubject // 1:创建序列 let behaviorSub = BehaviorSubject.init(value: 100) // 2:发送信号 behaviorSub.onNext(2) behaviorSub.onNext(3) // 3:订阅序列 behaviorSub.subscribe{ print("订阅到了:",$0)} .disposed(by: disposbag) // 再次发送 behaviorSub.onNext(4) behaviorSub.onNext(5) // 再次订阅 behaviorSub.subscribe{ print("订阅到了:",$0)} .disposed(by: disposbag) 复制代码
- 当没有信号的时候,会默认发送
信号:100
- 只能储存一个信号:
信号2
会被信号3
覆盖 - 订阅信号之前能够储存信号
// 初始化 public init(value: Element) { self._element = value } // 事件响应 func _synchronized_on(_ event: Event) -> Observers { switch event { case .next(let element): self._element = element case .error, .completed: self._stoppedEvent = event } return self._observers } 复制代码
publish
ReplaySubject
ReplaySubject
发送源 Observable
的所有事件无论 observer
什么时候开始订阅。
// ReplaySubject // 1:创建序列 let replaySub = ReplaySubject.create(bufferSize: 2) // let replaySub = ReplaySubject.createUnbounded() // 2:发送信号 replaySub.onNext(1) replaySub.onNext(2) replaySub.onNext(3) replaySub.onNext(4) // 3:订阅序列 replaySub.subscribe{ print("订阅到了:",$0)} .disposed(by: disposbag) // 再次发送 replaySub.onNext(7) replaySub.onNext(8) replaySub.onNext(9) 复制代码
bufferSize BehaviorSubject
AsyncSubject
AsyncSubject
只发送由源 Observable
发送的最后一个事件,并且只在源 Observable
完成之后。(如果源 Observable
没有发送任何值, AsyncSubject
也不会发送任何值。)
// AsyncSubject // 1:创建序列 let asynSub = AsyncSubject.init() // 2:发送信号 asynSub.onNext(1) asynSub.onNext(2) // 3:订阅序列 asynSub.subscribe{ print("订阅到了:",$0)} .disposed(by: disposbag) // 再次发送 asynSub.onNext(3) asynSub.onNext(4) // asynSub.onError(NSError.init(domain: "lgcooci", code: 10086, userInfo: nil)) asynSub.onCompleted() 复制代码
- 我们普通序列发送回来,都不会响应!直到完成序列响应
func _synchronized_on(_ event: Event) -> (Observers, Event) { switch event { case .next(let element): self._lastElement = element return (Observers(), .completed) case .error: self._stoppedEvent = event let observers = self._observers self._observers.removeAll() return (observers, event) case .completed: let observers = self._observers self._observers.removeAll() if let lastElement = self._lastElement { self._stoppedEvent = .next(lastElement) return (observers, .next(lastElement)) } else { self._stoppedEvent = event return (observers, .completed) } } } 复制代码
- 可以很清晰的看出,普通Next事件都是,元素的替换,根本没有响应出来 *
complete
事件发送到时候,就会把最新保存的self._lastElement
当成事件值传出去,响应.next(lastElement)
- 如果没有保存事件就发送完成事件:
.completed
-
error
事件会移空整个响应集合:self._observers.removeAll()
Variable
Variable
废弃了,这里贴出代码以供大家遇到老版本! 由于这个 Variable
的灵活性所以在开发里面应用非常之多!
// Variable : 5.0已经废弃(BehaviorSubject 替换) - 这里板书 大家可以了解一下 // 1:创建序列 let variableSub = Variable.init(1) // 2:发送信号 variableSub.value = 100 variableSub.value = 10 // 3:订阅信号 }) variableSub.asObservable().subscribe{ print("订阅到了:",$0)} .disposed(by: disposbag) // 再次发送 variableSub.value = 1000 复制代码
BehaviorRelay
Variable behaviorR.accept(20)
let behaviorRelay = BehaviorRelay(value: 100) behaviorRelay.subscribe(onNext: { (num) in print(num) .disposed(by: disposbag) print("打印:\(behaviorRelay.value)") behaviorRelay.accept(1000) 复制代码
Subject
在实际开发中,应用非常的广泛!平时很多时候都会在惆怅选择什么序列更合适,那么聪明的你一定要掌握底层的原理,并不说你背下特色就能真正开发的,因为如果后面一旦发生了 BUG
,你根本无法解决。作为 iOS
中高级发开人员**一定要知其然,而知其所以然!**碌碌无为的应用层开发毕竟走不长远! 就问此时此刻还有谁?45度仰望天空,该死!我这无处安放的魅力!