Skip to content

Commit

Permalink
Merge #36857
Browse files Browse the repository at this point in the history
36857: exec: plumb through contexts as an argument to Next r=yuzefovich a=yuzefovich

We need to have access to contexts from the operators (one use case
is cancellation checking, probably there are others as well). There
appears to be two ways of how to go about it - either embedding it
into the operators or passing through as an argument. This commit
implements the latter.

Release note: None

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
craig[bot] and yuzefovich committed Apr 19, 2019
2 parents 6eefbba + 4da8168 commit 80c5b09
Show file tree
Hide file tree
Showing 43 changed files with 285 additions and 170 deletions.
4 changes: 2 additions & 2 deletions pkg/sql/distsqlrun/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func (s *colBatchScan) Init() {
}
}

func (s *colBatchScan) Next() coldata.Batch {
bat, err := s.rf.NextBatch(s.ctx)
func (s *colBatchScan) Next(ctx context.Context) coldata.Batch {
bat, err := s.rf.NextBatch(ctx)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsqlrun/colbatch_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func BenchmarkColBatchScan(b *testing.B) {
b.Fatal(err)
}
for {
bat := tr.Next()
bat := tr.Next(ctx)
if err != nil {
b.Fatal(err)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/distsqlrun/columnarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type columnarizer struct {
accumulatedMeta []ProducerMetadata
}

var _ exec.Operator = &columnarizer{}

// newColumnarizer returns a new columnarizer.
func newColumnarizer(flowCtx *FlowCtx, processorID int32, input RowSource) (*columnarizer, error) {
c := &columnarizer{
Expand Down Expand Up @@ -70,7 +72,7 @@ func (c *columnarizer) Init() {
c.input.Start(context.TODO())
}

func (c *columnarizer) Next() coldata.Batch {
func (c *columnarizer) Next(context.Context) coldata.Batch {
// Buffer up n rows.
nRows := uint16(0)
columnTypes := c.OutputTypes()
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsqlrun/columnarizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func BenchmarkColumnarize(b *testing.B) {
for i := 0; i < b.N; i++ {
foundRows := 0
for {
batch := c.Next()
batch := c.Next(ctx)
if batch.Length() == 0 {
break
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/distsqlrun/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func newMaterializer(
flowCtx,
processorID,
output,
nil,
nil, /* memMonitor */
ProcStateOpts{
TrailingMetaCallback: func(ctx context.Context) []ProducerMetadata {
var trailingMeta []ProducerMetadata
Expand All @@ -121,14 +121,15 @@ func newMaterializer(

func (m *materializer) Start(ctx context.Context) context.Context {
m.input.Init()
m.Ctx = ctx
return ctx
}

// nextBatch saves the next batch from input in m.batch. For internal use only.
// The purpose of having this function is to not create an anonymous function
// on every call to Next().
func (m *materializer) nextBatch() {
m.batch = m.input.Next()
m.batch = m.input.Next(m.Ctx)
}

func (m *materializer) Next() (sqlbase.EncDatumRow, *ProducerMetadata) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/distsqlrun/vectorized_error_propagation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,12 @@ func (e *testNonVectorizedErrorEmitter) Init() {
}

// Next is part of exec.Operator interface.
func (e *testNonVectorizedErrorEmitter) Next() coldata.Batch {
func (e *testNonVectorizedErrorEmitter) Next(ctx context.Context) coldata.Batch {
if !e.emitBatch {
e.emitBatch = true
panic(errors.New("An error from distsqlrun package"))
}

e.emitBatch = false
return e.input.Next()
return e.input.Next(ctx)
}
6 changes: 4 additions & 2 deletions pkg/sql/exec/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package exec

import (
"context"

"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/exec/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/exec/types"
Expand Down Expand Up @@ -232,7 +234,7 @@ func (a *orderedAggregator) Init() {
a.initWithBatchSize(coldata.BatchSize, coldata.BatchSize)
}

func (a *orderedAggregator) Next() coldata.Batch {
func (a *orderedAggregator) Next(ctx context.Context) coldata.Batch {
if a.done {
a.scratch.SetLength(0)
return a.scratch
Expand All @@ -257,7 +259,7 @@ func (a *orderedAggregator) Next() coldata.Batch {
}

for a.scratch.resumeIdx < a.scratch.outputSize {
batch := a.input.Next()
batch := a.input.Next(ctx)
for i, fn := range a.aggregateFuncs {
fn.Compute(batch, a.aggCols[i])
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/exec/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package exec

import (
"context"
"fmt"
"testing"

Expand Down Expand Up @@ -434,6 +435,7 @@ func TestAggregatorRandomCountSum(t *testing.T) {
// This test sums and counts random inputs, keeping track of the expected
// results to make sure the aggregations are correct.
rng, _ := randutil.NewPseudoRand()
ctx := context.Background()
for _, groupSize := range []int{1, 2, coldata.BatchSize / 4, coldata.BatchSize / 2} {
for _, numInputBatches := range []int{1, 2, 64} {
for _, agg := range aggTypes {
Expand Down Expand Up @@ -472,7 +474,7 @@ func TestAggregatorRandomCountSum(t *testing.T) {
// Exhaust aggregator until all batches have been read.
i := 0
tupleIdx := 0
for b := a.Next(); b.Length() != 0; b = a.Next() {
for b := a.Next(ctx); b.Length() != 0; b = a.Next(ctx) {
countCol := b.ColVec(0).Int64()
sumCol := b.ColVec(1).Int64()
for j := uint16(0); j < b.Length(); j++ {
Expand Down Expand Up @@ -507,6 +509,7 @@ func TestAggregatorRandomCountSum(t *testing.T) {

func BenchmarkAggregator(b *testing.B) {
rng, _ := randutil.NewPseudoRand()
ctx := context.Background()

for _, aggFn := range []distsqlpb.AggregatorSpec_Func{
distsqlpb.AggregatorSpec_ANY_NOT_NULL,
Expand Down Expand Up @@ -572,7 +575,7 @@ func BenchmarkAggregator(b *testing.B) {
source.reset()
// Exhaust aggregator until all batches have been read.
foundTuples := 0
for b := a.Next(); b.Length() != 0; b = a.Next() {
for b := a.Next(ctx); b.Length() != 0; b = a.Next(ctx) {
foundTuples += int(b.Length())
}
if foundTuples != nTuples/groupSize {
Expand Down
10 changes: 7 additions & 3 deletions pkg/sql/exec/bool_vec_to_sel.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@

package exec

import "github.com/cockroachdb/cockroach/pkg/sql/exec/coldata"
import (
"context"

"github.com/cockroachdb/cockroach/pkg/sql/exec/coldata"
)

// boolVecToSelOp transforms a boolean column into a selection vector by adding
// an index to the selection for each true value in the boolean column.
Expand All @@ -30,11 +34,11 @@ var _ Operator = &boolVecToSelOp{}

var zeroBoolVec = make([]bool, coldata.BatchSize)

func (p *boolVecToSelOp) Next() coldata.Batch {
func (p *boolVecToSelOp) Next(ctx context.Context) coldata.Batch {
// Loop until we have non-zero amount of output to return, or our input's been
// exhausted.
for {
batch := p.input.Next()
batch := p.input.Next(ctx)
if batch.Length() == 0 {
return batch
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/exec/coalescer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package exec

import (
"context"

"github.com/cockroachdb/cockroach/pkg/sql/exec/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/exec/types"
)
Expand Down Expand Up @@ -46,7 +48,7 @@ func (p *coalescerOp) Init() {
p.buffer = coldata.NewMemBatch(p.inputTypes)
}

func (p *coalescerOp) Next() coldata.Batch {
func (p *coalescerOp) Next(ctx context.Context) coldata.Batch {
tempBatch := p.group
p.group = p.buffer

Expand All @@ -55,7 +57,7 @@ func (p *coalescerOp) Next() coldata.Batch {

for p.group.Length() < coldata.BatchSize {
leftover := coldata.BatchSize - p.group.Length()
batch := p.input.Next()
batch := p.input.Next(ctx)
batchSize := batch.Length()

if batchSize == 0 {
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/exec/coalescer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package exec

import (
"context"
"fmt"
"testing"

Expand Down Expand Up @@ -72,6 +73,7 @@ func TestCoalescer(t *testing.T) {
}

func BenchmarkCoalescer(b *testing.B) {
ctx := context.Background()
// The input operator to the coalescer returns a batch of random size from [1,
// col.BatchSize) each time.
nCols := 4
Expand Down Expand Up @@ -105,7 +107,7 @@ func BenchmarkCoalescer(b *testing.B) {
co.Init()

for i := 0; i < nBatches; i++ {
co.Next()
co.Next(ctx)
}
}
})
Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/exec/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package exec

import (
"context"

"github.com/cockroachdb/cockroach/pkg/sql/exec/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/exec/types"
)
Expand All @@ -31,6 +33,8 @@ type countOp struct {
count int64
}

var _ Operator = &countOp{}

// NewCountOp returns a new count operator that counts the rows in its input.
func NewCountOp(input Operator) Operator {
c := &countOp{
Expand All @@ -48,13 +52,13 @@ func (c *countOp) Init() {
c.done = false
}

func (c *countOp) Next() coldata.Batch {
func (c *countOp) Next(ctx context.Context) coldata.Batch {
if c.done {
c.internalBatch.SetLength(0)
return c.internalBatch
}
for {
bat := c.input.Next()
bat := c.input.Next(ctx)
length := bat.Length()
if length == 0 {
c.done = true
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/exec/deselector.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package exec

import (
"context"

"github.com/cockroachdb/cockroach/pkg/sql/exec/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/exec/types"
)
Expand Down Expand Up @@ -46,8 +48,8 @@ func (p *deselectorOp) Init() {
p.output = coldata.NewMemBatch(p.inputTypes)
}

func (p *deselectorOp) Next() coldata.Batch {
batch := p.input.Next()
func (p *deselectorOp) Next(ctx context.Context) coldata.Batch {
batch := p.input.Next(ctx)
if batch.Selection() == nil {
return batch
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/exec/deselector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package exec

import (
"context"
"fmt"
"testing"

Expand Down Expand Up @@ -76,6 +77,7 @@ func TestDeselector(t *testing.T) {

func BenchmarkDeselector(b *testing.B) {
rng, _ := randutil.NewPseudoRand()
ctx := context.Background()

nCols := 1
inputTypes := make([]types.T, nCols)
Expand Down Expand Up @@ -109,7 +111,7 @@ func BenchmarkDeselector(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
input.resetBatchesToReturn(nBatches)
for b := op.Next(); b.Length() != 0; b = op.Next() {
for b := op.Next(ctx); b.Length() != 0; b = op.Next(ctx) {
}
// We don't need to reset the deselector because it doesn't keep any
// state. We do, however, want to keep its already allocated memory
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/exec/distinct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package exec

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/sql/exec/coldata"
Expand Down Expand Up @@ -87,6 +88,7 @@ func TestSortedDistinct(t *testing.T) {

func BenchmarkSortedDistinct(b *testing.B) {
rng, _ := randutil.NewPseudoRand()
ctx := context.Background()

batch := coldata.NewMemBatch([]types.T{types.Int64, types.Int64, types.Int64})
aCol := batch.ColVec(1).Int64()
Expand Down Expand Up @@ -116,6 +118,6 @@ func BenchmarkSortedDistinct(b *testing.B) {
// don't count the artificial zeroOp'd column in the throughput
b.SetBytes(int64(8 * coldata.BatchSize * 3))
for i := 0; i < b.N; i++ {
distinct.Next()
distinct.Next(ctx)
}
}
5 changes: 3 additions & 2 deletions pkg/sql/exec/distinct_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package exec

import (
"bytes"
"context"

"github.com/cockroachdb/apd"
"github.com/cockroachdb/cockroach/pkg/sql/exec/coldata"
Expand Down Expand Up @@ -168,8 +169,8 @@ func (p *sortedDistinct_TYPEOp) reset() {
}
}

func (p *sortedDistinct_TYPEOp) Next() coldata.Batch {
batch := p.input.Next()
func (p *sortedDistinct_TYPEOp) Next(ctx context.Context) coldata.Batch {
batch := p.input.Next(ctx)
if batch.Length() == 0 {
return batch
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/exec/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package exec

import (
"bufio"
"context"
"fmt"
"runtime/debug"
"strings"
Expand Down Expand Up @@ -94,12 +95,12 @@ func (e *TestVectorizedErrorEmitter) Init() {
}

// Next is part of Operator interface.
func (e *TestVectorizedErrorEmitter) Next() coldata.Batch {
func (e *TestVectorizedErrorEmitter) Next(ctx context.Context) coldata.Batch {
if !e.emitBatch {
e.emitBatch = true
panic(errors.New("a panic from exec package"))
}

e.emitBatch = false
return e.input.Next()
return e.input.Next(ctx)
}
1 change: 1 addition & 0 deletions pkg/sql/exec/execgen/cmd/execgen/like_ops_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ package exec
import (
"bytes"
"context"
"regexp"
"github.com/cockroachdb/cockroach/pkg/sql/exec/coldata"
Expand Down
Loading

0 comments on commit 80c5b09

Please sign in to comment.