• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

spark sql多维分析优化——细节是魔鬼

大数据 diligentman 3个月前 (08-29) 44次浏览

这次是分享一个多维分析优化的案例

【本文大纲】

  1. 业务背景

  2. spark sql处理count distinct的原理

  3. spark sql 处理 grouping sets的原理

  4. 优化过程及效果

  5. 总结

1、业务背景

先上sql:

select         if(req_netease_user is null, 'all', req_netease_user) as netease_user,         if(campaign_id is null, 'all', campaign_id) as campaign_id,         if(spec_id is null, 'all', spec_id) as spec_id,         if(app_bundle is null, 'all', app_bundle) as app_bundle,         if(render_name is null,'all', render_name) as render_name,         if(platform is null, 'all', platform) as platform,         count(distinct request_id) as bid_request_num,        count(distinct deviceid) as bid_request_uv,        count(distinct case when bid_response_nbr=10001 then bid_response_id else null end) as offer_num,        count(distinct case when bid_response_nbr=10001 then deviceid else null end) as offer_uv,    dtfrom (    select        distinct dt,         if(req_netease_user is null, 'null', req_netease_user) as req_netease_user,         if(render_name is null, 'null', render_name) as render_name,         if(platform is null,'null', platform) as platform,         if(campaign_id is null, 'null', campaign_id) as campaign_id,        if(spec_id is null, 'null', spec_id) as spec_id,         if(app_bundle is null, 'null', app_bundle) as app_bundle,        request_id,         bid_response_nbr,         bid_response_id,         deviceid    from table_a where dt = '2019-08-11' and request_id is not null) tmp group by dt, req_netease_user, campaign_id, spec_id, app_bundle, render_name, platformgrouping sets(  (dt),
  (dt, req_netease_user),   (dt, campaign_id),  (dt, spec_id),  (dt, app_bundle),  (dt, render_name),  (dt, platform),
  (dt, req_netease_user, campaign_id),  (dt, req_netease_user, spec_id),  (dt, req_netease_user, app_bundle),  (dt, req_netease_user, render_name),  (dt, req_netease_user, platform),
  (dt, req_netease_user, campaign_id, spec_id),  (dt, req_netease_user, campaign_id, app_bundle),  (dt, req_netease_user, campaign_id, render_name),  (dt, req_netease_user, campaign_id, platform),        (dt, req_netease_user, campaign_id, spec_id, app_bundle),    (dt, req_netease_user, campaign_id, spec_id, render_name),    (dt, req_netease_user, campaign_id, spec_id, platform),
    (dt, req_netease_user, campaign_id, spec_id, app_bundle, render_name),    (dt, req_netease_user, campaign_id, spec_id, app_bundle, platform),
    (dt, req_netease_user, campaign_id, spec_id, app_bundle, render_name, platform))

逻辑不复杂,就是慢,运行时间如下:

spark sql多维分析优化——细节是魔鬼

要运行5个小时~~~

这是一张广告竞价的业务表,每一条请求 request_id 都会产生一条数据,一天下来,数据量是很大的(几十亿)。 然而,又要对  7个维度做成22个组合,分别求 count(distinct request_id) ,  count(distinct deviceid), count(distinct case when bid_response_nbr=10001 then bid_response_id else null end) ,count(distinct case when bid_response_nbr=10001 then deviceid else null end) 。 只能说,需求好无耻啊 啊 啊 啊

2、spark sql对count distinct做的优化

在 hive 中我们对count distinct  的优化往往是这样的:

--优化前select count(distinct id) from table_a 
--优化后select   count(id)from(    select         id    from table_a group by id) tmp

hive往往只用一个 reduce 来处理全局聚合函数,最后导致数据倾斜;在不考虑其它因素的情况下,我们的优化方案是先 group by 再 count 。

在使用spark sql  时,貌似不用担心这个问题,因为 spark 对count distinct  做了优化:

explain select     count(distinct id),    count(distinct name) from table_a

执行计划如下:

== Physical Plan ==*(3) HashAggregate(keys=[], functions=[count(if ((gid#147005 = 2)) table_a.`id`#147007 else null), count(if ((gid#147005 = 1)) table_a.`name`#147006 else null)])+- Exchange SinglePartition   +- *(2) HashAggregate(keys=[], functions=[partial_count(if ((gid#147005 = 2)) table_a.`id`#147007 else null), partial_count(if ((gid#147005 = 1)) table_a.`name`#147006 else null)])      +- *(2) HashAggregate(keys=[table_a.`name`#147006, table_a.`id`#147007, gid#147005], functions=[])         +- Exchange(coordinator id: 387101114) hashpartitioning(table_a.`name`#147006, table_a.`id`#147007, gid#147005, 4096), coordinator[target post-shuffle partition size: 67108864]            +- *(1) HashAggregate(keys=[table_a.`name`#147006, table_a.`id`#147007, gid#147005], functions=[])               +- *(1) Expand [List(name#146984, null, 1), List(null, id#146979, 2)], [table_a.`name`#147006, table_a.`id`#147007, gid#147005]                  +- *(1) Project [id#146979, name#146984]                     +- *(1) FileScan parquet table_a

从执行计划中可以看到,在处理 count distinct 时,用 Expand 的方式,具体是怎么 expand 的呢,如下图:

spark sql多维分析优化——细节是魔鬼

expand 之后,再以id、name 为 key 进行HashAggregate 也就是 group by ,这样以来,就相当于去重了。后面直接计算count (id)  、 count(name)  就可以,把数据分而治之。 在一定程度上缓解了数据倾斜。

顺便附上 distinct  这块的部分代码,方便做对照理解:

def rewrite(a: Aggregate): Aggregate = {    // 把所有聚合表式取出来    val aggExpressions = a.aggregateExpressions.flatMap { e =>      e.collect {        case ae: AggregateExpression => ae      }    }    // 抽取出含有 distinct的聚合表达式    val distinctAggGroups = aggExpressions.filter(_.isDistinct).groupBy { e =>        val unfoldableChildren = e.aggregateFunction.children.filter(!_.foldable).toSet        if (unfoldableChildren.nonEmpty) {          // Only expand the unfoldable children          unfoldableChildren        } else {                  e.aggregateFunction.children.take(1).toSet        }    }    // 当有多个distinct聚合表达式时,进行expand    if (distinctAggGroups.size > 1) {      // 创建gid标志      val gid = AttributeReference("gid", IntegerType, nullable = false)()      val groupByMap = a.groupingExpressions.collect {        case ne: NamedExpression => ne -> ne.toAttribute        case e => e -> AttributeReference(e.sql, e.dataType, e.nullable)()      }      val groupByAttrs = groupByMap.map(_._2)      ....           }
      // 构建Expand算子      val expand = Expand(        regularAggProjection ++ distinctAggProjections,        groupByAttrs ++ distinctAggChildAttrs ++ Seq(gid) ++ regularAggChildAttrMap.map(_._2),        a.child)                .....  }

3、spark sql 处理 grouping sets

grouping sets 、rollup 、cube  是用来处理多维分析的函数:

  • grouping sets对分组集中指定的组表达式的每个子集执行group by,group by A,B grouping sets(A,B)就等价于 group by A union group by B,其中A和B也可以是一个集合,比如group by A,B,C grouping sets((A,B),(A,C))。

  • rollup在指定表达式的每个层次级别创建分组集。group by A,B,C with rollup首先会对(A、B、C)进行group by,然后对(A、B)进行group by,然后是(A)进行group by,最后对全表进行group by操作。

  • cube : 为指定表达式集的每个可能组合创建分组集。首先会对(A、B、C)进行group by,然后依次是(A、B),(A、C),(A),(B、C),(B),(C),最后对全表进行group by操作。

我们来看一下 spark是怎么处理grouping sets的:

explain select   count(1) from  table_a group by  netease_user,platform grouping sets (netease_user,platform )

执行计划如下:

== Physical Plan ==*(2) HashAggregate(keys=[id#147356, name#147357, spark_grouping_id#147353], functions=[count(1)])+- Exchange(coordinator id: 1061978911) hashpartitioning(id#147356, name#147357, spark_grouping_id#147353, 4096), coordinator[target post-shuffle partition size: 67108864]   +- *(1) HashAggregate(keys=[id#147356, name#147357, spark_grouping_id#147353], functions=[partial_count(1)])      +- *(1) Expand [List(id#147354, null, 1), List(null, name#147355, 2)], [id#147356, name#147357, spark_grouping_id#147353]         +- *(1) Project [id#147341 AS id#147354, name#147346 AS name#147355]            +- *(1) FileScan parquet table_a

grouping sets 用的也是expand 的方式

 

4、优化过程

4.1 定位问题

了解了count distinct 和 grouping sets 的原理,已经基本能知道哪里慢了,还是来看一下执行计划:

== Physical Plan ==+- *(4) HashAggregate(keys=[dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85], functions=[count(if ((gid#151 = 4)) tmp.`request_id`#155 else null), count(if ((gid#151 = 3)) tmp.`deviceid`#154 else null), count(if ((gid#151 = 1)) CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`bid_response_id` ELSE CAST(NULL AS STRING) END#152 else null), count(if ((gid#151 = 2)) CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`deviceid` ELSE CAST(NULL AS STRING) END#153 else null)], output=[netease_user#140, campaign_id#141, spec_id#142, app_bundle#143, render_name#144, platform#145, bid_request_num#146L, bid_request_uv#147L, offer_num#148L, offer_uv#149L, dt#150])   +- Exchange(coordinator id: 697663456) hashpartitioning(dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85, 888), coordinator[target post-shuffle partition size: 536870912]      +- *(3) HashAggregate(keys=[dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85], functions=[partial_count(if ((gid#151 = 4)) tmp.`request_id`#155 else null), partial_count(if ((gid#151 = 3)) tmp.`deviceid`#154 else null), partial_count(if ((gid#151 = 1)) CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`bid_response_id` ELSE CAST(NULL AS STRING) END#152 else null), partial_count(if ((gid#151 = 2)) CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`deviceid` ELSE CAST(NULL AS STRING) END#153 else null)], output=[dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85, count#160L, count#161L, count#162L, count#163L])         +- *(3) HashAggregate(keys=[dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85, CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`bid_response_id` ELSE CAST(NULL AS STRING) END#152, CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`deviceid` ELSE CAST(NULL AS STRING) END#153, tmp.`deviceid`#154, tmp.`request_id`#155, gid#151], functions=[], output=[dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85, CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`bid_response_id` ELSE CAST(NULL AS STRING) END#152, CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`deviceid` ELSE CAST(NULL AS STRING) END#153, tmp.`deviceid`#154, tmp.`request_id`#155, gid#151])            +- Exchange(coordinator id: 92102096) hashpartitioning(dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85, CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`bid_response_id` ELSE CAST(NULL AS STRING) END#152, CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`deviceid` ELSE CAST(NULL AS STRING) END#153, tmp.`deviceid`#154, tmp.`request_id`#155, gid#151, 888), coordinator[target post-shuffle partition size: 536870912]               +- *(2) HashAggregate(keys=[dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85, CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`bid_response_id` ELSE CAST(NULL AS STRING) END#152, CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`deviceid` ELSE CAST(NULL AS STRING) END#153, tmp.`deviceid`#154, tmp.`request_id`#155, gid#151], functions=[], output=[dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85, CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`bid_response_id` ELSE CAST(NULL AS STRING) END#152, CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`deviceid` ELSE CAST(NULL AS STRING) END#153, tmp.`deviceid`#154, tmp.`request_id`#155, gid#151])                  +- *(2) Expand [List(dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85, CASE WHEN (bid_response_nbr#71L = 10001) THEN bid_response_id#66 ELSE null END, null, null, null, 1), List(dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85, null, CASE WHEN (bid_response_nbr#71L = 10001) THEN deviceid#57 ELSE null END, null, null, 2), List(dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85, null, null, deviceid#57, null, 3), List(dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85, null, null, null, request_id#56, 4)], [dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85, CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`bid_response_id` ELSE CAST(NULL AS STRING) END#152, CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`deviceid` ELSE CAST(NULL AS STRING) END#153, tmp.`deviceid`#154, tmp.`request_id`#155, gid#151]                     +- *(2) Expand [List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, null, null, null, null, null, null, 63), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, null, null, null, null, null, 31), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, null, campaign_id#39, null, null, null, null, 47), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, null, null, spec_id#40, null, null, null, 55), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, null, null, null, app_bundle#41, null, null, 59), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, null, null, null, null, render_name#37, null, 61), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, null, null, null, null, null, platform#38, 62), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, campaign_id#39, null, null, null, null, 15), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, null, spec_id#40, null, null, null, 23), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, null, null, app_bundle#41, null, null, 27), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, null, null, null, render_name#37, null, 29), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, null, null, null, null, platform#38, 30), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, campaign_id#39, spec_id#40, null, null, null, 7), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, campaign_id#39, null, app_bundle#41, null, null, 11), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, campaign_id#39, null, null, render_name#37, null, 13), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, campaign_id#39, null, null, null, platform#38, 14), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, campaign_id#39, spec_id#40, app_bundle#41, null, null, 3), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, campaign_id#39, spec_id#40, null, render_name#37, null, 5), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, campaign_id#39, spec_id#40, null, null, platform#38, 6), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, campaign_id#39, spec_id#40, app_bundle#41, render_name#37, null, 1), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, campaign_id#39, spec_id#40, app_bundle#41, null, platform#38, 2), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, campaign_id#39, spec_id#40, app_bundle#41, render_name#37, platform#38, 0)], [request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85]                        +- *(2) HashAggregate(keys=[dt#80, req_netease_user#36, render_name#37, platform#38, campaign_id#39, spec_id#40, app_bundle#41, request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57], functions=[], output=[request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, campaign_id#39, spec_id#40, app_bundle#41, render_name#37, platform#38])                           +- Exchange(coordinator id: 1960388256) hashpartitioning(dt#80, req_netease_user#36, render_name#37, platform#38, campaign_id#39, spec_id#40, app_bundle#41, request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, 888), coordinator[target post-shuffle partition size: 536870912]                              +- *(1) HashAggregate(keys=[dt#80, req_netease_user#36, render_name#37, platform#38, campaign_id#39, spec_id#40, app_bundle#41, request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57], functions=[], output=[dt#80, req_netease_user#36, render_name#37, platform#38, campaign_id#39, spec_id#40, app_bundle#41, request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57])                                 +- *(1) Project [dt#80, if (isnull(req_netease_user#60)) null else req_netease_user#60 AS req_netease_user#36, if (isnull(render_name#61)) null else render_name#61 AS render_name#37, if (isnull(platform#62)) null else platform#62 AS platform#38, if (isnull(campaign_id#68)) null else campaign_id#68 AS campaign_id#39, if (isnull(spec_id#63)) null else spec_id#63 AS spec_id#40, if (isnull(app_bundle#58)) null else app_bundle#58 AS app_bundle#41, request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57]                                    +- *(1) Filter isnotnull(request_id#56)                                       +- *(1) FileScan parquet table_a


从执行计划中,我们能知道,总共做了两次expand,第一次是 grouping sets,因为有22个组合,数据量翻了22倍;第二次是count distinct 总共四个指标,数据量在22倍的基础上,又翻了4倍。 也就是说,最终数据量翻了88倍~~~

来看看日志吧:

spark sql多维分析优化——细节是魔鬼

总共四个Job 最慢的是1和2。

spark sql多维分析优化——细节是魔鬼

spark sql多维分析优化——细节是魔鬼

Job 0 执行读 table_a 表 并过滤后,有1977861971条数据;

Job 1 经过两次expand 操作后,有174051853448条数据,数据量翻了了88倍….

然而,在 expand 的过程中 partition 的个数是不变,但是数据量却急剧膨胀了

4.2 优化方案

任务慢的主要原因,还是数据倾斜,只是这次的倾斜不是单个partition倾斜,而是整个stage倾斜。

主要有两个思路:

1、增加 expand的过程中partition 的数量

针对这次的数据来说,如果要运行不受阻,大概预估一下需要444*88 也就是39072个partition ,同时启动太多task 会造成集群资源紧张,也会导致其它任务没有资源。并且数据是 逐日增加的,总体上不好控制。

2、缩减expand 的数据量

从业务上分析:

最细粒度是request_id,其次是bid_response_id  ,request_id是主键,在写逻辑时就没必要加distinct 关键字

另外对于count(distinct case when bid_response_nbr=10001 then bid_response_id else null end)、count(distinct case when bid_response_nbr=10001 then deviceid else null end) 这两个指标也可以做一些处理。

从sql结构上:

可以把计算的指标拆开,分两次计算,然后再 join。

总体的处理原则就是,让过滤掉的数据尽量的多,expand 时的数据尽量少:

--改写后的sqlwith query_base as    (           select                  dt,               if(req_netease_user is null, 'null', req_netease_user) as req_netease_user,               if(render_name is null, 'null', render_name) as render_name,               if(platform is null,'null', platform) as platform,               if(campaign_id is null, 'null', campaign_id) as campaign_id,               if(spec_id is null, 'null', spec_id) as spec_id,               if(app_bundle is null, 'null', app_bundle) as app_bundle,               request_id,               if(bid_response_nbr=10001,1,0) as is_nbr,               bid_response_id,               deviceid           from table_a  where dt = '2019-08-11' and request_id is not null        )
select        request_num.netease_user,        request_num.campaign_id,        request_num.spec_id,        request_num.app_bundle,        request_num.render_name,        request_num.platform,        bid_request_num,         bid_request_uv,        offer_num,        offer_uv,        request_num.dtfrom(        select            dt,            nvl( req_netease_user,'all' ) as netease_user,            nvl( campaign_id, 'all') as campaign_id,            nvl( spec_id , 'all') as spec_id,            nvl( app_bundle , 'all') as app_bundle,            nvl( render_name,'all') as render_name,            nvl( platform, 'all') as platform,            sum(request_num) as bid_request_num,            count(distinct deviceid) as bid_request_uv,            count(distinct case when is_nbr=1 then deviceid else null end) as offer_uv        from          (          select              dt,              req_netease_user,              campaign_id,              spec_id,              app_bundle,              render_name,               platform,              deviceid,              count(request_id) as request_num,              max(is_nbr) as is_nbr          from query_base group by dt, req_netease_user, campaign_id, spec_id, app_bundle, render_name, platform,deviceid        )tmp group by dt, req_netease_user, campaign_id, spec_id, app_bundle, render_name, platform        grouping sets(            (dt),            (dt, req_netease_user),            (dt, campaign_id),            (dt, spec_id),            (dt, app_bundle),            (dt, render_name),            (dt, platform),            (dt, req_netease_user, campaign_id),            (dt, req_netease_user, spec_id),            (dt, req_netease_user, app_bundle),            (dt, req_netease_user, render_name),            (dt, req_netease_user, platform),            (dt, req_netease_user, campaign_id, spec_id),            (dt, req_netease_user, campaign_id, app_bundle),            (dt, req_netease_user, campaign_id, render_name),            (dt, req_netease_user, campaign_id, platform),            (dt, req_netease_user, campaign_id, spec_id, app_bundle),            (dt, req_netease_user, campaign_id, spec_id, render_name),            (dt, req_netease_user, campaign_id, spec_id, platform),            (dt, req_netease_user, campaign_id, spec_id, app_bundle, render_name),            (dt, req_netease_user, campaign_id, spec_id, app_bundle, platform),            (dt, req_netease_user, campaign_id, spec_id, app_bundle, render_name, platform)        )) request_num join(
        select            dt,            nvl( req_netease_user,'all' ) as netease_user,            nvl( campaign_id, 'all') as campaign_id,            nvl( spec_id , 'all') as spec_id,            nvl( app_bundle , 'all') as app_bundle,            nvl( render_name,'all') as render_name,            nvl( platform, 'all') as platform,            count(distinct bid_response_id ) offer_num        from          (         select              dt,              req_netease_user,              campaign_id,              spec_id,              app_bundle,              render_name,               platform,                            bid_response_id          from query_base where is_nbr=1         )tmp group by dt, req_netease_user, campaign_id, spec_id, app_bundle, render_name, platform        grouping sets(            (dt),            (dt, req_netease_user),            (dt, campaign_id),            (dt, spec_id),            (dt, app_bundle),            (dt, render_name),            (dt, platform),            (dt, req_netease_user, campaign_id),            (dt, req_netease_user, spec_id),            (dt, req_netease_user, app_bundle),            (dt, req_netease_user, render_name),            (dt, req_netease_user, platform),            (dt, req_netease_user, campaign_id, spec_id),            (dt, req_netease_user, campaign_id, app_bundle),            (dt, req_netease_user, campaign_id, render_name),            (dt, req_netease_user, campaign_id, platform),            (dt, req_netease_user, campaign_id, spec_id, app_bundle),            (dt, req_netease_user, campaign_id, spec_id, render_name),            (dt, req_netease_user, campaign_id, spec_id, platform),            (dt, req_netease_user, campaign_id, spec_id, app_bundle, render_name),            (dt, req_netease_user, campaign_id, spec_id, app_bundle, platform),            (dt, req_netease_user, campaign_id, spec_id, app_bundle, render_name, platform)        )) request_uv on  request_num.netease_user=request_uv.netease_userand request_num.render_name=request_uv.render_nameand request_num.campaign_id=request_uv.campaign_idand request_num.spec_id=request_uv.spec_idand request_num.app_bundle=request_uv.app_bundleand request_num.platform=request_uv.platform;

4.3 效果

spark sql多维分析优化——细节是魔鬼

优化后只用5分钟,棒棒哒~~

5、总结

总体来说,expand 方式适合维度小的多维分析,这是因为 expand 方式读取数据的次数只有一次,但数据会膨胀n倍。

 


程序员灯塔 , 版权所有
转载请注明原文链接:https://www.wangt.cc/2019/08/0999b57253/
喜欢 (0)