Skip to content
mengshi edited this page Jan 24, 2014 · 17 revisions

执行器概况

    TDDL的执行器,根据执行计划,到指定的数据节点进行查询操作,并在指定节点上完成相应的聚合操作,最终返回一个ResultSet给用户。

   

执行器架构



 

    执行器分为两个部分,Cursor和Handler。

    Cusor是一种装饰器,主要方法为next(),每一种Cursor只完成单一的一项功能。例如,Limit Cursor用于从底层Cursor取出指定条数的记录,Column Alias Cursor用于将从底层取出来的数据进行别名替换。Cursor以Cursor树的形式存在。

    Handler用于将执行计划翻译成Cursor树。对应于执行计划中的节点类型,用于查询的Handler也分为三种

  • MergeHandler
  • JoinHandler
  • QueryHandler

Handler



 

QueryHandlerCommon

    此为所有用于查询的Handler的基类,包含各个Handler在完成各自特有的查询功能后的一些共用的后续处理操作,这些操作包括并按照以下顺序进行:

  1. 调用子类的doQuery(),方法,生成该执行计划节点的基础Cursor,如Join节点会生成相应策略的JoinCursor,Merge节点会生成MergeCursor,Query节点会生成对应存储的QueryCursor。
  2. 对result filter进行处理,生成一个ResultFilterCursor,在next过程中,过滤掉不符合result filter的数据。
  3. 对聚合函数和group进行处理,如果数据对于group by无序,则先调用order by逻辑,生成相应策略的OrderByCursor对数据进行排序,最终生成AggregateCursor进行group by和聚合函数的操作。
  4. 处理having filter,在聚合之后对数据进行过滤,逻辑与对result filter的处理相同。
  5. 处理distinct,如果数据对于distinct列无序,则先调用order by逻辑,生成相应策略的OrderByCursor对数据进行排序,最终生成DistinctCursor进行去重操作。
  6. 处理执行计划中显式指定的order by,生成相应策略的OrderByCursor对数据进行排序。
  7. 处理执行计划中的limit条件,生成LimitCursor,过滤掉前面不符合要求的数据,并在返回要求的数据条数后停止。
  8. 处理列的投影与别名,将select中不存在的列过滤掉,并完成结果中列的名字替换为别名,表的名字替换为别名,生成ColumnAliasCursor。

QueryHandler

    QueryHandler用于执行计划中Query节点的翻译。

    QueryHandler按照以下逻辑对Query节点进行翻译:

  1. 如果Query节点包含子节点,则先调用执行器中的其他Handler对子节点进行处理。
  2. 判断Query中的KeyFilter是等值查询还是范围查询,如果是等值查询,则生成对应存储的QueryCursor,如果是范围查询,则生成对应存储的RangeCursor。
  3. 调用QueryHandlerCommon中的逻辑对生成的Cursor进行进一步的包装。

JoinHandler

IndexNestedLoopJoinHandler/NestedLoopJoinHandler

    这两种Join策略对底层数据无要求,因此直接调用执行器,对左右子节点进行处理,生成两个子Cursor,然后将两个子Cursor包装成IndexNestedLoopJoinCursor或者NestedLoopJoinCursor。

    最后调用QueryHandlerCommon中的逻辑对生成的Cursor进行进一步包装。

SortMergeJoinHandler

    SortMergeJoin要求左右两边的数据均按照Join列有序,因此可能需要对底层的数据进行二次处理后才能进行Join。按照以下顺序进行操作:

  1. 调用执行器,对左右子节点进行处理,并生成两个子Cursor
  2. 判断两个子Cursor的数据是否按照Join列有序,如果无序,则调用order by逻辑,对子Cursor进行包装,使其数据按照Join列有序
  3. 将符合要求的两个子Cursor生成一个SortMergeJoinCursor
  4. 调用QueryHandlerCommon中的逻辑对生成的Cursor进行进一步的包装。

MergeHandler

    MergeHandler对执行计划中的Merge节点进行翻译,操作如下:

  1. 调用执行器,将子节点生成多个子Cursor。这里MergeHandler会根据执行计划的要求,选择串行执行,或者并行执行。
  2. 如果子节点为insert/delete/update/replace,则生成一个AffectRowsCursor,将各个子节点的affect rows进行合并。
  3. 如果Merge为一个Union操作,则调用order by逻辑,保证数据有序,并进行去重操作。
  4. 生成一个MergeCursor,对子Cursor的数据进行一个简单的合并。
  5. 调用QueryHandlerCommon中的逻辑对生成的Cursor进行进一步的包装。

    在并行执行时,用户可以选择每个查询单独使用一个线程池,或者一个Datasource公用一个线程池。

Cursors

Query Cursor

Column Alias Cursor

Merge Cursor

Merge Sorted Cursor

Temp Table Cursor

Limit Cursor

Index Nested Loop Cursor

Sort Merge Cursor

Aggregate Cursor

Range Cursor

Distinct Cursor

Value Filter Cursor

Value Mapping Cursor

Order By

    在目前的TDDL5版本中,排序操作被大量使用,如Group By,Distinct,Join等操作中。排序是TDDL5的一个核心组件。

    在TDDL5的执行器中,会根据底层Cursor数据的顺序来确定所使用的排序策略,这些策略包括

  1. 当所要求的顺序和已有的顺序完全一致时,直接返回底层Cursor。
  2. 当所要求的顺序和已有的顺序完全相反时,使用ReverseCursor对底层数据进行反转(前提是底层存储支持prev操作)。
  3. 当底层为多个Cursor的Merge,并且每一个子Cursor的数据顺序都与所要求的顺序一致时,使用MergeSortedCursors进行归并排序。
  4. 若非上述三种情况,则使用临时表对数据进行排序。
    1、2两种策略比较简单,不再赘述。这里主要说明下3、4两种策略。

Merge Sorted

临时表

    在TDDL5中,如果出现了order by无法下推,也无法做有序归并的情况时,会使用临时表来对数据进行排序。

    TDDL5使用Berkeley DB JAVA Edition(以下简称BDB)来作为临时表。BDB是一个嵌入式Key-Value数据库,使用B树作为存储,因此只要将要排序的数据按照排序字段作为key,存到BDB的表中,取出的数据即按照排序字段有序。

    在TDDL5中,临时表在TempTableCursor中实现。这个Cursor会将底层的Cursor中的数据全部取出,并存在一个BDB临时表中。在调用该Cursor的next()方法时,数据会从临时表中依次取出。

    使用BDB作为临时表有以下好处:

  1. 无需自行实现复杂的B树结构,BDB久经验证,更为安全
  2. BDB可以自己管理内存,当数据量较小的情况下,数据会全部保存在内存中,当数据量比较大的时候,数据会自动被写到磁盘上,进行外部排序,不至于将内存撑爆。
  3. 数据量亦可以通过参数进行限制,如临时表最大存储多少数据,多大的数据会被写到磁盘等。
  4. 临时表的创建删除均有BDB管理。
    TDDL5亦留有临时表的接口,用户可以自由选择其他存储来作为临时表。
    注意,由于在数据量比较大的情况下,临时表还是有不小的开销的,因此默认情况下TDDL5是关闭了临时表,在需要临时表的时候,TDDL5会抛出异常进行提示,请检查您的SQL,确定该查询使用临时表是否符合您的预期,若符合预期,可以通过添加hint的方式,或者设置Datasource的connection properties来开启临时表(方式详见admin guide)

Group By

Scalar函数

Aggregate函数

Having

Limit

Join

Index Nested Loop Join

    由IndexNestedLoopJoinMgetImplCursor完成。

    传统IndexNestedLoopJoin算法如下:

 

function indexNestedLoop(relation left, relation right, attribute a)
    var relation output
var row left_row
var list right_matched_rows
var attribute key
foreach left_row in left
key := left_row.a
right_matched_rows := right.skipTo(key) //right cursor use index to find the matched set
add cross product of left_row and right_matched_rows to output
return output
    算法的思路简单说,就是以左表为驱动表,遍历左表的每一条数据,从中拿出join列的值,然后用该值到右表去做一个索引查询,由此得到join的结果。
    具体几个存储的例子:
    如果右表是以HBase为代表的Key-Value存储,则join列需要为右表的row key,对于左表的每一个值,需要到hbase中做按照row key的查询。
    如果右表是MySQL、OceanBase等支持SQL的存储,则没有右表join列是否为索引的限制,因为右表的skipTo操作时以SQL的形式完成的,具体如何来查到相应的数据,底层的存储可以独立完成。当然,右表如果没有索引,Join的效率会比较低下。
    不过传统的Index Nested Loop Join在跨机join中存在一个缺陷,因为对于左表的每条记录,都要到右表去查,会导致网络交互过多。因此在TDDL5中,我们对Index Nested Loop Join做了一点改进。
    在TDDL5中,我们不再一条一条的去右表查数据,而是从左表读出一批数据,然后再到右表去批量的查询,这一批数据中,落到同一分库上的数据会被同时发往右表。这样可以极大的减少网络交互的次数,提升效率。改进后的算法如下:
function indexNestedLoop(relation left, relation right, attribute a)
    var relation output
var row left_row var list left_sub_rows var map right_matched_rows //a map ,which key is value of join columns and value is the row var set keys foreach left_row in left
left_sub_rows.add(left_row) keys.add(left_row.a)

if left_sub_rows.size > buffer_size
right_matched_rows := right.mget(keys) //right cursor use index to find the matched set
do a normal hash join between left_sub_rows and right_matched_rows, and add result to output
left_sub_rows.clear()
keys.clear() return output
    以右表为MySQL举例说明改进后的Index Nested Loop Join的执行方式:
    查询为:select * from left join right on id=id and left.c1=10
    在左表执行一个查询语句,如select * from left where c1=10;
    遍历左表的结果集,从中取出若干条记录来,如3条记录(目前实际默认为20条,用户可以通过配置修改此数值):
   
id c1 c2
1 10 100
2 10 110
3 10 120
    然后收集这一个子集中join列的值为一个set,{1,2,3};
    把这一个join列的值的set发往右表左一个批量查询,假设{1},{2,3}分别发往两个分库db1和db2,则MySQL的实现是,在两个分库上分别执行如下sql:
    db1:select * from right where id=1;
    db2:select * from right where id in (2,3);

 

    遍历右边的结果集,以join列的值为key,转换成一个map,假设如下:
id id c3 c4
2 2 20 210
2 20 240
3 3 30 111
1 1 10 333
1 10 222
    最后将左表的子集和右表的map做一个简单的Hash Join,就能很容易的得出join结果。
    值得注意的是,由于Index Nested Loop Join是左表驱动,因此它只能完成inner join与left outter join,无法实现right outter join。同时,Index Nested Loop Join的结果继承了左表的顺序。

 

Nested Loop Join

    在一些情况下,比如右表是一个很复杂但最终结果很少的查询,如果还使用Index Nested Loop Join,会导致右表的查询被反复计算,导致开销增大,因此,在有些情况下,Optimizer会选择使用Nested Loop Join。

    Nested Loop Join的思路在于,右表的数据较小,可以缓存在内存中,每次遍历左表的同时去遍历内存中的右表,最终得出结果。

    Nested Loop Join算法如下:

   

  For each tuple r in R
     For each tuple s in S
        If r and s satisfy the join condition
           Then output the tuple <r,s>
    在TDDL5中,Nested Loop Join由BlockNestedLoopJoinCursor实现。

 

    出于和Index Nested Loop Join同样的左表驱动的原因,Nested Loop Join也无法实现right outter join,同时,结果的顺序也继承了左表的顺序。

 

Sort Merge Join

    Nested Loop Join/Index Nested Loop Join由于是左表驱动的,因此无法做right outter join和full outter join。并且,如果左右表有一定的顺序,这两种join策略也无法利用这个顺序使join性能提升,因此TDDL5还实现了Sort Merge Join。

 

    Sort Merge Join的算法如下,可参考wiki http://en.wikipedia.org/wiki/Sort-merge_join:

 function sortMerge(relation left, relation right, attribute a)
     var relation output
     var list left_sorted := sort(left, a) // Relation left sorted on attribute a
     var list right_sorted := sort(right, a)
     var attribute left_key, right_key
     var set left_subset, right_subset // These sets discarded except where join predicate is satisfied
     advance(left_subset, left_sorted, left_key, a)
     advance(right_subset, right_sorted, right_key, a)
     while not empty(left_subset) and not empty(right_subset)
         if left_key = right_key // Join predicate satisfied
             add cross product of left_subset and right_subset to output
             advance(left_subset, left_sorted, left_key, a)
             advance(right_subset, right_sorted, right_key, a)
         else if left_key < right_key
            advance(left_subset, left_sorted, left_key, a)
         else // left_key > right_key
            advance(right_subset, right_sorted, right_key, a)
     return output
 
 // Remove tuples from sorted to subset until the sorted[1].a value changes
 function advance(subset out, sorted inout, key out, a in)
     key := sorted[1].a
     subset := emptySet
     while not empty(sorted) and sorted[1].a = key
         insert sorted[1] into subset
         remove sorted[1]
    Sort Merge Join的核心在于左右两表均按照join列有序,Join的过程就是一个有序的归并过程。如果下面存储支持流式,那么数据是一个流式的返回,Sort Merge Join的过程客户端几乎不会缓存任何数据,能够极大的节省内存空间。同时,Sort Merge Join在底层数据天然有序的情况下,无需对数据进行二次排序,效率非常高。
    当然,如果底层数据存储时并非是所期望的顺序,但是优化器又因为某些原因(如right join等)指定了Sort Merge Join,那么TDDL5的执行器在执行时,会先对数据进行排序,然后再执行Join操作。
    Sort Merge Join在TDDL5中由SortMergeJoinCursor实现。
 
 

 

 

Clone this wiki locally