RxSwift-map源码解析
map
操作符为每一个序列元素提供转换,并返回到原序列。

看一段代码示例:
Observable.of(1,2,3,4,5,6) .subscribe(onNext: { (val) in print(val) }).disposed(by: disposeBag) 复制代码
输出:1,2,3,4,5,6
Observable.of(1,2,3,4,5,6) .map{ $0+10 } .subscribe(onNext: { (val) in print(val) }).disposed(by: disposeBag) 复制代码
输出:11,12,13,14,15,16
of map
那么 map
是如何给序列重新设置新值的呢?闭包就像加工零件的数控机床,设定好加工程序 $0+10
就会对 of
中的每一个元素加工产出新的零件,看一下 map
源码都做了哪些事情:
extension ObservableType { public func map(_ transform: @escaping (E) throws -> R) -> Observable { return self.asObservable().composeMap(transform) } } 复制代码
transform asObservable()
首先看到 map
函数是一个带闭包参数的 ObservableType
的扩展函数,内部调用了 composeMap
并传入了外部的闭包以便内部调用。
由前边的源码探索经验可猜测,该处闭包会被保留在内部,在订阅时被使用,那么根据断点一步步探索,看看外界的闭包最终会保留在何处。 composeMap
所在类:
public class Observable : ObservableType { /// Type of elements in sequence. public typealias E = Element // 此处代码有省略 internal func composeMap(_ transform: @escaping (Element) throws -> R) -> Observable { return _map(source: self, transform: transform) } } 复制代码
-
source
向_map
函数传入了self
即为当前的序列对象 -
transform
一路追踪的外部闭包
是 ObservableType
的子类 Observable
实现了 composeMap
方法,返回 Observable
类型的对象,在内部调用了 _map
方法:
internal func _map(source: Observable, transform: @escaping (Element) throws -> R) -> Observable { return Map(source: source, transform: transform) } 复制代码
还是向 Map
内部传入序列,及业务层闭包,一直强调序列和业务层闭包,主要由于结构复杂,以免被遗忘,后续和订阅难以被联系在一起。继续查看 Map
类:
final private class Map: Producer { typealias Transform = (SourceType) throws -> ResultType private let _source: Observable private let _transform: Transform init(source: Observable, transform: @escaping Transform) { self._source = source self._transform = transform #if TRACE_RESOURCES _ = increment(&_numberOfMapOperators) #endif } override func composeMap(_ selector: @escaping (ResultType) throws -> R) -> Observable { let originalSelector = self._transform return Map(source: self._source, transform: { (s: SourceType) throws -> R in let r: ResultType = try originalSelector(s) return try selector(r) }) } override func run(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == ResultType { let sink = MapSink(transform: self._transform, observer: observer, cancel: cancel) let subscription = self._source.subscribe(sink) return (sink: sink, subscription: subscription) } } 复制代码
- 继承自
Producer
,在 《RxSwift核心源码探索》 中我们已经很熟悉了,继承自Observable
,提供了连接序列和观察者的方法对象sink
,及发送序列元素到观察者,再返回到订阅,这里不再叙述。 -
Map
中保留了源序列及业务层闭包方法 - 此处
run
方法会在父类Producer
类中方法调用,父类指针指向子类对象
继续断点运行就到达了我们的订阅,该处方法和 《RxSwift核心源码探索》 中的订阅方法为同一方法:
extension ObservableType { //业务层订阅调用 public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> Disposable { let disposable: Disposable if let disposed = onDisposed { disposable = Disposables.create(with: disposed) } else { disposable = Disposables.create() } #if DEBUG let synchronizationTracker = SynchronizationTracker() #endif let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : [] let observer = AnonymousObserver { event in #if DEBUG synchronizationTracker.register(synchronizationErrorMessage: .default) defer { synchronizationTracker.unregister() } #endif switch event { case .next(let value): onNext?(value) case .error(let error): if let onError = onError { onError(error) } else { Hooks.defaultErrorHandler(callStack, error) } disposable.dispose() case .completed: onCompleted?() disposable.dispose() } } return Disposables.create( self.asObservable().subscribe(observer), disposable ) } } 复制代码
self.asObservable().subscribe(observer)
此处调用的则是 Producer
中的 subscribe
方法,看一下该处方法:
class Producer : Observable { override init() { super.init() } override func subscribe(_ observer: O) -> Disposable where O.E == Element { if !CurrentThreadScheduler.isScheduleRequired { // The returned disposable needs to release all references once it was disposed. let disposer = SinkDisposer() let sinkAndSubscription = self.run(observer, cancel: disposer) disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription) return disposer } else { return CurrentThreadScheduler.instance.schedule(()) { _ in let disposer = SinkDisposer() let sinkAndSubscription = self.run(observer, cancel: disposer) disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription) return disposer } } } } 复制代码
此方法很熟悉,主要看一下内部 self.run
方法调用,此处继承链和 《RxSwift核心源码探索》 中的继承链不同,继承链如下:
-
RxSwift
核心源码探索中Producer
的子类是AnonymousObservable
,run
方法在此类实现 -
Map
源码中Producer的子类是Map
,run
方法在该处被实现
此处如果不太清楚可以追溯上文查看。上面有 Map
类的完整代码,此处只查看调用方法代码:
override func run(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == ResultType { let sink = MapSink(transform: self._transform, observer: observer, cancel: cancel) let subscription = self._source.subscribe(sink) return (sink: sink, subscription: subscription) } 复制代码
- 调用了
MapSink
方法此处和 《RxSwift核心源码探索》 中的AnnonymousObservableSink
类似 -
self._source
此处为订阅时保存的闭包 -
.subscribe(sink)Producer
类的方法,传入sink
用来调用sink
中的on
方法
类似于 《RxSwift核心源码探索》 中的 Sink
,功能是一样的, MapSink
中保留的是观察者, Map
中保留的为可观察序列 Observable
,通过 Observable
来触发观察者的方法调用。 subscribe
方法中调用的
-
sinkAndSubscription = self.run(observer, cancel: disposer)
final private class ObservableSequence: Producer{ fileprivate let _elements: S fileprivate let _scheduler: ImmediateSchedulerType init(elements: S, scheduler: ImmediateSchedulerType) { self._elements = elements self._scheduler = scheduler } override func run(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E { let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel) let subscription = sink.run() return (sink: sink, subscription: subscription) } } 复制代码
是继承自 Producer
的方法,内部创建了 ObservableSequenceSink
对象并传入了当前 Observable
对象和 observer
对象,最终调用了 run()
方法,此处猜测内部为变量序列并调用观察者闭包方法,向外界发送消息。代码如下:
final private class ObservableSequenceSink: Sink where S.Iterator.Element == O.E { typealias Parent = ObservableSequenceprivate let _parent: Parent init(parent: Parent, observer: O, cancel: Cancelable) { self._parent = parent super.init(observer: observer, cancel: cancel) } func run() -> Disposable { return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in var mutableIterator = iterator if let next = mutableIterator.next() { self.forwardOn(.next(next)) recurse(mutableIterator) } else { self.forwardOn(.completed) self.dispose() } } } } 复制代码
- 注意此类继承自
Sink
,由此可知会调用Sink
中的forwardOn
方法
_elements
是由 of
创建时保留的序列集合,此处对序列元素进行遍历,并调用 forwardOn
方法发送元素。 forwardOn
:
class Sink : Disposable { fileprivate let _observer: O fileprivate let _cancel: Cancelable fileprivate var _disposed = AtomicInt(0) #if DEBUG fileprivate let _synchronizationTracker = SynchronizationTracker() #endif init(observer: O, cancel: Cancelable) { #if TRACE_RESOURCES _ = Resources.incrementTotal() #endif self._observer = observer self._cancel = cancel } final func forwardOn(_ event: Event) { #if DEBUG self._synchronizationTracker.register(synchronizationErrorMessage: .default) defer { self._synchronizationTracker.unregister() } #endif if isFlagSet(&self._disposed, 1) { return } self._observer.on(event) } } 复制代码
-
_observer
是上面出入的MapSink
对象
清楚看到在此处调用了 sink
的 on
方法, self._observer.on(event)
。继续追踪 MapSink
类的 on
方法:
final private class MapSink: Sink, ObserverType { typealias Transform = (SourceType) throws -> ResultType typealias ResultType = O.E typealias Element = SourceType private let _transform: Transform init(transform: @escaping Transform, observer: O, cancel: Cancelable) { self._transform = transform super.init(observer: observer, cancel: cancel) } func on(_ event: Event) { switch event { case .next(let element): do { let mappedElement = try self._transform(element) self.forwardOn(.next(mappedElement)) } catch let e { self.forwardOn(.error(e)) self.dispose() } case .error(let error): self.forwardOn(.error(error)) self.dispose() case .completed: self.forwardOn(.completed) self.dispose() } } } 复制代码
到此处就很熟悉了,此处 on
和 《RxSwift核心源码探索》 中不同:
- 《RxSwift核心源码探索》 中此处有业务层
onNext
来触发 -
Map
中是通过设定好的of
序列直接触发
元素处理代码:
do { let mappedElement = try self._transform(element) self.forwardOn(.next(mappedElement)) } 复制代码
-
let mappedElement = try self._transform(element)
调用外界闭包获取新值 -
self.forwardOn(.next(mappedElement))
通过forwardOn
将新值发送至订阅者
最终会调用 ObserverBase中的on
方法,再调用观察者 observer
的 onCore
方法,向观察者发送元素。在由观察者调用业务层订阅时实现的闭包将序列元素发送到了业务层,到此 map
就完成了对源序列的修改。
总结:
实际上 map
就是对 sink
做了一层封装,根据业务层的 map
设置在 ObservableSequenceSink
中处理了序列元素再发送至 ofrwardOn
直至 Observer
对象,由此完成了对元素的加工处理。
RxSwift
源码比较绕,复杂的逻辑带来的是高效的开发,高效的运行,因此对 RxSwfit
源码我们还需要进一步探索理解。