大数据量下Spark性能优化的一些方法

一般我们在写SQL的时候通常使用Join算子来进行关联表,这方面的查询也一般是最常见的。下文仅对Join的流程进行简述,以及提供相关的优化方法

Spark SQL相关理论

如下图所示,在分析不同类型的Join具体执行之前,先介绍Join执行的基本框架,框架中的一些概念和定义是在不同的SQL场景中使用的。
在Spark SQL中Join的实现都基于一个基本的流程,根据角色的不同,参与Join的两张表分别被称为"流式表"和"构建表",
不同表的角色在Spark SQL中会通过一定的策略进行设定,通常来讲,系统会*默认大表为流式表,将小表设定为构建表*。流式表的迭代器为StreamIter,构建表的迭代器为BuildIter。通过遍历StreamIter中的每条记录,然后在BuildIter中查找相匹配的记录,这个查找过程被称为Build过程,每次Build操作的结果为一条JoinedRow(A,B),其中A来自StreamIter,B来自BuildIter,这个过程为Builder操作,而如果B来自StreamIter,A来自builder,则为buildLeft操作
复制代码

对于LeftOuter,RightOuter,LeftSemi,RightSemi,他们的build类型是确定的,即LeftOuter,LeftSemi为BuildRight类型,RightOuter,RightSemi为BuildLeft类型

在具体的Join实现层面,Spark SQl提供了BoradcastJoinExec,ShuffleHashJoinExec和SortMergeJoinExec三种机制。

最常见的Join方式SortMergeJoinExec(业务使用Join遇到最多的情况)

SortMergeJoinExec是Join查询的主要实现方式,Hash系列的Join实现中都是将一侧的数据完全加载到内存中,这对于一定大小的表来说比较适用,然而当两个表数据量都非常大的时候,无论使用哪种方法都会对计算内存造成很大压力,此时Spark会采用SortMergeExec进行Join操作。其物理执行计划和最终执行计划如下所示:

select name,score from student join exam on student.id = exam_student_id
复制代码

SortMergeJoin的实现方式并不用将一侧的数据全部加载后进行Join操作,其前提条件是需要在Join操作前将数据排序,为了让两条记录链接到一起,需要将具有相同Key记录分发到同一个分区,因此一般会进行一次Shuffle操作(即物理执行计划中的Exchange节点),根据Key分区,将连接到一起的记录分发到同一个分区内,这样在后续的Shuffle阶段就可以将两个表中具有相同Key记录分到同一个分区处理.

经过ExChange节点操作之后,分别对两个表中每个分区里的数据按照key进行排序(图1中的SortExec节点)
,然后在此基础上进行sort排序,在遍历流式表,对于每条记录而言,都采用顺序查找的方式从构建查找表中查找对应的记录,由于排序的特性,每次处理完一条记录后只需要从上一次结束的位置开始查找,SortMergeJoinExec执行时就能够避免大量无用的操作,对于提升性能很有帮助,具体原理如下:

对于查找数据匹配的核心类SortMergeScanner,在SortMergeJoinScanner的构造参数中会传入StreamedTable迭代器和BufferTable的迭代器(BufferTable),因为二者是已经排序好的,所以只需要不断以动迭代器,得到新的数据进行比较即可

  • 对于SortMergeExec的性能优化

    • 预排序Join

    在Shuffle之前,Map阶段会按照key的hash值对数据进行重分区,相同的key被分到同一个分区内,不同Mapper中相同分区的数据会被Shuffle到同一个Reducer。ReDucer会对来自不同Mapper的数据进行排序,然后对排序的数据进行Join

这种机制的不同之处是,当Reducer数量较少时,会造成Reducer处理的数据量比较大。所以可以把数据排序提前到Mapper阶段,Map阶段会按照key的hash值对数据重新分区并按照key进行排序,Recuder只需要对来自不同Mapper的数据进行归并排序.mergeSpill将所有insertRecord中的小文件进行合并,每次从spilled文件中取出一个属于当前partition的最小值并写入文件中,如果没有当前partition的数据,则换到下一个partition,直到所有数据被取出

def joinShuffleWrite(Iterator<Product2<K,V>> records){
   while(records.hasNext())
     sorter.insertRecord(record.next())
     
   end while
   mergeSpills()
}

def insertRecord(Object record){
	if(meomryBuffer.size() >= threshold){
    sortAndSpill(meomoryBuffer)
  }
  //TODO add record to memory
}

def mergeSpills(){
  while( currentPartitionId!=null){
    if(record!=null){
      //TODO wirte record to output file
    }else{
    if(has next Partition){
    	currentPartitionId = next Partition
    }else{
      currentPartitionId = null
    }
    
    }
  }
}
复制代码
  • 多维分析的Union方式
    社区的多维分析采用Expand方式,一次读入数据,读入的每条数据会生成多条(2^n,n为维度大小)

    select A,B,sum(C) from myTable group by A,B with cube
    复制代码

    当维度比较大或者初始数据量比较大的时候,Shuffle的性能会非常差,某些资源有限的情况下还会出现oom的问题,所以在Spark-19175。提出了以下优化

/**
 * Splits [[Aggregate]] on [[Expand]], which has large number of projections,
 * into various [[Aggregate]]s.
 */
object SplitAggregateWithExpand extends Rule[LogicalPlan] {
  /**
   * Split [[Expand]] operator to a number of [[Expand]] operators
   */
  private def splitExpand(expand: Expand): Seq[Expand] = {
    val len = expand.projections.length
    val allProjections = expand.projections
    Seq.tabulate(len)(
      i => Expand(Seq(allProjections(i)), expand.output, expand.child)
    )
  }

  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case a @ Aggregate(_, _, e @ Expand(projections, _, _)) =>
      if (SQLConf.get.groupingWithUnion && projections.length > 1) {
        val expands = splitExpand(e)
        val aggregates: Seq[Aggregate] = Seq.tabulate(expands.length)(
          i => Aggregate(a.groupingExpressions, a.aggregateExpressions, expands(i))
        )
        Union(aggregates)
      } else {
        a
      }
  }
}
复制代码

业务配置实施调优

Spark-Submit参数调优
参考 : https://help.aliyun.com/document_detail/28124.html
复制代码
小文件合并

Spark SQL将数据出入Hive表的时候,文件数目与Reduce数目相同,当Reducer数目比较多的时候,这种机制会造成小文件过多的情况,当读取大量小文件构成的Hive数据时,文件的打开和关闭使得Spark SQL效率很差。

- 只合并碎片文件
	如果设置的碎片阈值是128M,那么只会将该表/分区内小于该阈值的文件进行合并,
    同时如果碎片文件数量小于一定阈值,将不会触发合并,这里主要考虑的是合并任务存在一定性能开销,因此允许系统中存在一定量的小文件。
    
- 分区数量及合并方式
	定义了一些规则用于计算输出文件数量及合并方式的选择,获取任务的最大并发度 maxConcurrency 用于计算数据的分块大小,
    再根据数据碎片文件的总大小选择合并(coalesce/repartition)方式。
    
    maxConcurrency的计算: https://developer.aliyun.com/article/740597
    a.开启dynamicAllocation
	maxConcurrency = spark.dynamicAllocation.maxExecutors * spark.executor.cores

	b.未开启 dynamicAllocation
	maxConcurrency = spark.executor.instances * spark.executor.cores
    
    
   场景1:最大并发度100,碎片文件数据100,碎片文件总大小100M,如果使用 coalesce(1),
   将会只会有1个线程去读/写数据,改为 repartition(1),则会有100个并发读,一个线程顺序写。性能相差100X。

   场景2:最大并发度100,碎片文件数量10000,碎片文件总大小100G,
   如果使用 repartition(200),将会导致100G的数据发生 shuffle,改为 coalesce(200),则能在保持相同并发的情况下避免 200G数据的IO。

   场景3:最大并发度200,碎片文件数量10000,碎片文件总大小50G,
   如果使用 coalesce(100),会保存出100个500M文件,但是会浪费一半的计算性能,改为 coalesce(200),合并耗时会下降为原来的50%。
   
复制代码
开启Executor动态调整

在开启了Executor动态调整(spark.dymaicAllcation.enabled = true),可以让用户免于繁琐的Executor预估。除此之外还可以开启spark.shuffle.service.enabled,用来保留被移除Executor产生的Shuffle文件,而保留的Shuffle文件可用于Stage失败的恢复

SparkSQl业务层面调优(来自<<Spark SQL内核剖析>>一书)
多表Join顺序调整

多表Join操作在实际业务中非常常见,主要用来整合多张数据表的信息以支持进一步的分析。
在多表Join的场景中,数据表的顺序对性能影响比较大。
实际上,多表Join一直都是数据库中基于代价优化机制(Cost-Based Optim ization,简称CBO)的重点针对对象。

考虑到Spark SQL中的CBO尚未完全成熟,不能对SQL中Join的顺序做智能的调整,所以在写SQL时需要从业务层面加以判断。
如图11.17所示,用户需要将数据表A(30亿条)、数据表B(120亿条)和数据表C(1亿条)通过Join操作连接起来。原有的写法是表A和表B先进行Join操作,然后与表C再进行Join操作。经过观察,表A与表B执行Join操作之后得到的结果有25亿条数据,所以最终是这25亿条数据与1亿条数据的表C进行Join操作。

经过测试,如果表A与表C先进行Join操作,如图11.18所示,Join的中间结果只有5千万条数据,然后与表B再进行Join操作。这种方式可以大大减少参与Shuffl e的数据量,提升Join的执行速度,性能上得到了大约40%的提升。 总的来讲,多个表进行Join操作时,总会有一个最佳的执行顺序,从业务层面进行调整需要对数据的分布情况有大致的了解。

数据复用

“复用”是数据处理系统中常用的优化手段,主要包括数据复用和计算复用两个方面。

  • 计算复用指的是重用相同的操作逻辑,减少CPU的计算代价。

  • 数据复用比较好理解,如果读取同一份数据的两个任务之间没有依赖关系,就可以想办法合并任务逻辑,使得只需要读取一次数据,减少IO代价。

在业务中,数据复用出现的场景比较多,绝大部分以Union操作为主。例如,同一条SQL语句模板,用户程序中输入参数,每条SQL语句执行后得到一个结果,最后将结果合并起来。此外,即使在单条SQL语句中,用户的SQL语句也可能用Union划分成多个子查询逻辑。 这里举例说明,简单的SQL写法如下,用户需要分别选取key为20170802和20170803的数据进行进一步处理(注:这里省略了复杂的处理逻辑,仅用value替代)。这条SQL语句在Spark SQL中执行时,数据表myTable会被读取两次,当myTable数据量非常大时,对性能有着很大的影响。

上述SQL语句可以在业务层针对性地优化为下面的写法,将筛选数据的逻辑整合在一起。对于进一步的复杂操作,可以用casewhen语句来支持。这样,Spark SQL只需要读取数据表myTable一次,减少了IO代价,特别是在存储与计算分离的架构下,能够减少数据传输过程中对网络的占用。

目前,Spark SQL在系统层面还缺少这些优化的逻辑,因此需要在业务层面进行更多的调整优化。实际上,复用技术的各种场景已经有了很多研究工作,包括MapReduce环境下的MRShare和Stubby等,都可以借鉴在Spark SQL中。

Window函数执行性能优化

一般情况下,当Spark集群某个节点的Task执行非常缓慢甚至出现OOM时,可以适当地增加该节点内存,以提高执行效率。
然而,在实际应用中,还存在一类比较特殊的情况。例如,某数据分析业务需要统计每个区域的前若干名用户,SQL语句中会涉及row_number这样的Window函数。

在Spark执行过程中发现,存在某个分区(partition)的数据量达到1亿多条,总共有大约6.5GB的数据量。在一定的范围内(注:TDW设置在10GB以内)无论如何增加Executor的内存,该分区对应的Task最终都会发生内存溢出,导致SQL最终失败。 实际上,这个问题和Shuffl e读写过程中内存的使用方式有关。

回顾一下,Shuffle写的过程中有一个阈值(threshold),超过了这个阈值之后,会将数据文件排序后spill到磁盘上形成文件。如图所示,假设threshold为4,那么数据相应地写成4个文件。而在Shuffl e读阶段会针对每个数据文件构造一个reader(参见UnsafeSorterSpillReader),

每个reader会申请1MB的内存空间作为缓冲区。问题就出在这里,
WindowExec执行计划中数据结构默认设置的阈值是4096,那么1亿条数据会对应产生24000多个文件,需要申请24GB左右的内存空间,导致了内存溢出的问题。 解决方法比较简单,将参数spark.sql.windowExec.buffer.spill.threshold直接调大。这里关于阈值的设置有一个设置范围需要估算一下,假设Executor实际JVM堆内存大小为M(GB),处理的数据条数为n,大小为size(GB),那么threshold的设置需要大致满足以下两个约束条件:(1)不超过Executor内存,即size∗threshold/n<M;(2)读取Shuffl e文件的缓冲区不超过Executor内存,即n/(1024∗threshold)<M。针对上述例子,如果Executor内存设置为M=4,得到25000<threshold<6100000+,在此范围内选择一个数值即可。

从某种程度上看,这种案例可以归纳为一大类。虽然Spark系统中有统一的内存管理功能,通过MemoryManager实现对一系列MemoryConsumer有效的管理。然而,仍然有一些对象属于“黑户”状态,并没有实现MemoryConsumer接口。面对这类问题时,需要对Spark执行原理有较深入的了解。

数据倾斜处理

在分布式环境中,数据倾斜一直以来都是“痛点”。然而,根据“二八原则”,实际业务中的数据一般都难以均匀分布,

例如某个ID的用户活动特别频繁、某个时间段系统登录的人数特别多等。 具体到SQL语句中,数据倾斜出现的算子主要是Aggregation和Join,其中Aggregation中因为有着Partial机制,问题并不突出。

这里以Join为例,当某个或某些key的数据量远大于其他时,处理这些key的任务运行时间远大于处理其他key的任务运行时间,从而拖慢整个Join的执行时间。 在TDW平台建设的过程中,数据倾斜也是经常碰到的一类问题,在业务层面一般通过以下几种方法来优化或规避。

  • 无关数据过滤:

    经过运维和开发人员的观察,发现实际情况中大约50%的数据倾斜都是业务无关的数据导致的,具体分为两种情形:

    • 大量的null数据没有过滤,参与了Join的执行(注:TDW中根据业务需求没有跳过null);

    • 存在“脏数据”,不满足原有的数据类型,经过内在的逻辑处理往往会得到null等相同结果。

    这两种情况导致的数据倾斜处理起来比较简单,直接排查后过滤这些无关的数据即可。

    • 小表广播:如果参加Join操作的两个表是大小表,则可以采用BroadcastJoin的方式,将小表广播到大表所在的Executor上,避免数据倾斜的出现。社区Spark从2.2版本之后支持在SQL中通过添加Hint的方式强制采用BroadcastJoin。例如,下面的SQL语句在执行中会将小表t1广播到大表t2所在的Executor上。这种方式处理的情况有限。

      此外需要注意,对于外连接,基表不能被广播,因此左外连接中左表不可以是小表,右外连接中右表不可以是小表。

倾斜数据分离

例如参加Join操作的两个表分别为t1和t2,有数据倾斜的表为t1。

可以将t1的数据分为两部分t11和t12,t11中不包含数据倾斜的数据,t12中只包含数据倾斜的数据。数据表t11和t12分别与t2进行Join操作,然后将结果合并,对应的SQL写法如下。

  • t11与t2的Join操作不存在数据倾斜的问题。
  • 由于数据表t12通常不会很大,所以t12与t2的Join操作可以采用第二种方法执行BroadcastJoin。这样处理之后,数据表t1与数据表t2的Join操作就能够处理数据倾斜的情况了。

数据打散

主要思想在于分散倾斜数据。举一个简单的例子,如图所示。

假设表A和表B都有id、value字段,需要对表A和表B按照id进行Join操作,即“A.id=B.id”。此时,因为id都为a,
所有的数据会在一个task上进行关联操作,这样就出现了数据倾斜。

大数据量的情况下这个task将拖慢整个应用的执行效率。数据打散的处理方法就是将大表(A)中的id加上后缀(“id_0”-“id_2”),起到“打散”的作用。为了结果正确,小表B中的id需要将每条数据都“复制”多份。此时再执行join操作,将会产生3个task,每个task只需要关联一条数据,起到了分散的作用。具体到SQL写法,每个数据表的代码如下。

经过处理之后,再使用new_id作为聚合条件。需要注意的是,这里的rand函数的效果不一定非常均匀,后缀数量可以根据实际业务的数据分步来权衡。处理数据倾斜时,有必要先用count(∗)查看数据的分步情况。此外,可以实现一个UDF函数用于专门生成0到n的数组,便于添加后缀。