Skip to content

Commit

Permalink
Merge #102397
Browse files Browse the repository at this point in the history
102397: colexec: fix top K sort in some edge cases r=yuzefovich a=yuzefovich

This commit fixes two bugs with the top K sort when it spills to disk.

First, we forgot to unset a few fields when resetting the operator. As
a result, if the operator is reused and spills to disk twice, we could
incorrectly not export some buffered tuples on the second run. However,
this bug is benign because we never reuse the top K sorter multiple
times. (We can reuse the disk-backed sort for other disk-backed
operators as part of "fallback strategy", but we never use top K sort in
there, only general sort. Thus, this bug can only be visible in our
internal tests where each test case can be run as "reused after
resetting".)

Next, it fixes a rare but user-visible bug when top K sort with partial
ordering spills to disk. In order for the bug to be triggered, quite a few
conditions need to be met, so I struggled to come up with a regression
end-to-end test.

In particular, in the top K sort we track "first unprocessed tuple" in
the current input batch that we haven't processed yet in order to be
able to correctly export buffered tuples when spilling to disk. After we
have consumed initial K tuples to fill up the topK batch, we then
proceed to process the remaining tuples in the input batch. If we have
partial ordering, and we find that a new "group" begins within the
current input batch (meaning that a new value is encountered on already
ordered column), then we can stop the iteration. This is where the
problem lies - we forgot to update the "first unprocessed tuple" if we
short-circuited. The impact is such that if we were to spill to disk at
this point (e.g. because we don't have enough memory budget to allocate
the output batch or to update accounting due to tuples after initial K
being larger), we could then see these tuples after initial K twice
when we spill to disk. In other words, we could include some tuples
twice in the result if spilling to disk happens right at the unlucky
moment.

Fixes: #101264.

Release note (bug fix): Previously, CockroachDB could produce incorrect
results when evaluating queries with ORDER BY clause in rare
circumstances, and this is now fixed. In particular, some rows could be
duplicated if all of the following conditions were met:
- the query had a LIMIT clause
- the sort operation had to spill to disk (meaning that LIMIT number of
rows used up non-trivial amount of memory, i.e. the rows were "wide")
- the ORDER BY clause contained multiple columns AND the ordering on the
prefix of those columns was already provided by the index.

The bug has been present since at least 22.1 version.

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
craig[bot] and yuzefovich committed May 5, 2023
2 parents cb14c18 + 50aed24 commit eb12435
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 16 deletions.
26 changes: 14 additions & 12 deletions pkg/sql/colexec/sorttopk.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/sql/colexec/sorttopk.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ func (t *topKSorter) Reset(ctx context.Context) {
t.firstUnprocessedTupleIdx = 0
t.topK.ResetInternalBatch()
t.emitted = 0
t.exportedFromTopK = 0
t.exportedFromBatch = 0
if t.hasPartialOrder {
t.orderState.distincter.(colexecop.Resetter).Reset(t.Ctx)
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/sql/colexec/sorttopk_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ func processGroupsInBatch(
func processBatch(
t *topKSorter, groupId int, sel []int, partialOrder bool, useSel bool,
) (groupDone bool) {
for i := t.firstUnprocessedTupleIdx; i < t.inputBatch.Length(); i++ {
for ; t.firstUnprocessedTupleIdx < t.inputBatch.Length(); t.firstUnprocessedTupleIdx++ {
if useSel {
idx := sel[i]
idx := sel[t.firstUnprocessedTupleIdx]
} else {
idx := i
idx := t.firstUnprocessedTupleIdx
}
if partialOrder {
// If this is a distinct group, we have already found the top K input,
Expand All @@ -98,7 +98,6 @@ func processBatch(
heap.Fix(t.heaper, 0)
}
}
t.firstUnprocessedTupleIdx = t.inputBatch.Length()
return false
}

Expand Down Expand Up @@ -141,6 +140,9 @@ func spool(t *topKSorter, partialOrder bool) {
groupId = processGroupsInBatch(t, fromLength, groupId, sel, false)
}
}
// Importantly, the memory limit can only be reached _after_ the tuples
// are appended to topK, so all fromLength tuples are considered
// "processed".
t.firstUnprocessedTupleIdx = fromLength
t.topK.AppendTuples(t.inputBatch, 0 /* startIdx */, fromLength)
remainingRows -= uint64(fromLength)
Expand Down

0 comments on commit eb12435

Please sign in to comment.