Checkpoint对齐机制源码分析

  private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {

    //barrierId表示当前批次的checkpointId

    final long barrierId = receivedBarrier.getId();

    // 如果是单输入流 则直接触发checkpoint

    if (totalNumberOfInputChannels == 1) {

      if (barrierId > currentCheckpointId) {

        // new checkpoint

        currentCheckpointId = barrierId;

        notifyCheckpoint(receivedBarrier);

      }

      return;

    }

    //多输入流的处理,numBarriersReceived表示已接收到的

     //当前批次checkpointId 的channel 个数

     //numBarriersReceived >0 表示正在对齐过程中

    if (numBarriersReceived > 0) {

      // this is only true if some alignment is already progress and was not canceled

      if (barrierId == currentCheckpointId) {

        // regular case

        onBarrier(channelIndex);

      }

      else if (barrierId > currentCheckpointId) {

        // 如果到来的barrierId也就是checkpointId 大于当前正在

        //发生对齐机制的checkpointId ,那么会取消当前的checkpoint(比喻说超时导致)

        // 并且重置blockedChannels状态 重置numBarriersReceived为0

        //然后开启下一次(barrierId) checkpoint对齐机制

        LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +

            "Skipping current checkpoint.",

          inputGate.getOwningTaskName(),

          barrierId,

          currentCheckpointId);


notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId)); releaseBlocksAndResetBarriers(); beginNewAlignment(barrierId, channelIndex); } else { // ignore trailing barrier from an earlier checkpoint (obsolete now) return; } } else if (barrierId > currentCheckpointId) { //numBarriersReceived==0 开启一次新的chechpoint //将对应的blockedChannels置为阻塞状态true beginNewAlignment(barrierId, channelIndex); } else { // either the current checkpoint was canceled (numBarriers == 0) or // this barrier is from an old subsumed checkpoint return; }
// check if we have all barriers - since canceled checkpoints always have zero barriers // this can only happen on a non canceled checkpoint if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) { // actually trigger checkpoint if (LOG.isDebugEnabled()) { LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.", inputGate.getOwningTaskName(), receivedBarrier.getId(), receivedBarrier.getTimestamp()); } //对齐完成 将缓存的数据(BufferBlocker中的数据)插入到消费队列中 //被消费 ,然后触发checkpoint releaseBlocksAndResetBarriers(); notifyCheckpoint(receivedBarrier); } }