Spark原理|SparkSQL DatasourceV2 之 Multiple Catalog

[实习] 阿里巴巴计算平台事业部 2021 应届生实习生招聘 (大数据计算、云计算平台、产品等等)
面向人群:海内外院校2021届毕业生,毕业时间2020年11月-2021年10月
阿里巴巴计算平台事业部是阿里巴巴支撑所有计算服务的大中台,为整个集团提供了包括批计算、流计算、机器学习等各种计算服务。面对着双十一的各项挑战,复杂的业务场景,飞速的业务增长,高并发的大促洪峰,让我们一直贴近业务,解决难题,不断进行技术创新。
我们的产品主要是,基于Hadoop、Kubernetes和Flink等开源生态,结合阿里巴巴电商业务场景,研发阿里巴巴新一代大数据计算平台,包括计算引擎、SQL引擎、分布式存储、资源调度等核心技术,统一支持批计算、流计算、机器学习等计算需求,支持阿里集团所有实时和离线核心业务(搜索,推荐,双11大屏,机器学习后端计算),同时在阿里云上提供大数据计算服务。
如果你对开源大数据感兴趣,这里有最好的土壤。这里有 Apache Flink 的核心开发和创始团队,多位Flink PMC & Committer, Calcite PMC & Commiter, Hadoop PMC & Committer、Hive PMC & Committer 以及Hbase PMC & Committer,加入我们可以跟大神面对面交流哦。
招聘岗位:分布式计算、云计算平台、大数据处理、SQL 引擎、存储、云产品运营

工作地点:杭州、上海、北京

简历投递:

jiangjie.qj@alibaba-inc.com


SparkSQL DatasourceV2作为Spark2.3引入的特性,在Spark 3.0 preview(2019/12/23)版本中又有了新的改进以更好的支持各类数据源。本文将从catalog角度,介绍新的数据源如何和Spark DatasourceV2进行集成。

问题

SparkSQL是Spark的一个子模块,主要功能是用于处理结构化数据,目前在大数据OLAP领域已经有了广泛的应用。Iceberg作为一个通用的表格式,也已经在数据湖的解决方案中逐渐展现了它的优势。
那该如何将这2者相结合,使得应用SparkSQL + Iceberg可以和SparkSQL + Hive一样方便,如,基于SQL直接访问数据或进行DDL操作:

select c1 from iceberg_db.t;

drop table iceberg_db.t;

SparkSQL 基本原理

先来看下SparkSQL处理SQL的基本流程:
如上图所示,在提交SQL后,spark内部会经历语法解析生成逻辑计划,解析逻辑计划,优化逻辑计划,生成执行计划,执行。在解析逻辑计划的过程中,引入了catalog,它的作用是来判断SQL中引用的数据库,表,列,函数等是否存在。

在Spark + Hive的解决方案中,基于 ExternalCatalog
接口,实现了 HiveExternalCatalog
,该类中的Hive客户端和Hive的metastore进行交互,从而能解析SQL中的库表列是否存在,并能基于Hive客户端进行Hive表的DDL操作,比如create table, drop table等。

Multiple Catalog解析

那Spark + Iceberg是否只需要实现 ExternalCatalog
接口,就能基于SQL直接访问数据或进行DDL操作吗?答案是肯定的,但是,由于解析SQL过程中只能支持一种catalog,如果要实现Hive table joion Iceberg table该怎么办,如:

select * 

from iceberg_db.t1 

join hive_db.t2 

  on t1.k1 = t2.k1;

为了更为通用的解决这类问题,在Spark 3.0 preview版本中引入了multiple catalog功能,该功能对于catalog做了如下变化:

  • CatalogPlugin
    CatalogManager
    CatalogPlugin
    
spark.sql.catalog.=

这里
设置为需要的CatalogName,
设置为具体的实现类,如,

spark.sql.catalog.iceberg_catalog = org.apache.iceberg.spark.SparkCatalog

接口定义如下:

public interface CatalogPlugin {


void initialize(String name, CaseInsensitiveStringMap options);
String name(); }

  • 增加了 TableCatalog
    的接口,该接口继承自 CatalogPlugin
    ,定义了相关的方法用来解析SQL中的元数据,如,tableExists,还定义了一系列方法进行DDL操作,如,createTable,alterTable,dropTable,接口定义如下,

public interface TableCatalog extends CatalogPlugin {
Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException;
Table loadTable(Identifier ident) throws NoSuchTableException;
default void invalidateTable(Identifier ident) { }
default boolean tableExists(Identifier ident) { try { return loadTable(ident) != null; } catch (NoSuchTableException e) { return false; } }
Table createTable( Identifier ident, StructType schema, Transform[] partitions, Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException;
Table alterTable( Identifier ident, TableChange... changes) throws NoSuchTableException;
boolean dropTable(Identifier ident);
void renameTable(Identifier oldIdent, Identifier newIdent) throws NoSuchTableException, TableAlreadyExistsException; }

  • CatalogManager
    CatalogPlugin
    CatalogManager
    

在解析过程中,根据catalogName从 CatalogManager
获取具体的 CatalogPlugin
实现, V2SessionCatalog
是为了兼容之前的catalog的实现机制,而 CustomerCatalog
是自定义的CatalogPlugin实现。同时,CatalogManager还会管理当前的Catalog/Namespace,相关方法如下:

def currentNamespace: Array[String]

def setCurrentNamespace(namespace: Array[String]): Unit

def currentCatalog: CatalogPlugin

def setCurrentCatalog(catalogName: String): Unit

  • 命名结构变更为 [.][.]*
    ,对于表名,原本只支持2层的命名结构, databaseName.tableName
    ,但是在业界流行的数据库中(如 MySQL,PostgreSQL
    ),已经支持3层的命名结构, database.schema.table
    。而在multiple catalog实现过程中,引入了Namespace概念,使得SparkSQL能支持多层命名结构,如, catalog.ns1.ns2.table
  • 由于引入了catalog和namespace概念,SparkSQL还增加相关命令支持catalog/namespace的管理,如,

   CREATE/DROP/SHOW NAMESPACES

   USE .

除了multiple catalog以外,SparkSQL DatasourceV2还重构生成了 SupportsRead
/ SupportsWrite
等接口,用来支持数据源的各类操作,由于篇幅有限,就不在本文中具体展开。

基于 Spark 3.0 preview使用Iceberg + SparkSQL

在Spark DatasourceV2增加了multiple catalog等功能后,回到我们想要查询的SQL,实现步骤如下:

  1. 在Iceberg侧对 CatalogPlugin/TableCatalog/SupportsRead
    等接口进行实现,实现类名如: org.apache.iceberg.spark.SparkCatalog
  2. 在spark的配置文件中设置:
spark.sql.catalog.iceberg_catalog = org.apache.iceberg.spark.SparkCatalog
  1. 基于配置的catalogName,调整SQL如下,就可以进行基于SQL的跨数据源查询了。
select * 

from iceberg_catalog.ns1.t1

join hive_db.t2 on t1.k1 = t2.k1;

  1. 除了跨数据源数据分析以外,现在还可以对Iceberg的表进行DDL操作了,如,

create table iceberg_catalog.t1 ......

drop table iceberg_catalog.t1

总结

Spark 3.0 preview在DatasourceV2的功能方面较Spark2.4做了比较大的改动,Multiple Catalog作为比较重要的新增功能,使得新的数据源能很便捷的和SparkSQL进行整合,提供元数据相关服务。除了Multiple Catalog以外,还增加了诸如 SupportsRead
SupportsWrite
SupportsPushDownFilters
等一系列接口增强对数据源的整合。Iceberg作为新兴的表格式,能很好的利用DatasourceV2的新功能,结合SparkSQL构建数据湖解决方案。目前Iceberg开源代码还未针对新的DatasourceV2特性进行更新,在我们内部项目中已经对这块整合进行了相关实践,并计划贡献给社区,使得基于Iceberg的数据湖解决方案能更完善。