Flink: 两个递归彻底搞懂operator chain

    private List createChain(

            Integer startNodeId,

            Integer currentNodeId,

            Map<Integer, byte[]> hashes,

            List<Map<Integer, byte[]>> legacyHashes,

            int chainIndex,

            Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {


if (!builtVertices.contains(startNodeId)) { //chain 的出边 List transitiveOutEdges = new ArrayList(); //能够chain在一起的边 List chainableOutputs = new ArrayList(); //不能够chain一起的边 List nonChainableOutputs = new ArrayList();
for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) { //isChainable判断是否能够chain在一起 if (isChainable(outEdge, streamGraph)) { chainableOutputs.add(outEdge); } else { nonChainableOutputs.add(outEdge); } }
for (StreamEdge chainable : chainableOutputs) { //能够chain在一起那么遍历下一个节点 transitiveOutEdges.addAll( createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes)); }
for (StreamEdge nonChainable : nonChainableOutputs) { transitiveOutEdges.add(nonChainable); //以不能chain在一起的节点为起始点重新开始往下遍历 createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes); }
List<Tuple2<byte[], byte[]>> operatorHashes = chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList());
byte[] primaryHashBytes = hashes.get(currentNodeId);
for (Map<Integer, byte[]> legacyHash : legacyHashes) { operatorHashes.add(new Tuple2(primaryHashBytes, legacyHash.get(currentNodeId))); }
chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs)); chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs)); chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
//如果currentNodeId=startNodeId 那么就说明是一个chain的起点,则需要创建jobVertix //不是则表示是chain的一部分,只需要创建StreamConfig即可 StreamConfig config = currentNodeId.equals(startNodeId) ? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes) : new StreamConfig(new Configuration());
setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
if (currentNodeId.equals(startNodeId)) { config.setChainStart(); //起始chain config.setChainIndex(0); config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName()); config.setOutEdgesInOrder(transitiveOutEdges); config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges()); //连接边 for (StreamEdge edge : transitiveOutEdges) { connect(startNodeId, edge); }
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
} else { //currentNodeId属于chain的一部分 Map chainedConfs = chainedConfigs.get(startNodeId);
if (chainedConfs == null) { chainedConfigs.put(startNodeId, new HashMap()); } config.setChainIndex(chainIndex); StreamNode node = streamGraph.getStreamNode(currentNodeId); config.setOperatorName(node.getOperatorName()); chainedConfigs.get(startNodeId).put(currentNodeId, config); }
config.setOperatorID(new OperatorID(primaryHashBytes));
if (chainableOutputs.isEmpty()) { config.setChainEnd(); } //返回chain的出边 return transitiveOutEdges;
} else { return new ArrayList(); }

}