Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: implement the execution part of the outer hash join #12882

Merged
merged 23 commits into from
Nov 12, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,95 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor)
e.ctx.GetSessionVars().StmtCtx.MemTracker = t
return e
}
func prepare4OuterJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor) *HashJoinExec {
SunRunAway marked this conversation as resolved.
Show resolved Hide resolved
SunRunAway marked this conversation as resolved.
Show resolved Hide resolved
// reverse
innerExec, outerExec = outerExec, innerExec
cols0 := testCase.columns()
cols1 := testCase.columns()
joinSchema := expression.NewSchema(cols0...)
joinSchema.Append(cols1...)
joinKeys := make([]*expression.Column, 0, len(testCase.keyIdx))
for _, keyIdx := range testCase.keyIdx {
joinKeys = append(joinKeys, cols0[keyIdx])
}
e := &HashJoinExec{
baseExecutor: newBaseExecutor(testCase.ctx, joinSchema, stringutil.StringerStr("HashJoin"), innerExec, outerExec),
concurrency: uint(testCase.concurrency),
joinType: 2, // 1 for LeftOutersJoin, 2 for RightOuterJoin
isOuterJoin: true,
innerKeys: joinKeys,
outerKeys: joinKeys,
innerExec: innerExec,
outerExec: outerExec,
innerEstCount: float64(testCase.rows),
outerHashJoin: true,
}

defaultValues := make([]types.Datum, e.innerExec.Schema().Len())
lhsTypes, rhsTypes := retTypes(innerExec), retTypes(outerExec)
e.joiners = make([]joiner, e.concurrency)
for i := uint(0); i < e.concurrency; i++ {
e.joiners[i] = newJoiner(testCase.ctx, e.joinType, true, defaultValues,
nil, lhsTypes, rhsTypes)
}
return e
}
func benchmarkOuterHashJoinExecWithCase(b *testing.B, casTest *hashJoinTestCase) {
SunRunAway marked this conversation as resolved.
Show resolved Hide resolved
opt := mockDataSourceParameters{
schema: expression.NewSchema(casTest.columns()...),
rows: casTest.rows,
ctx: casTest.ctx,
genDataFunc: func(row int, typ *types.FieldType) interface{} {
switch typ.Tp {
case mysql.TypeLong, mysql.TypeLonglong:
return int64(row)
case mysql.TypeVarString:
return rawData
default:
panic("not implement")
}
},
}
opt.rows = 5
dataSource1 := buildMockDataSource(opt)
opt.rows = 10
dataSource2 := buildMockDataSource(opt)

b.ResetTimer()
println("b.n = ", b.N)
for i := 0; i < 1 && (b.N < 10); i++ {
b.StopTimer()
exec := prepare4OuterJoin(casTest, dataSource1, dataSource2)
tmpCtx := context.Background()
chk := newFirstChunk(exec)
dataSource1.prepareChunks()
dataSource2.prepareChunks()

b.StartTimer()
if err := exec.Open(tmpCtx); err != nil {
b.Fatal(err)
}
var numOfChk = 0
for {
if err := exec.Next(tmpCtx, chk); err != nil {
b.Fatal(err)
}
fmt.Printf("row num = %d\n", chk.NumRows())
for k := 0; k < chk.NumRows(); k++ {
println(chk.GetRow(k).GetInt64(0), " ", len(chk.GetRow(k).GetString(1)), " ", chk.GetRow(k).GetInt64(2), " ", len(chk.GetRow(k).GetString(3)))
}
if chk.NumRows() == 0 {
break
}
numOfChk++
}
println("num of chunks = ", numOfChk)
if err := exec.Close(); err != nil {
b.Fatal(err)
}
b.StopTimer()
}
}
func benchmarkHashJoinExecWithCase(b *testing.B, casTest *hashJoinTestCase) {
opt := mockDataSourceParameters{
schema: expression.NewSchema(casTest.columns()...),
Expand Down Expand Up @@ -676,6 +764,19 @@ func BenchmarkHashJoinExec(b *testing.B) {
})
}

func BenchmarkOuterHashJoinExec(b *testing.B) {
b.ReportAllocs()
cas := defaultHashJoinTestCase()
cas.rows = 10
cas.concurrency = 1
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkOuterHashJoinExecWithCase(b, cas)
})
cas.keyIdx = []int{0}
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkOuterHashJoinExecWithCase(b, cas)
})
}
func benchmarkBuildHashTableForList(b *testing.B, casTest *hashJoinTestCase) {
opt := mockDataSourceParameters{
schema: expression.NewSchema(casTest.columns()...),
Expand Down
17 changes: 12 additions & 5 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,12 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo
joinType: v.JoinType,
isOuterJoin: v.JoinType.IsOuterJoin(),
innerEstCount: v.Children()[v.InnerChildIdx].StatsCount(),
outerHashJoin: v.OuterHashJoin,
fzhedu marked this conversation as resolved.
Show resolved Hide resolved
}
// reverse the inner and the outer
if e.outerHashJoin {
v.InnerChildIdx = 1 - v.InnerChildIdx
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we'd better move these lines to the plan building phase?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be not.
The plan building phase just make choose whether to adopt the outer hash join, and the executing phase reverses the inner and the outer internally. This way does not need to change a lot of code in the plan build phase.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Execute phase better not change the physical plan.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other members may not think so according to previous daily meeting when discussing the solutions.
On the other hand, the current way is implemented and tested. If taking to way to change physical plans at builing plan phase, it needs to rewrite a lot of code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it need to rewrite a lot of code?
Only the *LogicalJoin.getHashJoin and the PhysicalHashJoin.GetCost function of HashJoin may be affected?

Copy link
Contributor Author

@fzhedu fzhedu Nov 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides, buildHashJoin here and explain()
In other words, it needs to rewrite the PR #12883 , as well as the function here.

v.LeftConditions, v.RightConditions = v.RightConditions, v.LeftConditions
SunRunAway marked this conversation as resolved.
Show resolved Hide resolved
}

defaultValues := v.DefaultValues
Expand All @@ -1016,9 +1022,6 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo
e.outerFilter = v.RightConditions
e.innerKeys = v.LeftJoinKeys
e.outerKeys = v.RightJoinKeys
if defaultValues == nil {
defaultValues = make([]types.Datum, e.innerExec.Schema().Len())
}
} else {
XuHuaiyu marked this conversation as resolved.
Show resolved Hide resolved
if len(v.RightConditions) > 0 {
b.err = errors.Annotate(ErrBuildExecutor, "join's inner condition should be empty")
Expand All @@ -1029,13 +1032,17 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo
e.outerFilter = v.LeftConditions
e.innerKeys = v.RightJoinKeys
e.outerKeys = v.LeftJoinKeys
if defaultValues == nil {
}
if defaultValues == nil {
if e.outerHashJoin {
fzhedu marked this conversation as resolved.
Show resolved Hide resolved
defaultValues = make([]types.Datum, e.outerExec.Schema().Len())
} else {
defaultValues = make([]types.Datum, e.innerExec.Schema().Len())
}
}
e.joiners = make([]joiner, e.concurrency)
for i := uint(0); i < e.concurrency; i++ {
e.joiners[i] = newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues,
e.joiners[i] = newJoiner(b.ctx, v.JoinType, (v.InnerChildIdx == 0 && !e.outerHashJoin) || (v.InnerChildIdx == 1 && e.outerHashJoin), defaultValues,
v.OtherConditions, lhsTypes, rhsTypes)
}
executorCountHashJoinExec.Inc()
Expand Down
2 changes: 2 additions & 0 deletions executor/explainfor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ func (s *testSuite) TestExplainFor(c *C) {
func (s *testSuite) TestIssue11124(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists kankan1")
tk.MustExec("drop table if exists kankan2")
tk.MustExec("create table kankan1(id int, name text);")
tk.MustExec("create table kankan2(id int, h1 text);")
tk.MustExec("insert into kankan1 values(1, 'a'), (2, 'a');")
Expand Down
30 changes: 28 additions & 2 deletions executor/hash_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ func newHashRowContainer(sCtx sessionctx.Context, estCount int, hCtx *hashContex
return c
}

// GetMatchedRows get matched rows from probeRow. It can be called
// GetMatchedRowsAndIds get matched rows and IDs from probeRow. It can be called
// in multiple goroutines while each goroutine should keep its own
// h and buf.
func (c *hashRowContainer) GetMatchedRows(probeRow chunk.Row, hCtx *hashContext) (matched []chunk.Row, err error) {
func (c *hashRowContainer) GetMatchedRowsAndIds(probeRow chunk.Row, hCtx *hashContext) (matched []chunk.Row, matchedIds []chunk.RowPtr, err error) {
fzhedu marked this conversation as resolved.
Show resolved Hide resolved
fzhedu marked this conversation as resolved.
Show resolved Hide resolved
hasNull, key, err := c.getJoinKeyFromChkRow(c.sc, probeRow, hCtx)
if err != nil || hasNull {
return
Expand All @@ -144,6 +144,7 @@ func (c *hashRowContainer) GetMatchedRows(probeRow chunk.Row, hCtx *hashContext)
}
matched = make([]chunk.Row, 0, len(innerPtrs))
var matchedRow chunk.Row
matchedIds = make([]chunk.RowPtr, 0, len(innerPtrs))
for _, ptr := range innerPtrs {
if c.alreadySpilled() {
matchedRow, err = c.recordsInDisk.GetRow(ptr)
Expand All @@ -162,6 +163,7 @@ func (c *hashRowContainer) GetMatchedRows(probeRow chunk.Row, hCtx *hashContext)
continue
}
matched = append(matched, matchedRow)
matchedIds = append(matchedIds, ptr)
}
return
}
Expand Down Expand Up @@ -237,6 +239,30 @@ func (c *hashRowContainer) PutChunk(chk *chunk.Chunk) error {
}
return nil
}
func (c *hashRowContainer) PutChunkSelected(chk *chunk.Chunk, selected []bool) error {
SunRunAway marked this conversation as resolved.
Show resolved Hide resolved
chkIdx := uint32(c.records.NumChunks())
numRows := chk.NumRows()

c.records.Add(chk)
SunRunAway marked this conversation as resolved.
Show resolved Hide resolved
c.hCtx.initHash(numRows)

hCtx := c.hCtx
for _, colIdx := range c.hCtx.keyColIdx {
err := codec.HashChunkColumns(c.sc, hCtx.hashVals, chk, hCtx.allTypes[colIdx], colIdx, hCtx.buf, hCtx.hasNull)
SunRunAway marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Trace(err)
}
}
for i := 0; i < numRows; i++ {
if !selected[i] || c.hCtx.hasNull[i] {
continue
}
key := c.hCtx.hashVals[i].Sum64()
rowPtr := chunk.RowPtr{ChkIdx: chkIdx, RowIdx: uint32(i)}
c.hashTable.Put(key, rowPtr)
}
return nil
}

// getJoinKeyFromChkRow fetches join keys from row and calculate the hash value.
func (*hashRowContainer) getJoinKeyFromChkRow(sc *stmtctx.StatementContext, row chunk.Row, hCtx *hashContext) (hasNull bool, key uint64, err error) {
Expand Down
2 changes: 1 addition & 1 deletion executor/hash_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (s *pkgTestSuite) testHashRowContainer(c *C, hashFunc func() hash.Hash64, s
}
probeCtx.hasNull = make([]bool, 1)
probeCtx.hashVals = append(hCtx.hashVals, hashFunc())
matched, err := rowContainer.GetMatchedRows(probeRow, probeCtx)
matched, _, err := rowContainer.GetMatchedRowsAndIds(probeRow, probeCtx)
c.Assert(err, IsNil)
c.Assert(len(matched), Equals, 2)
c.Assert(matched[0].GetDatumRow(colTypes), DeepEquals, chk0.GetRow(1).GetDatumRow(colTypes))
Expand Down
Loading