Flink实践|Flink TopN 实践
TopN无论是在离线还是在实时计算中都是比较常见的功能,不同于离线计算中的TopN,实时数据是持续不断的,这样就给TopN的计算带来很大的困难,因为要持续在内存中维持一个TopN的数据结构,当有新数据来的时候,更新这个数据结构。参考网上提出的思路,基于Flink进行了一次TopN实践。
基于Flink实现TopN关键知识点:
-
TopN 采用小根堆数据结构,新的数据来了,只需要和根节点比较,小,则丢弃,大,插入小根堆,此次采用TreeMap来替代小根堆数据结构;
-
当Watermark越过Window边界的时候,Window的数据触发计算先发到下游,接着Watermark才发送到下游;
-
用MapState保存TopN数据结构,key为Window;
-
Namespace相同的情况下,每一个key只有1个Timer。

假如有依次有A,B,C,A流入WindowOperator, 那么每一个数据来了,首先Window assign,接着进行agg获取新的acc更新到State里面,最后注册Timer。当Watermark流入到Window Operator的时候,Timer小于Watermark都将触发毁掉,触发计算,并发送数据到下游。

接着按Window进行keyBy,接受到一条消息之后的处理流程为:
@Override public void processElement(Row value, Context ctx, Collector out) throws Exception { try { long timstamp = (Long) value.getField(timeIndex); TreeMap topNMap = null; if (treeMapMapState != null) { if (treeMapMapState.contains(timstamp)) { //获取该window对应的treeMap topNMap = treeMapMapState.get(timstamp); } } if (topNMap == null) { topNMap = Maps.newTreeMap(); } add(value, topNMap);//判断该值是否要插入treeMap treeMapMapState.put(timstamp, topNMap);//更新treeMapd到state里面去 ctx.timerService().registerEventTimeTimer(timstamp + 1);//注册timer,相key的timer只会有一个 } catch (Exception e) { LOG.error("process {} failed", value, e); } }
Watermark在WindowOperator所有低于该Watermark的Window触发完成以后被发送到下游,当KeyedProcessOperator接受到Watermark之后,说明之前Window的所有数据已经流入KeyedProcessOperator,接着在KeyedProcessOperatorr中注册的低于Watermark的Timer都会被触发,最后会调用onTimer方法。
@Override public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { long key = ctx.getCurrentKey(); try { if (treeMapMapState.contains(key)) { TreeMap treeMap = treeMapMapState.get(key);//获取该window对应的treeMap,代表一个widnow下的topN数据 LOG.error("*********************** \n"); LOG.info("top 10 {}", treeMap); LOG.error("*********************** \n"); String result = (String) treeMap.values().stream().collect(Collectors.joining(",")); treeMapMapState.remove(key); Row row = new Row(2); row.setField(0, redisKey); row.setField(1, result); out.collect(row); } } catch (Exception e) { LOG.error("onTimer failed for windowEnd {}", key, e); } }
这里需要注意的一点是,如果使用的是ProcessTime,那么KeyedProcessOperator注册Timer的时候,会和处理时间进行比较,如果低于系统处理时间就会立即触发onTimer方法,那么触发的时候其实是没有收集完整上一个窗口的数据的。
大家工作学习遇到HBase技术问题,把问题发布到HBase技术社区论坛 http://hbase.group ,欢迎大家论坛上面提问留言讨论。想了解更多HBase技术关注HBase技术社区公众号(微信号: hbasegroup ),非常欢迎大家积极投稿。