Flink原理|Flink Timer注册与Watermark触发

在Flink中无论是WindowOperator还是KeyedProcessOperator都持有InternalTimerService具体实现的对象,通过此对象用户可以注册EventTime及ProcessTime的Timer,当Watermark越过这些Timer的时候,调用回调函数执行一定的操作。这里着重看下KeyedProcessOperator(WindowOperator机制大致相同,这里就不再细说)。

当StreamTask被调度执行的时候,具体生命周期如
*  -- invoke()
*        |
*        +----> Create basic utils (config, etc) and load the chain of operators
*        +----> operators.setup()
*        +----> task specific init()
*        +----> initialize-operator-states()
*        +----> open-operators()
*        +----> run()
*        +----> close-operators()
*        +----> dispose-operators()
*        +----> common cleanup
*        +----> task specific cleanup()

KeyedProcessOperator的open方法将在StreamTask open-operators()阶段被调用:

@Override
public void open() throws Exception {
   super.open();
   collector = new TimestampedCollector(output);
   //为该Operator构造InternalTimerService并启动,通过该InternalTimerService可以访问时间
   InternalTimerService internalTimerService =
         getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);

   TimerService timerService = new SimpleTimerService(internalTimerService);

   context = new ContextImpl(userFunction, timerService);
   onTimerContext = new OnTimerContextImpl(userFunction, timerService);
}

然后StreamTask调用run()启动计算:

@Override
protected void run() throws Exception {
   // cache processor reference on the stack, to make the code more JIT friendly
//在run方法中通过inputProcessor来从input gate里面读取消息,消息可以是正常的数据,也可以是watermark
   final StreamInputProcessor inputProcessor = this.inputProcessor;

   while (running && inputProcessor.processInput()) {
      // all the work happens in the "processInput" method
   }

   // the input is finished, notify non-head operators
   if (running) {
      synchronized (getCheckpointLock()) {
         OneInputStreamOperator headOperator = getHeadOperator();
         for (StreamOperator operator : operatorChain.getAllOperatorsTopologySorted()) {
            if (operator.getOperatorID().equals(headOperator.getOperatorID())) {
               continue;
            }

            Preconditions.checkState(operator instanceof OneInputStreamOperator);
            ((OneInputStreamOperator) operator).endInput();
         }
      }
   }
}

在StreamInputProcessor的processInput()方法中

else {
      // now we can do the actual processing
      StreamRecord record = recordOrMark.asRecord();
      synchronized (lock) {
         numRecordsIn.inc();
         streamOperator.setKeyContextElement1(record);
         //正常数据处理,最终会调用用户实现的userfunction的processElement,对于KeyedProcessOperator就是调用用户定义keyedProcessFunction的processElement
         streamOperator.processElement(record);
      }
   }

   return true;
} else if (recordOrMark.isWatermark()) {
   // handle watermark
   statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);//处理watermark
   continue;
} else if (recordOrMark.isStreamStatus()) {

下面就看下Watermark的处理过程,最终会调用到AbstractStreamOperator的processWatermark方法:

public void processWatermark(Watermark mark) throws Exception {
 if (timeServiceManager != null) {
      timeServiceManager.advanceWatermark(mark);//第一步处理watermark
   }
   output.emitWatermark(mark);//第二步,将watermark发送到下游
}

那么是怎么处理Watermark的呢?接着看InternalTimeServiceManager的advanceWatermark方法:

public void advanceWatermark(Watermark watermark) throws Exception {
   //这里之前调用getInternalTimerService构建的的InternalTimerService都要处理该Watermark
   for (HeapInternalTimerService service : timerServices.values()) {
      service.advanceWatermark(watermark.getTimestamp());
   }
}

接着看HeapInternalTimerService我们可以发现,这里逻辑Timer时间小于Watermark的都应该被触发回调:

public void advanceWatermark(long time) throws Exception {
   currentWatermark = time;//更新当前watermark

   InternalTimer timer;
 //取出所有低于Watermark的Timer触发回调。
   while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {

      Set<InternalTimer> timerSet = getEventTimeTimerSetForTimer(timer);
      timerSet.remove(timer);
      eventTimeTimersQueue.remove();

      keyContext.setCurrentKey(timer.getKey());
      triggerTarget.onEventTime(timer);//这里的triggerTarget就是具体的operator对象
   }
}

这里triggerTarget就是具体的operator实例,在open的时候通过InternalTimeServiceManager的getInternalTimerService方法传递到HeapInternalTimerService。

接着看KeyedProcessOperator的onEventTime,这里就是调用用户实现的KeyedProcessFunction的onTimer做一些具体的事情。对于Window来说也是调用onEventTime或者onProcessTime来从key和window对应的状态中的数据发送到WindowFunction中去计算并发送到下游节点:

@Override
public void onEventTime(InternalTimer timer) throws Exception {
   collector.setAbsoluteTimestamp(timer.getTimestamp());
   invokeUserFunction(TimeDomain.EVENT_TIME, timer);
}
private void invokeUserFunction(
      TimeDomain timeDomain,
      InternalTimer timer) throws Exception {
   onTimerContext.timeDomain = timeDomain;
   onTimerContext.timer = timer;
   userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);//这里就是前面用户实现的onTimer方法
   onTimerContext.timeDomain = null;
   onTimerContext.timer = null;
}

前面讲的是Watermark是怎么被触发的,但是还有另外一个问题,Timer是如何注册的? W indowOperator和KeyedProcessOperator直接或者间接持有timerService,通过timerService对象就可以注册相应的timer。

/**
 * Interface for working with time and timers.
 */
@PublicEvolving
public interface TimerService {

   /** Returns the current processing time. */
   long currentProcessingTime();

   /** Returns the current event-time watermark. */
   long currentWatermark();

   /**
    * Registers a timer to be fired when processing time passes the given time.
    *
    * 

Timers can internally be scoped to keys and/or windows. When you set a timer * in a keyed context, such as in an operation on * {@link org.apache.flink.streaming.api.datastream.KeyedStream} then that context * will also be active when you receive the timer notification. */ void registerProcessingTimeTimer(long time); /** * Registers a timer to be fired when the event time watermark passes the given time. * *

Timers can internally be scoped to keys and/or windows. When you set a timer * in a keyed context, such as in an operation on * {@link org.apache.flink.streaming.api.datastream.KeyedStream} then that context * will also be active when you receive the timer notification. */ void registerEventTimeTimer(long time); }

对于KeyedProcessOperator来说会将timeService对象间接的传递到KeyedProcessFunction,使用户在函数层面就能注册和访问Timer。这里需要注意的有两点:

1.Namespace相同的情况下,每一个key只有1个Timer。

2.如果TimeCharacteristic为processTime,当需要注册timer时间小于当前系统处理时间会立即触发回调。

大家工作学习遇到HBase技术问题,把问题发布到HBase技术社区论坛 http://hbase.group ,欢迎大家论坛上面提问留言讨论。想了解更多HBase技术关注HBase技术社区公众号(微信号: hbasegroup ),非常欢迎大家积极投稿。