SparkSQL Catalyst - 基于代价优化(CBO)

达芬奇密码2018-07-19 12:31
CBO基本原理
文章《 瞄一眼,带你走进SparkSQL的世界 》主要讲解了SparkSQL优化器核心组件 - Catalyst的整个工作流程,重点说明了基于规则的优化策略(CRO)。CRO是一种经验式、启发式的优化思路,优化规则都已经预先定义好,只需要将SQL往这些规则上套就可以。 说白了,CRO就像是一个经验丰富的老司机,基本套路全都知道
然而世界上有一种东西叫做 - 不按套路来,与其说它不按套路来,倒不如说它本身就没有什么套路而言。最典型的莫过于复杂Join算子,对于这些复杂的Join来说,通常有两个对优化相当重要的问题需要决定:
1. 该Join应该选择哪种策略来执行?BroadcastJoin or ShuffleHashJoin or SortMergeJoin?不同的执行策略对系统的资源要求不同,执行效率也有天壤之别,同一个SQL,选择到合适的策略执行可能只需要几秒钟,而如果没有选择到合适的执行策略就可能会导致系统OOM。
2. 对于雪花模型或者星型模型来讲,多表Join应该选择什么样的顺序执行?不同的Join顺序意味着不同的执行效率,比如A join B join C,A、B表都很大,C表很小,那A join B很显然需要大量的系统资源来运算,执行时间肯定不会短。而如果使用A join C join B的执行顺序,因为C表很小,所以A join C会很快得到结果,而且结果集会很小,再使用小的结果集 join B,结果必然也会很小。
首先来看第一个问题,当前SparkSQL会让用户指定参数'spark.sql.autoBroadcastJoinThreshold’来决定是否采用BroadcastJoin策略,简单来说,它会选择参与Join的两表中的小表大小与该值进行对比,如果小表大小小于该配置值,就将此表进行广播;否则采用SortMergeJoin策略。对于SparkSQL采取的方式,有两个问题需要深入分析:
  • 参数'spark.sql.autoBroadcastJoinThreshold’指定的是表的大小(size),而不是条数。这样完全合理吗?我们知道Join算子会将两表中具有相同key的记录合并在一起,因此Join的复杂度只与两表的数据条数有关,而与表大小(size)没有直接的关系。这样一想,其实参数'spark.sql.autoBroadcastJoinThreshold’应该更多地是考虑广播的代价,而不是Join本身的代价。
  • 之前Catalyst文章中我们讲到谓词下推规则,Catalyst会将很多过滤条件下推到Join之前,因此参与Join的两表大小并不应该是两张原始表的大小,而是经过过滤后的表数据大小。因此,单纯的知道原始表大小还远远不够,Join优化还需要评估过滤后表数据大小以及表数据条数。
对于第二个问题,也面临和第一个问题同样的两点缺陷,举个简单的例子:
  • SQL:select * from A , B , C where A.id = B.b_id and A.id = C.c_id and C.c_id > 100
  • 假设:A、B、C总纪录大小分别是100m,40m,60m,C.c_id > 100过滤后C的总纪录数会降到10m
上述SQL是一个典型的A join B join C的多Join示例,很显然,A肯定在最前面,现在问题是B和C的Join顺序,是A join B join C还是A join C join B。对于上面的示例,优化器会有两种最基本的选择,第一就是按照用户手写的Join顺序执行,即按照‘A.id = B.b_id and A.id = C.c_id ’顺序, 得到的执行顺序是A join B join C。第二是按照A、B 、C三表的原始大小进行组织排序,原始表小的先Join,原始表大的后Join,适用这种规则得到的顺序依然是A join B join C,因为B的记录大小小于C的记录大小。
同样的道理,第一个缺陷很明显,记录大小并不能精确作为Join代价的计算依据,而应该是记录条数。第二就是对于过滤条件的忽略,上述示例中C经过过滤后的大小降到10m,明显小于B的40m,因此实际上应该执行的顺序为A join C join B,与上述得到的结果刚好相反。
可见,基于规则的优化策略并不适合复杂Join的优化,此时就需要另一种优化策略 - 基于代价优化(CBO)。基于代价优化策略实际只做两件事,就是解决上文中提到的两个问题:
  1. 解决参与Join的数据表大小并不能完全作为计算Join代价依据的问题,而应该加入数据记录条数这个维度
  2. 解决Join代价计算应该考虑谓词下推(等)条件的影响,不能仅仅关注原始表的大小。这个问题看似简单,实际上很复杂,需要优化器将逻辑执行计划树上的每一个节点的<数据量,数据条数>都评估出来,这样的话,评估Join的执行策略、执行顺序就不再以原始表大小作为依据,而是真实参与Join的两个数据集的大小条数作为依据。
CBO实现思路
经过上文的解释,可以明确CBO的本质就是计算LogionPlan每个节点的输出数据大小与数据条数,作为复杂Join算子的代价计算依据。逻辑执行计划树的叶子节点是原始数据,往上会经过各种过滤条件以及其他函数的限制,父节点依赖于子节点。整个过程可以变换为子节点经过特定计算评估父节点的输出,计算出来之后父节点将会作为上一层的子节点往上计算。因此,CBO可以分解为两步:
  1. 一次性计算出原始数据的相关数据
  2. 再对每类节点制定一种对应的评估规则就可以自下往上评估出所有节点的代价值
一次性计算出原始表的相关数据
这个操作是CBO最基础的一项工作,在计算之前,我们需要明确“相关数据”是什么?这里给出核心的统计信息如下:
  • estimatedSize: 每个LogicalPlan节点输出数据大小(解压)
  • rowCount: 每个LogicalPlan节点输出数据总条数
  • basicStats: 基本列信息,包括列类型、Max、Min、number of nulls, number of distinct values, max column length, average column length等
  • Histograms: Histograms of columns, i.e., equi-width histogram (for numeric and string types) and equi-height histogram (only for numeric types).
至于为什么要统计这么多数据,下文会讲到。现在再来看如何进行统计,有两种比较可行的方案:
1. 打开所有表扫描一遍,这样最简单,而且统计信息准确,缺点是对于大表来说代价比较大。hive和impala目前都采用的这种方式:
(1)hive统计原始表命令:analyse table ***
(2)impala统计原始表明了:compute stats ***
2. 针对一些大表,扫描一遍代价太大,可以采用采样(sample)的方式统计计算
代价评估规则&计算所有节点统计信息
代价评估规则意思是说在当前子节点统计信息的基础上,计算父节点相关统计信息的一套规则。 对于不同谓词节点,评估规则必然不一样,比如fliter、group by、limit等等的评估规则不同。 假如现在已经知道表C的基本统计信息,对于SQL: select * from A , B , C where A.id = B.b_id and A.id = C.c_id and C.c_id > N  这个条件,如何评估经过C.c_id > N过滤后的基本统计信息。我们来看看:
1. 假设当前已知C列的最小值c_id.Min、最大值c_id.Max以及总行数c_id.Distinct,如下图所示:

2. 现在分别有三种情况需要说明,其一是N小于c_id.Min,其二是N大于c_id.Max,其三是N介于c_id.Min和c_id.Max之间。前两种场景是第三种场景的特殊情况,这里简单的针对第三种场景说明。如下图所示:

在C.c_id > N过滤条件下,c_id.Min会增大到N,c_id.Max保持不变。而过滤后总行数c_id.distinct(after filter) = (c_id.Max - N) / (c_id.Max - c_id.Min) * c_id.distinct(before filter)
当然,上述计算只是示意性计算,真实算法会复杂很多。另外,如果大家对group by 、limit等谓词的评估规则比较感兴趣的话,可以阅读 SparkSQL CBO设计文档 ,在此不再赘述。
至此,通过各种评估规则就可以计算出语法树中所有节点的基本统计信息,当然最重要的是参与Join的数据集节点的统计信息。最后只需要根据这些统计信息选择最优的Join算法以及Join顺序,最终得到最优的物理执行计划。
Hive - CBO优化效果
Hive本身没有去从头实现一个SQL优化器,而是借助于Apache Calcite ,Calcite是一个开源的、基于CBO的企业级SQL查询优化框架,目前包括Hive、Phoniex、Kylin以及Flink等项目都使用了Calcite作为其执行优化器,这也很好理解,执行优化器本来就可以抽象成一个系统模块,并没有必要花费大量时间去重复造轮子。
hortonworks曾经对Hive的CBO特性做了相关的测试,测试结果认为CBO至少对查询有三个重要的影响:Join ordering optimization、Bushy join support以及Join simplification,本文只简单介绍一下Join ordering optimization,有兴趣的同学可以继续阅读这篇文章来更多地了解其他两个重要影响。(下面数据以及示意图也来自于该篇文章,特此注明)
hortonworks对TPCDS的部分Query进行了研究,发现对于大部分星型\雪花模型,都存在多Join问题,这些Join顺序如果组织不好,性能就会很差,如果组织得当,性能就会很好。比如Query Q3:
select
    dt.d_year,
    item.i_brand_id brand_id,
    item.i_brand brand,
    sum(ss_ext_sales_price) sum_agg
from
    date_dim dt,
    store_sales,
    item
where
    dt.d_date_sk = store_sales.ss_sold_date_sk
and store_sales.ss_item_sk = item.i_item_sk
and item.i_manufact_id =436
and dt.d_moy =12
groupby dt.d_year , item.i_brand , item.i_brand_id
order by dt.d_year , sum_agg desc , brand_id
limit 10
上述Query涉及到3张表,一张事实表store_sales(数据量大)和两张维度表(数据量小),三表之间的关系如下图所示:

这里就涉及上文提到的Join顺序问题,从原始表来看,date_dime有73049条记录,而item有462000条记录。很显然,如果没有其他暗示的话,Join顺序必然是store_sales join date_time join item。但是,where条件中还带有两个条件,CBO会根据过滤条件对过滤后的数据进行评估,结果如下:
Table
Cardinality
Cardinality after filter 
Selectivity
date_dim
73,049
6200
8.5%
item
462,000
484
0.1%
根据上表所示,过滤后的数据量item明显比date_dim小的多,剧情反转的有点快。于是乎,经过CBO之后Join顺序就变成了store_sales join item join date_time,为了进一步确认,可以在开启CBO前后分别记录该SQL的执行计划,如下图所示:

左图是未开启CBO特性时Q3的执行计划,store_sales先与date_dim进行join,join后的中间结果数据集有140亿条。而再看右图,store_sales先于item进行join,中间结果只有8200w条。很显然,后者执行效率会更高,实践出真知,来看看两者的实际执行时间:
Test Query Response Time(seconds) Intermediate Rows CPU(seconds)
Q3 CBO OFF 255 13,987,506,884 51,967
Q3 CBO ON 142 86,217,653 35,036
上图很明显的看出Q3在CBO的优化下性能将近提升了1倍,与此同时,CPU资源使用率也降低了一半左右。不得不说,TPCDS中有很多相似的Query,有兴趣的同学可以深入进一步深入了解。
Impala - CBO优化效果
和Hive优化的原理相同,也是针对复杂join的执行顺序、Join的执行策略选择优化等方面进行的优化,本人使用TPC-DS对Impala在开启CBO特性前后的部分Query进行了性能测试,测试结果如下图所示:

参考资料:

3. Enhancing Spark SQL Optimizer with Reliable Statistics : http://www.slideshare.net/JenAman/enhancing-spark-sql-optimizer-with-reliable-statistics
4. Cost-based Optimizer framework : https://issues.apache.org/jira/browse/SPARK-16026

本文来自网易实践者社区,经作者范欣欣授权发布。