RxSwift-dispose源码解析

任何对象都有生命周期,有创建就要销毁。 OC 中有 initdeallocswiftinitdeinitRxSwift 也不例外, RxSwiftcreatedispose 。下面就看看 dispose 是如何管理序列销毁的。

一篇没有配图的文章是没有灵魂的

通过前面了解了 RxSwift 的两种清除方式:

  • 订阅产生的可清除资源( Disposable )对象,调用 dispose 方法清除
  • 通过清除包 DisposeBag 清除,在作用域结束后被释放,也可以在需要的时候置空释放

无论哪种方式,最终都是调用 dispose() 方法来释放。

下面看一段序列由创建到销毁:

//创建一个序列
let ob = Observable.create { (observer) -> Disposable in
        observer.onNext("msg")
        return Disposables.create{
            print("被销毁")
        }
    }
//订阅序列
let dis = ob.subscribe(onNext: { (val) in
        print(val)
    }){
        print("销毁了")
}
//销毁订阅
dis.dispose()
复制代码

此处代码,我们调用了 dispose 方法来销毁对象,销毁者 dis 其实和创建序列时返回的 Disposables 对象并不是一个对象。

disposable 对象创建代码:

extension Disposables {
    public static func create(with dispose: @escaping () -> Void) -> Cancelable {
        return AnonymousDisposable(disposeAction: dispose)
    }
}
复制代码
AnonymousDisposable
fileprivate final class AnonymousDisposable : DisposeBase, Cancelable {
    public typealias DisposeAction = () -> Void

    private var _isDisposed = AtomicInt(0)
    private var _disposeAction: DisposeAction?

    public var isDisposed: Bool {
        return isFlagSet(&self._isDisposed, 1)
    }

    fileprivate init(disposeAction: @escaping DisposeAction) {
        self._disposeAction = disposeAction
        super.init()
    }
    fileprivate func dispose() {
        if fetchOr(&self._isDisposed, 1) == 0 {
            if let action = self._disposeAction {
                self._disposeAction = nil
                action()
            }
        }
    }
}
复制代码
  • 初始化保存业务层,创建销毁者对象实现的闭包
  • dispose() 为销毁方法,该方法调用后就销毁了传入的闭包对象
  • 判断实列是否被释放,未释放就执行执行对保存闭包的置空操作
  • action() 通知业务层释放监听闭包,通知完成出{}作用域 action 即被释放

再看看订阅者中的销毁者的创建(代码标记为:one:):

public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
    -> Disposable {
        let disposable: Disposable
        
        if let disposed = onDisposed {
            disposable = Disposables.create(with: disposed)
        }
        else {
            disposable = Disposables.create()
        }
        
        #if DEBUG
            let synchronizationTracker = SynchronizationTracker()
        #endif
        
        let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
        
        let observer = AnonymousObserver { event in
            
            #if DEBUG
                synchronizationTracker.register(synchronizationErrorMessage: .default)
                defer { synchronizationTracker.unregister() }
            #endif
            
            switch event {
            case .next(let value):
                onNext?(value)
            case .error(let error):
                if let onError = onError {
                    onError(error)
                }
                else {
                    Hooks.defaultErrorHandler(callStack, error)
                }
                disposable.dispose()
            case .completed:
                onCompleted?()
                disposable.dispose()
            }
        }
        return Disposables.create(
            self.asObservable().subscribe(observer),
            disposable
        )
}
复制代码
  • onDisposed 外部实现的销毁者闭包,实现即传入闭包,否则直接调用 create() 创建,目的是对外发出销毁通知
  • 最后又创建了一个销毁者对象并返回,可在业务层做销毁操作,该销毁对象保存了 self.asObservable().subscribe(observer) 返回的销毁者,和当前方法中创建的销毁者。 what???搞毛啊,这么多销毁者,怕我还不够迷糊吗!!

销毁者 Disposables 有多种扩展,为满足不同需求:

  1. 需要在序列创建处观察销毁情况;
  2. 前面几篇文章有讲到,在调用 onErroronCompleted 方法会销毁我们的序列,在业务层省去了开发人员去销毁序列的步骤;
  3. 根据不同业务需求,需要满足开发人员销毁序列的功能。

因此以上出现了这么多的销毁者,最终销毁者还是被同类销毁者(不同扩展)所管理。看一下最后一个销毁者内部做了哪些事情:

extension Disposables {
    public static func create(_ disposable1: Disposable, _ disposable2: Disposable) -> Cancelable {
        return BinaryDisposable(disposable1, disposable2)
    }
}
复制代码
  • 收集前面所创建的销毁者到 BinaryDisposable 对象中
private final class BinaryDisposable : DisposeBase, Cancelable {

    private var _isDisposed = AtomicInt(0)

    // state
    private var _disposable1: Disposable?
    private var _disposable2: Disposable?

    /// - returns: Was resource disposed.
    var isDisposed: Bool {
        return isFlagSet(&self._isDisposed, 1)
    }

    init(_ disposable1: Disposable, _ disposable2: Disposable) {
        self._disposable1 = disposable1
        self._disposable2 = disposable2
        super.init()
    }

    func dispose() {
        if fetchOr(&self._isDisposed, 1) == 0 {
            self._disposable1?.dispose()
            self._disposable2?.dispose()
            self._disposable1 = nil
            self._disposable2 = nil
        }
    }
}
复制代码
  • 继承了 Disposable 协议,并实现了协议方法 dispose()

该类中实现了 dispose() 方法,该方法即是外部订阅后调用的 dispose() 方法,销毁所有创建序列时产生的销毁者,销毁之前各自掉用各自的 dispose 方法,来销毁外界保留的闭包对象,并向业务层发送销毁通知。

下面看一下代码:one:处 self.asObservable().subscribe(observer) 的销毁者是如何产生的:

class Producer : Observable {
    override init() {
        super.init()
    }

    override func subscribe(_ observer: O) -> Disposable where O.E == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }
}
复制代码

又是我们熟悉的 Producer ,熟悉的 sink ,在该处创建了一个 SinkDisposer 对象,应该又是一个销毁者:

fileprivate final class SinkDisposer: Cancelable {}
复制代码
  • 继承自 Cancelable -> Disposable ,并实现了 dispose() 方法
  • 销毁者去向 run -> AnonymousObservableSink -> Sink
  • Sink 继承自 Disposable ,实现了 dispose() 方法,内部调用了外部传入的销毁者即 SinkDisposer 对象

setSinkAndSubscription 该方法传入了 sink 销毁者和业务层创建序列时创建的销毁者。代码如下:

func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
    self._sink = sink
    self._subscription = subscription

    let previousState = fetchOr(&self._state, DisposeState.sinkAndSubscriptionSet.rawValue)
    if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 {
        rxFatalError("Sink and subscription were already set")
    }

    if (previousState & DisposeState.disposed.rawValue) != 0 {
        sink.dispose()
        subscription.dispose()
        self._sink = nil
        self._subscription = nil
    }
}
复制代码

保存了两个销毁者( Producer 创建的销毁者、业务层创建的销毁者),这个地方其他的没干就是想销毁这两个销毁者:

sink.dispose()
subscription.dispose()
self._sink = nil
self._subscription = nil
复制代码

置空前都调用了 dispose() ,这里面置空销毁者,并向业务层发送了销毁通知。

sink 主要用来连接序列,触发序列闭包,向观察者发送消息。而 sink 清空所有内部销毁者后并置空,序列和订阅者就失去联系。

系统销毁:

  • AnonymousObservableSink -> Sink -> dispose() -> SinkDisposer -> dispose()

外部销毁:

  • BinaryDisposable -> AnonymousObservableSink -> Sink -> dispose() -> SinkDisposer -> dispose()

无论是系统销毁,还是外部调用 dispose 销毁最终所有产生的销毁者都会被销毁释放。到此我们发现我们所销毁的居然是外部对应的监听闭包,内部创建的 Disposable 的子类对象。

sink 连接了序列和订阅者, sink 本身是 Disposable 对象,因此被销毁后断开了序列和订阅者之间的联系。