技术资讯 | Spark sql优化案例分享

01

引言

首先我们从 两种计算引擎的基本架构图来分析Spark/MapReduce的性能

1.1 M apReduce

 

处理效率低效 :

  • Map/Reduce任务中间结果写磁盘,多个MR之间通过HDFS交换数据; 任务调度和启动开销大;

  • 一条SQL语句经常被拆分成多个Application,数据在多个Application之间只能通过读写HDFS交换;

  • 无法充分利用内存;

1.2 Spark

1.2.1高 (比MapReduce快几倍到几十倍) 

  • 内存计算引擎,提供Cache机制来支持需要反复迭代计算或者多次数据共享,减少数据读取的IO开销,另外为了解决纯内存计算带来的数据可靠性,引入了Checkpoint机制;

  • DAG引擎,减少多次计算之间中间结果写到HDFS的开销;

  • Executor使用线程池模型来减少task启动开销,shuffle过程中避免 不必要的sort操作以及减少磁盘IO操作。

02

影响Spark任务快慢的因素

2.1因子

  • 数据量 (GB级 vs TB级)

  • 数据组织形式 (存储结构,压缩算法,数据Schema[Map等复杂结构])

  • 小文件 (很多的KB级,10M级)

  • 内存 (内存偏少,大量溢写)

  • Core个数 (并发量小,shuffle等待)

  • 算法 (聚合因子,过滤条件,SQL组织形式 )

  • 发生大量的Shuffle (可用Broadcast替换)

  • 是否使用缓存 (将经常访问的数据缓存在Executor内存中) 

2.2.Spark基本流程图

 

  • Driver (AppMaster) 负责DAG,Task调度

  • Yarn负责计算资源分配

  • Worker上的Executor负责计算

Worker节点才是计算执行的地方,所以性能优化重点将研究Executor的优化。

2.3Executor

建议不要自己配置Executor个数,使用动态分配模式:

概念:

根据当前的负载动态的增加或者删除Executor,这样做的 好处 在于:

在业务组的队列资源 (vcore, memory) 资源恒定额情况下,能更好的均衡各个业务的对资源的占用,也就是对于一个计算量较小的任务不用占用太多资源。而对于一个计算量较大的任务,也能从集群中获取相对较多的资源。

而采用指定模式,则会导致任务在获取足够多 (可通过参数设置比例) 的Executor之前一直处于等待状态,而这通常会浪费计算资源。

Executor动态分配模型

 

ExecutorAllocationManager内部会定时根据工作负载计算所需的Executor数量:

  • 如果任务对Executor需求数量大于之前向集群管理器申请的Executor数量,那么向Yarn申请添加Executor;

  • 如果任务对Executor需求数量小于之前向集群管理器申请的Executor数量,那么向Yarn申请取消部分Executor;

  • ExecutorAllocationManager内部还会定时向Yarn申请移除 (杀死) 过期的Executor;

2.4Core

spark.executor.cores

建议:

  • 建议executor的cpu core数量设置为2 ~ 4个比较合适;

  • 在队列有大量任务提交的情况下,还要更少,以免影响其他用户提交的任务因申请不到cpu (vcore) 资源而卡住。

2.5Memory

spark.executor.memory

建议:

每个Executor的每个core分配的内存设置4g较为合适。

用户设置该值的时候需要考虑如下影响因子:

  • 自己使用的executor-memory * num-executor所使用的资源不能超过所提交队列的阈值;

  • 在队列资源共用的模式下,所申请的资源还要更小,以免申请不到资源或者阻塞其他用户的任务;

  • 用户申请的executor-momory不能超过yarn设置的最大值,当前设置的最大值为60g;

Spark内存区分存储内存&计算内存

 

spark.storage.memoryFraction

控制存储内存在整个内存中的比例

StorageMemory:

RDD cache, RDD Broadcast等等内容

根据应用的不同可自己动态调整,但通常情况下不需要调整。使用默认值即可,下图展示的是Storage内存与Execution的 内存动态调节机制

 

2.6 Shuffle并行度

spark.default.parallelism

该参数用于设置每个stage的默认task数量,这个参数极为重要,如果不设置可能会直接影响你的任务性能。 (只有在处理RDD时才会起作用,对Spark SQL无效)

建议:

500 ~ 1000较为合适,通常情况设置为executor-memory * num-executor 乘积的2~3倍较为合适;

spark.sql.shuffle.partitions

用于配置join 或聚合操作shuffle数据时使用的分区数 (对sparks SQL专用的设置)

2.7 存储结构

目前HADOOP中常用的数据存储结构包括:

  • Text (行式存储)

  • CSV (行式存储)

  • RCFile (列式存储)

  • ORC (列式存储)

  • Parquet (列式存储)

目前Spark默认存储的格式为Parquet。 下图展示的是相同数据以不同存储结构存储,存储文件的Size对比:

 

2.8 列式存储的好处

  • 查询的时候不需要扫描全部的数据,而只需要读取每次查询涉及的列。 这样可以将I/O消耗降低N倍,另外可以保存每一列的统计信息 (minmax、sum等) ,实现部分的谓词下推;

  • 由于每一列的成员都是同构的,可以针对不同的数据类型使用更高效的数据压缩算法,进一步减小I/O;

  • 由于每一列的成员的同构性,可以使用更加适合CPU pipeline的编码方式,减小CPU的缓存失效;

  • 由于列式存储数据量更小,Spark的Task读取数据的时间更短,不光节省计算资源,还节省存储资源。

 

列式存储的向量化操作,相对于行式存储一行一行的操作,列式存储可做到 一个batch的操作,这样的操作方式极大的 提升了运算性能

 

2.9 压缩方式(gzip, bzip2, lzo, snappy)

案例1:

Snappy压缩前后比例为3:1

 

对于Spark任务来说,压缩的数据带来的好处是显而易见的:

  • 大幅节省内存

  • 大幅节省磁盘

  • 大幅节省数据读取时间

各种压缩格式特性

 

Spark设置压缩格式 (例如设置snappy压缩)

各种压缩格式性能对比

 

2.10 输入小文件

案例1:

 

  • 小文件太多,导致每个task读取的数据量较小,计算的时间很短;

  • 执行的时间不足以弥补JVM启动的时间。

调节每个task任务的输入数据大小

2.11 Shuffle&&输出小文件

某个表的hdfs文件如下:

输出表的分区下有20个小文件。

由于集群中NameNode节点需要维护文件的元数据信息,太多的输出小文件会给集群的NameNode带来巨大的压力;

控制task个数

方案1:

使用hint将会使得输入数据进行重新Repartition,调节最终task的个数以及输出文件的个数。

通过重分区将减少或者增大分区数量以达到增加或减少task的数量,从而增大或者减少Task输出的文件个数。

2.12 SQL自身-下推 (PushDownPredicate)

 

上图中:

  • 方式1从磁盘中读取出所有的数据,在内存中过滤;

  • 方式2,3将过滤从内存中下推到磁盘,在扫描磁盘的数据的时候就过滤掉数据。

概念:

所谓下推是指将过滤尽可能地下沉到数据源端,从而避免从磁盘读取不必要数据。

下推与不下推性能对比 (DataBricks官方)

 

通过上图可知,60%的下推比不下推的性能提高了2~18倍;

2.12.1谓词下推的限制:

  • 只有operator 包含的所有expression都是确定性的时候才可以下推, 比如 rand 表达式等;

  • Filter 的字段必须要在group by 的维度字段里面,举个例子:

1)下面的聚合是 可以谓词下推 的:

2)下面的聚合是 不可以谓词下 推的:

案例 1:

Partition是String类型,此处是Int类型,导致下推失败。

 

案例 2:

Regexp是非确定的,导致下推失败

 

2.13 SQL自身-广播

Spark:

对该变量进行广播。广播后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的task执行时共享该Executor中的那份变量副本。

这样的话,可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低GC的频率;

可适当调整广播变量大小的阈值,使得稍微大一些的数据也能被广播:

 

2.14 SQL自身-缓存

将数据缓存在内存中,需要 遵循的原则为:

  • 数据重复使用

  • 重新生成这部分数据的代价昂贵

SQL语句:

权衡cache与否的代价,不cache则多次使用同一份数据都需要重新计算一次。

Cache则只会计算一次,但是会占用executor的内存资源,那是否应该cache就是把计算RDD,从hdfs上获取数据的时间资源与缓存数据的内存资源之间进行权衡。

 

  rdd1,rdd2不需要缓存

rdd可以缓存,rdd1,rdd2不需要缓存

2.15 表结构嵌套字段(Map,Array )

案例1:

 

这张表是业务用户的表结构。

用户行为数据以Json形式上报,由于表的结构实在太过复杂。Column字段存在大量的Map结构,分析层面很难通过简单的SQL语句来分析这行数据,只能以读取HDFS,在代码层面来做数据分析。

案例2:

Spark SQL 处理嵌套类型数据时,存在以下问题:

1)读取大量不必要的数据:

对于嵌套数据类型的字段,如下图中的Map 类型的people 字段,往往只需要读取其中的子字段,如people.age。

却需要将整个Map 类型的people 字段全部读取出来然后抽取出people.age 字段。这会引入大量的无意义的IO 开销。如果是几百个Key,这也就意味着IO 被放大了几十至几百倍;

2)无法进行向量化读取:

而向量化读能极大的提升性能。Spark 不支持包含嵌套数据类型的向量化读取,这极大地影响了包含嵌套数据类型的查询性能;

3)不支持 Filter 下推:

Spark 不支持嵌套类型字段上的Filter 的下推;

4)重复计算:

JSON 字段,在Spark SQL 中以String 类型存在,严格来说不算嵌套数据类型。不过实践中也常用于保存不固定的多个字段,在查询时通过JSON Path 抽取目标子字段。

而大型JSON 字符串的字段抽取非常消耗CPU。对于热点表,频繁重复抽取相同子字段非常浪费资源。

2.16 Distribute By / Cluster By

好处:

在Join的过程中,未使用Distribute By 的两个表,将各自的数据随机分配到executor上,当join的时候就会出现,跨executor的数据需要互相迁移进行匹配的情况,从而引起更大量的Shuffle。

但如果我们将join的连接字段通过Distribute By 重分区,神奇的一幕将发生:需要互相匹配的数据都会在相同的executor上,从而避免跨executor的数据迁移,所有的匹配都在统一个executor上进行,这样就大大减少了Shuffle。

 

使用方法:

 

投稿 | 大数据平台

编辑 | sea

排版 | sea

往期推荐

在看点一下 大家都知道