RxSwift高阶函数skip解读

skip 的作用:跳过 Observable 中头 n 个元素,只关注后面的元素。

skip 的简单使用:

Observable.of(1, 2, 3, 4, 5, 6)
        .skip(2)
        .subscribe(onNext: { print($0) })
        .disposed(by: disposeBag)

跳过前n个,输出剩余的元素:3 4 5 6复制代码

虽然这里主要是研究 skip 函数,但是调用者是 of 函数的返回值,所以 of 函数也不能省掉。

先看 of 函数的实现:

public static func of(_ elements: Element ..., scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable{return ObservableSequence(elements: elements, scheduler: scheduler)
}复制代码

这里只是传入了几个元素和默认的调度者,创建了一个 ObservableSequence 序列的实例,它就是 skip 的调用者。

skip 函数的实现:

public func skip(_ count: Int) -> Observable{return SkipCount(source: self.asObservable(), count: count)
}复制代码

SkipCount 保存了调用者 ObservableSequence 序列和需要跳过的 count

final private class SkipCount: Producer{let source: Observablelet count: Int
    
    init(source: Observable, count: Int) {
        self.source = sourceself.count = count
    }
    
    override func run(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {let sink = SkipCountSink(parent: self, observer: observer, cancel: cancel)let subscription = self.source.subscribe(sink)return (sink: sink, subscription: subscription)
    }
}复制代码

然后就开始了 subscribe() 订阅信号。

此时, SkipCount 就会调用父类 Producer 中的 subscribe 函数,这个函数已经是老朋友了,它会根据所在的线程分别调用子类的 run ,并创建一个销毁者 SinkDisposer ,把订阅信号时创建的 匿名观察者 和这个销毁者 SinkDisposer 一起通过 run 函数来传参过去。然后在子类 SkipCountrun 中让源序列( self.source == ObservableSequence )去订阅信号,并把 携带了 匿名观察者SkipCountSink 传了过去。

下一步动动脚指头也知道,轮到 ObservableSequence 去父类的 subscribe 请安了,都是亲兄弟,也必然会像 SkipCount 一样回到 run 函数中:

final private class ObservableSequence: Producer{
    override func run(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)let subscription = sink.run()return (sink: sink, subscription: subscription)
    }
}复制代码

ObservableSequence 中的 run 函数的实现就和 RxSwift核心逻辑 中的比较吻合了。都是创建了一个 Sink 的子类 ObservableSequenceSink ,然后调用 run 需要注意的是: ObservableSequenceSink 初始化用的 observerSkipCountSink

ObservableSequenceSinkrun

final private class ObservableSequenceSink: Sinkwhere Sequence.Element == Observer.Element {
    typealias Parent = ObservableSequencefunc run() -> Disposable {return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse invar mutableIterator = iteratorif let next = mutableIterator.next() {
                self.forwardOn(.next(next))
                recurse(mutableIterator)
            }else {
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }
}复制代码

这里 _parent 的调度者会带你去它家一顿唠嗑,最后还是会递归回调 action闭包 。这闭包里的代码不难看出,会去父类 SinkforwardOn ,然后就是 Sink._observer.on(event) ,这个 _observer 不就是我们刚刚重点加粗的那个 SkipCountSink 么!

skip 的套路近在眼前:

final private class SkipCountSink: Sink, ObserverType {
    typealias Element = Observer.Element 
    typealias Parent = SkipCountlet parent: Parent
    var remaining: Int
    
    init(parent: Parent, observer: Observer, cancel: Cancelable) {
        self.parent = parent
        self.remaining = parent.count
        super.init(observer: observer, cancel: cancel)
    }
    
    func on(_ event: Event) {
        switch event {case .next(let value):            if self.remaining <= 0 {
                self.forwardOn(.next(value))
            }else {
                self.remaining -= 1
            }case .error:
            self.forwardOn(event)
            self.dispose()case .completed:
            self.forwardOn(event)
            self.dispose()
        }
    }
}复制代码

soga,原来我们 skip(n) 几次,这里就在 else 中就放空几次。剩余的会正常 self._observer.on(event) ,去回调 subscribe 中的闭包。

skipWhile

skipWhile 的作用:跳过 Observable 中头几个元素,直到元素的判定为否

示例:

Observable.of(1, 2, 3, 4, 5, 6)
        .skipWhile { $0 < 4 }
        .subscribe(onNext: { print($0) })
        .disposed(by: disposeBag)
        
跳过满足{$0 < 4}条件的元素,输出剩余的元素:4 5 6复制代码

同样的套路:

final private class SkipWhileSink: Sink, ObserverType {
    typealias Element = Observer.Element 
    typealias Parent = SkipWhilefileprivate let _parent: Parent
    fileprivate var _running = falsefunc on(_ event: Event) {
        switch event {case .next(let value):if !self._running {do {
                    self._running = try !self._parent._predicate(value)
                } catch let e {
                    self.forwardOn(.error(e))
                    self.dispose()return}
            }if self._running {
                self.forwardOn(.next(value))
            }case .error, .completed:
            self.forwardOn(event)
            self.dispose()
        }
    }
}复制代码

关键部分还是 on 函数中的实现, .next 下,用 SkipWhile 保存的闭包 _predicate 来判断当前元素是否满足条件。直到 self._running 标记为 true 后才会执行 forwardOn 去响应订阅。