Skip to content
mengshi edited this page Jan 24, 2014 · 17 revisions

执行器概况

    TDDL的执行器,根据执行计划,到指定的数据节点进行查询操作,并在指定节点上完成相应的聚合操作,最终返回一个ResultSet给用户。

   

执行器架构



 

    执行器分为两个部分,Cursor和Handler。

    Cusor是一种装饰器,主要方法为next(),每一种Cursor只完成单一的一项功能。例如,Limit Cursor用于从底层Cursor取出指定条数的记录,Column Alias Cursor用于将从底层取出来的数据进行别名替换。Cursor以Cursor树的形式存在。

    Handler用于将执行计划翻译成Cursor树。对应于执行计划中的节点类型,用于查询的Handler也分为三种

  • MergeHandler
  • JoinHandler
  • QueryHandler

Handler



 

QueryHandlerCommon

    此为所有用于查询的Handler的基类,包含各个Handler在完成各自特有的查询功能后的一些共用的后续处理操作,这些操作包括并按照以下顺序进行:

  1. 调用子类的doQuery(),方法,生成该执行计划节点的基础Cursor,如Join节点会生成相应策略的JoinCursor,Merge节点会生成MergeCursor,Query节点会生成对应存储的QueryCursor。
  2. 对result filter进行处理,生成一个ResultFilterCursor,在next过程中,过滤掉不符合result filter的数据。
  3. 对聚合函数和group进行处理,如果数据对于group by无序,则先调用order by逻辑,生成相应策略的OrderByCursor对数据进行排序,最终生成AggregateCursor进行group by和聚合函数的操作。
  4. 处理having filter,在聚合之后对数据进行过滤,逻辑与对result filter的处理相同。
  5. 处理distinct,如果数据对于distinct列无序,则先调用order by逻辑,生成相应策略的OrderByCursor对数据进行排序,最终生成DistinctCursor进行去重操作。
  6. 处理执行计划中显式指定的order by,生成相应策略的OrderByCursor对数据进行排序。
  7. 处理执行计划中的limit条件,生成LimitCursor,过滤掉前面不符合要求的数据,并在返回要求的数据条数后停止。
  8. 处理列的投影与别名,将select中不存在的列过滤掉,并完成结果中列的名字替换为别名,表的名字替换为别名,生成ColumnAliasCursor。

QueryHandler

    QueryHandler用于执行计划中Query节点的翻译。

    QueryHandler按照以下逻辑对Query节点进行翻译:

  1. 如果Query节点包含子节点,则先调用执行器中的其他Handler对子节点进行处理。
  2. 判断Query中的KeyFilter是等值查询还是范围查询,如果是等值查询,则生成对应存储的QueryCursor,如果是范围查询,则生成对应存储的RangeCursor。
  3. 调用QueryHandlerCommon中的逻辑对生成的Cursor进行进一步的包装。

JoinHandler

IndexNestedLoopJoinHandler/NestedLoopJoinHandler

    这两种Join策略对底层数据无要求,因此直接调用执行器,对左右子节点进行处理,生成两个子Cursor,然后将两个子Cursor包装成IndexNestedLoopJoinCursor或者NestedLoopJoinCursor。

    最后调用QueryHandlerCommon中的逻辑对生成的Cursor进行进一步包装。

SortMergeJoinHandler

    SortMergeJoin要求左右两边的数据均按照Join列有序,因此可能需要对底层的数据进行二次处理后才能进行Join。按照以下顺序进行操作:

  1. 调用执行器,对左右子节点进行处理,并生成两个子Cursor
  2. 判断两个子Cursor的数据是否按照Join列有序,如果无序,则调用order by逻辑,对子Cursor进行包装,使其数据按照Join列有序
  3. 将符合要求的两个子Cursor生成一个SortMergeJoinCursor
  4. 调用QueryHandlerCommon中的逻辑对生成的Cursor进行进一步的包装。

MergeHandler

    MergeHandler对执行计划中的Merge节点进行翻译,操作如下:

  1. 调用执行器,将子节点生成多个子Cursor。这里MergeHandler会根据执行计划的要求,选择串行执行,或者并行执行。
  2. 如果子节点为insert/delete/update/replace,则生成一个AffectRowsCursor,将各个子节点的affect rows进行合并。
  3. 如果Merge为一个Union操作,则调用order by逻辑,保证数据有序,并进行去重操作。
  4. 生成一个MergeCursor,对子Cursor的数据进行一个简单的合并。
  5. 调用QueryHandlerCommon中的逻辑对生成的Cursor进行进一步的包装。

    在并行执行时,用户可以选择每个查询单独使用一个线程池,或者一个Datasource公用一个线程池。

Cursors

Query Cursor

Column Alias Cursor

Merge Cursor

Merge Sorted Cursor

Temp Table Cursor

Limit Cursor

Index Nested Loop Cursor

Sort Merge Cursor

Aggregate Cursor

Range Cursor

Distinct Cursor

Value Filter Cursor

Value Mapping Cursor

Order By

Group By

Scalar函数

Aggregate函数

Having

Limit

Join

Index Nested Loop Join

    由IndexNestedLoopJoinCursor完成。

    传统IndexNestedLoopJoin算法如下:

 

function indexNestedLoop(relation left, relation right, attribute a)
    var relation output
var row left_row
var list right_matched_rows
var attribute key
foreach left_row in left
key := left_row.a
right_matched_rows := right.skipTo(key) //right cursor use index to find the matched set
add cross product of left_row and right_matched_rows to output
return output
    算法的思路简单说,就是以左表为驱动表,遍历左表的每一条数据,从中拿出join列的值,然后用该值到右表去做一个索引查询,由此得到join的结果。
    具体几个存储的例子:
    如果右表是以HBase为代表的Key-Value存储,则join列需要为右表的row key,对于左表的每一个值,需要到hbase中做按照row key的查询。
    如果右表是MySQL、OceanBase等支持SQL的存储,则没有右表join列是否为索引的限制,因为右表的skipTo操作时以SQL的形式完成的,具体如何来查到相应的数据,底层的存储可以独立完成。当然,右表如果没有索引,Join的效率会比较低下。
    不过传统的Index Nested Loop Join在跨机join中存在一个缺陷,因为对于左表的每条记录,都要到右表去查,会导致网络交互过多。因此在TDDL5中,我们对Index Nested Loop Join做了一点改进。
    在TDDL5中,我们不再一条一条的去右表查数据,而是从左表读出一批数据,然后再到右表去批量的查询,这一批数据中,落到同一分库上的数据会被同时发往右表。这样可以极大的减少网络交互的次数,提升效率。改进后的算法如下:
function indexNestedLoop(relation left, relation right, attribute a)
    var relation output
var row left_row var list left_sub_rows var map right_matched_rows //a map ,which key is value of join columns and value is the row var set keys foreach left_row in left
left_sub_rows.add(left_row) keys.add(left_row.a)

if left_sub_rows.size > buffer_size
right_matched_rows := right.mget(keys) //right cursor use index to find the matched set
do a normal hash join between left_sub_rows and right_matched_rows, and add result to output
left_sub_rows.clear()
keys.clear() return output
    以右表为MySQL举例说明改进后的Index Nested Loop Join的执行方式:
    查询为:select * from left join right on id=id and left.c1=10
    在左表执行一个查询语句,如select * from left where c1=10;
    遍历左表的结果集,从中取出若干条记录来,如3条记录(目前实际默认为20条,用户可以通过配置修改此数值):
   
id c1 c2
1 10 100
2 10 110
3 10 120
    然后收集这一个子集中join列的值为一个set,{1,2,3};
    把这一个join列的值的set发往右表左一个批量查询,假设{1},{2,3}分别发往两个分库db1和db2,则MySQL的实现是,在两个分库上分别执行如下sql:
    db1:select * from right where id=1;
    db2:select * from right where id in (2,3);

 

    遍历右边的结果集,以join列的值为key,转换成一个map,假设如下:
id id c3 c4
2 2 20 210
2 20 240
3 3 30 111
1 1 10 333
1 10 222
    最后将左表的子集和右表的map做一个简单的Hash Join,就能很容易的得出join结果。

 

Nested Loop Join

Sort Merge Join

http://en.wikipedia.org/wiki/Sort-merge_join

 function sortMerge(relation left, relation right, attribute a)
     var relation output
     var list left_sorted := sort(left, a) // Relation left sorted on attribute a
     var list right_sorted := sort(right, a)
     var attribute left_key, right_key
     var set left_subset, right_subset // These sets discarded except where join predicate is satisfied
     advance(left_subset, left_sorted, left_key, a)
     advance(right_subset, right_sorted, right_key, a)
     while not empty(left_subset) and not empty(right_subset)
         if left_key = right_key // Join predicate satisfied
             add cross product of left_subset and right_subset to output
             advance(left_subset, left_sorted, left_key, a)
             advance(right_subset, right_sorted, right_key, a)
         else if left_key < right_key
            advance(left_subset, left_sorted, left_key, a)
         else // left_key > right_key
            advance(right_subset, right_sorted, right_key, a)
     return output
 
 // Remove tuples from sorted to subset until the sorted[1].a value changes
 function advance(subset out, sorted inout, key out, a in)
     key := sorted[1].a
     subset := emptySet
     while not empty(sorted) and sorted[1].a = key
         insert sorted[1] into subset
         remove sorted[1]
 

 

 

Clone this wiki locally