RxSwift高阶函数skip解读
skip
的作用:跳过 Observable 中头 n 个元素,只关注后面的元素。
skip
的简单使用:
Observable.of(1, 2, 3, 4, 5, 6) .skip(2) .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) 跳过前n个,输出剩余的元素:3 4 5 6复制代码
虽然这里主要是研究 skip
函数,但是调用者是 of
函数的返回值,所以 of
函数也不能省掉。
先看 of
函数的实现:
public static func of(_ elements: Element ..., scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable{return ObservableSequence(elements: elements, scheduler: scheduler) }复制代码
这里只是传入了几个元素和默认的调度者,创建了一个 ObservableSequence
序列的实例,它就是 skip
的调用者。
skip
函数的实现:
public func skip(_ count: Int) -> Observable{return SkipCount(source: self.asObservable(), count: count) }复制代码
SkipCount
保存了调用者 ObservableSequence
序列和需要跳过的 count
。
final private class SkipCount: Producer{let source: Observablelet count: Int init(source: Observable, count: Int) { self.source = sourceself.count = count } override func run(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {let sink = SkipCountSink(parent: self, observer: observer, cancel: cancel)let subscription = self.source.subscribe(sink)return (sink: sink, subscription: subscription) } }复制代码
然后就开始了 subscribe()
订阅信号。
此时, SkipCount
就会调用父类 Producer
中的 subscribe
函数,这个函数已经是老朋友了,它会根据所在的线程分别调用子类的 run
,并创建一个销毁者 SinkDisposer
,把订阅信号时创建的 匿名观察者
和这个销毁者 SinkDisposer
一起通过 run
函数来传参过去。然后在子类 SkipCount
的 run
中让源序列( self.source
== ObservableSequence
)去订阅信号,并把 携带了 匿名观察者
的 SkipCountSink
传了过去。
下一步动动脚指头也知道,轮到 ObservableSequence
去父类的 subscribe
请安了,都是亲兄弟,也必然会像 SkipCount
一样回到 run
函数中:
final private class ObservableSequence: Producer{ override func run(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)let subscription = sink.run()return (sink: sink, subscription: subscription) } }复制代码
ObservableSequence
中的 run
函数的实现就和 RxSwift核心逻辑 中的比较吻合了。都是创建了一个 Sink
的子类 ObservableSequenceSink
,然后调用 run
。 需要注意的是: ObservableSequenceSink
初始化用的 observer
是 SkipCountSink
。
ObservableSequenceSink
的 run
:
final private class ObservableSequenceSink: Sinkwhere Sequence.Element == Observer.Element { typealias Parent = ObservableSequencefunc run() -> Disposable {return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse invar mutableIterator = iteratorif let next = mutableIterator.next() { self.forwardOn(.next(next)) recurse(mutableIterator) }else { self.forwardOn(.completed) self.dispose() } } } }复制代码
这里 _parent
的调度者会带你去它家一顿唠嗑,最后还是会递归回调 action闭包
。这闭包里的代码不难看出,会去父类 Sink
的 forwardOn
,然后就是 Sink._observer.on(event)
,这个 _observer
不就是我们刚刚重点加粗的那个 SkipCountSink
么!
skip
的套路近在眼前:
final private class SkipCountSink: Sink, ObserverType { typealias Element = Observer.Element typealias Parent = SkipCountlet parent: Parent var remaining: Int init(parent: Parent, observer: Observer, cancel: Cancelable) { self.parent = parent self.remaining = parent.count super.init(observer: observer, cancel: cancel) } func on(_ event: Event) { switch event {case .next(let value): if self.remaining <= 0 { self.forwardOn(.next(value)) }else { self.remaining -= 1 }case .error: self.forwardOn(event) self.dispose()case .completed: self.forwardOn(event) self.dispose() } } }复制代码
soga,原来我们 skip(n)
几次,这里就在 else
中就放空几次。剩余的会正常 self._observer.on(event)
,去回调 subscribe
中的闭包。
skipWhile
skipWhile
的作用:跳过 Observable 中头几个元素,直到元素的判定为否
示例:
Observable.of(1, 2, 3, 4, 5, 6) .skipWhile { $0 < 4 } .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) 跳过满足{$0 < 4}条件的元素,输出剩余的元素:4 5 6复制代码
同样的套路:
final private class SkipWhileSink: Sink, ObserverType { typealias Element = Observer.Element typealias Parent = SkipWhilefileprivate let _parent: Parent fileprivate var _running = falsefunc on(_ event: Event) { switch event {case .next(let value):if !self._running {do { self._running = try !self._parent._predicate(value) } catch let e { self.forwardOn(.error(e)) self.dispose()return} }if self._running { self.forwardOn(.next(value)) }case .error, .completed: self.forwardOn(event) self.dispose() } } }复制代码
关键部分还是 on
函数中的实现, .next
下,用 SkipWhile
保存的闭包 _predicate
来判断当前元素是否满足条件。直到 self._running
标记为 true
后才会执行 forwardOn
去响应订阅。