Flink原理|Flink Timer注册与Watermark触发
在Flink中无论是WindowOperator还是KeyedProcessOperator都持有InternalTimerService具体实现的对象,通过此对象用户可以注册EventTime及ProcessTime的Timer,当Watermark越过这些Timer的时候,调用回调函数执行一定的操作。这里着重看下KeyedProcessOperator(WindowOperator机制大致相同,这里就不再细说)。
当StreamTask被调度执行的时候,具体生命周期如 * -- invoke() * | * +----> Create basic utils (config, etc) and load the chain of operators * +----> operators.setup() * +----> task specific init() * +----> initialize-operator-states() * +----> open-operators() * +----> run() * +----> close-operators() * +----> dispose-operators() * +----> common cleanup * +----> task specific cleanup()
在 KeyedProcessOperator的open方法将在StreamTask open-operators()阶段被调用:
@Override public void open() throws Exception { super.open(); collector = new TimestampedCollector(output); //为该Operator构造InternalTimerService并启动,通过该InternalTimerService可以访问时间 InternalTimerService internalTimerService = getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this); TimerService timerService = new SimpleTimerService(internalTimerService); context = new ContextImpl(userFunction, timerService); onTimerContext = new OnTimerContextImpl(userFunction, timerService); }
然后StreamTask调用run()启动计算:
@Override protected void run() throws Exception { // cache processor reference on the stack, to make the code more JIT friendly //在run方法中通过inputProcessor来从input gate里面读取消息,消息可以是正常的数据,也可以是watermark final StreamInputProcessor inputProcessor = this.inputProcessor; while (running && inputProcessor.processInput()) { // all the work happens in the "processInput" method } // the input is finished, notify non-head operators if (running) { synchronized (getCheckpointLock()) { OneInputStreamOperator headOperator = getHeadOperator(); for (StreamOperator operator : operatorChain.getAllOperatorsTopologySorted()) { if (operator.getOperatorID().equals(headOperator.getOperatorID())) { continue; } Preconditions.checkState(operator instanceof OneInputStreamOperator); ((OneInputStreamOperator) operator).endInput(); } } } }
在StreamInputProcessor的processInput()方法中
else { // now we can do the actual processing StreamRecord record = recordOrMark.asRecord(); synchronized (lock) { numRecordsIn.inc(); streamOperator.setKeyContextElement1(record); //正常数据处理,最终会调用用户实现的userfunction的processElement,对于KeyedProcessOperator就是调用用户定义keyedProcessFunction的processElement streamOperator.processElement(record); } } return true; } else if (recordOrMark.isWatermark()) { // handle watermark statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);//处理watermark continue; } else if (recordOrMark.isStreamStatus()) {
下面就看下Watermark的处理过程,最终会调用到AbstractStreamOperator的processWatermark方法:
public void processWatermark(Watermark mark) throws Exception { if (timeServiceManager != null) { timeServiceManager.advanceWatermark(mark);//第一步处理watermark } output.emitWatermark(mark);//第二步,将watermark发送到下游 }
那么是怎么处理Watermark的呢?接着看InternalTimeServiceManager的advanceWatermark方法:
public void advanceWatermark(Watermark watermark) throws Exception { //这里之前调用getInternalTimerService构建的的InternalTimerService都要处理该Watermark for (HeapInternalTimerService service : timerServices.values()) { service.advanceWatermark(watermark.getTimestamp()); } }
接着看HeapInternalTimerService我们可以发现,这里逻辑Timer时间小于Watermark的都应该被触发回调:
public void advanceWatermark(long time) throws Exception { currentWatermark = time;//更新当前watermark InternalTimer timer; //取出所有低于Watermark的Timer触发回调。 while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) { Set<InternalTimer> timerSet = getEventTimeTimerSetForTimer(timer); timerSet.remove(timer); eventTimeTimersQueue.remove(); keyContext.setCurrentKey(timer.getKey()); triggerTarget.onEventTime(timer);//这里的triggerTarget就是具体的operator对象 } }
这里triggerTarget就是具体的operator实例,在open的时候通过InternalTimeServiceManager的getInternalTimerService方法传递到HeapInternalTimerService。
接着看KeyedProcessOperator的onEventTime,这里就是调用用户实现的KeyedProcessFunction的onTimer做一些具体的事情。对于Window来说也是调用onEventTime或者onProcessTime来从key和window对应的状态中的数据发送到WindowFunction中去计算并发送到下游节点:
@Override public void onEventTime(InternalTimer timer) throws Exception { collector.setAbsoluteTimestamp(timer.getTimestamp()); invokeUserFunction(TimeDomain.EVENT_TIME, timer); } private void invokeUserFunction( TimeDomain timeDomain, InternalTimer timer) throws Exception { onTimerContext.timeDomain = timeDomain; onTimerContext.timer = timer; userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);//这里就是前面用户实现的onTimer方法 onTimerContext.timeDomain = null; onTimerContext.timer = null; }
前面讲的是Watermark是怎么被触发的,但是还有另外一个问题,Timer是如何注册的? W indowOperator和KeyedProcessOperator直接或者间接持有timerService,通过timerService对象就可以注册相应的timer。
/** * Interface for working with time and timers. */ @PublicEvolving public interface TimerService { /** Returns the current processing time. */ long currentProcessingTime(); /** Returns the current event-time watermark. */ long currentWatermark(); /** * Registers a timer to be fired when processing time passes the given time. * *Timers can internally be scoped to keys and/or windows. When you set a timer * in a keyed context, such as in an operation on * {@link org.apache.flink.streaming.api.datastream.KeyedStream} then that context * will also be active when you receive the timer notification. */ void registerProcessingTimeTimer(long time); /** * Registers a timer to be fired when the event time watermark passes the given time. * *
Timers can internally be scoped to keys and/or windows. When you set a timer * in a keyed context, such as in an operation on * {@link org.apache.flink.streaming.api.datastream.KeyedStream} then that context * will also be active when you receive the timer notification. */ void registerEventTimeTimer(long time); }
对于KeyedProcessOperator来说会将timeService对象间接的传递到KeyedProcessFunction,使用户在函数层面就能注册和访问Timer。这里需要注意的有两点:
1.Namespace相同的情况下,每一个key只有1个Timer。
2.如果TimeCharacteristic为processTime,当需要注册timer时间小于当前系统处理时间会立即触发回调。
大家工作学习遇到HBase技术问题,把问题发布到HBase技术社区论坛 http://hbase.group ,欢迎大家论坛上面提问留言讨论。想了解更多HBase技术关注HBase技术社区公众号(微信号: hbasegroup ),非常欢迎大家积极投稿。