Skip to content

Commit

Permalink
exec: Handle NULLS in TopK sorter
Browse files Browse the repository at this point in the history
This commit fixes NULLs in the TopK sorter by avoiding use
of the vec copy method, which has a bug. Instead, we add
a set method to the vec comparator, and use the templatized
comparator to perform the sets that the TopK sorter needs.

To facilitate this, we add an UnsetNull method to the Nulls
object. However, use of this method results in HasNull()
maybe returning true even if the vector doesn't have nulls.
This behavior already occurs when selection vectors are used.

Release note (bug fix): Fix null handling in the vectorized
TopK Sorter
  • Loading branch information
rohany committed Jul 3, 2019
1 parent 6364489 commit 2665a78
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 22 deletions.
10 changes: 10 additions & 0 deletions pkg/sql/exec/coldata/nulls.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ func (n *Nulls) SetNull(i uint16) {
n.SetNull64(uint64(i))
}

// UnsetNull unsets the ith value of the column.
func (n *Nulls) UnsetNull(i uint16) {
n.UnsetNull64(uint64(i))
}

// SetNullRange sets all the values in [start, end) to null.
func (n *Nulls) SetNullRange(start uint64, end uint64) {
if start >= end {
Expand Down Expand Up @@ -146,6 +151,11 @@ func (n *Nulls) SetNull64(i uint64) {
n.nulls[i/8] &= flippedBitMask[i%8]
}

// UnsetNull64 unsets the ith values of the column.
func (n *Nulls) UnsetNull64(i uint64) {
n.nulls[i/8] |= ^flippedBitMask[i%8]
}

// Extend extends the nulls vector with the next toAppend values from src,
// starting at srcStartIdx.
func (n *Nulls) Extend(src *Nulls, destStartIdx uint64, srcStartIdx uint16, toAppend uint16) {
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/exec/coldata/nulls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,18 @@ func TestSetAndUnsetNulls(t *testing.T) {
for i := uint16(0); i < BatchSize; i++ {
require.True(t, n.NullAt(i))
}

for i := uint16(0); i < BatchSize; i += 3 {
n.UnsetNull(i)
}
for i := uint16(0); i < BatchSize; i++ {
if i%3 == 0 {
require.False(t, n.NullAt(i))
} else {
require.True(t, n.NullAt(i))
}
}

n.UnsetNulls()
for i := uint16(0); i < BatchSize; i++ {
require.False(t, n.NullAt(i))
Expand Down
27 changes: 7 additions & 20 deletions pkg/sql/exec/sorttopk.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,9 @@ type topKSorter struct {
func (t *topKSorter) Init() {
t.input.Init()
t.topK = coldata.NewMemBatchWithSize(t.inputTypes, int(t.k))
t.comparators = make([]vecComparator, len(t.orderingCols))
for i := range t.orderingCols {
typ := t.inputTypes[t.orderingCols[i].ColIdx]
// one vec for output batch and one for current input batch
t.comparators = make([]vecComparator, len(t.inputTypes))
for i := range t.inputTypes {
typ := t.inputTypes[i]
t.comparators[i] = GetVecComparator(typ, 2)
}
t.output = coldata.NewMemBatchWithSize(t.inputTypes, coldata.BatchSize)
Expand Down Expand Up @@ -161,18 +160,7 @@ func (t *topKSorter) spool(ctx context.Context) {
maxIdx := t.heap[0]
if t.compareRow(inputVecIdx, topKVecIdx, idx, maxIdx) < 0 {
for j := range t.inputTypes {
// TODO(solon): Make this copy more efficient, perhaps by adding a
// copy method to the vecComparator interface. This would avoid
// needing to switch on the column type every time.
t.topK.ColVec(j).Copy(
coldata.CopyArgs{
ColType: t.inputTypes[j],
Src: inputBatch.ColVec(j),
DestIdx: uint64(maxIdx),
SrcStartIdx: uint64(idx),
SrcEndIdx: uint64(idx + 1),
},
)
t.comparators[j].set(inputVecIdx, topKVecIdx, idx, maxIdx)
}
heap.Fix(t, 0)
}
Expand Down Expand Up @@ -221,7 +209,7 @@ func (t *topKSorter) emit() coldata.Batch {
func (t *topKSorter) compareRow(vecIdx1, vecIdx2 int, rowIdx1, rowIdx2 uint16) int {
for i := range t.orderingCols {
info := t.orderingCols[i]
res := t.comparators[i].compare(vecIdx1, vecIdx2, rowIdx1, rowIdx2)
res := t.comparators[info.ColIdx].compare(vecIdx1, vecIdx2, rowIdx1, rowIdx2)
if res != 0 {
switch d := info.Direction; d {
case distsqlpb.Ordering_Column_ASC:
Expand All @@ -237,9 +225,8 @@ func (t *topKSorter) compareRow(vecIdx1, vecIdx2 int, rowIdx1, rowIdx2 uint16) i
}

func (t *topKSorter) updateComparators(vecIdx int, batch coldata.Batch) {
for i := range t.orderingCols {
vec := batch.ColVec(int(t.orderingCols[i].ColIdx))
t.comparators[i].setVec(vecIdx, vec)
for i := range t.inputTypes {
t.comparators[i].setVec(vecIdx, batch.ColVec(i))
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/exec/sorttopk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ func TestTopKSorter(t *testing.T) {
},
{
name: "nulls",
tuples: tuples{{1}, {2}, {3}, {4}, {5}, {6}, {7}, {nil}},
expected: tuples{{nil}, {1}, {2}},
tuples: tuples{{1}, {2}, {nil}, {3}, {4}, {5}, {6}, {7}, {nil}},
expected: tuples{{nil}, {nil}, {1}},
typ: []types.T{types.Int64},
ordCols: []distsqlpb.Ordering_Column{{ColIdx: 0}},
k: 3,
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec/vec_comparators.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ type vecComparator interface {
// 0, or 1.
compare(vecIdx1, vecIdx2 int, valIdx1, valIdx2 uint16) int

// set sets the value of the vector at dstVecIdx at index dstIdx to the value
// at the vector at srcVecIdx at index srcIdx.
set(srcVecIdx, dstVecIdx int, srcIdx, dstIdx uint16)

// setVec updates the vector at idx.
setVec(idx int, vec coldata.Vec)
}
9 changes: 9 additions & 0 deletions pkg/sql/exec/vec_comparators_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,15 @@ func (c *_TYPEVecComparator) setVec(idx int, vec coldata.Vec) {
c.nulls[idx] = vec.Nulls()
}

func (c *_TYPEVecComparator) set(srcVecIdx, dstVecIdx int, srcIdx, dstIdx uint16) {
if c.nulls[srcVecIdx].HasNulls() && c.nulls[srcVecIdx].NullAt(srcIdx) {
c.nulls[dstVecIdx].SetNull(dstIdx)
} else {
c.nulls[dstVecIdx].UnsetNull(dstIdx)
c.vecs[dstVecIdx][dstIdx] = c.vecs[srcVecIdx][srcIdx]
}
}

// {{end}}

func GetVecComparator(t types.T, numVecs int) vecComparator {
Expand Down

0 comments on commit 2665a78

Please sign in to comment.