• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

spark sql多维分析优化——提高读取文件的并行度

SPARK diligentman 3周前 (08-29) 26次浏览

这次分享多维分析优化的另一种情况

本文大纲

1、描述问题背景

2、讲一下解决思路

3、解决办法(spark sql处理parquet row group原理及分区原理,参数测试,解决方案)

4、效果

 

1、描述问题

代码如下:

select netease_user, if(campaign_id is null, 'all', campaign_id) as campaign_id, if(spec_id is null, 'all', spec_id) as spec_id, if(app_bundle is null, 'all', app_bundle) as app_bundle, if(render_name is null, 'all', render_name) as render_name, platform, sum(bidfloor) as success_bidfloor, count(distinct clk_request_id) as click_pv, count(distinct exp_deviceid) as exp_uv, count(distinct exp_request_id) as exp_pv, count(distinct clk_deviceid) as click_uv, round(sum(case when winprice<0 then 0 else winprice end)/1000, 4) as costfrom(select distinct nvl(netease_user , 'true') as netease_user, nvl(render_name , 'null') as render_name, platform, nvl(campaign_id, 'null') as campaign_id, nvl(spec_id, 'null') as spec_id, nvl(app_bundle , 'null') as app_bundle, clk_request_id, exp_deviceid, exp_request_id, clk_deviceid, winprice, bidfloorfrom table_a where day = '20190815' and platform is not null) tmpgroup by netease_user, campaign_id, spec_id, app_bundle, render_name, platformgrouping sets( ( netease_user, platform),
( netease_user, platform, campaign_id), ( netease_user, platform, spec_id), ( netease_user, platform,app_bundle), ( netease_user, platform,render_name),
( netease_user, platform,campaign_id, spec_id), ( netease_user, platform,campaign_id, app_bundle), ( netease_user, platform,campaign_id, render_name), ( netease_user, platform, spec_id, app_bundle), ( netease_user, platform, spec_id, render_name), ( netease_user, platform, app_bundle, render_name), ( netease_user, platform, campaign_id, spec_id, app_bundle), ( netease_user, platform, spec_id, app_bundle, render_name), ( netease_user, platform, campaign_id, app_bundle, render_name), ( netease_user, platform, campaign_id, spec_id, render_name), ( netease_user, campaign_id, spec_id, app_bundle, render_name, platform));

 

基础表数据量有几百万,并不算很大。

但是运行时长还是挺长的:

spark sql多维分析优化——提高读取文件的并行度

 

需要60分钟左右。

来看一下日志:

spark sql多维分析优化——提高读取文件的并行度

第二个job比较慢,一定就是expand 慢了:

spark sql多维分析优化——提高读取文件的并行度

从上面可以看到,数据过滤后是582w,经过两次expand 后,变成了4.6个亿,4.6个亿的量本来不算大,但因为只有2个task在处理,就显的异常的慢

 

2、解决思路

解决多维分析的办法一般是:把逻辑拆开,分别计算指标,然后再 join 起来,这个也是上一篇【spark sql多维分析优化——细节是魔鬼】用到的一个办法。但这个办法有个缺点就是如果指标比较多的情况下,代码会写的很长,数据也会被多加载几遍。

对于这次案例来说,不用拆代码,因为5亿左右的量并不算很大,我们只用把task给扩展一下,从2个扩展到20个应该就能很快处理完了。

该怎么扩展呢?

首先我们先简化一下代码:

spark sql多维分析优化——提高读取文件的并行度

这里的distinct 是没必要的,从对业务的了解以及日志的数据来看,distinct 并没使数据大量减少,并且由于distinct引起了shuffle,也会占用一部分时间,因此可以把distinct去掉。

去掉distinct后,expand  操作就会被合并到Job 1 中,这样以来我们只要在读取文件时增加task, 让每个task处理更少的数据,就能提高效率。

 

3、解决办法及遇到的问题

该怎么提高读取文件的并行度呢?

基础表 table_a 存储格式为parquet,我们首先要了解spark sql 是怎么来处理parquet文件的。

3.1 spark sql分区方式(parquet)

spark 通过FileSourceScanExec 来处理hdfs文件:

/** 基础表table_a不为分桶表,读取数据的分区方式走此方法*/private def createNonBucketedReadRDD(      readFile: (PartitionedFile) => Iterator[InternalRow],      selectedPartitions: Seq[PartitionDirectory],      fsRelation: HadoopFsRelation): RDD[InternalRow] = {    /**defaultMaxSplitBytes 即为spark.sql.files.maxPartitionBytes 参数,默认为128M*/    val defaultMaxSplitBytes =      fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes     /**openCostInBytes 即为spark.sql.files.openCostInBytes 参数,默认为4M*/    val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes    /**defaultParallelism  并行度参数 即 spark.default.parallelism */    val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism    val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum    val bytesPerCore = totalBytes / defaultParallelism        /**分片方法的计算公式*/    /**openCostInBytes与bytesPerCore取最大,然后再与defaultMaxSplitBytes取最小*/    val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))       logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +      s"open cost is considered as scanning $openCostInBytes bytes.")        /**遍历文件*/    val splitFiles = selectedPartitions.flatMap { partition =>      partition.files.flatMap { file =>        val blockLocations = getBlockLocations(file)           /**判断文件是否支持分割,parquet可分割*/          if (fsRelation.fileFormat.isSplitable(            fsRelation.sparkSession, fsRelation.options, file.getPath)) {          /**依据分片大小maxSplitBytes计算要多少分区来处理数据*/          (0L until file.getLen by maxSplitBytes).map { offset =>            val remaining = file.getLen - offset            /**假如剩余量不足,那么该文件剩余的作为一个分区*/            val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining            val hosts = getBlockHosts(blockLocations, offset, size)            PartitionedFile(              partition.values, file.getPath.toUri.toString, offset, size, hosts)          }        } else {          /**判断文件是否支持分割,如果不能分割,一个文件一个partition*/          val hosts = getBlockHosts(blockLocations, 0, file.getLen)          Seq(PartitionedFile(            partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts))        }      }    }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)    .....

如果想要增加分区,即task 数量,就要降低最终分片 maxSplitBytes的值,可以通过降低spark.sql.files.maxPartitionBytes 的值来降低 maxSplitBytes 的值

3.2 参数测试及问题

spark.sql.files.maxPartitionBytes 参数默认为128M,生成了四个分区:

spark sql多维分析优化——提高读取文件的并行度

 

table_a 在hdfs 20190815日的数据情况:

205.2 M  part-00000-30ceee1e-2ed6-4239-8a6b-45fc6cbf1ef6.c000
205.2 M  part-00001-30ceee1e-2ed6-4239-8a6b-45fc6cbf1ef6.c000
3.8 M    part-00002-30ceee1e-2ed6-4239-8a6b-45fc6cbf1ef6.c000

 

共三个数据文件,如果设置参数 spark.sql.files.maxPartitionBytes为64M,会把数据分8个块:

##part-00000  四块range: 0-67108864  ; range: 67108864-134217728;  range: 134217728-201326592range: 201326592-215189723

 

##part-00001  四块range: 0-67108864  ; range: 67108864-134217728;  range: 134217728-201326592range: 201326592-215167669
##part-00002  一块range: 0-4002630

启动7个task:  

理论上有6个task分别负责每个64M的块数据,然后最后一个task负责part-00000,part-00001剩余的不足64M的两个块以及part-00002。

然而事实是:

spark sql多维分析优化——提高读取文件的并行度

分区数确实增加了,由四个增加到了7个,但是新增的3个却没处理什么数据,大部分的数据还是4个partition在处理,所以还是很慢~~~~

task数增加了,但是数据并没有均分到每个task,为什么呢?

仔细研究了一下parquet 文件的结构:

spark sql多维分析优化——提高读取文件的并行度

 

parquet 文件的数据是以row group 存储,一个parquet 文件可能只含有一个row group,也有可能含有多个row group  ,row group  的大小 主要由parquet.block.size 决定。

spark 在处理parquet 文件时,一个row group 只能由一个task 来处理,在hdfs 中一个row group 可能横跨hdfs block ,那么spark是怎么保证一个task只处理一个 row group 的呢?

static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMetadataFilter filter) {  List<RowGroup> rowGroups = metaData.getRow_groups();  List<RowGroup> newRowGroups = new ArrayList<RowGroup>();  for (RowGroup rowGroup : rowGroups) {    long totalSize = 0;    long startIndex = getOffset(rowGroup.getColumns().get(0));    for (ColumnChunk col : rowGroup.getColumns()) {      totalSize += col.getMeta_data().getTotal_compressed_size();    }    /**计算row group中点*/    long midPoint = startIndex + totalSize / 2;    /**谁拥有这个row group的中点,谁就可以处理这个row group*/    if (filter.contains(midPoint)) {      newRowGroups.add(rowGroup);    }  }  metaData.setRow_groups(newRowGroups);  return metaData;}

这就导致并不是所有task 都能够分到数据。

检查table_a发现,生成table_a时,parquet.block.size  用的默认值128M ,这样就导致一个row group 有128M 的大小。

parquet.block.size 是可以依据实际使用情况来调优的,对于做多维分析表,可以设置稍小一点。

最终 经过调试设置parquet.block.size 为16M ;设置spark.sql.files.maxPartitionBytes为16M

 

 

4、效果

修改参数后:

spark sql多维分析优化——提高读取文件的并行度

spark sql多维分析优化——提高读取文件的并行度

读取hdfs文件时,并行了22个task,并且每个task处理数据均匀。

 

 

spark sql多维分析优化——提高读取文件的并行度

2分40秒就能完成,有没有棒棒哒?

 


程序员灯塔 , 版权所有
转载请注明原文链接:https://www.wangt.cc/2019/08/143c70d3ef/
喜欢 (0)