【kudu pk parquet】runtime filter实践

阿凡达2018-07-10 12:31

已经有好一阵子没有写博文了,今天给大家带来一篇最近一段时间开发相关的文章:在impala和kudu上支持runtime filter。

大家搜索下实践者社区,可以发现前面已经有好几位同学写了这个主题的博文(都是我们组的^_^),说明这个功能在数据库领域的重要性,所以,嘿嘿,再敲一遍黑板:“重点,必考题!”
附上年初测试kudu时候的博文《 【大数据之数据仓库】kudu性能测试报告分析》作为背景。
背景准备
为了生动、立体的给大家展示runtime filter功能,这里就以一个具体的sql例子来讲解。
  • 表结构:
       create table orders
       (
           o_orderkey      bigint,        ->   主键,也是分区键(分布式数据库用于数据分片)
           o_custkey       bigint,        ->   外键,同customer.c_custkey
           o_orderstatus   string, 
           o_totalprice    double, 
           o_orderdate     string, 
           o_orderpriority string, 
           o_clerk         string, 
           o_shippriority  bigint, 
           o_comment       string
       )
       create table customer
       (
           c_custkey       bigint,        ->   主键,也是分区键
           c_name          string, 
           c_address       string, 
           c_nationkey     bigint, 
           c_phone         string, 
           c_acctbal       double, 
           c_mktsegment    string, 
           c_comment       string
       )
  • 测试sql:
select c.*  from orders o join customer c on c.c_custkey = o.o_custkey where o_orderkey = 1125;
我们用业界的TPC-H工具生成1TB的测试数据,使用上面的sql语句来测试orders和customer两表关联。
很多对数据库熟悉的同学会说,简单:从orders表里用“o_orderkey = 1125”条件过滤出o_custkey字段,再用“返回的o_orderkey值”作为条件到customer表里过滤出全部字段。
对,完全正确!!
可是,那是受过专业数据库知识培训的你的大脑的优化器的优化结果(比人工智能还智能,哈哈),但对于数据库计算引擎,则未必会如你般冰雪聪明。
不信?找你熟悉的数据库,用上面的表结构和sql语句,伪造几条数据测试一下^_^
那么,计算引擎是如何处理的呢?
以MPP的impala为例,简短讲就是先扫描(读)一张表,比如这里的orders表,把扫描结果保存到hash数据结构里,然后再扫描另一张表,比如这里的customer表,把扫描的结果到前面的hash数据结构里找(关联字段),匹配的就是关联到的结果。
修改前测试
我们以impala计算引擎对接kudu存储引擎为例,拿修改之前的版本测试:
  • 在impala-shell执行sql:
    
  • 查看profile:
       F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
       PLAN-ROOT SINK
       |  mem-estimate=0B mem-reservation=0B
       |
       04:EXCHANGE [UNPARTITIONED]
       |  mem-estimate=0B mem-reservation=0B
       |  tuple-ids=1,0 row-size=251B cardinality=2
       |
       F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
       02:HASH JOIN [INNER JOIN, BROADCAST]            ->  以hashJoin的方式两表关联
       |  hash predicates: c.c_custkey = o.o_custkey   ->  用“c.c_custkey = o.o_custkey”条件关联
       |  mem-estimate=9B mem-reservation=136.00MB
       |  tuple-ids=1,0 row-size=251B cardinality=2
       |
       |--03:EXCHANGE [BROADCAST]
       |  |  mem-estimate=0B mem-reservation=0B
       |  |  tuple-ids=0 row-size=8B cardinality=1
       |  |
       |  F01:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
       |  00:SCAN KUDU [kudu_1000g.orders o]             ->  扫描orders表
       |     kudu predicates: o_orderkey = 1125          ->  用“o_orderkey = 1125”条件过滤
       |     mem-estimate=0B mem-reservation=0B
       |     tuple-ids=0 row-size=8B cardinality=1
       |
       01:SCAN KUDU [kudu_1000g.customer c]              ->  扫描customer表
          mem-estimate=0B mem-reservation=0B             ->  没有过滤条件,返回全部数据
          tuple-ids=1 row-size=243B cardinality=150000000
      profile中已经做了一些批注,概括一下就是:
a.用“o_orderkey = 1125”条件扫描orders表,把返回的结果放入hash数据结构中;
        b.再全表扫描customer表,返回所有的数据,返回过程中逐批同前面hash数据结构中的数据进行匹配,匹配成功的保存到结果集合中。
  • 查看plan:
    
    显而易见,plan中显示的数据更加直观,并且把耗时长的节点都标记成了红色。
    我们可以看到,左边红色扫描customer表,活生生把全表(总共1.5亿条记录)的全部字段都扫描上来了,磁盘扫描开销、网络传输开销,还有大数据集合关联带来的CPU计算开销,所以耗时很长,达到了37秒钟。
修改后测试
在前面,我们有提到更聪明、更高效的方法,那是否可以实现呢?答案是肯定的,我们确实把扫描orders表的返回结果应用到了扫描customer表的扫描节点中,作为 动态谓词下发了。
术语:
  1. 谓词,就是filter或者过滤器,条件表达式;
  2. 静态则表示的是来自于sql语句本身,动态即运行过程中产生,也即runtime;
  3. 动态谓词就是runtime filter。
我们拿修改之后版本测试:
  • 在impala-shell执行sql:
    
  • 查看profile:
       F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
       PLAN-ROOT SINK
       |  mem-estimate=0B mem-reservation=0B
       |
       04:EXCHANGE [UNPARTITIONED]
       |  mem-estimate=0B mem-reservation=0B
       |  tuple-ids=1,0 row-size=251B cardinality=2
       |
       F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
       02:HASH JOIN [INNER JOIN, BROADCAST]            ->  以hashJoin的方式两表关联
       |  hash predicates: c.c_custkey = o.o_custkey   ->  用“c.c_custkey = o.o_custkey”条件关联
       |  runtime filters: RF000 <- o.o_custkey        ->  这里生成了1个runtime filter
       |  mem-estimate=9B mem-reservation=136.00MB
       |  tuple-ids=1,0 row-size=251B cardinality=2
       |
       |--03:EXCHANGE [BROADCAST]
       |  |  mem-estimate=0B mem-reservation=0B
       |  |  tuple-ids=0 row-size=8B cardinality=1
       |  |
       |  F01:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
       |  00:SCAN KUDU [kudu_1000g.orders o]            ->  扫描orders表
       |     kudu predicates: o_orderkey = 1125         ->  用“o_orderkey = 1125”条件过滤
       |     mem-estimate=0B mem-reservation=0B
       |     tuple-ids=0 row-size=8B cardinality=1
       |
       01:SCAN KUDU [kudu_1000g.customer c]             ->  扫描customer表
          runtime filters: RF000 -> c.c_custkey         ->  这里应用了1个runtime filter
          mem-estimate=0B mem-reservation=0B
          tuple-ids=1 row-size=243B cardinality=150000000
  • 查看plan:
    
    跟修改前比较,可以发现左边红色部分只返回了1条记录。
  
所以,总结上面的对比可以发现性能:43.08秒对2.04秒,足足提升了20倍!

相关分享
  1. 对于runtime filter,我们需要明白谁产生和谁使用的关系,前者仅由关联节点生成,而后者仅由扫描节点使用,两者都属于计算引擎。其中扫描节点在使用runtime filter上有两种方式,一种是把runtime filter直接推送到存储引擎,离数据最近,理论上效果肯定是最佳的,我们选择的正是这种方式;还有一种是在扫描节点上过滤,把远端数据全部读取过来进行本地过滤,可以减少流入上层关联节点的数据量,比如parquet就是这种方式。这里有必要说明下parquet的特殊之处,它可以选择采用hdfs的short circuit,简短的理解:作为分布式文件系统的hdfs,它的数据文件是以block文件块的形式组织起来的,而parquet的数据是放在一个个的block上,在impala和hdfs配对部署的前提下,当impala把需要扫描block文件块的计算任务分配到block文件块所在的impala节点上,那这个impala计算节点就可以直接通过操作系统的文件系统读block文件块,省去了hdfs分布式文件系统的中间层传输开销;
  2. runtime filter的类型可以有很多种:包括min/max(范围区间,或者大于、小于)、in list(数组)、bloom filter(布隆过滤器)、equality(等值)等,但是在目前的impala里仅支持bloom filter,这是万金油,最方便实现,后续我们可以考虑引入其他的类型,降低存储引擎扫描时候的计算量(节约CPU计算时间)。从kudu官方来看,一直建议使用min/max或者in list的方式进行下推,估计同修改的工作量有关,因为它目前的通信协议是不支持bloom filter这种谓词下发,而且两边(impala和 kudu)的bloom filter算法也是不一样的;
  3. 分布式计算引擎,对扫描返回的数据做重分布(repartition或者shuffle)后,会生成一个统一的runtime filter,这个工作由coordinator集中merge再分发给各个计算节点,并且在左子树上,只要关联字段一样,它会一直推送到最底层的扫描节点;同一个列,多份runtime filter、多种谓词,通过merge的方式进行合并,比如bloom filter + range组合,range + range组合等等;
  4. 通常数据扫描节点在启动扫描以后,就不会再更新过滤器,也即不会再下发新的谓词,因为本身这个过程就已经比较复杂。但是我们的修改,可以支持在扫描过程的中间(mid-scan),把新的runtime filter下发下去,并且在kudu存储引擎层进行直接应用,这对于缩小返回的数据集非常有帮助;
  5. 最后一个是关于runtime filter应用于裁剪数据分片,这个意义也比较大,决定着响应时间。可以分两步:第一步是针对分区键,比较容易理解,就是启动扫描或者扫描的中间,把不需要扫描的数据分片直接跳过,有同学可能会说,关联键不一定是分区键哦,是的,这时,我们就需要第二步,针对非分区键的索引(俗称二级索引),实现上可以有多种方案,比如针对分片的min/max或者bitmap等,但是工作量都不小呢:(;
写在最后
说了半天,很多同学是否被runtime filter、过滤器、谓词、条件表达式给搞迷糊了?其实是同一个概念,用中间过程产生的数据构造出一个条件,并且应用于下一个阶段,对于数据计算效率的提升非常有帮助,这就是runtime filter存在的价值。

本文来自网易实践者社区,经作者何李夫授权发布。