技术资讯 | Spark SQL小文件问题在OPPO的解决方案

2.1  不含有Shuffle算子的简单静态分区SQL 

这样的SQL比较简单,主要是filter上游表一部分数据写入到下游表,或者是两张表简单UNION起来的任务,这种任务的分区数目主要是由读取文件时Partition数目决定的。

  • 因为从Spark 2.4以来,对Hive orc表和parquet支持已经很不错了,为了加快运行速率,我们开启了将Hive orc/parquet表自动转为DataSource的参数。 对于这种DataSource表的类型,partition数目主要是 由如下 三个参数控制 其关系。

    其关系如下图所示,因此可以通过调整这三个参数来输入数据的分片进行调整:        

  • 而非DataSource表,使用CombineInputFormat来读取数据,因此主要是通过MR参数来进行分片调整:

    mapreduce.input.fileinputformat.split.minsize

虽然我们可以通过调整输入数据的分片来对最终文件数量进行调整,但是这样的调整是不稳定的,上游数据大小发生一些轻微的变化,就可能带来参数的重新适配。

为了简单粗暴的解决这个问题,我们对这样的SQL加了repartition的hint,引入了新的shuffle,保证文件数量是一个固定值。

2.2  带有Shuffle算子的静态分区任务  

在ISSUE SPARK-9858中,引入了一个新的参数:

spark.sql.adaptive.shuffle.targetPostShuffleInputSize

后期基于spark adaptive又对这个参数做了进一步增强,可以动态的调整partition数量,尽可能保证每个task处理 targetPostShuffleInputSize 大小的数据,因此这个参数我们也可以用来在一定程度上控制生成的文件数量。

2.3   动态分区任务  

动态分区任务因为存在着分区这一变量,单纯调整rdd这边的partition数目很难把控整体的文件数量。

在hive里,我们可以通过设置 hive.optimize.sort.dynamic.partition 来缓解动态分区产生文件过多导致任务执行时task节点经常oom的状况。这样的参数会引入新的的shuffle,来对数据进行重排序,将相同的partition分给同一个task处理,从而避免了一个task同时持有多个文件句柄。

因此,我们可以借助这样的思想,使用distribute by语句来修改sql,从而控制文件数量。一般而言,假设我们想对于每个分区生成不超过N个文件,则可以在SQL末尾增加DISTRIBUTE BY [动态分区列],ceil(rand() * N)。

03

自研可合并文件的commitProtocol方案

综上种种,每个方法都存在一定的弊端,众多规则也在实际使用过程中对业务方造成很大 困扰。

因此我们产生了想在spark这边实现和hive类似的小文件合并机制。在几个可能的方案选型中,我们最终选择了: 重写 spark.sql.sources.commitProtocolClass 方法。

一方面,该方案对Spark代码无侵入,便于Spark源码的维护,另一方面,该方案对业务方使用友好,可以动态通过set命令设置,如果出现问题回滚也十分方便。 业务方在使用过程中,只需要简单设置:

spark.sql.sources.commitProtocolClass 即可控制是否开启小文件合并。

在开启小文件合并参数后,我们会在commit阶段拿到生成的所有文件,引入两个新的job来对这些文件进行处理。首先我们在第一个job获取到所有大小小于 spark.compact.smallfile.size 的文件,在查找完成后按照 spark.compact.size 参数值对组合文件,并在第二个job中对这些文件进行合并。

投稿 | 大数据平台

编辑 | sea

排版 | sea

往期推荐

在看点一下 大家都知道