动态过滤

本节介绍openLooKeng动态过滤特性。动态过滤适用于高选择性join场景,即大多数的probe侧的表在读取之后由于不匹配join条件而被过滤掉。 openLooKeng在查询运行时,依靠join条件以及build侧读出的数据,生成动态过滤条件,并作为额外的过滤条件应用到probe侧表的table scan阶段,从而减少参与join操作的probe表的数据量,有效地减少IO读取与网络传输。

适用场景

openLooKeng动态过滤主要应用于高选择性的join场景(包含针对分区表的分区裁剪以及非分区表的行过滤)。openLooKeng动态过滤当前适用于inner joinsemi join 以及right join场景,适用于Hive connectorDC connector以及Memory connector

使用

openLooKeng动态过滤特性依赖于分布式缓存组件,请参考State Store章节配置。

/etc/config.properties需要配置如下参数

enable-dynamic-filtering=true
dynamic-filtering-data-type=BLOOM_FILTER
dynamic-filtering-max-per-driver-size=100MB
dynamic-filtering-max-per-driver-row-count=10000
dynamic-filtering-bloom-filter-fpp=0.1

上述属性说明如下:

  • enable-dynamic-filtering:是否开启动态过滤特性。
  • dynamic-filtering-wait-time:等待动态过滤条件生成的最长等待时间,默认值是1s。
  • dynamic-filtering-data-type:设置动态过滤类型,可选包含BLOOM_FILTER以及HASHSET,默认类型为BLOOM_FILTER
  • dynamic-filtering-max-size: 每个dynamic filter的大小上限,如果预估大小超过设定值,代价优化器不会生成对应的dynamic filter,默认值是1000000。
  • dynamic-filtering-max-per-driver-size:每个driver可以收集的数据大小上限,默认值是1MB。
  • dynamic-filtering-max-per-driver-row-count:每个driver可以收集的数据条目上限,默认值是10000。
  • dynamic-filtering-bloom-filter-fpp:动态过滤使用的bloomfilter的FPP值,默认是0.1。

如果应用于Hive connector,需要对catalog/hive.properties如下修改:

hive.dynamic-filter-partition-filtering=true
hive.dynamic-filtering-row-filtering-threshold=5000

上述属性说明如下:

  • hive.dynamic-filter-partition-filtering:使用动态过滤条件根据分区值进行预先过滤,默认值是false。
  • hive.dynamic-filtering-row-filtering-threshold:如果动态过滤条件大小低于阈值,则应用行过滤,默认值是2000。

执行计划

下面的例子展示了SQL语句如何应用动态过滤条件,在执行计划中标记为dynamicFilter。 可以使用explain命令查看动态过滤是否应用,也可以在webUI中的liveplan查看当前执行是否应用动态过滤。

create table table1 (id integer, year varchar);
create table table2 (id integer, total integer);
insert into table1 values (1, '2019'), (2, '2020'), (3, '2021');
insert into table2 values (1, 100), (2, 200);

Inner join:

explain select t1.id, t1.year from table1 t1, table2 t2 where t1.id = t2.id and t2.total = 200;
Query Plan
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Output[id, year]                                                                                                                                                           
   Layout: [id:integer, year:varchar]                                                                                                                                     
   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                                                                                                
└─ RemoteExchange[GATHER]                                                                                                                                                  
   Layout: [year:varchar, id:integer]                                                                                                                                  
   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                                                                                             
└─ InnerJoin[("id" = "id_0")][$hashvalue, $hashvalue_9]                                                                                                                 
   Layout: [id:integer, year:varchar]                                                                                                                               
   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                                                                                          
   Distribution: PARTITIONED                                                                                                                                        
   dynamicFilterAssignments = {id_0 -> 238}                                                                                                                         
├─ RemoteExchange[REPARTITION][$hashvalue]                                                                                                                           
     Layout: [id:integer, year:varchar, $hashvalue:bigint]                                                                                                         
     Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}                                                                                                      
  └─ ScanFilterProject[table = memory:0, filterPredicate = true, dynamicFilter = {238 -> "id"}]                                                                     
         Layout: [id:integer, year:varchar, $hashvalue_8:bigint]                                                                                                    
         Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}    
         $hashvalue_8 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("id"), 0))                                                                       
         year := 1                                                                                                                                                  
         id := 0                                                                                                                                                    
└─ LocalExchange[HASH][$hashvalue_9] ("id_0")                                                                                                                        
   Layout: [id_0:integer, $hashvalue_9:bigint]                                                                                                                   
   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}                                                                                                      
└─ RemoteExchange[REPARTITION][$hashvalue_10]                                                                                                                     
   Layout: [id_0:integer, $hashvalue_10:bigint]                                                                                                               
   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}                                                                                                   
└─ ScanFilterProject[table = memory:5, filterPredicate = ("total" = 200)]                                                                                      
Layout: [id_0:integer, $hashvalue_11:bigint]                                                                                                            
Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
$hashvalue_11 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("id_0"), 0))                                                                 
total := 1                                                                                                                                              
id_0 := 0

Semi join:

explain select * from table1 where id in (select id from table2);
Query Plan
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Output[id, year]                                                                                                                                                              
   Layout: [id:integer, year:varchar]                                                                                                                                        
   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                                                                                                   
└─ RemoteExchange[GATHER]                                                                                                                                                     
   Layout: [id:integer, year:varchar]                                                                                                                                     
   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                                                                                                
└─ FilterProject[filterPredicate = "expr_6"]                                                                                                                               
   Layout: [id:integer, year:varchar]                                                                                                                                  
   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}/{rows: ? (?), cpu: ?, memory: ?, network: ?}                                                                
└─ Project[]                                                                                                                                                            
   Layout: [id:integer, year:varchar, expr_6:boolean]                                                                                                               
   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                                                                                          
└─ SemiJoin[id = id_1][$hashvalue, $hashvalue_16]                                                                                                                    
   Layout: [id:integer, year:varchar, $hashvalue:bigint, expr_6:boolean]                                                                                         
   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                                                                                       
   Distribution: PARTITIONED                                                                                                                                     
   dynamicFilterId: 279                                                                                                                                          
├─ RemoteExchange[REPARTITION][$hashvalue]                                                                                                                        
     Layout: [id:integer, year:varchar, $hashvalue:bigint]                                                                                                      
     Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}                                                                                                   
  └─ ScanFilterProject[table = memory:0, filterPredicate = true, dynamicFilter = {279 -> "id"}]                                                                  
         Layout: [id:integer, year:varchar, $hashvalue_15:bigint]                                                                                                
         Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
         $hashvalue_15 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("id"), 0))                                                                   
         year := 1                                                                                                                                               
         id := 0                                                                                                                                                 
└─ LocalExchange[SINGLE] ()                                                                                                                                       
   Layout: [id_1:integer, $hashvalue_16:bigint]                                                                                                               
   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}                                                                                                   
└─ RemoteExchange[REPARTITION - REPLICATE NULLS AND ANY][$hashvalue_17]                                                                                        
   Layout: [id_1:integer, $hashvalue_17:bigint]                                                                                                            
   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}                                                                                                
└─ ScanProject[table = memory:5]                                                                                                                            
Layout: [id_1:integer, $hashvalue_18:bigint]                                                                                                         
Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}                                             
$hashvalue_18 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("id_1"), 0))                                                              
id_1 := 0