Flink实践|Flink TopN 实践

TopN无论是在离线还是在实时计算中都是比较常见的功能,不同于离线计算中的TopN,实时数据是持续不断的,这样就给TopN的计算带来很大的困难,因为要持续在内存中维持一个TopN的数据结构,当有新数据来的时候,更新这个数据结构。参考网上提出的思路,基于Flink进行了一次TopN实践。

基于Flink实现TopN关键知识点:

  • TopN 采用小根堆数据结构,新的数据来了,只需要和根节点比较,小,则丢弃,大,插入小根堆,此次采用TreeMap来替代小根堆数据结构;

  • 当Watermark越过Window边界的时候,Window的数据触发计算先发到下游,接着Watermark才发送到下游;

  • 用MapState保存TopN数据结构,key为Window;

  • Namespace相同的情况下,每一个key只有1个Timer。

假如有依次有A,B,C,A流入WindowOperator, 那么每一个数据来了,首先Window assign,接着进行agg获取新的acc更新到State里面,最后注册Timer。当Watermark流入到Window Operator的时候,Timer小于Watermark都将触发毁掉,触发计算,并发送数据到下游。

接着按Window进行keyBy,接受到一条消息之后的处理流程为:

@Override
public void processElement(Row value, Context ctx, Collector out) throws Exception {
 try {
  long timstamp = (Long) value.getField(timeIndex);
  TreeMap topNMap = null;
  if (treeMapMapState != null) {
   if (treeMapMapState.contains(timstamp)) { //获取该window对应的treeMap
    topNMap = treeMapMapState.get(timstamp);
   }
  }
  if (topNMap == null) {
   topNMap = Maps.newTreeMap();
  }

  add(value, topNMap);//判断该值是否要插入treeMap

  treeMapMapState.put(timstamp, topNMap);//更新treeMapd到state里面去

  ctx.timerService().registerEventTimeTimer(timstamp + 1);//注册timer,相key的timer只会有一个
 } catch (Exception e) {
  LOG.error("process {} failed", value, e);
 }
}

Watermark在WindowOperator所有低于该Watermark的Window触发完成以后被发送到下游,当KeyedProcessOperator接受到Watermark之后,说明之前Window的所有数据已经流入KeyedProcessOperator,接着在KeyedProcessOperatorr中注册的低于Watermark的Timer都会被触发,最后会调用onTimer方法。

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {
 long key = ctx.getCurrentKey();
 try {
  if (treeMapMapState.contains(key)) {
   TreeMap treeMap = treeMapMapState.get(key);//获取该window对应的treeMap,代表一个widnow下的topN数据
   LOG.error("*********************** \n");
   LOG.info("top 10 {}", treeMap);
   LOG.error("*********************** \n");
   String result = (String) treeMap.values().stream().collect(Collectors.joining(","));
   treeMapMapState.remove(key);
   Row row = new Row(2);
   row.setField(0, redisKey);
   row.setField(1, result);
   out.collect(row);
  }
 } catch (Exception e) {
  LOG.error("onTimer failed for windowEnd {}", key, e);
 }
}

这里需要注意的一点是,如果使用的是ProcessTime,那么KeyedProcessOperator注册Timer的时候,会和处理时间进行比较,如果低于系统处理时间就会立即触发onTimer方法,那么触发的时候其实是没有收集完整上一个窗口的数据的。

大家工作学习遇到HBase技术问题,把问题发布到HBase技术社区论坛 http://hbase.group ,欢迎大家论坛上面提问留言讨论。想了解更多HBase技术关注HBase技术社区公众号(微信号: hbasegroup ),非常欢迎大家积极投稿。