零基础学 Flink:实时热销榜 Top 5(案例)
如前文所预告的一样,今天我们来分析一下,如何通过flink完成实时热销榜单Top5的计算,本文案例,需要使用前文一些内容,如果不了解的同学,请移步《 零基础学Flink:Join两个流 》。
案例代码存放在 https://github.com/dafei1288/flink_casestudy
前文我们已经聚合好了两条流,结果是将汇率和订单价格最终计算成最后的成交价格。其数据结构如下:
Tuple9
时间戳(Long)
商品大类(String)
商品细目(Integer)
货币类型(String)
价格(Integer)
时间戳(Long)
货币类型(String)
汇率(Integer)
成交价格(Integer)
我们本次便从这个数据流开始入手,首先是对这条流的事件时间进行一个重新定义,我们就使用订单的时间戳作为事件时间
joinedStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple9>() { @Override public long extractAscendingTimestamp(Tuple9 element) { return element.f0; } });
在这个案例里,我们并没有使用一个明确的商品ID来定义一个商品,而是设计了两个字段,分别是商品大类以及商品细目,我们使用这两个拼接形成的一个字段为分组字段,这么设计也可以帮助我们了解一下 KeySelector 的使用。
joinedTimedStream.keyBy(new KeySelector<Tuple9,String>(){ @Override public String getKey(Tuple9 value) throws Exception { return value.f1+value.f2; } }).timeWindow(Time.seconds(30), Time.seconds(10)) .aggregate(new SumAgg(), new WindowResultFunction());
这里我们再次引用这张图,来加深一下理解,stream是如何转换的。
在这里,我们同时需要对数据进行聚合,这里我们不以订单计数来衡量热销商品,而是使用最终价格的聚合值来进行衡量。
接下来我们需要定义如何进行聚合计算,这里只做了简单聚合
public static class SumAgg implements AggregateFunction<Tuple9, Long, Long> { @Override public Long createAccumulator() { return 0L; } @Override public Long add(Tuple9 value, Long acc) { return acc + value.f8; } @Override public Long getResult(Long acc) { return acc; } @Override public Long merge(Long acc1, Long acc2) { return acc1 + acc2; } }
然后是定义输出窗口结果
/** 用于输出窗口的结果 */ //IN, OUT, KEY, W extends Window public static class WindowResultFunction implements WindowFunction { @Override public void apply( String key, // 窗口的主键 TimeWindow window, // 窗口 Iterable aggregateResult, // 聚合函数的结果 Collector collector // 输出类型为 OrderView ) throws Exception { Long count = aggregateResult.iterator().next(); collector.collect(OrderView.of(key, window.getEnd(), count)); }
}
public static class OrderView { public String itemId; // 商品ID public long windowEnd; // 窗口结束时间戳 public long allsum; // 商品的点击量 public static OrderView of(String itemId, long windowEnd, long allsum) { OrderView result = new OrderView(); result.itemId = itemId; result.windowEnd = windowEnd; result.allsum = allsum; return result; } @Override public String toString() { return "OrderView{" + "itemId='" + itemId + '\'' + ", windowEnd=" + windowEnd + ", viewCount=" + allsum + '}'; } }
经过上述的步骤,我们得到了一个经过聚合的时间窗口数据,接下来只需取再按时间分组并取到前五的数据就大功告成了。 使用 ProcessFunction
实现一个自定义的 TopN 函数 TopNHot
来计算排名前5的商品,并将排名结果格式化成字符串,便于后续输出。
DataStream topNHots = windowedData .keyBy("windowEnd") .process(new TopNHot(5));
ProcessFunction
是 Flink 提供的一个 low-level API,用于实现更高级的功能。它主要提供了定时器 timer 的功能(支持EventTime或ProcessingTime)。本案例中我们将利用 timer 来判断何时 收齐 了某个 window 下所有商品的点击量数据。由于 Watermark 的进度是全局的, 在 processElement
方法中,每当收到一条数据( OrderView
),我们就注册一个 windowEnd+1
的定时器(Flink 框架会自动忽略同一时间的重复注册)。 windowEnd+1
的定时器被触发时,意味着收到了 windowEnd+1
的 Watermark,即收齐了该 windowEnd
下的所有商品窗口统计值。我们在 onTimer()
中处理将收集的所有商品及点击量进行排序,选出 TopN,并将排名信息格式化成字符串后进行输出。
这里我们还使用了 ListState
来存储收到的每条 OrderView 消息,保证在发生故障时,状态数据的不丢失和一致性。 ListState
是 Flink 提供的类似 Java List
接口的 State API,它集成了框架的 checkpoint 机制,自动做到了 exactly-once 的语义保证。
public static class TopNHot extends KeyedProcessFunction { private final int topSize; public TopNHot(int topSize) { this.topSize = topSize; } // 用于存储商品与点击数的状态,待收齐同一个窗口的数据后,再触发 TopN 计算 private ListState orderState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // 状态的注册 ListStateDescriptor itemsStateDesc = new ListStateDescriptor( "orderState-state", OrderView.class); orderState = getRuntimeContext().getListState(itemsStateDesc); } @Override public void processElement( OrderView input, Context context, Collector collector) throws Exception { // 每条数据都保存到状态中 orderState.add(input); // 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据 context.timerService().registerEventTimeTimer(input.windowEnd + 1); } @Override public void onTimer( long timestamp, OnTimerContext ctx, Collector out) throws Exception { // 获取收到的所有商品销售量 List allItems = new ArrayList(); orderState.get().forEach(it->allItems.add(it)); // 提前清除状态中的数据,释放空间 orderState.clear(); // 按照销售额从大到小排序 allItems.sort((x1,x2)-> (int) (x1.allsum - x2.allsum)); // 将排名信息格式化成 String, 便于打印 StringBuilder result = new StringBuilder(); result.append("====================================\n"); result.append("时间: ").append(new Timestamp(timestamp-1)).append("\n"); for (int i=0;i<topSize && i<allItems.size();i++) { OrderView currentItem = allItems.get(i); // No1: 商品ID=12224 销售额=2413 result.append("No").append(i+1).append(":") .append(" 商品ID=").append(currentItem.itemId) .append(" 销售额=").append(currentItem.allsum) .append("\n"); } result.append("====================================\n\n"); out.collect(result.toString()); } }
下面是完整代码:
package cn.flinkhub.topndemo; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.*; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.util.Collector; import java.io.IOException; import java.sql.Timestamp; import java.util.*; public class App { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Map properties= new HashMap(); properties.put("bootstrap.servers", "localhost:9092"); properties.put("group.id", "test"); properties.put("enable.auto.commit", "true"); properties.put("auto.commit.interval.ms", "1000"); properties.put("auto.offset.reset", "earliest"); properties.put("session.timeout.ms", "30000"); // properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("topicOrder", "order"); properties.put("topicRate", "rate"); ParameterTool parameterTool = ParameterTool.fromMap(properties); FlinkKafkaConsumer010 consumer010Rate = new FlinkKafkaConsumer010( parameterTool.getRequired("topicRate"), new DeserializationSchema() { @Override public TypeInformation getProducedType() { return TypeInformation.of(new TypeHint<Tuple3>(){}); //return TypeInformation.of(new TypeHint(){}); } @Override public Tuple3 deserialize(byte[] message) throws IOException { String[] res = new String(message).split(","); Long timestamp = Long.valueOf(res[0]); String dm = res[1]; Integer value = Integer.valueOf(res[2]); return Tuple3.of(timestamp,dm,value); } @Override public boolean isEndOfStream(Object nextElement) { return false; } }, parameterTool.getProperties()); FlinkKafkaConsumer010 consumer010Order = new FlinkKafkaConsumer010( parameterTool.getRequired("topicOrder"), new DeserializationSchema() { @Override public TypeInformation getProducedType() { return TypeInformation.of(new TypeHint<Tuple5>(){}); } @Override public Tuple5 deserialize(byte[] message) throws IOException { //%d,%s,%d,%s,%d String[] res = new String(message).split(","); Long timestamp = Long.valueOf(res[0]); String catlog = res[1]; Integer subcat = Integer.valueOf(res[2]); String dm = res[3]; Integer value = Integer.valueOf(res[4]); return Tuple5.of(timestamp,catlog,subcat,dm,value); } @Override public boolean isEndOfStream(Object nextElement) { return false; } }, parameterTool.getProperties()); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); DataStream<Tuple3> rateStream = env.addSource(consumer010Rate); DataStream<Tuple5> oraderStream = env.addSource(consumer010Order); long delay = 1000; DataStream<Tuple3> rateTimedStream = rateStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3>(Time.milliseconds(delay)) { @Override public long extractTimestamp(Tuple3 element) { return (Long)element.getField(0); } }); DataStream<Tuple5> oraderTimedStream = oraderStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple5>() { @Override public long extractAscendingTimestamp(Tuple5 value) { return (Long)value.getField(0); } }); DataStream<Tuple9> joinedStream = oraderTimedStream.join(rateTimedStream).where(new KeySelector<Tuple5,String>(){ @Override public String getKey(Tuple5 value) throws Exception { return value.getField(3).toString(); } }).equalTo(new KeySelector<Tuple3,String>(){ @Override public String getKey(Tuple3 value) throws Exception { return value.getField(1).toString(); } }).window(TumblingEventTimeWindows.of(Time.seconds(10))) .apply(new JoinFunction<Tuple5, Tuple3,Tuple9>() { @Override public Tuple9 join( Tuple5 first, Tuple3second) throws Exception { Integer res = (Integer)second.getField(2)*(Integer)first.getField(4); return Tuple9.of(first.f0,first.f1,first.f2,first.f3,first.f4,second.f0,second.f1,second.f2,res); } }); DataStream<Tuple9> joinedTimedStream = joinedStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple9>() { @Override public long extractAscendingTimestamp(Tuple9 element) { return element.f0; } }); DataStream windowedData = joinedTimedStream.keyBy(new KeySelector<Tuple9,String>(){ @Override public String getKey(Tuple9 value) throws Exception { return value.f1+value.f2; } }).timeWindow(Time.seconds(30), Time.seconds(10)) .aggregate(new SumAgg(), new WindowResultFunction()); DataStream topNHots = windowedData .keyBy("windowEnd") .process(new TopNHot(5)); topNHots.print(); env.execute("done!"); } public static class SumAgg implements AggregateFunction<Tuple9, Long, Long> { @Override public Long createAccumulator() { return 0L; } @Override public Long add(Tuple9 value, Long acc) { return acc + value.f8; } @Override public Long getResult(Long acc) { return acc; } @Override public Long merge(Long acc1, Long acc2) { return acc1 + acc2; } } /** 用于输出窗口的结果 */ //IN, OUT, KEY, W extends Window public static class WindowResultFunction implements WindowFunction { @Override public void apply( String key, // 窗口的主键 TimeWindow window, // 窗口 Iterable aggregateResult, // 聚合函数的结果 Collector collector // 输出类型为 OrderView ) throws Exception { Long count = aggregateResult.iterator().next(); collector.collect(OrderView.of(key, window.getEnd(), count)); } } public static class OrderView { public String itemId; // 商品ID public long windowEnd; // 窗口结束时间戳 public long allsum; // 商品的销售量 public static OrderView of(String itemId, long windowEnd, long allsum) { OrderView result = new OrderView(); result.itemId = itemId; result.windowEnd = windowEnd; result.allsum = allsum; return result; } @Override public String toString() { return "OrderView{" + "itemId='" + itemId + '\'' + ", windowEnd=" + windowEnd + ", viewCount=" + allsum + '}'; } } public static class TopNHot extends KeyedProcessFunction { private final int topSize; public TopNHot(int topSize) { this.topSize = topSize; } // 用于存储商品与点击数的状态,待收齐同一个窗口的数据后,再触发 TopN 计算 private ListState orderState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // 状态的注册 ListStateDescriptor itemsStateDesc = new ListStateDescriptor( "orderState-state", OrderView.class); orderState = getRuntimeContext().getListState(itemsStateDesc); } @Override public void processElement( OrderView input, Context context, Collector collector) throws Exception { // 每条数据都保存到状态中 orderState.add(input); // 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据 context.timerService().registerEventTimeTimer(input.windowEnd + 1); } @Override public void onTimer( long timestamp, OnTimerContext ctx, Collector out) throws Exception { // 获取收到的所有商品销售量 List allItems = new ArrayList(); orderState.get().forEach(it->allItems.add(it)); // 提前清除状态中的数据,释放空间 orderState.clear(); // 按照销售额从大到小排序 allItems.sort((x1,x2)-> (int) (x1.allsum - x2.allsum)); // 将排名信息格式化成 String, 便于打印 StringBuilder result = new StringBuilder(); result.append("====================================\n"); result.append("时间: ").append(new Timestamp(timestamp-1)).append("\n"); for (int i=0;i<topSize && i<allItems.size();i++) { OrderView currentItem = allItems.get(i); // No1: 商品ID=12224 销售额=2413 result.append("No").append(i+1).append(":") .append(" 商品ID=").append(currentItem.itemId) .append(" 销售额=").append(currentItem.allsum) .append("\n"); } result.append("====================================\n\n"); out.collect(result.toString()); } } }
好了,我们来看下结果
参考连接:
http://wuchong.me/blog/2018/11/07/use-flink-calculate-hot-items/