Apache Hudi架构设计和基本概念

Apache Hudi是一个Data Lakes的开源方案,Hudi是Hadoop Updates and Incrementals的简写,它是由Uber开发并开源的Data Lakes解决方案。Hudi具有如下基本特性/能力:

  • Hudi能够摄入(Ingest)和管理(Manage)基于HDFS之上的大型分析数据集,主要目的是高效的减少入库延时。
  • Hudi基于Spark来对HDFS上的数据进行更新、插入、删除等。
  • Hudi在HDFS数据集上提供如下流原语:插入更新(如何改变数据集);增量拉取(如何获取变更的数据)。
  • Hudi可以对HDFS上的parquet格式数据进行插入/更新操作。
  • Hudi通过自定义InputFormat与Hadoop生态系统(Spark、Hive、Parquet)集成。
  • Hudi通过Savepoint来实现数据恢复。
  • 目前,Hudi支持Spark 2.x版本,建议使用2.4.4+版本的Spark。

基本架构

与Kudu相比,Kudu是一个支持OLTP workload的数据存储系统,而Hudi的设计目标是基于Hadoop兼容的文件系统(如HDFS、S3等),重度依赖Spark的数据处理能力来实现增量处理和丰富的查询能力,Hudi支持Incremental Pulling而Kudu不支持。

Hudi能够整合Batch和Streaming处理的能力,这是通过利用Spark自身支持的基本能力来实现的。一个数据处理Pipeline通常由Source、Processing、Sink三个部分组成,Hudi可以作为Source、Sink,它把数据存储到分布式文件系统(如HDFS)中。

Apache Hudi在大数据应用场景中,所处的位置,如下图所示:

从上图中可见,Hudi能够与Hive、Spark、Presto这类处理引擎一起工作。Hudi有自己的数据表,通过将Hudi的Bundle整合进Hive、Spark、Presto等这类引擎中,使得这些引擎可以查询Hudi表数据,从而具备Hudi所提供的Snapshot Query、Incremental Query、Read Optimized Query的能力。

下面,先从Apache Hudi中提出的几个概念开始,来了解Hudi的设计:

Timeline

Hudi内部对每个表都维护了一个Timeline,这个Timeline是由一组作用在某个表上的Instant对象组成。Instant表示在某个时间点对表进行操作的,从而达到某一个状态的表示,所以Instant包含Instant Action,Instant Time和Instant State这三个内容,它们的含义如下所示:

  • Instant Action:对Hudi表执行的操作类型,目前包括COMMITS、CLEANS、DELTA_COMMIT、COMPACTION、ROLLBACK、SAVEPOINT这6种操作类型。
  • Instant Time:表示一个时间戳,这个时间戳必须是按照Instant Action开始执行的时间顺序单调递增的。
  • Instant State:表示在指定的时间点(Instant Time)对Hudi表执行操作(Instant Action)后,表所处的状态,目前包括REQUESTED(已调度但未初始化)、INFLIGHT(当前正在执行)、COMPLETED(操作执行完成)这3种状态。

下面,根据官网给出的一个例子来理解一下Timeline,如下图所示:

根据上图,说明如下:

  • 例子场景是,在10:00~10.20之间,要对一个Hudi表执行Upsert操作,操作的频率大约是5分钟执行一次。
  • 每次操作执行完成,会看到对应这个Hudi表的Timeline上,有一系列的COMMIT元数据生成。
  • 当满足一定条件时,会在指定的时刻对这些COMMIT进行CLEANS和COMPACTION操作,这两个操作都是在后台完成,其中在10:05之后执行了一次CLEANS操作,10:10之后执行了一次COMPACTION操作。

我们看到,从数据生成到最终到达Hudi系统,可能存在延迟,如图中数据大约在07:00、08:00、09:00时生成,数据到达大约延迟了分别3、2、1小时多,最终生成COMMIT的时间才是Upsert的时间。对于数据到达时间(Arrival Time)和事件时间(Event Time)相关的数据延迟性(Latency)和完整性(Completeness)的权衡,Hudi可以将数据Upsert到更早时间的Buckets或Folders下面。通过使用Timeline来管理,当增量查询10:00之后的最新数据时,可以非常高效的找到10:00之后发生过更新的文件,而不必根据延迟时间再去扫描更早时间的文件,比如这里,就不需要扫描7:00、8:00或9:00这些时刻对应的文件(Buckets)。

文件及索引

Hudi将表组织成HDFS上某个指定目录(basepath)下的目录结构,表被分成多个分区,分区是以目录的形式存在,每个目录下面会存在属于该分区的多个文件,类似Hive表,每个Hudi表分区通过一个分区路径(partitionpath)来唯一标识。在每个分区下面,通过文件分组(File Group)的方式来组织,每个分组对应一个唯一的文件ID。每个文件分组中包含多个文件分片(File Slice),每个文件分片包含一个Base文件(*.parquet),这个文件是在执行COMMIT/COMPACTION操作的时候生成的,同时还生成了几个日志文件(*.log.*),日志文件中包含了从该Base文件生成以后执行的插入/更新操作。

Hudi采用MVCC设计,当执行COMPACTION操作时,会合并日志文件和Base文件,生成新的文件分片。CLEANS操作会清理掉不用的/旧的文件分片,释放存储空间。

Hudi会通过记录Key与分区Path组成Hoodie Key,即Record Key+Partition Path,通过将Hoodie Key映射到前面提到的文件ID,具体其实是映射到file_group/file_id,这就是Hudi的索引。一旦记录的第一个版本被写入文件中,对应的Hoodie Key就不会再改变了。

Hudi表类型

Hudi具有两种类型的表:

  • Copy-On-Write表

使用专门的列式文件格式存储数据,例如Parquet格式。更新时保存多版本,并且在写的过程中通过异步的Merge来实现重写(Rewrite)数据文件。

Copy-On-Write表只包含列式格式的Base文件,每次执行COMMIT操作会生成新版本的Base文件,最终执行COMPACTION操作时还是会生成列式格式的Base文件。所以,Copy-On-Write表存在写放大的问题,因为每次有更新操作都会重写(Rewrite)整个Base文件。

通过官网给出的一个例子,来说明写入Copy-On-Write表,并进行查询操作的基本流程,如下图所示:

上图中,每次执行INSERT或UPDATE操作,都会在Timeline上生成一个的COMMIT,同时对应着一个文件分片(File Slice)。如果是INSERT操作则生成文件分组的第一个新的文件分片,如果是UPDATE操作则会生成一个新版本的文件分片。

写入过程中可以进行查询,如果查询COMMIT为10:10之前的数据,则会首先查询Timeline上最新的COMMIT,通过过滤掉只会小于10:10的数据查询出来,即把文件ID为1、2、3且版本为10:05的文件分片查询出来。

  • Merge-On-Read表

使用列式和行式文件格式混合的方式来存储数据,列式文件格式比如Parquet,行式文件格式比如Avro。更新时写入到增量(Delta)文件中,之后通过同步或异步的COMPACTION操作,生成新版本的列式格式文件。

Merge-On-Read表存在列式格式的Base文件,也存在行式格式的增量(Delta)文件,新到达的更新都会写到增量日志文件中,根据实际情况进行COMPACTION操作来将增量文件合并到Base文件上。通常,需要有效的控制增量日志文件的大小,来平衡读放大和写放大的影响。

Merge-On-Read表可以支持Snapshot Query和Read Optimized Query,下面的例子展示了Merge-On-Read表读写的基本流程,如下图所示:

上图中,每个文件分组都对应一个增量日志文件(Delta Log File)。COMPACTION操作在后台定时执行,会把对应的增量日志文件合并到文件分组的Base文件中,生成新版本的Base文件。

对于查询10:10之后的数据的Read Optimized Query,只能查询到10:05及其之前的数据,看不到之后的数据,查询结果只包含版本为10:05、文件ID为1、2、3的文件;但是Snapshot Query是可以查询到10:05之后的数据的。

Hudi查询类型

Hudi支持三种查询类型:

  • Snapshot Query

只能查询到给定COMMIT或COMPACTION后的最新快照数据。对于Copy-On-Write表,Snapshot Query能够查询到,已经存在的列式格式文件(Parquet文件);对于Merge-On-Read表,Snapshot Query能够查询到,通过合并已存在的Base文件和增量日志文件得到的数据。

  • Incremental Query

只能查询到最新写入Hudi表的数据,也就是给定的COMMIT/COMPACTION之后的最新数据。

  • Read Optimized Query

只能查询到给定的COMMIT/COMPACTION之前所限定范围的最新数据。也就是说,只能看到列式格式Base文件中的最新数据。

查询引擎支持能力矩阵

基于Hudi表和Hudi Bundle,外部的其他查询引擎可以非常方便的查询Hudi表,比如Hive、Spark SQL、Presto等。Hudi支持在Copy-On-Write表和Merge-On-Read表两种类型的表,同时支持基于Hudi表的Snapshot Query、Incremental Query、Read Optimized Query的能力。下面是Hudi支持的外部查询引擎支持查询的能力矩阵:

  • Copy-On-Write表

  • Merge-On-Read表

参考链接

本文基于 署名-非商业性使用-相同方式共享 4.0 许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。