-
Notifications
You must be signed in to change notification settings - Fork 17
Tddl_Executor
TDDL的执行器,根据执行计划,到指定的数据节点进行查询操作,并在指定节点上完成相应的聚合操作,最终返回一个ResultSet给用户。
执行器分为两个部分,Cursor和Handler。
Cusor是一种装饰器,主要方法为next(),每一种Cursor只完成单一的一项功能。例如,Limit Cursor用于从底层Cursor取出指定条数的记录,Column Alias Cursor用于将从底层取出来的数据进行别名替换。Cursor以Cursor树的形式存在。
Handler用于将执行计划翻译成Cursor树。对应于执行计划中的节点类型,用于查询的Handler也分为三种
- MergeHandler
- JoinHandler
- QueryHandler
此为所有用于查询的Handler的基类,包含各个Handler在完成各自特有的查询功能后的一些共用的后续处理操作,这些操作包括并按照以下顺序进行:
- 调用子类的doQuery(),方法,生成该执行计划节点的基础Cursor,如Join节点会生成相应策略的JoinCursor,Merge节点会生成MergeCursor,Query节点会生成对应存储的QueryCursor。
- 对result filter进行处理,生成一个ResultFilterCursor,在next过程中,过滤掉不符合result filter的数据。
- 对聚合函数和group进行处理,如果数据对于group by无序,则先调用order by逻辑,生成相应策略的OrderByCursor对数据进行排序,最终生成AggregateCursor进行group by和聚合函数的操作。
- 处理having filter,在聚合之后对数据进行过滤,逻辑与对result filter的处理相同。
- 处理distinct,如果数据对于distinct列无序,则先调用order by逻辑,生成相应策略的OrderByCursor对数据进行排序,最终生成DistinctCursor进行去重操作。
- 处理执行计划中显式指定的order by,生成相应策略的OrderByCursor对数据进行排序。
- 处理执行计划中的limit条件,生成LimitCursor,过滤掉前面不符合要求的数据,并在返回要求的数据条数后停止。
- 处理列的投影与别名,将select中不存在的列过滤掉,并完成结果中列的名字替换为别名,表的名字替换为别名,生成ColumnAliasCursor。
QueryHandler用于执行计划中Query节点的翻译。
QueryHandler按照以下逻辑对Query节点进行翻译:
- 如果Query节点包含子节点,则先调用执行器中的其他Handler对子节点进行处理。
- 判断Query中的KeyFilter是等值查询还是范围查询,如果是等值查询,则生成对应存储的QueryCursor,如果是范围查询,则生成对应存储的RangeCursor。
- 调用QueryHandlerCommon中的逻辑对生成的Cursor进行进一步的包装。
这两种Join策略对底层数据无要求,因此直接调用执行器,对左右子节点进行处理,生成两个子Cursor,然后将两个子Cursor包装成IndexNestedLoopJoinCursor或者NestedLoopJoinCursor。
最后调用QueryHandlerCommon中的逻辑对生成的Cursor进行进一步包装。
SortMergeJoin要求左右两边的数据均按照Join列有序,因此可能需要对底层的数据进行二次处理后才能进行Join。按照以下顺序进行操作:
- 调用执行器,对左右子节点进行处理,并生成两个子Cursor
- 判断两个子Cursor的数据是否按照Join列有序,如果无序,则调用order by逻辑,对子Cursor进行包装,使其数据按照Join列有序
- 将符合要求的两个子Cursor生成一个SortMergeJoinCursor
- 调用QueryHandlerCommon中的逻辑对生成的Cursor进行进一步的包装。
MergeHandler对执行计划中的Merge节点进行翻译,操作如下:
- 调用执行器,将子节点生成多个子Cursor。这里MergeHandler会根据执行计划的要求,选择串行执行,或者并行执行。
- 如果子节点为insert/delete/update/replace,则生成一个AffectRowsCursor,将各个子节点的affect rows进行合并。
- 如果Merge为一个Union操作,则调用order by逻辑,保证数据有序,并进行去重操作。
- 生成一个MergeCursor,对子Cursor的数据进行一个简单的合并。
- 调用QueryHandlerCommon中的逻辑对生成的Cursor进行进一步的包装。
在并行执行时,用户可以选择每个查询单独使用一个线程池,或者一个Datasource公用一个线程池。
在目前的TDDL5版本中,排序操作被大量使用,如Group By,Distinct,Join等操作中。排序是TDDL5的一个核心组件。
在TDDL5的执行器中,会根据底层Cursor数据的顺序来确定所使用的排序策略,这些策略包括
- 当所要求的顺序和已有的顺序完全一致时,直接返回底层Cursor。
- 当所要求的顺序和已有的顺序完全相反时,使用ReverseCursor对底层数据进行反转(前提是底层存储支持prev操作)。
- 当底层为多个Cursor的Merge,并且每一个子Cursor的数据顺序都与所要求的顺序一致时,使用MergeSortedCursors进行归并排序。
- 若非上述三种情况,则使用临时表对数据进行排序。
当子Cursor的顺序均为所需要的顺序,则使用MergeSortedCursor对子Cursor的数据进行有序归并。
Merge Sorted是TDDL5最为推荐的排序逻辑。由于order by条件多数情况下可以下推到存储上执行,所以客户端仅需要对数据做一个有序归并即可,开销很小,并且不占用内存。
在Merge Sorted中,核心是对不同数据类型的数据进行比较。
在TDDL5中,数据类型被抽象为了DataType类,数据在进行比较时,会调用相应类型的DataType接口的compare方法进行比较,对于不同的数据类型,比较的方式有所不同:
- 数值类型,例如integer,long之类的,直接进行数值的比较。
- 日期类型,如datetime,time,timestamp等,转换成long之后进行比较。
- string类型,进行忽略大小写的比较。注意,不同存储在对string进行order by操作时,依据可能不同。例如,MySQL中,对string类型进行order by,默认是不区分大小写的,但是可以同过配置,使之区分大小写。如果MySQL中区分了大小写,那么它的排序结果会和TDDL5期待的结果不一致,最终可能会导致结果的错误。目前版本的TDDL5还无法识别这种情况,因此在TDDL5中,如果需要对string字段进行order by操作,请务必让MySQL不区分大小写。
- byte[]类型,byte数组的比较参考了MySQL的比较方式,详细请见数据类型相关章节。
在TDDL5中,如果出现了order by无法下推,也无法做有序归并的情况时,会使用临时表来对数据进行排序。
TDDL5使用Berkeley DB JAVA Edition(以下简称BDB)来作为临时表。BDB是一个嵌入式Key-Value数据库,使用B树作为存储,因此只要将要排序的数据按照排序字段作为key,存到BDB的表中,取出的数据即按照排序字段有序。
在TDDL5中,临时表在TempTableCursor中实现。这个Cursor会将底层的Cursor中的数据全部取出,并存在一个BDB临时表中。在调用该Cursor的next()方法时,数据会从临时表中依次取出。
使用BDB作为临时表有以下好处:
- 无需自行实现复杂的B树结构,BDB久经验证,更为安全
- BDB可以自己管理内存,当数据量较小的情况下,数据会全部保存在内存中,当数据量比较大的时候,数据会自动被写到磁盘上,进行外部排序,不至于将内存撑爆。
- 数据量亦可以通过参数进行限制,如临时表最大存储多少数据,多大的数据会被写到磁盘等。
- 临时表的创建删除均有BDB管理。
函数分为两类:
- scalar函数,例如数学运算,日期计算,now等函数,特点是有多少数据,就会生成多少记录
- aggregate函数,例如count,sum,max,min等,特点是在一个group内,只会生成一条记录
- serverMap,此接口用于数据节点上的运算,例如,Count函数的serverMap实现就是简单的加1操作。
- serverReduce,此接口由于Merge节点上的运算,例如,Count函数的serverReduce实现就是将serverMap的结果进行相加。
对于每一条记录,scalar函数都会返回一条结果。由于scalar函数不涉及聚合逻辑,因此scalar函数的serverMap和serverReduce函数功能相同。实现一个scalar函数,只要实现ScalarFunction的compute接口即可。
对于每一个group by分组,aggregate函数只会返回一个结果。因此aggregate函数的处理与group by的处理在一起。
AggregateCursor会将底层数据进行分组,然后将一个分组的每条数据依次传给每一个Aggregate函数。
如果当前节点为数据节点,则将记录传给aggregate函数的serverMap接口,如count函数,对于每一条记录在serverMap中进行+1操作,当本组数据全部处理完毕,AggregateCursor将每个聚合函数的结果写到结果集中,返回给Merge节点继续处理。
如果当前节点为Merge节点,则将数据节点返回的记录传给aggregate函数的serverReduce接口,如count函数,将每个数据节点返回的结果再一次进行汇总,同一个group的数据汇总完成后,将最后的结果写入结果集返回。
对于avg函数,会做一个特殊处理,会将数据节点的avg展开为count/sum,在merge节点的时候合并为avg。
TDDL5目前支持的聚合函数列表:
- count
- sum
- max
- min
- avg
TDDL5中目前distinct的处理方式为排序。
对于底层的数据,handler会调用order by逻辑保证底层数据对于distinct逻辑有序。DistinctCursor会对有序的数据进行去重操作。
having的处理在group by聚合之后。
处理方式很简单,在AggregateCursor之上用ValueFilterCursor包装一下即可。
limit由LimitFromToCursor实现。
简单的跳过一定数目的数据,并在返回需要条数的数据之后,丢弃剩余的数据。
由IndexNestedLoopJoinMgetImplCursor完成。
传统IndexNestedLoopJoin算法如下:
function indexNestedLoop(relation left, relation right, attribute a) var relation output算法的思路简单说,就是以左表为驱动表,遍历左表的每一条数据,从中拿出join列的值,然后用该值到右表去做一个索引查询,由此得到join的结果。
var row left_row
var list right_matched_rows
var attribute key
foreach left_row in left
key := left_row.a
right_matched_rows := right.skipTo(key) //right cursor use index to find the matched set
add cross product of left_row and right_matched_rows to output
return output
function indexNestedLoop(relation left, relation right, attribute a) var relation output以右表为MySQL举例说明改进后的Index Nested Loop Join的执行方式:
var row left_row var list left_sub_rows var map right_matched_rows //a map ,which key is value of join columns and value is the row var set keys foreach left_row in left
left_sub_rows.add(left_row) keys.add(left_row.a)
if left_sub_rows.size > buffer_size
right_matched_rows := right.mget(keys) //right cursor use index to find the matched set
do a normal hash join between left_sub_rows and right_matched_rows, and add result to output
left_sub_rows.clear()
keys.clear() return output
id | c1 | c2 |
1 | 10 | 100 |
2 | 10 | 110 |
3 | 10 | 120 |
db1:select * from right where id=1; db2:select * from right where id in (2,3);
id | id | c3 | c4 |
2 | 2 | 20 | 210 |
2 | 20 | 240 | |
3 | 3 | 30 | 111 |
1 | 1 | 10 | 333 |
1 | 10 | 222 |
在一些情况下,比如右表是一个很复杂但最终结果很少的查询,如果还使用Index Nested Loop Join,会导致右表的查询被反复计算,导致开销增大,因此,在有些情况下,Optimizer会选择使用Nested Loop Join。
Nested Loop Join的思路在于,右表的数据较小,可以缓存在内存中,每次遍历左表的同时去遍历内存中的右表,最终得出结果。
Nested Loop Join算法如下:
For each tuple r in R For each tuple s in S If r and s satisfy the join condition Then output the tuple <r,s>在TDDL5中,Nested Loop Join由BlockNestedLoopJoinCursor实现。
出于和Index Nested Loop Join同样的左表驱动的原因,Nested Loop Join也无法实现right outter join,同时,结果的顺序也继承了左表的顺序。
Nested Loop Join/Index Nested Loop Join由于是左表驱动的,因此无法做right outter join和full outter join。并且,如果左右表有一定的顺序,这两种join策略也无法利用这个顺序使join性能提升,因此TDDL5还实现了Sort Merge Join。
Sort Merge Join的算法如下,可参考wiki http://en.wikipedia.org/wiki/Sort-merge_join:
function sortMerge(relation left, relation right, attribute a) var relation output var list left_sorted := sort(left, a) // Relation left sorted on attribute a var list right_sorted := sort(right, a) var attribute left_key, right_key var set left_subset, right_subset // These sets discarded except where join predicate is satisfied advance(left_subset, left_sorted, left_key, a) advance(right_subset, right_sorted, right_key, a) while not empty(left_subset) and not empty(right_subset) if left_key = right_key // Join predicate satisfied add cross product of left_subset and right_subset to output advance(left_subset, left_sorted, left_key, a) advance(right_subset, right_sorted, right_key, a) else if left_key < right_key advance(left_subset, left_sorted, left_key, a) else // left_key > right_key advance(right_subset, right_sorted, right_key, a) return output
// Remove tuples from sorted to subset until the sorted[1].a value changes function advance(subset out, sorted inout, key out, a in) key := sorted[1].a subset := emptySet while not empty(sorted) and sorted[1].a = key insert sorted[1] into subset remove sorted[1]Sort Merge Join的核心在于左右两表均按照join列有序,Join的过程就是一个有序的归并过程。如果下面存储支持流式,那么数据是一个流式的返回,Sort Merge Join的过程客户端几乎不会缓存任何数据,能够极大的节省内存空间。同时,Sort Merge Join在底层数据天然有序的情况下,无需对数据进行二次排序,效率非常高。