Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: refactor Join and Limit's ResolveIndices #45831

Merged
merged 4 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion planner/core/issuetest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ go_test(
data = glob(["testdata/**"]),
flaky = True,
race = "on",
shard_count = 7,
shard_count = 8,
deps = [
"//parser",
"//planner",
Expand Down
10 changes: 10 additions & 0 deletions planner/core/issuetest/planner_issue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,13 @@ func TestIssue45036(t *testing.T) {
" └─TableReader_9 10000.00 root partition:all data:TableRangeScan_8",
" └─TableRangeScan_8 10000.00 cop[tikv] table:s range:[1,100000], keep order:false, stats:pseudo"))
}

func TestIssue45758(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("CREATE TABLE tb1 (cid INT, code INT, class VARCHAR(10))")
tk.MustExec("CREATE TABLE tb2 (cid INT, code INT, class VARCHAR(10))")
// result ok
tk.MustExec("UPDATE tb1, (SELECT code AS cid, code, MAX(class) AS class FROM tb2 GROUP BY code) tb3 SET tb1.cid = tb3.cid, tb1.code = tb3.code, tb1.class = tb3.class")
}
92 changes: 72 additions & 20 deletions planner/core/resolve_indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,25 @@ func (p *PhysicalHashJoin) ResolveIndicesItself() (err error) {
shallowColSlice := make([]*expression.Column, p.schema.Len())
copy(shallowColSlice, p.schema.Columns)
p.schema = expression.NewSchema(shallowColSlice...)
for i := 0; i < colsNeedResolving; i++ {
newCol, err := p.schema.Columns[i].ResolveIndices(mergedSchema)
if err != nil {
return err
foundCnt := 0
// The two column sets are all ordered. And the colsNeedResolving is the subset of the mergedSchema.
// So we can just move forward j if there's no matching is found.
// We don't use the normal ResolvIndices here since there might be duplicate columns in the schema.
// e.g. The schema of child_0 is [col0, col0, col1]
// ResolveIndices will only resolve all col0 reference of the current plan to the first col0.
for i, j := 0, 0; i < colsNeedResolving && j < len(mergedSchema.Columns); {
if !p.schema.Columns[i].Equal(nil, mergedSchema.Columns[j]) {
j++
continue
}
p.schema.Columns[i] = newCol.(*expression.Column)
p.schema.Columns[i] = p.schema.Columns[i].Clone().(*expression.Column)
p.schema.Columns[i].Index = j
i++
j++
foundCnt++
}
if foundCnt < colsNeedResolving {
return errors.Errorf("Some columns of %v cannot find the reference from its child(ren)", p.ExplainID().String())
}

return
Expand Down Expand Up @@ -213,12 +226,25 @@ func (p *PhysicalMergeJoin) ResolveIndices() (err error) {
shallowColSlice := make([]*expression.Column, p.schema.Len())
copy(shallowColSlice, p.schema.Columns)
p.schema = expression.NewSchema(shallowColSlice...)
for i := 0; i < colsNeedResolving; i++ {
newCol, err := p.schema.Columns[i].ResolveIndices(mergedSchema)
if err != nil {
return err
foundCnt := 0
// The two column sets are all ordered. And the colsNeedResolving is the subset of the mergedSchema.
Copy link
Contributor

@AilinKid AilinKid Feb 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this fix
join-schema(xx, col1, col1, col2)
+--child1
+--child2 (col1,col1,col2)

1: If join's col1 and col1 leverage normal resolve index to do the column index location, it will both redirect to child2's first col1.

2: And join's inline projection will reversely mark lused nad rused from join itself back to its children, a kind of flag mark array, and fill these cols into join chunk result when onMatch or not.

3: So if both come on, the join output chunk result will miss one column size, since filling result could't see the second col2 from flag array rused here

4: that's why we redirect the second same col1 in the join schema to the second same col1 in the right child.

5: but we need to notice one case here, we assume that two column sets are all ordered. That means the join schema must occur ordered from the merged schema.
One bad case is about:

join schema 里面是 [xx, col2, col1] 下面 limit 承载了 schema [col2 ,col1], 孩子是 sort,其孩子是 proj[col2, col1], datasource是[col1, col2].   
1: proj 留下 replace map 后消除了,limit 承载了 join 的 schema 即使使用 replace 自己替换自己看起来暂时ok,

2: pushTopN down,将 limit 和 sort 整合成 topN,topN 没有 schema!也就是这里回丢失 limit 的 schema。然后 join 直接到孩子 datasource 的 schema[col1, col2].

3: 然后现在就是 join [xx, col2, col1],右侧孩子 topN 无schema,datasouce[col1, col2]

总结:join schema 的出现,不再根据左右孩子出现的顺序具有一定的偏序关系。遍历的时候可能需要一定的向后寻找才能找到

conclusion: The appearance of join schema no longer has a certain partial order relationship based on the order in which the left and right children appear. When traversing, you may need to search backwards to find it.

// So we can just move forward j if there's no matching is found.
// We don't use the normal ResolvIndices here since there might be duplicate columns in the schema.
// e.g. The schema of child_0 is [col0, col0, col1]
// ResolveIndices will only resolve all col0 reference of the current plan to the first col0.
for i, j := 0, 0; i < colsNeedResolving && j < len(mergedSchema.Columns); {
if !p.schema.Columns[i].Equal(nil, mergedSchema.Columns[j]) {
j++
continue
}
p.schema.Columns[i] = newCol.(*expression.Column)
p.schema.Columns[i] = p.schema.Columns[i].Clone().(*expression.Column)
p.schema.Columns[i].Index = j
i++
j++
foundCnt++
}
if foundCnt < colsNeedResolving {
return errors.Errorf("Some columns of %v cannot find the reference from its child(ren)", p.ExplainID().String())
}
return
}
Expand Down Expand Up @@ -296,12 +322,25 @@ func (p *PhysicalIndexJoin) ResolveIndices() (err error) {
shallowColSlice := make([]*expression.Column, p.schema.Len())
copy(shallowColSlice, p.schema.Columns)
p.schema = expression.NewSchema(shallowColSlice...)
for i := 0; i < colsNeedResolving; i++ {
newCol, err := p.schema.Columns[i].ResolveIndices(mergedSchema)
if err != nil {
return err
foundCnt := 0
// The two column sets are all ordered. And the colsNeedResolving is the subset of the mergedSchema.
// So we can just move forward j if there's no matching is found.
// We don't use the normal ResolvIndices here since there might be duplicate columns in the schema.
// e.g. The schema of child_0 is [col0, col0, col1]
// ResolveIndices will only resolve all col0 reference of the current plan to the first col0.
for i, j := 0, 0; i < colsNeedResolving && j < len(mergedSchema.Columns); {
if !p.schema.Columns[i].Equal(nil, mergedSchema.Columns[j]) {
j++
continue
}
p.schema.Columns[i] = newCol.(*expression.Column)
p.schema.Columns[i] = p.schema.Columns[i].Clone().(*expression.Column)
p.schema.Columns[i].Index = j
i++
j++
foundCnt++
}
if foundCnt < colsNeedResolving {
return errors.Errorf("Some columns of %v cannot find the reference from its child(ren)", p.ExplainID().String())
}

return
Expand Down Expand Up @@ -687,12 +726,25 @@ func (p *PhysicalLimit) ResolveIndices() (err error) {
shallowColSlice := make([]*expression.Column, p.schema.Len())
copy(shallowColSlice, p.schema.Columns)
p.schema = expression.NewSchema(shallowColSlice...)
for i, col := range p.schema.Columns {
newCol, err := col.ResolveIndices(p.children[0].Schema())
if err != nil {
return err
foundCnt := 0
// The two column sets are all ordered. And the colsNeedResolving is the subset of the mergedSchema.
// So we can just move forward j if there's no matching is found.
// We don't use the normal ResolvIndices here since there might be duplicate columns in the schema.
// e.g. The schema of child_0 is [col0, col0, col1]
// ResolveIndices will only resolve all col0 reference of the current plan to the first col0.
for i, j := 0, 0; i < p.schema.Len() && j < p.children[0].Schema().Len(); {
if !p.schema.Columns[i].Equal(nil, p.children[0].Schema().Columns[j]) {
j++
continue
}
p.schema.Columns[i] = newCol.(*expression.Column)
p.schema.Columns[i] = p.schema.Columns[i].Clone().(*expression.Column)
p.schema.Columns[i].Index = j
i++
j++
foundCnt++
}
if foundCnt < p.schema.Len() {
return errors.Errorf("Some columns of %v cannot find the reference from its child(ren)", p.ExplainID().String())
}
return
}
Expand Down