Apache Flink在58同城实时特征挖掘中的应用实践

背景

58作为国内最大的生活信息服务平台,涵盖了本地服务、招聘、房产、二手车、二手物品等多条业务线。

随着移动互联网人口红利的消失,精细化运营成为必然趋势,业务线越来越注重用户体验、客户效果以及变现效率。 用户画像作为整个推荐系统的物料,对于提升用户体验、客户效果以及变现效率起着至关重要的作用。 而用户画像的实时性则是画像体系建设的关键一环。 目前在二手车个性化推荐场景下,用户画像从产生到更新大概需要分钟级别左右,业务线对于分钟级别的特征更新不能忍受,升级实时特征挖掘计算框架成为必要。

本文主要介绍58商业工程团队在实时特征挖掘计算框架的优化实践,并阐述在升级过程中,对新技术落地的实践效果和心得体会。

特征挖掘平台计算框架介绍

在介绍上述之前,先简要介绍一下58商业特征挖掘平台的整体挖掘流程。

 

图1.1: 58商业特征挖掘平台(醍醐)挖掘流程

如上图所示,整个醍醐特征挖掘平台对外屏蔽了异构数据和异构计算,开发者只需要实现Importer和Operator模块即可在平台完成整个的实时和离线特征挖掘,而无需关心具体的底层实现和计算。 其中Importer模块主要实现数据的解析,将其转换为Behavior,Operator模块封装了用户的特征挖掘计算逻辑。

可以看到,目前我们的实时挖掘框架使用的Spark Streaming,考虑到数据量和资源的限制,设置的batch批次为1min,在一些场景下,业务对于min级别的特征延迟更新不能忍受。

实时计算框架升级

目前业界实时计算框架主要基于 Spark Streaming Storm Flink 三种框架,下面对这三种框架进行 对比。

•实时计算框架对比

图1.2: 实时计算框架对比

如图1.2是三种实时计算框架的对比情况,主要性能指标如下:

(1) 处理模型: flink和storm都是纯流式处理模型,理论上是无限流模式,对数据进行逐条处理,其中flink会把批处理作为流处理的一个特例来处理,从而将无限流转为有限流模式;而spark是纯批处理模型,理论上是有限流模式,相反的,spark认为流处理是批处理的一个特例,从处理模型来看flink和storm天然支持纯流式数据。

(2) 延迟性: 理论上flink和storm可以提供ms级的延迟处理,spark可以提供s级到min级别延迟处理,不过具体延迟多少由处理的数据量大小、计算逻辑的复杂性、资源分配等因素共同决定,不能一概而论。

(3) 吞吐量: 在吞吐量方面,spark和flink都是针对大数据量场景和设定的,其吞吐量相当,而storm相对来说吞吐量会比较低,当数据量很大时,storm的处理性能会受到很大影响。

(4) 容错: 容错性方面,storm最低,主要是基于Records Ack 来进行容错,而spark和flink都实现了checkpoint机制,其中spark 是基于RDD的全量checkpoint,spark自带的checkpoint到hdfs机制相对来说会比较鸡肋,实际生产中一般不建议开启,因为会影响系统的性能; flink在checkpoint方面做了优化,基于增量checkpoint,同时checkpoint是异步进行的,实时生产中一般是设置checkpoint到RockDB中,这点相对来说性能上会比spark要好。

(5) 可靠性保证: 要想实现全链路的exactly once,需要同时实现传输、实时计算框架、写入db各个环节的exactly once,这里的可靠性保证仅仅指的是实时计算框架本身的可靠性。 其中storm可靠性级别为at least once,而spark和flink可以做到 exactly once。

(6) 状态管理和窗口函数: 状态管理和窗口函数对比具体见图1.2这里不再详述。

(7) 反压机制: 对于流式计算来说,在实际生产过程中经常遇到短期的数据尖峰情况,基于这种case,spark和flink实现了反压机制。 其中spark的反压主要是通过后端task的执行情况、调度时间等来使用pid控制器计算一个最大offset,进而来调整Spark Streaming从kafka拉取数据的速度; 与 Spark Streaming 的反压不同的是,Flink 反压是 jobmanager 针对每一个 task 每 50ms 触发 100 次 Thread.getStackTrace() 调用,求出阻塞的占比。

过程如下图所示:

图1.3: flink反压图示

综上所述,基于时效性、吞吐量、容错及可靠性等综合考虑,本次迭代采用flink代替Spark Streaming框架进行升级。

升级之后整体醍醐挖掘框架如图1.4 所示:

图1. 4 醍醐计算框架图

可以看到,对于整个离线计算框架我们选用spark来进行离线特征挖掘工作,实时计算框架升级之后既支持spark streaming批量数据挖掘的方式也支持无限流模式的实时挖掘,最终挖掘后的结果写入Storage中,用户在使用的时候基于实际情况进行选择即可。 本次升级迭代使得整个醍醐挖掘框架支持的实时计算场景覆盖度得到了提升。

•flink整体架构

如下图1.5展示了flink 在yarn集群模式下提交的架构图:

图1.5: flink集群模式下架构图(备注: 此图参考自云栖社区)

当flink 集群启动后,首先会启动一个 JobManger 和一个或多个的 TaskManager。 由 Client 提交任务给 JobManager,JobManager 再调度任务到各个TaskManager去执行,然后 TaskManager 将心跳和统计信息汇报给 JobManager。 TaskManager 之间以流的形式进行数据的传输。 上述三者均为独立的 JVM 进程。

1)Client 为提交 Job 的客户端,可以是运行在任何机器上(与 JobManager 环境连通即可)。 提交 Job 后,Client 可以结束进程(Streaming的任务),也可以不结束并等待结果返回。

2)JobManager 主要负责调度 Job 并协调 Task 做 checkpoint。 从 Client 处接收到 Job 和 JAR 包等资源后,会生成优化后的执行计划,并以 Task 的单元调度到各个 TaskManager 去执行。

3)TaskManager 在启动的时候就设置好了槽位数(Slot),每个 slot 能启动一个 或者多个Task,Task 为线程。 从 JobManager 处接收需要部署的 Task,部署启动后,与自己的上游建立 Netty 连接,接收数据并处理。

•flink的三层图结构

flink总共提供了三种图的抽象,具体的三层图结构如下图所示:

图1.6: flink三层图结构(备注: 此图参考自云栖社区)

上图中给出了flink各个图的工作原理和转换过程。

1)  用户提交的 main 函数中 flink 会把每一个算子 transform 成一个对流的 转换,并注册到 执行环境中生成 StreamGraph

2) 基于StreamGraph生成JobGraph,数据从上一个operator流到下一个operator的过程中,上游作为生产者提供了IntermediateDataSet,而下游作为消费者需要JobEdge。 事实上,JobEdge是一个通信管道,连接了上游生产的dataset和下游的JobVertex节点。

3)  JobGraph 转换到 ExecutionGraph 的过程中,主要发生了以下转变:

a)  加入了并行度的概念,成为真正可调度的图结构

b)  生成了与 JobVertex 对应的 ExecutionJobVertex ExecutionVertex ,与 IntermediateDataSet 对应的 IntermediateResult IntermediateResultPartition 等,并行将通过这些类实现

4) ExecutionGraph已经可以用于调度任务。 我们可以看到,flink根据该图生成了一一对应的Task,每个task对应一个ExecutionGraph的一个Execution。 Task用InputGate、InputChannel和ResultPartition对应了上面图中的IntermediateResult和ExecutionEdge。

1.7 展示了 58 二手车业务线基于 flink 框架生成的 chain 执行图。

 

图1.7: 二手车业务chain链路执行图

从图中可知,整个框架总共消费了三个kafka topic数据源,经过Map、Filter、FlatMap之后,三条数据流进行了UNION操作合并为一个dataStream从而变为一个chain,然后经过其他运算最终通过执行keyBy+map算子输出到终端db中。

•写入终端db选择

对于特征挖掘场景, db 必须支持高 qps 的写入和读取,同时存储容量够大,基于 比较 redis hbase wtable 几种存储系统进行技术选型,对比如下:

综上所述,我们采用58自研的wtable来作为写入的存储查询系统。 WTable是58架构平台部研发的分布式kv、klist系统,具有高可用、高性能、存储容量大、自动扩容和容灾等特点,数据实时落地磁盘或SSD,能够高效、稳定地支持线上服务访问。

•写入端到端问题处理

实现端到端Exactly Once语义,需要满足如下两种约束:

1) 写入的db支持幂等运算,即多次写入不影响最终结果。

2) 写入db支持事务操作,即将数据更新和消费kafka数据在一个事务内提交,保证数据更新成功之后,再持续消费kafka数据。

目前flink 结合 kafka  0.11及其高版本使用二阶段提交语义来实现了数据写回kafka的Exactly Once语义支持,经过评估我们并未选用上述方式。

原因如下:

如图1.8所示,统计了wtable写入的tp99延迟在26.1ms,同时我们设置的wtable client超时时间为50ms,对于每次写入加一次超时重试,这样我们的写入失败率可以控制在千万分之一以内,对于整个的特征挖掘场景来说,这样的写入失败率是可以忍受的,因此采用这种方式来解决端到端的数据写入问题。

 

图1.8: wtable耗时时间分布

•任务监控

 

图1.9: flink任务监控信息

从读取数据的qps、延迟、cpu使用占比、内存使用占比方面来进行监控。 其中设置的数据延迟报警为连续三次积压超过2000条会自动给开发者发送短信报警,从qps可以看出我们的1s内的消息数至少是2000条,因此只要延迟超过1s就会触发报警。

•性能评估

为便于统计和分析整个事实计算链路过程中各个环节的时间消耗,分别在flume传输环节、数据挖掘flink计算环节、以及最终写入db环节进行埋点统计各个环节的耗时分布。

 

图1.10: 实时计算全链路各环节延迟统计

 

图1.11: 画像生产tp99分布

基于图1.10和1.11可以看到,目前58场景下采用新框架,整个环节从数据生产到实际写入db的tp99延迟在8s,其中flink框架可以稳定在1s内完成整个的计算过程,优化效果比较明显,主要的瓶颈在于flume传输环节,并且每天上午10: 00到下午3: 00时段flume的传输延迟尤其比较大,这是由58的业务特点和公司flume传输架构决定的,为此我们推动TEG的同学调整了商业的数据传输链路,单独搭建了传输通道,避免业务高峰期其他业务对于我们的影响,flume传输通道优化后的延时情况如图1.12所示:

图1.12: 优化前后各个环节延迟对比

从图1.12可以看到优化之后商业二手车数据从产生到最终作用于用户tp99延迟只有4s,这对于用户体验、客户效果、变现效率的提升是显而易见的。

•flink实践过程中遇到的一些问题总结

1) 初始化问题

对于所有task的文件初始化操作都可以通过重写open方法来完成,flink中open方法只会在每个task启动时初始化一次,这样保证了初始化的执行效率。

图1.13: task 初始化

2) 序列化问题

flink框架自身采用的序列化方法有别于java序列化方式,所有需要shuffer或者IO传输的接口都需要实现其序列化方式,否则会抛序列化异常。

3) 空指针问题

图1.14: flink空指针异常

如图1.14对于所有DataStream的操作,必须保证每个算子都不为空,否则会导致框架捕获不到错误而抛异常,经过debug发现flink 1.6版本未捕获该异常,如图

图1.15: flink 代码debug

4) 算子问题

Spark 自带reduceByKey算子,该算子首先会在每个节点本地进行一次聚合,然后把相同key的数据放到同一个节点中进行聚合,最终输出一个结果。 Flink中没有现成的算子,官方推荐使用keyBy+reduce来实现reduceByKey功能,在实际场景中测试发现,因为flink本身就会自带状态操作,并且是无限流模式,这导致其输出会把中间状态也输出,如图1.16所示

图1.16: flink keyBy+reduce输出中间状态

当我们需要与db中的历史特征进行merge时,就会发生错误的情况,导致db更新结果不正确。 解决办法: 通过keyBy+map来实现,充分利用其无限流模式,对于每条数据直接与db进行merge,然后更新,避免输出中间结果。

5) 分布式缓存失效

在Flink 1.6 Yarn模式提交任务下,使用分布式缓存会失效。 如下图所示:

图1.17: yarn集群上分布式缓存失效

咨询了社区的相关同学,这应该是flink的一个bug,后续会给社区发一封邮件进行追踪。 解决办法: 在open方法里将文件copy到task所在的机器中,然后来使用。 这样相对来说,没那么优雅,一台机器节点如果有多个task运行,会存放多份数据,不过由于每台机器最多会同时运行4个task,因此并不会造成太大影响。

总结和展望

以上就是58商业工程平台团队在实时特征挖掘计算框架所做的优化工作,至此我们的实时特征挖掘框架同时具备了批处理和纯流式处理的能力,扩展了我们实时挖掘框架的能力。

后续将在以下几个方面继续进行深入的探索和优化:

(1)flink端到端Exactly Once 语义实现;

(2)推动flink计算框架应用到其他商业其他场景下。

参考文献

https://yq.aliyun.com/articles/180418?spm=a2c4e.11153940.0.0.43fa211e3UH1SL  《阿里云—云栖社区》 行者武松

作者简介

林鹏,商业产品技术部工程团队大数据架构师。 负责商业dmp平台的建设和开发,对大数据周边技术有相关研究。