Spark Sql之count distinct
 
- 学习内容
- spark 对count(distinct)的优化
- 数据膨胀原理
- 
- distinct数据膨胀
- grouping sets数据膨胀
- 开个坑
 
 
- distinct源码
- spark sql grouping sets
- 优化思路
- 
- 1、增加 expand的过程中partition 的数量
- 2、缩减expand 的数据量
 
- 参考
 
 
学习内容
 
spark 对count(distinct)的优化
 
 
 先说结论:spark sql和hive不一样,spark对count(distinct)做了group by优化
 
 
 
 在hive中count().
 hive往往只用一个 reduce 来处理全局聚合函数,最后导致数据倾斜;在不考虑其它因素的情况下,我们的优化方案是先 group by 再 count 。
 
 
select count(distinct id) from table_a 
select 
  count(id)
from
(
    select 
        id
    from table_a group by id
) tmp
 
 
 在使用spark sql 时,不用担心这个问题,因为 spark 对count distinct 做了优化:
 
 
explain 
select 
    count(distinct id),
    count(distinct name) 
from table_a
 
== Physical Plan ==
*(3) HashAggregate(keys=[], functions=[count(if ((gid#147005 = 2)) table_a.`id`#147007 else null), count(if ((gid#147005 = 1)) table_a.`name`#147006 else null)])
+- Exchange SinglePartition
   +- *(2) HashAggregate(keys=[], functions=[partial_count(if ((gid#147005 = 2)) table_a.`id`#147007 else null), partial_count(if ((gid#147005 = 1)) table_a.`name`#147006 else null)])
      +- *(2) HashAggregate(keys=[table_a.`name`#147006, table_a.`id`#147007, gid#147005], functions=[])
         +- Exchange(coordinator id: 387101114) hashpartitioning(table_a.`name`#147006, table_a.`id`#147007, gid#147005, 4096), coordinator[target post-shuffle partition size: 67108864]
            +- *(1) HashAggregate(keys=[table_a.`name`#147006, table_a.`id`#147007, gid#147005], functions=[])
               +- *(1) Expand [List(name#146984, null, 1), List(null, id#146979, 2)], [table_a.`name`#147006, table_a.`id`#147007, gid#147005]
                  +- *(1) Project [id#146979, name#146984]
                     +- *(1) FileScan parquet table_a
 
数据膨胀原理
 
 
 从上述执行计划可以看到,expand,那为什么为产生数据膨胀呐?
 
 
 
 distinct算子在处理过程中是将distinct后的字段和group by字段共同作为key传入reduce,导致shuffle前map阶段没有预聚合,同时shuffle时网络传输数据量过大消耗增加,对reduce处理时负载也增大
 
 
 
 distinct算子在处理过程中会将原有数据膨胀,有N个DISTINCT关键字数据就会在map端膨胀N倍,同时对shuffle和reduce的长尾影响(原因1)也会扩大N
 
 

 
 
 expand 之后,再以id、name 为 key 进行HashAggregate 也就是 group by ,这样以来,就相当于去重了。后面直接计算count (id) 、 count(name) 就可以,把数据分而治之。 在一定程度上缓解了数据倾斜。
 
 
distinct数据膨胀
 
 val sql:String =
    s"""
       |select
       |  count(distinct sha1),
       |  count(distinct task_id),
       |  count(distinct task_type)
       |from tmp
       |""".stripMargin
    val df2: DataFrame = session.sql(sql)
    df2.show()
    df2.explain(true)
 

 
grouping sets数据膨胀
 
    val sql1:String =
      s"""
         |select
         |  count(sha1),
         |  count(task_id),
         |  count(task_type)
         |from (
         |select sha1,task_id,task_type
         |from tmp
         |group by grouping sets(sha1, task_id, task_type)
         |)
         |""".stripMargin
    val df22: DataFrame = session.sql(sql1)
    df22.explain(true)
    df22.show()
 

 
开个坑
 
 
 在spark sql里面小数据量的话,count(distinct)和gruop by的执行时间是差不多的,
 但是我看到有篇文章介绍的是大数据量的distinct和group by的对比,说的是大数据量的话无法在内存里HashAggregate也就是group by,两者的执行时间的差距还是很大的。具体的还没测试。。。
 
 
distinct源码
 
def rewrite(a: Aggregate): Aggregate = {
    
    val aggExpressions = a.aggregateExpressions.flatMap { e =>
      e.collect {
        case ae: AggregateExpression => ae
      }
    }
    
    val distinctAggGroups = aggExpressions.filter(_.isDistinct).groupBy { e =>
        val unfoldableChildren = e.aggregateFunction.children.filter(!_.foldable).toSet
        if (unfoldableChildren.nonEmpty) {
          
          unfoldableChildren
        } else {        
          e.aggregateFunction.children.take(1).toSet
        }
    }
    
    if (distinctAggGroups.size > 1) {
      
      val gid = AttributeReference("gid", IntegerType, nullable = false)()
      val groupByMap = a.groupingExpressions.collect {
        case ne: NamedExpression => ne -> ne.toAttribute
        case e => e -> AttributeReference(e.sql, e.dataType, e.nullable)()
      }
      val groupByAttrs = groupByMap.map(_._2)
      ....     
      }
      
      val expand = Expand(
        regularAggProjection ++ distinctAggProjections,
        groupByAttrs ++ distinctAggChildAttrs ++ Seq(gid) ++ regularAggChildAttrMap.map(_._2),
        a.child)        
        .....
  }
 
 
 重点代码:
 //todo 当有多个distinct聚合表达式时,进行expand
 if (distinctAggGroups.size > 1) { expand }
 
 
spark sql grouping sets
 
 
 grouping sets 、rollup 、cube 是用来处理多维分析的函数:
 
 
 
 grouping sets:对分组集中指定的组表达式的每个子集执行group by,group by A,B grouping sets(A,B)就等价于 group by A union group by B,其中A和B也可以是一个集合,比如group by A,B,C grouping sets((A,B),(A,C))。
 
 
 
 rollup:在指定表达式的每个层次级别创建分组集。group by A,B,C with rollup首先会对(A、B、C)进行group by,然后对(A、B)进行group by,然后是(A)进行group by,最后对全表进行group by操作。
 
 
 
 cube : 为指定表达式集的每个可能组合创建分组集。首先会对(A、B、C)进行group by,然后依次是(A、B),(A、C),(A),(B、C),(B),©,最后对全表进行group by操作。
 
 
前文也说了,grouping sets也是利用expand的方式
 
优化思路
 
 
 上文我们基本可以了解到了,是由于expand导致的慢,优化方向可以朝着减少distinct关键的出现的次数,减少数据膨胀方向入手
 
 
1、增加 expand的过程中partition 的数量
 
 
 但是这样有一个弊端:同时启动太多task 会造成集群资源紧张,也会导致其它任务没有资源。并且数据是 逐日增加的,总体上不好控制。
 
 
2、缩减expand 的数据量
 
 
 从sql结构上:
 可以把计算的指标拆开,分两次计算,然后再 join。
 总体的处理原则就是,让过滤掉的数据尽量的多,expand 时的数据尽量少:
 
 
参考
 
参考博客