Skip to content

Commit

Permalink
colexec: fix coalescing logic in the spilling queue
Browse files Browse the repository at this point in the history
In a recently merged change to the spilling queue we introduced a bug in
the coalescing logic - we incorrectly computed the index of the "tail"
batch in case when it is the last one in the in-memory buffer. This
could lead to multiple problems (modification of the filled-up batch, of
the already dequeued batch, etc). This is now fixed.

Additionally, this commit switches to using `Copy` instead of `Append`
in that coalescing logic (since we know that there is enough capacity
for all of the tuples we're appending) and sets the item to `nil` when
it was dequeued.

Release note: None (no stable release with this bug)
  • Loading branch information
yuzefovich committed Jan 15, 2021
1 parent 46f5cca commit 9be814e
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 12 deletions.
2 changes: 0 additions & 2 deletions pkg/sql/colexec/routers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/colcontainerutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -861,7 +860,6 @@ func TestHashRouterOneOutput(t *testing.T) {

func TestHashRouterRandom(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 58956, "flaky test")
defer log.Scope(t).Close(t)
ctx := context.Background()

Expand Down
22 changes: 12 additions & 10 deletions pkg/sql/colexec/spilling_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,11 @@ func (q *spillingQueue) enqueue(ctx context.Context, batch coldata.Batch) error

alreadyCopied := 0
if q.numInMemoryItems > 0 {
// If we have already enqueued at least one batch, let's try to append
// as many tuples to it as it has the capacity for.
// If we have already enqueued at least one batch, let's try to copy
// as many tuples into it as it has the capacity for.
tailBatchIdx := q.curTailIdx - 1
if tailBatchIdx < 0 {
tailBatchIdx = 0
tailBatchIdx = len(q.items) - 1
}
tailBatch := q.items[tailBatchIdx]
if l, c := tailBatch.Length(), tailBatch.Capacity(); l < c {
Expand All @@ -252,13 +252,15 @@ func (q *spillingQueue) enqueue(ctx context.Context, batch coldata.Batch) error
}
q.unlimitedAllocator.PerformOperation(tailBatch.ColVecs(), func() {
for i := range q.typs {
tailBatch.ColVec(i).Append(
coldata.SliceArgs{
Src: batch.ColVec(i),
Sel: batch.Selection(),
DestIdx: l,
SrcStartIdx: 0,
SrcEndIdx: alreadyCopied,
tailBatch.ColVec(i).Copy(
coldata.CopySliceArgs{
SliceArgs: coldata.SliceArgs{
Src: batch.ColVec(i),
Sel: batch.Selection(),
DestIdx: l,
SrcStartIdx: 0,
SrcEndIdx: alreadyCopied,
},
},
)
}
Expand Down

0 comments on commit 9be814e

Please sign in to comment.