Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exec: add UnsetNulls to ResetInternalBatch #40593

Merged
merged 3 commits into from
Sep 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/col/coldata/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func (m *MemBatch) Reset(types []coltypes.T, length int) {
func (m *MemBatch) ResetInternalBatch() {
m.SetSelection(false)
for _, v := range m.b {
v.Nulls().UnsetNulls()
if v.Type() == coltypes.Bytes {
v.Bytes().Reset()
}
Expand Down
51 changes: 48 additions & 3 deletions pkg/sql/exec/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,23 @@ type orderedAggregator struct {
// function operators write directly to this output batch.
scratch struct {
coldata.Batch
// shouldResetInternalBatch keeps track of whether the scratch.Batch should
// be reset. It is false in cases where we have overflow results still to
// return and therefore do not want to modify the batch.
shouldResetInternalBatch bool
// resumeIdx is the index at which the aggregation functions should start
// writing to on the next iteration of Next().
resumeIdx int
// outputSize is col.BatchSize by default.
outputSize int
}

// unsafeBatch is a coldata.Batch returned when only a subset of the
// scratch.Batch results is returned (i.e. work needs to be resumed on the
// next Next call). The values to return are copied into this batch to protect
// against downstream modification of the internal batch.
unsafeBatch coldata.Batch

// groupCol is the slice that aggregateFuncs use to determine whether a value
// is part of the current aggregation group. See aggregateFunc.Init for more
// information.
Expand Down Expand Up @@ -246,6 +256,7 @@ func (a *orderedAggregator) initWithBatchSize(inputSize, outputSize int) {
vec := a.scratch.ColVec(i)
a.aggregateFuncs[i].Init(a.groupCol, vec)
}
a.unsafeBatch = coldata.NewMemBatchWithSize(a.outputTypes, outputSize)
a.scratch.outputSize = outputSize
}

Expand All @@ -254,7 +265,10 @@ func (a *orderedAggregator) Init() {
}

func (a *orderedAggregator) Next(ctx context.Context) coldata.Batch {
a.scratch.ResetInternalBatch()
if a.scratch.shouldResetInternalBatch {
a.scratch.ResetInternalBatch()
a.scratch.shouldResetInternalBatch = false
}
if a.done {
a.scratch.SetLength(0)
return a.scratch
Expand Down Expand Up @@ -282,7 +296,21 @@ func (a *orderedAggregator) Next(ctx context.Context) coldata.Batch {
if a.scratch.resumeIdx >= a.scratch.outputSize {
// We still have overflow output values.
a.scratch.SetLength(uint16(a.scratch.outputSize))
return a.scratch
for i := 0; i < len(a.outputTypes); i++ {
a.unsafeBatch.ColVec(i).Copy(
coldata.CopySliceArgs{
SliceArgs: coldata.SliceArgs{
Src: a.scratch.ColVec(i),
ColType: a.outputTypes[i],
SrcStartIdx: 0,
SrcEndIdx: uint64(a.scratch.Length()),
},
},
)
}
a.unsafeBatch.SetLength(a.scratch.Length())
a.scratch.shouldResetInternalBatch = false
return a.unsafeBatch
}
}

Expand Down Expand Up @@ -318,13 +346,30 @@ func (a *orderedAggregator) Next(ctx context.Context) coldata.Batch {
copy(a.groupCol, zeroBoolColumn)
}

batchToReturn := a.scratch.Batch
if a.scratch.resumeIdx > a.scratch.outputSize {
a.scratch.SetLength(uint16(a.scratch.outputSize))
for i := 0; i < len(a.outputTypes); i++ {
a.unsafeBatch.ColVec(i).Copy(
coldata.CopySliceArgs{
SliceArgs: coldata.SliceArgs{
Src: a.scratch.ColVec(i),
ColType: a.outputTypes[i],
SrcStartIdx: 0,
SrcEndIdx: uint64(a.scratch.Length()),
},
},
)
}
a.unsafeBatch.SetLength(a.scratch.Length())
batchToReturn = a.unsafeBatch
a.scratch.shouldResetInternalBatch = false
} else {
a.scratch.SetLength(uint16(a.scratch.resumeIdx))
a.scratch.shouldResetInternalBatch = true
}

return a.scratch
return batchToReturn
}

// reset resets the orderedAggregator for another run. Primarily used for
Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/exec/hashjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,16 +249,20 @@ func (hj *hashJoinEqOp) Init() {

func (hj *hashJoinEqOp) Next(ctx context.Context) coldata.Batch {
hj.prober.batch.ResetInternalBatch()
return hj.nextInternal(ctx)
}

func (hj *hashJoinEqOp) nextInternal(ctx context.Context) coldata.Batch {
switch hj.runningState {
case hjBuilding:
hj.build(ctx)
return hj.Next(ctx)
return hj.nextInternal(ctx)
case hjProbing:
hj.prober.exec(ctx)

if hj.prober.batch.Length() == 0 && hj.builder.spec.outer {
hj.initEmitting()
return hj.Next(ctx)
return hj.nextInternal(ctx)
}
return hj.prober.batch
case hjEmittingUnmatched:
Expand Down
5 changes: 0 additions & 5 deletions pkg/sql/exec/mergejoiner_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1244,11 +1244,6 @@ func (o *mergeJoin_JOIN_TYPE_STRING_FILTER_INFO_STRINGOp) Next(ctx context.Conte
if o.needToResetOutput {
o.needToResetOutput = false
o.output.ResetInternalBatch()
for _, vec := range o.output.ColVecs() {
// We only need to explicitly reset nulls since the values will be
// copied over and the correct length will be set.
vec.Nulls().UnsetNulls()
}
}
o.initProberState(ctx)

Expand Down
91 changes: 64 additions & 27 deletions pkg/sql/exec/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,17 @@ var orderedVerifier verifier = (*opTestOutput).Verify
// error if they aren't equal by set comparison (irrespective of order).
var unorderedVerifier verifier = (*opTestOutput).VerifyAnyOrder

// maybeHasNulls is a helper function that returns whether any of the columns in b
// (maybe) have nulls.
func maybeHasNulls(b coldata.Batch) bool {
for i := 0; i < b.Width(); i++ {
if b.ColVec(i).MaybeHasNulls() {
return true
}
}
return false
}

// runTests is a helper that automatically runs your tests with varied batch
// sizes and with and without a random selection vector.
// tups is the set of input tuples.
Expand All @@ -89,34 +100,60 @@ func runTests(
}
})

t.Run("verifySelResets", func(t *testing.T) {
// Verify that all operators have an unset selection vector even if an
// operator later in the chain sets one. This test ensures that operators
// that "own their own batches", such as any operator that has to reshape
// its output, always reset their selection vectors before returning a fresh
// batch.
inputSources := make([]Operator, len(tups))
for i, tup := range tups {
inputSources[i] = newOpTestInput(1 /* batchSize */, tup)
}
op, err := constructor(inputSources)
if err != nil {
t.Fatal(err)
t.Run("verifySelAndNullResets", func(t *testing.T) {
// This test ensures that operators that "own their own batches", such as
// any operator that has to reshape its output, are not affected by
// downstream modification of batches.
// We run the main loop twice: once to determine what the operator would
// output on its second Next call (we need the first call to Next to get a
// reference to a batch to modify), and a second time to modify the batch
// and verify that this does not change the operator output.
var secondBatchHasSelection, secondBatchHasNulls bool
for round := 0; round < 2; round++ {
inputSources := make([]Operator, len(tups))
for i, tup := range tups {
inputSources[i] = newOpTestInput(1 /* batchSize */, tup)
}
op, err := constructor(inputSources)
if err != nil {
t.Fatal(err)
}
op.Init()
ctx := context.Background()
b := op.Next(ctx)
if round == 1 {
if secondBatchHasSelection {
b.SetSelection(false)
} else {
b.SetSelection(true)
}
if secondBatchHasNulls {
// ResetInternalBatch will throw away the null information.
b.ResetInternalBatch()
} else {
for i := 0; i < b.Width(); i++ {
b.ColVec(i).Nulls().SetNulls()
}
}
}
b = op.Next(ctx)
if round == 0 {
secondBatchHasSelection = b.Selection() != nil
secondBatchHasNulls = maybeHasNulls(b)
}
if round == 1 {
if secondBatchHasSelection {
assert.NotNil(t, b.Selection())
} else {
assert.Nil(t, b.Selection())
}
if secondBatchHasNulls {
assert.True(t, maybeHasNulls(b))
} else {
assert.False(t, maybeHasNulls(b))
}
}
}
op.Init()
ctx := context.Background()
b := op.Next(ctx)
if b.Selection() != nil {
// We're testing an operator that needs to set a selection vector for some
// reason already, so we can't test the condition we're looking for.
return
}
// Set the selection vector by hand.
b.SetSelection(true)
b = op.Next(ctx)
// Make sure that the next time we call the operator, it has an empty
// selection vector.
assert.Nil(t, b.Selection())
})
}

Expand Down