Checkpoint对齐机制源码分析
2009 年 1 月 1 日
private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception { //barrierId表示当前批次的checkpointId final long barrierId = receivedBarrier.getId(); // 如果是单输入流 则直接触发checkpoint if (totalNumberOfInputChannels == 1) { if (barrierId > currentCheckpointId) { // new checkpoint currentCheckpointId = barrierId; notifyCheckpoint(receivedBarrier); } return; } //多输入流的处理,numBarriersReceived表示已接收到的 //当前批次checkpointId 的channel 个数 //numBarriersReceived >0 表示正在对齐过程中 if (numBarriersReceived > 0) { // this is only true if some alignment is already progress and was not canceled if (barrierId == currentCheckpointId) { // regular case onBarrier(channelIndex); } else if (barrierId > currentCheckpointId) { // 如果到来的barrierId也就是checkpointId 大于当前正在 //发生对齐机制的checkpointId ,那么会取消当前的checkpoint(比喻说超时导致) // 并且重置blockedChannels状态 重置numBarriersReceived为0 //然后开启下一次(barrierId) checkpoint对齐机制 LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " + "Skipping current checkpoint.", inputGate.getOwningTaskName(), barrierId, currentCheckpointId);
notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId)); releaseBlocksAndResetBarriers(); beginNewAlignment(barrierId, channelIndex); } else { // ignore trailing barrier from an earlier checkpoint (obsolete now) return; } } else if (barrierId > currentCheckpointId) { //numBarriersReceived==0 开启一次新的chechpoint //将对应的blockedChannels置为阻塞状态true beginNewAlignment(barrierId, channelIndex); } else { // either the current checkpoint was canceled (numBarriers == 0) or // this barrier is from an old subsumed checkpoint return; }
// check if we have all barriers - since canceled checkpoints always have zero barriers // this can only happen on a non canceled checkpoint if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) { // actually trigger checkpoint if (LOG.isDebugEnabled()) { LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.", inputGate.getOwningTaskName(), receivedBarrier.getId(), receivedBarrier.getTimestamp()); } //对齐完成 将缓存的数据(BufferBlocker中的数据)插入到消费队列中 //被消费 ,然后触发checkpoint releaseBlocksAndResetBarriers(); notifyCheckpoint(receivedBarrier); } }