Iceberg集成|Iceberg在基于Flink的流式数据入库场景中的应用
导言
本文以流式数据入库的场景为基础,介绍引入Iceberg作为落地格式和嵌入Flink sink的收益,并分析了现有实现的框架和要点。
应用场景
流式数据入库,是大数据和数据湖的典型应用场景。上游的流式数据,如日志,或增量修改,通过数据总线,经过必要的处理后,汇聚并存储于数据湖,供下游的应用(如报表或者商业智能分析)使用。
上述的应用场景通常有如下的痛点,需要整个流程不断的优化:
- 支持流式数据写入,并保证端到端的不重不丢(即exactly-once);
- 尽量减少中间环节,能支持更实时(甚至是T+0)的读取或导出,给下游提供更实时更准确的基础数据;
- 支持ACID,避免脏读等错误发生;
- 支持修改已落地的数据。虽然大数据和数据湖长于处理静态的、或者缓慢变化的数据,即读多写少的场景,但方便的修改功能可以提升用户体验,避免用户因为极少的修改,手动更换整个数据文件,甚至是重新导出;
- 支持修改表结构,如增加或者变更列;而且变更不要引起数据的重新组织。
引入Iceberg作为Flink sink
为了解决上述痛点,我们引入了Iceberg作为数据落地的格式。Iceberg支持ACID事务、修改和删除、独立于计算引擎、支持表结构和分区方式动态变更等特性,很好的满足我们的需求。
同时,为了支持流式数据的写入,我们引入Flink作为流式处理框架,并将Iceberg作为Flink sink。
下文主要介绍Flink Iceberg sink的实现框架和要点。但在这之前,需要先介绍一些实现中用到的Flink基本概念。
Flink基本概念
从Flink的角度如何理解”流”和”批”
有界 | 开始 | 结束 | 处理时机 | 处理要求SLA | 处理顺序是否重要 | |
---|---|---|---|---|---|---|
流 | 否 | 有 | 否 | 实时,尽快 | 低延迟, 不重不丢即exactly-once | 是 |
批 | 是 | 有 | 有 | 可以等到全部收到后再处理 | 高吞吐 | 否 |
Flink使用DataFrame API来统一的处理流和批数据。
Stream, Transformation和Operator
一个Flink程序由 stream
和 transformation
组成:
-
Stream
: Transformation之间的中间结果数据; -
Transformation
:对(一个或多个)输入stream进行操作,输出(一个或多个)结果stream。
当Flink程序执行时,其被映射成 Streaming Dataflow
,由如下的部分组成:
- Source (operator):接收外部输入给Flink;
- Transformation (operator):中间对stream做的任何操作;
- Sink (operator):Flink输出给外部。
下图为Flink官网的示例,展示了一个以Kafka作为输入Source,经过中间两个transformation,最终通过sink输出到Flink之外的过程。
State, Checkpoint and Snapshot
Flink依靠checkpoint和基于snapshot的恢复机制,保证程序state的一致性,实现容错。
Checkpoint是对分布式的数据流,以及所有operator的state,打snapshot的过程。
State
一个operator的state,即它包含的所有用于恢复当前状态的信息,可分为两类:
- 系统state:如operator中对数据的缓存。
- 用户自定义state:和用户逻辑相关,可以利用Flink提供的managed state,如ValueState、ListState,来存储。
State的存储位置,可以分为:
- Local:内存,或者本地磁盘
- State backend:远端的持久化存储,如HDFS。
如下图所示:
Checkpoint
Flink做checkpoint的过程如下:
- Checkpoint coordinator首先发送barrier给source。
- Source做snapshot,完成后向coordinator确认。
- Source向下游发送barrier。
- 下游operator收到所有上游的barrier后,做snapshot,完成后向coordinator确认。
-
继续
往下游发送barrier,直到sink。 - Sink通知coordinator自己完成checkpoint。
- Coordinator确认本周期snapshot做完。
如下图所示:
Barrier
Barrier是Flink做分布式snapshot的重要概念。它作为一个系统标记,被插入到数据流中,随真实数据一起,按照数据流的方向,从上游向下游传递。
由于每个barrier唯一对应checkpoint id,所以数据流中的record实际被barrier分组,如下图所示,barrier n和barrier n-1之间的record,属于checkpoint n。
Barrier的作用是在分布式的数据流中,将operator的多个输入流按照checkpoint对齐(align),如下图所示:
Flink Iceberg sink
了解了上述Flink的基本概念,这些概念又是如何被应用和映射到Flink Iceberg sink当中的呢?
总体框架
如图,Flink Iceberg sink有两个主要模块和两个辅助模块组成:
模块 | 类型 | 功能 | 多个并行 |
---|---|---|---|
Writer | StreamOperator | 累积数据,生成DataFile | 是 |
Committer | SinkFunction | 把DataFile填入manifest file,并commit给Iceberg | 否,唯一 |
SinkAppender | 辅助辅助 | 把Writer和Committer接入DataStream | – |
AvroSerializer | 辅助 | 把输入转化为Avro IndexedRecord,输出给writer | – |
实现要点
Writer
- 在当前的实现中,Java的Map作为每条记录,输入给writer。内部逻辑先将其转化为作为中间格式的Avro IndexedRecord,而后通过Iceberg里的Parquet相关API,累积的写入DataFile。
- 使用Avro作为中间格式是一个临时方案,为简化适配,并最大限度的利用现有逻辑。但长期来看,使用中间格式会影响处理效率,社区也在试图通过ISSUE-870来去掉Avro,进而使用Iceberg内建的数据类型作为输入,同时也需要加入一个到Flink内建数据类型的转换器。
- 在做checkpoint的过程中,发送writer自己的barrier到下游的committer之前,关闭单个Parquet文件,构建DataFile,并发送DataFile的信息给下游。
Committer
- 全局唯一的Committer在收到上游所有writer的barrier以后,将收到的DataFile的信息填入manifest file,并使用ListState把manifest file作为用户自定义的state,保存于snapshot中。
- 当checkpoint完成以后,通过merge append将manifest file提交给Iceberg。Iceberg内部通过后续的一系列操作完成commit。最终让新加入的数据对其他的读任务可见。
试用Flink Iceberg sink
社区上https://github.com/apache/incubator-iceberg/pull/856提供了可以试用的原型代码。下载该patch放入master分支,编译并构建即可。如下的程序展示了如何将该sink嵌入到Flink数据流中:
// Configurate catalog org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration(); hadoopConf.set( org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS.varname, META_STORE_URIS); hadoopConf.set( org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname, META_STORE_WAREHOUSE); Catalog icebergCatalog = new HiveCatalog(hadoopConf); // Create Iceberg table Schema schema = new Schema( ... ); PartitionSpec partitionSpec = builderFor(schema)... TableIdentifier tableIdentifier = TableIdentifier.of(DATABASE_NAME, TABLE_NAME); // If needed, check the existence of table by loadTable() and drop it // before creating it icebergCatalog.createTable(tableIdentifier, schema, partitionSpec); // Obtain an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Enable checkpointing env.enableCheckpointing(...); // Add Source DataStream<Map> dataStream = env.addSource(source, typeInformation); // Configure Ieberg sink Configuration conf = new Configuration(); conf.setString( org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname, META_STORE_URIS); conf.setString(IcebergConnectorConstant.DATABASE, DATABASE_NAME); conf.setString(IcebergConnectorConstant.TABLE, TABLE_NAME); // Append Iceberg sink to data stream IcebergSinkAppender<Map> appender = new IcebergSinkAppender<Map>(conf, "test") .withSerializer(MapAvroSerializer.getInstance()) .withWriterParallelism(1); appender.append(dataStream); // Trigger the execution env.execute("Sink Test");
后续规划
Flink Iceberg sink有很多需要完善的地方,例如:上文中提到的去掉Avro作为中间格式;以及在各种失败的情况下是否仍能保证端到端的exactly-once;按固定时长做checkpoint,在高低峰时生成不同大小的DataFile,是否对后续读不友好等。这些问题都在我们的后续规划中,也会全数贡献给社区。
参考
[1] Iceberg官网:https://iceberg.apache.org/
[2] Flink 1.10文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/
[3] Neflix提供的Flink Iceberg connector原型:https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg
[4] Flink Iceberg sink设计文档:https://docs.google.com/document/d/19M-sP6FlTVm7BV7MM4Om1n_MVo1xCy7GyDl_9ZAjVNQ/edit?usp=sharing
[5] Flink容错机制(checkpoint) https://www.cnblogs.com/starzy/p/11439988.html
腾讯大数据诚招计算、存储、消息中间件、调度、中台等各方向的大数据研发工程师,请私信或联系jerryshao@tencent.com
。