Iceberg实践|基于Apache Iceberg打造T+0实时数仓
导语
大数据处理技术现今已广泛应用于各个行业,为业务解决海量存储和海量分析的需求。但数据量的爆发式增长,对数据处理能力提出了更大的挑战,同时对时效性也提出了更高的要求。业务通常已不再满足滞后的分析结果,希望看到更实时的数据,从而在第一时间做出判断和决策。典型的场景如电商大促和金融风控等,基于延迟数据的分析结果已经失去了价值。
为了同时满足大数据量和高时效性的双重要求,实时数仓和在线交互式(ad-hoc)分析技术,及相应的基础组件应运而生,并快速发展。其中包括通用计算引擎(如Spark和Flink),交互式分析系统(如Presto,Druid和ClickHouse),数据湖框架(如Iceberg,Hudi和Delta Lake),和底层存储(如Ozone)。
本文主要介绍基于Iceberg的特性,通过Spark和Flink,如何打造T+0实时数仓,以及相应功能在Iceberg社区的进展。
离线和实时数仓
传统的离线数仓可以通过Hive加HDFS搭建。借助Hive成熟和稳定的能力,以及丰富的上下游生态,构造数据处理和分析平台。它通常遇到如下痛点:
• 流批混合的作业难以基于同一套基础组件搭建; • 难以保证端到端的”有且仅有一次“和”强一致“的语义; • 流批衔接,即流式数据落地,通常环节多,流程长,时效性差; • 难以保证ACID事务和读写分离,导致下游出现脏读等错误;如果通过外部逻辑实现ACID事务和强一致性,会进一步加长整个流程; • 已写入的数据很难修正,或者只能以数据文件甚至整个分区这种较大的粒度进行操作,费时费力; • 数据落地和处理过程难以实现端到端的增量处理等。
针对上述离线数仓的痛点,随着流式计算引擎的发展,越来越多的公司引入实时数仓,或者实时和离线融合的数据分析平台,以求达到秒级的实时响应。
基于Iceberg打造实时数仓
Iceberg最近已经顺利毕业,晋升为Apache顶级项目。它作为新兴的数据湖框架之一,开创性的抽象出”表格式“(table format)这一中间层,既独立于上层的计算引擎(如Spark和Flink)和查询引擎(如Hive和Presto),也和下层的文件格式(如Parquet,ORC和Avro)相互解耦。
同时,Iceberg还提供了许多额外的能力:
• ACID事务; • 时间旅行(time travel),以访问之前版本的数据; • 完备的自定义类型、分区方式和操作的抽象; • 列和分区方式可以进化,而且进化对用户无感,即无需重新组织或变更数据文件; • 隐式分区,使SQL不用针对分区方式特殊优化; • 面向云存储的优化等;
上述的抽象和能力使得Iceberg在流批衔接和实时数仓中可以发挥核心作用。
总体框架
如下图所示:
使用Flink流式处理引擎消费数据总线,借助ACID事务的能力强一致的导入Iceberg;读写分离使交互式查询引擎可以第一时间读取正确的数据;Row-level update和delete可以通过Spark对数据进行修正;增量消费使得已落地的数据可以进一步的返回流式处理引擎中,并只处理和向后传递变化的数据;Iceberg中的数据也可以同时被报表系统消费和进一步处理。
ACID事务
Iceberg实现了ACID事务机制,使得边写边读成为可能,从而数据可以更快的被下游消费到。ACID事务机制保证下游只能看到已commit的snapshot所包含的数据,而不用担心读到部分或者未commit的数据。业务因此可以省去大量的用于保证ACID事务和失败恢复的逻辑。
如上图所示,虚线框代表即将被生成的snapshot,其中包含新写入但尚未commit的数据;实线框表示已经被commit的snapshot,下游可以访问最新的snapshot(S3)或者之前的snapshot(如S2等)中的数据。由ACID事务衍生出的row-level update和delete的能力将在后文中介绍。
Flink写入和读取Iceberg
Flink应用从总线中消费流式数据后,可以通过Flink sink汇聚并落地Iceberg,后续也可以通过Flink source继续消费Iceberg中的数据。Flink的checkpoint机制和Iceberg的ACID事务特性是保证端到端的“有且仅有一次“语义的关键。
关于Flink sink写入Iceberg,可以分为相互解耦的两个层次来实现:
• 相对底层的DataStream实现:实现SinkFunction和checkpoint相关接口。接入DataStream中,即可实现落地Iceberg。 • 相对高层的Table和SQL实现:按照对insert、delele和update的不同要求,实现StreamTableSink相关接口(AppendStreamTableSource,UpsertStreamTableSource或RetractStreamTableSource),以支持Table和SQL等高级语义和操作。但最终的写入和commit操作还是通过上述的底层来实现。此处还需要处理Flink的schema以及类型到Iceberg的schema以及类型的转换和映射。
关于Flink source读取Iceberg,也可以分为相互解耦的两个层次来实现:
• 相对底层的DataStream实现:实现SourceFunctin和checkpoint相关接口。接入DataStream中,即可被Flink引擎驱动发送ScanTask来读取Iceberg中的数据。 • 相对高层的Table和SQL实现:实现StreamTableSource,并辅助以ProjectableTableSource以支持projection从Flink下推到Iceberg,以及FilterableTableSource以支持表达式下推到Iceberg,从而过滤不满足条件的行。
社区正在积极推进 Flink Iceberg sink [1] 的开发和合入。同时Flink sink和source也有 不同的实现 [2] 。但目标都是希望Iceberg能够成为首个在社区官方支持Flink的数据湖框架。
Flink Iceberg sink后续改进主要包括:在现有只能append写入的基础上,增加update和delete语义,使延迟数据可以得到正确的处理。由于整个流程不可避免的出现数据延迟到达的情况,而落地通常使用“事件时间(event time)”来聚合和分区,因此如何把延迟到达的数据合入正确的分区是需要解决的问题。实现此功能后,CDC(Change Data Capture)的场景即可被全面支持了。但这仍然依赖于 row-level delete的实现 [3] 。
基于Spark进行数据修正
支持对已入库数据的修正,同时保证ACID的事务特性,是现代数仓的基本能力。Iceberg的重要特性之一就是在row-level这一细粒度下的update和delete的能力。
Row-level update和delete通常有Copy-on-Write和Merge-on-Read两种方案。其中Copy-on-Write把生成新数据文件的压力集中于写入的时候,适合对读有较高要求的场景;而Merge-on-Read把合并最终结果的压力放在读取的时候,适合于快速写入的场景。
我们在内部已经实现了基于Copy-on-Write的方式。同时也将Iceberg作为Spark 3.0的V2 Data Source和multi-catalog,和Spark进行了集成,用户可以方便的通过Spark SQL进行update、delete和merge into等DML操作,以及建表删表等DDL操作。
我们作为社区中spark-3分支的维护者,正在积极推进相关功能的开发和合入,让更多的人受益。
增量消费Iceberg中的数据
流式数据落地数仓以后,还可以通过增量消费的方式回到流式处理引擎当中,继续向下游传递,做进一步的处理。而且针对数据延迟到达的情况,增量消费也为下游提供了仅获取变化数据的方式(而非全部数据),提高了信息传递的效率。增量消费,配合Flink sink写入支持update和delete语义,使端到端支持增量处理,可以进一步降低整个流程的延迟。Iceberg可以方便的基于snapshot的历史实现增量消费。
基于Spark,指定开始和结束snapshot-id的 incremental scan [4] 已经合入社区,以此为基础,基于micro-batch的 Spark Structured Streaming Read [5] 也已经实现。
Flink也有类似的增量消费的实现。
数据和元数据的压缩合并
为了提高读取时job planning的效率,小文件的压缩合并(compaction)是数仓日常维护中的重要任务,特别是流式数据直接落地,和基于Merge-on-Read实现row-level update和delete的功能,更加剧了小文件的产生。这里要注意的是压缩合并的对象既包括数据文件,也包括元数据文件。
压缩合并可以分为三个级别:
• Minor compaction:仅合并元数据文件(rewrite manifest),不操作数据文件; • Major compaction:合并元数据和数据文件。未来还需要处理数据文件和Merge-on-Read产生的delete文件的合并; • Optimization:合并元数据和数据文件的同时,清理过期的snapshot以及这些snapshot对应的元数据和数据文件。
对于天生就是小文件的元数据,Iceberg可以自动的通过MergeAppend进行合并;也可以通过RewriteManifests手动发起合并,但它使用起来不是很方便。社区已经开发出 对应的Spark Action [6] ,依靠外部的Spark计算资源,方便的进行元数据的合并。
而对于数据文件的合并,社区也正在积极推进 相应Spark Action [7] 的开发和合入。
总结
随着数据量的持续增大,和业务对时效性的严苛要求,实时数仓的作用愈发的重要。而Iceberg凭借ACID事务、时间旅行和优秀的抽象等特性,以及对Spark和Flink等计算引擎接入的广泛支持,作为实时数仓的核心组件,可以缩短导入流程,方便数据变更,加速数据读取。
参考
[1] Iceberg作为Flink sink:https://github.com/apache/iceberg/pull/856
[2] https://github.com/generic-datalake/iceberg-pro
[3] Row-level delete的里程碑任务分解:https://github.com/apache/iceberg/milestone/4
[4] 基于Spark实现incremental scan:https://github.com/apache/iceberg/pull/829
[5] Spark Structured Streaming读取Iceberg:https://github.com/apache/iceberg/pull/796
[6] 用于合并元数据的Spark action:https://github.com/apache/iceberg/pull/875
[7] 用于合并数据文件的Spark action:https://github.com/apache/iceberg/pull/1083