达达集团实时计算任务SQL化实践

03

增加的功能及改进

开发DFL的过程中,根据一些业务相关的需求及简化数据开发人员使用DFL的需要,我们在原生FSL的基础上进行了大量的改进和扩展的工作,下面介绍一些我们在DFL上做的工作。

3.1 Flink HA模式下,SESSION模式提交任务超时

为了Flink任务有较好的容错性,我们为Flink集群配置了基于ZooKeper的HA。出于任务管理和维护的需要,我们的一些Flink任务使用了session模式,在将这些任务迁移到DFL后,发现提交任务时,会报超时的错误。查阅Flink的官方文档也没有发现线索。后面经过我们的探索,发现了在YARN session模式下,配置了HA时,进行任务提交需要指定high-availability.cluster-id。添加了如下代码后,SESSION模式下,任务可以正常提交了。

3.2 Kafka支持使用SQL关键字作为JSON的字段名

当在Flink中使用了SQL关键字作字段名时,即使将字段名用反引号包起来,依然会报如下的错误:

这个是Flink的bug,已经在1.10.1中作了修复,详见这个issue:https://issues.apache.org/jira/browse/FLINK-16526。我们使用的版本为Flink 1.6.2,无法使用这个修复。我们的做法是支持将Kafka中JSON的字段名和引用这个JSON字段的列名作解耦,即在Flink SQL中使用指定的列名引用该JSON字段,而用于JSON解析的还是原始的JSON字段名。具体来说,我们在元数据系统中,支持为Kafka类型的表注册一个可选的sourceName。如果注册了sourceName,Flink Stream  SQL将使用sourceName去JSON中解析对应的字段。

3.3 元数据整合

DFL上线后,通过添加必要的功能,使用纯SQL开发已经满足我们的很多实时任务开发的需求。但是在DFL运行一段时间后,我们注意到了管理各种上下游存储的信息给我们的数据开发人员带来的困扰。我们线上使用的存储系统包括了Kafka、HBase、ElasticSearch、Redis和MySQL(之后又引入了ClickHouse)。这些数据源基本都是异构的,连接及用户信息各异,而且在不同的任务中使用相同的数据源,每次都需要使用CREATE TABLE

() WITH ()的语法将字段信息和连接信息重复填写。针对这个问题,受Hive元数据的启发,我们决定开发自己的实时元数据管理系统对这些实时数据源进行管理。我们的元数据管理系统的架构如下图所示。

元数据管理系统开发完成后,我们将Flink Stream SQL和元数据管理系统进行了深度集成。通过引入USE TABLE AS WITH ()的语法,我们的数据开发人员只需要将数据源在元数据管理系统中进行注册 ,之后在Flink Stream SQL中引用注册后的表就无需再填写任何连接信息,而且如果需要引用所有的字段的话,也无需再填写字段信息。如果不想要引用所有的子段,有两种办法可以做到。第一种方法是在USE TABLE的WITH里面使用columns表达需要引用的字段,第二种方法是在元数据系统里注册一张只包含了要引用的字段的表。

3.4 Redis hash/set数据类型的支持

FSL已经内置了对Redis作为sink table和side table的支持,但是FSL只支持Redis的String类型的数据,而我们的场景会使用到Redis的hash和set类型的数据,因此我们需要添加对Redis这两种数据类型的支持。首先介绍一下将Redis中的数据映射到Flink中的表的方法,在我们的Redis的key中包含了两部分的内容(使用”:”分隔),两部分分别为固定的keyPrefix和由一到多个字段的值使用”:”拼接的primaryKey,其中keyPrefix模拟表的概念,也方便Redis中存储的内容的管理。对String类型的数据,Redis的key会在上面介绍的key的基础上拼接上字段名称(使用”:”作为分隔符),并以字段的值作为该key对应的value写入Redis中;对Hash类型的数据,Redis的完整的key就为上面介绍的key,hash的key则由用户指定的字段的值使用”:”拼接而成,类似的,hash的value由用户指定的字段的值拼接而成。除了Redis hash和set数据类型的支持之外,我们还为Redis增加了setnx和hsetnx以及TTL的功能。

3.5 ClickHouse sink的支持

FSL内置了对Kafka、MySQL、Redis、Elasticsearch和HBbase等数据源作为目标表的支持,但是我们在使用的过程中也遇到了一些新的数据源作为目标写入端的要求,为此我们开发了新的sink插件来支持这种需求。我们开发和维护的sink插件包括了ClickHouse和HdfsFile。下面以ClickHouse的sink为例介绍一下我们在这方面所做的一些工作。

对于ClickHouse,我们开发了实现了RichSinkFunction和CheckpointedFunction的ClickhouseSink。通过实现CheckpointedFunction并在snapshotState()方法中将数据刷写到ClickHouse来确保数据不会丢失。为了处理不同的输入数据类型,我们提供接口ClickhouseMapper用于将输入数据映射为org.apache.flink.types.Row类型的数据。ClickhouseMapper的定义如下。

不同于通常情况下由用户提供sink表的schema的方式,我们通过执行DESC

的方式从ClickHouse获取表的schema。为了处理ClickHouse中的特殊数据类型,例如nullable(String),Int32等,我们使用正则表达式提取出实际的类型进行写入,相关的代码如下。

为了写入数据的过程不阻塞正常的数据处理流程,我们使用了将数据写入任务放入线程池的方式。同时为了在Flink任务失败的情况下不发生数据丢失的情况,在snapshotState()方法中等待线程池中的任务完成。

3.6 BINLOG表达的简化

为了处理线上数据的更新,我们采用了阿里巴巴开源的Canal采集MySQL binlog并发送到Kafka的方式。由于binlog特殊的数据组织形式,处理binlog的数据需要做很多繁杂的工作,例如从binlog的columnValues或者updatedValues字段中使用udf取出实际增加或者更新的字段。由于我们将Flink Stream SQL和元数据系统进行了对接,因此我们可以拿到MySQL表的schema信息,从而我们可以提供语法封装来帮助数据开发人员减少这种重复性的SQL表达。为此,我们引入一种新的SQL语法:USE BINLOG TABLE,这种语法的格式如下。

我们会将这种语法展开为如下的内容。

04

应用

在DFL上线后,由于可以使用纯SQL进行开发,符合数据开发同学的开发习惯,而且我们提供了很多的语法封装,加上元数据管理带来的便利,数据开发同学逐步将一些实时计算任务迁移到了DFL上,这为部门带来了极大的效率提升。截止到目前,DFL已经应用到了达达集团的各个数据应用系统中,系统中运行的实时计算任务已经达到70多个,涵盖达达快送、京东到家的各个业务及流量模块,而且实时计算任务数量和SQL化占比还在稳步增加中。随着大数据部门的计算基础设施开放,现在我们的实时计算能力也在集团其它部门中得到了越来越广泛的应用。

05

未来规划

当前Flink的社区版本已经发展到了1.10,Flink Table/SQL本身已经支持了DFL提供的多数功能,出于降低维护组件复杂度的考虑,我们计划后续引入Flink 1.10,并逐步推广Flink 1.10的使用,以期最后将所有的任务都迁移到最新的Flink版本上。

公司内部在逐步推广私有云的使用,考虑到社区在Flink on K8s上的进展,我们后续在引入新版本的Flink时,将尝试在公司的私有云上进行部署。

2024年四月
M T W T F S S
« Jan    
1234567
891011121314
15161718192021
22232425262728
2930