diff --git a/Makefile b/Makefile index 8f04be4f8fe3..96f657daf9c2 100644 --- a/Makefile +++ b/Makefile @@ -803,6 +803,9 @@ EXECGEN_TARGETS = \ pkg/sql/colexec/mergejoiner_leftsemi.eg.go \ pkg/sql/colexec/mergejoiner_rightouter.eg.go \ pkg/sql/colexec/min_max_agg.eg.go \ + pkg/sql/colexec/overloads_test_utils.eg.go \ + pkg/sql/colexec/rank.eg.go \ + pkg/sql/colexec/row_number.eg.go \ pkg/sql/colexec/projection_ops.eg.go \ pkg/sql/colexec/quicksort.eg.go \ pkg/sql/colexec/rowstovec.eg.go \ @@ -812,10 +815,7 @@ EXECGEN_TARGETS = \ pkg/sql/colexec/sum_agg.eg.go \ pkg/sql/colexec/tuples_differ.eg.go \ pkg/sql/colexec/vec_comparators.eg.go \ - pkg/sql/colexec/vecbuiltins/rank.eg.go \ - pkg/sql/colexec/vecbuiltins/row_number.eg.go \ - pkg/sql/colexec/zerocolumns.eg.go \ - pkg/sql/colexec/overloads_test_utils.eg.go + pkg/sql/colexec/zerocolumns.eg.go execgen-exclusions = $(addprefix -not -path ,$(EXECGEN_TARGETS)) @@ -1485,14 +1485,14 @@ pkg/sql/colexec/mergejoiner_leftsemi.eg.go: pkg/sql/colexec/mergejoiner_tmpl.go pkg/sql/colexec/mergejoiner_rightouter.eg.go: pkg/sql/colexec/mergejoiner_tmpl.go pkg/sql/colexec/min_max_agg.eg.go: pkg/sql/colexec/min_max_agg_tmpl.go pkg/sql/colexec/quicksort.eg.go: pkg/sql/colexec/quicksort_tmpl.go +pkg/sql/colexec/rank.eg.go: pkg/sql/colexec/rank_tmpl.go +pkg/sql/colexec/row_number.eg.go: pkg/sql/colexec/row_number_tmpl.go pkg/sql/colexec/rowstovec.eg.go: pkg/sql/colexec/rowstovec_tmpl.go pkg/sql/colexec/select_in.eg.go: pkg/sql/colexec/select_in_tmpl.go pkg/sql/colexec/sort.eg.go: pkg/sql/colexec/sort_tmpl.go pkg/sql/colexec/sum_agg.eg.go: pkg/sql/colexec/sum_agg_tmpl.go pkg/sql/colexec/tuples_differ.eg.go: pkg/sql/colexec/tuples_differ_tmpl.go pkg/sql/colexec/vec_comparators.eg.go: pkg/sql/colexec/vec_comparators_tmpl.go -pkg/sql/colexec/vecbuiltins/rank.eg.go: pkg/sql/colexec/vecbuiltins/rank_tmpl.go -pkg/sql/colexec/vecbuiltins/row_number.eg.go: pkg/sql/colexec/vecbuiltins/row_number_tmpl.go pkg/sql/colexec/zerocolumns.eg.go: pkg/sql/colexec/zerocolumns_tmpl.go $(EXECGEN_TARGETS): bin/execgen diff --git a/pkg/sql/colexec/.gitignore b/pkg/sql/colexec/.gitignore index 9e153d3ac780..2c91adf276b7 100644 --- a/pkg/sql/colexec/.gitignore +++ b/pkg/sql/colexec/.gitignore @@ -15,8 +15,11 @@ mergejoiner_leftouter.eg.go mergejoiner_leftsemi.eg.go mergejoiner_rightouter.eg.go min_max_agg.eg.go +overloads_test_utils.eg.go projection_ops.eg.go quicksort.eg.go +rank.eg.go +row_number.eg.go rowstovec.eg.go selection_ops.eg.go select_in.eg.go @@ -24,7 +27,4 @@ sort.eg.go sum_agg.eg.go tuples_differ.eg.go vec_comparators.eg.go -vecbuiltins/rank.eg.go -vecbuiltins/row_number.eg.go zerocolumns.eg.go -overloads_test_utils.eg.go diff --git a/pkg/sql/colexec/case.go b/pkg/sql/colexec/case.go index 6b121f688880..98398d302633 100644 --- a/pkg/sql/colexec/case.go +++ b/pkg/sql/colexec/case.go @@ -17,7 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coltypes" "github.com/cockroachdb/cockroach/pkg/sql/colexec/execerror" - "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" ) type caseOp struct { @@ -40,7 +40,7 @@ func (c *caseOp) ChildCount() int { return 1 + len(c.caseOps) + 1 } -func (c *caseOp) Child(nth int) execinfrapb.OpNode { +func (c *caseOp) Child(nth int) execinfra.OpNode { if nth == 0 { return c.buffer.input } else if nth < len(c.caseOps)+1 { diff --git a/pkg/sql/row/cfetcher.go b/pkg/sql/colexec/cfetcher.go similarity index 95% rename from pkg/sql/row/cfetcher.go rename to pkg/sql/colexec/cfetcher.go index fa117b761b1d..28c9ba18f1e3 100644 --- a/pkg/sql/row/cfetcher.go +++ b/pkg/sql/colexec/cfetcher.go @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package row +package colexec import ( "bytes" @@ -23,9 +23,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/colencoding" - "github.com/cockroachdb/cockroach/pkg/sql/colexec" "github.com/cockroachdb/cockroach/pkg/sql/colexec/execerror" "github.com/cockroachdb/cockroach/pkg/sql/colexec/typeconv" + "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/scrub" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" @@ -36,8 +36,6 @@ import ( "github.com/cockroachdb/errors" ) -// TODO(yuzefovich): move CFetcher into sql/colexec. - // Only unique secondary indexes have extra columns to decode (namely the // primary index columns). func cHasExtraCols(table *cTableInfo) bool { @@ -145,10 +143,10 @@ func (m colIdxMap) get(c sqlbase.ColumnID) (int, bool) { return 0, false } -// CFetcher handles fetching kvs and forming table rows for an +// cFetcher handles fetching kvs and forming table rows for an // arbitrary number of tables. // Usage: -// var rf CFetcher +// var rf cFetcher // err := rf.Init(..) // // Handle err // err := rf.StartScan(..) @@ -162,7 +160,7 @@ func (m colIdxMap) get(c sqlbase.ColumnID) (int, bool) { // } // // Process res.colBatch // } -type CFetcher struct { +type cFetcher struct { // table is the table that's configured for fetching. table cTableInfo @@ -194,7 +192,7 @@ type CFetcher struct { traceKV bool // fetcher is the underlying fetcher that provides KVs. - fetcher kvFetcher + fetcher *row.KVFetcher // machine contains fields that get updated during the run of the fetcher. machine struct { @@ -231,16 +229,16 @@ type CFetcher struct { } // estimatedStaticMemoryUsage is the best guess about how much memory the - // CFetcher will use. + // cFetcher will use. estimatedStaticMemoryUsage int } // Init sets up a Fetcher for a given table and index. If we are using a // non-primary index, tables.ValNeededForCol can only refer to columns in the // index. -func (rf *CFetcher) Init( +func (rf *cFetcher) Init( reverse, - returnRangeInfo bool, isCheck bool, tables ...FetcherTableArgs, + returnRangeInfo bool, isCheck bool, tables ...row.FetcherTableArgs, ) error { if len(tables) == 0 { return errors.AssertionFailedf("no tables to fetch from") @@ -290,7 +288,7 @@ func (rf *CFetcher) Init( } rf.machine.batch = coldata.NewMemBatch(typs) rf.machine.colvecs = rf.machine.batch.ColVecs() - rf.estimatedStaticMemoryUsage = colexec.EstimateBatchSizeBytes(typs, coldata.BatchSize) + rf.estimatedStaticMemoryUsage = EstimateBatchSizeBytes(typs, coldata.BatchSize) var err error @@ -422,7 +420,7 @@ func (rf *CFetcher) Init( // StartScan initializes and starts the key-value scan. Can be used multiple // times. -func (rf *CFetcher) StartScan( +func (rf *cFetcher) StartScan( ctx context.Context, txn *client.Txn, spans roachpb.Spans, @@ -450,12 +448,14 @@ func (rf *CFetcher) StartScan( firstBatchLimit++ } - f, err := makeKVBatchFetcher(txn, spans, rf.reverse, limitBatches, firstBatchLimit, rf.returnRangeInfo) + f, err := row.NewKVFetcher( + txn, spans, rf.reverse, limitBatches, firstBatchLimit, rf.returnRangeInfo, + ) if err != nil { return err } + rf.fetcher = f rf.machine.lastRowPrefix = nil - rf.fetcher = newKVFetcher(&f) rf.machine.state[0] = stateInitFetch return nil } @@ -550,7 +550,7 @@ const debugState = false // index used; columns that are not needed (as per neededCols) are empty. The // Batch should not be modified and is only valid until the next call. // When there are no more rows, the Batch.Length is 0. -func (rf *CFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) { +func (rf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) { for { if debugState { log.Infof(ctx, "State %s", rf.machine.state[0]) @@ -559,7 +559,7 @@ func (rf *CFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) { case stateInvalid: return nil, errors.New("invalid fetcher state") case stateInitFetch: - moreKeys, kv, newSpan, err := rf.fetcher.nextKV(ctx) + moreKeys, kv, newSpan, err := rf.fetcher.NextKV(ctx) if err != nil { return nil, execerror.NewStorageError(err) } @@ -568,7 +568,7 @@ func (rf *CFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) { continue } if newSpan { - rf.machine.curSpan = rf.fetcher.span + rf.machine.curSpan = rf.fetcher.Span // TODO(jordan): parse the logical longest common prefix of the span // into a buffer. The logical longest common prefix is the longest // common prefix that contains only full key components. For example, @@ -661,7 +661,7 @@ func (rf *CFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) { rf.machine.state[0] = stateFetchNextKVWithUnfinishedRow case stateSeekPrefix: for { - moreRows, kv, _, err := rf.fetcher.nextKV(ctx) + moreRows, kv, _, err := rf.fetcher.NextKV(ctx) if err != nil { return nil, execerror.NewStorageError(err) } @@ -684,7 +684,7 @@ func (rf *CFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) { rf.shiftState() case stateFetchNextKVWithUnfinishedRow: - moreKVs, kv, _, err := rf.fetcher.nextKV(ctx) + moreKVs, kv, _, err := rf.fetcher.NextKV(ctx) if err != nil { return nil, execerror.NewStorageError(err) } @@ -775,20 +775,20 @@ func (rf *CFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) { // shiftState shifts the state queue to the left, removing the first element and // clearing the last element. -func (rf *CFetcher) shiftState() { +func (rf *cFetcher) shiftState() { copy(rf.machine.state[:2], rf.machine.state[1:]) rf.machine.state[2] = stateInvalid } -func (rf *CFetcher) pushState(state fetcherState) { +func (rf *cFetcher) pushState(state fetcherState) { copy(rf.machine.state[1:], rf.machine.state[:2]) rf.machine.state[0] = state } // getDatumAt returns the converted datum object at the given (colIdx, rowIdx). // This function is meant for tracing and should not be used in hot paths. -func (rf *CFetcher) getDatumAt(colIdx int, rowIdx uint16, typ types.T) tree.Datum { - return colexec.PhysicalTypeColElemToDatum(rf.machine.colvecs[colIdx], rowIdx, rf.table.da, typ) +func (rf *cFetcher) getDatumAt(colIdx int, rowIdx uint16, typ types.T) tree.Datum { + return PhysicalTypeColElemToDatum(rf.machine.colvecs[colIdx], rowIdx, rf.table.da, typ) } // processValue processes the state machine's current value component, setting @@ -796,7 +796,7 @@ func (rf *CFetcher) getDatumAt(colIdx int, rowIdx uint16, typ types.T) tree.Datu // is found in the current value component. // If debugStrings is true, returns pretty printed key and value // information in prettyKey/prettyValue (otherwise they are empty strings). -func (rf *CFetcher) processValue( +func (rf *cFetcher) processValue( ctx context.Context, familyID sqlbase.FamilyID, ) (prettyKey string, prettyValue string, err error) { table := &rf.table @@ -931,7 +931,7 @@ func (rf *CFetcher) processValue( // processValueSingle processes the given value (of column // family.DefaultColumnID), setting values in table.row accordingly. The key is // only used for logging. -func (rf *CFetcher) processValueSingle( +func (rf *cFetcher) processValueSingle( ctx context.Context, table *cTableInfo, family *sqlbase.ColumnFamilyDescriptor, @@ -980,7 +980,7 @@ func (rf *CFetcher) processValueSingle( if rf.traceKV { prettyValue = rf.getDatumAt(idx, rf.machine.rowIdx, *typ).String() } - if debugRowFetch { + if row.DebugRowFetch { log.Infof(ctx, "Scan %s -> %v", rf.machine.nextKV.Key, "?") } return prettyKey, prettyValue, nil @@ -989,13 +989,13 @@ func (rf *CFetcher) processValueSingle( // No need to unmarshal the column value. Either the column was part of // the index key or it isn't needed. - if debugRowFetch { + if row.DebugRowFetch { log.Infof(ctx, "Scan %s -> [%d] (skipped)", rf.machine.nextKV.Key, colID) } return "", "", nil } -func (rf *CFetcher) processValueBytes( +func (rf *cFetcher) processValueBytes( ctx context.Context, table *cTableInfo, valueBytes []byte, prettyKeyPrefix string, ) (prettyKey string, prettyValue string, err error) { prettyKey = prettyKeyPrefix @@ -1038,7 +1038,7 @@ func (rf *CFetcher) processValueBytes( return "", "", err } valueBytes = valueBytes[len:] - if debugRowFetch { + if row.DebugRowFetch { log.Infof(ctx, "Scan %s -> [%d] (skipped)", rf.machine.nextKV.Key, colID) } continue @@ -1082,13 +1082,13 @@ func (rf *CFetcher) processValueBytes( // processValueTuple processes the given values (of columns family.ColumnIDs), // setting values in the rf.row accordingly. The key is only used for logging. -func (rf *CFetcher) processValueTuple( +func (rf *cFetcher) processValueTuple( ctx context.Context, table *cTableInfo, tupleBytes []byte, prettyKeyPrefix string, ) (prettyKey string, prettyValue string, err error) { return rf.processValueBytes(ctx, table, tupleBytes, prettyKeyPrefix) } -func (rf *CFetcher) fillNulls() error { +func (rf *cFetcher) fillNulls() error { table := &rf.table if rf.machine.remainingValueColsByIdx.Empty() { return nil @@ -1120,18 +1120,18 @@ func (rf *CFetcher) fillNulls() error { // GetRangesInfo returns information about the ranges where the rows came from. // The RangeInfo's are deduped and not ordered. -func (rf *CFetcher) GetRangesInfo() []roachpb.RangeInfo { - f := rf.fetcher.kvBatchFetcher +func (rf *cFetcher) GetRangesInfo() []roachpb.RangeInfo { + f := rf.fetcher if f == nil { // Not yet initialized. return nil } - return f.getRangesInfo() + return rf.fetcher.GetRangesInfo() } // getCurrentColumnFamilyID returns the column family id of the key in // rf.machine.nextKV.Key. -func (rf *CFetcher) getCurrentColumnFamilyID() (sqlbase.FamilyID, error) { +func (rf *cFetcher) getCurrentColumnFamilyID() (sqlbase.FamilyID, error) { // If the table only has 1 column family, and its ID is 0, we know that the // key has to be the 0th column family. if rf.table.maxColumnFamilyID == 0 { @@ -1150,7 +1150,7 @@ func (rf *CFetcher) getCurrentColumnFamilyID() (sqlbase.FamilyID, error) { } // EstimateStaticMemoryUsage estimates how much memory is pre-allocated by the -// CFetcher. -func (rf *CFetcher) EstimateStaticMemoryUsage() int { +// cFetcher. +func (rf *cFetcher) EstimateStaticMemoryUsage() int { return rf.estimatedStaticMemoryUsage } diff --git a/pkg/sql/row/cfetcher_test.go b/pkg/sql/colexec/cfetcher_test.go similarity index 94% rename from pkg/sql/row/cfetcher_test.go rename to pkg/sql/colexec/cfetcher_test.go index 4bbb9d9db0c0..5f03fc243e02 100644 --- a/pkg/sql/row/cfetcher_test.go +++ b/pkg/sql/colexec/cfetcher_test.go @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package row +package colexec import ( "testing" @@ -19,7 +19,7 @@ import ( func TestCFetcherUninitialized(t *testing.T) { // Regression test for #36570: make sure it's okay to call GetRangesInfo even // before the fetcher was fully initialized. - var fetcher CFetcher + var fetcher cFetcher assert.Nil(t, fetcher.GetRangesInfo()) } diff --git a/pkg/sql/colflow/colbatch_scan.go b/pkg/sql/colexec/colbatch_scan.go similarity index 93% rename from pkg/sql/colflow/colbatch_scan.go rename to pkg/sql/colexec/colbatch_scan.go index 39b05484fae7..54d6d8e84db5 100644 --- a/pkg/sql/colflow/colbatch_scan.go +++ b/pkg/sql/colexec/colbatch_scan.go @@ -8,14 +8,13 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package colflow +package colexec import ( "context" "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/sql/colexec" "github.com/cockroachdb/cockroach/pkg/sql/colexec/execerror" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -33,16 +32,13 @@ import ( // should get rid off table readers entirely. We will have to be careful about // propagating the metadata though. -// TODO(yuzefovich): once row.CFetcher is moved into colexec, move colBatchScan -// there as well. - // colBatchScan is the exec.Operator implementation of TableReader. It reads a table // from kv, presenting it as coldata.Batches via the exec.Operator interface. type colBatchScan struct { - colexec.ZeroInputNode + ZeroInputNode spans roachpb.Spans flowCtx *execinfra.FlowCtx - rf *row.CFetcher + rf *cFetcher limitHint int64 ctx context.Context // maxResults is non-zero if there is a limit on the total number of rows @@ -52,7 +48,7 @@ type colBatchScan struct { init bool } -var _ colexec.StaticMemoryOperator = &colBatchScan{} +var _ StaticMemoryOperator = &colBatchScan{} func (s *colBatchScan) EstimateStaticMemoryUsage() int { return s.rf.EstimateStaticMemoryUsage() @@ -127,7 +123,7 @@ func newColBatchScan( neededColumns := helper.NeededColumns() columnIdxMap := spec.Table.ColumnIdxMapWithMutations(returnMutations) - fetcher := row.CFetcher{} + fetcher := cFetcher{} if _, _, err := initCRowFetcher( &fetcher, &spec.Table, int(spec.IndexIdx), columnIdxMap, spec.Reverse, neededColumns, spec.IsCheck, spec.Visibility, @@ -149,9 +145,9 @@ func newColBatchScan( }, nil } -// initCRowFetcher initializes a row.CFetcher. See initRowFetcher. +// initCRowFetcher initializes a row.cFetcher. See initRowFetcher. func initCRowFetcher( - fetcher *row.CFetcher, + fetcher *cFetcher, desc *sqlbase.TableDescriptor, indexIdx int, colIdxMap map[sqlbase.ColumnID]int, diff --git a/pkg/sql/colflow/columnarizer.go b/pkg/sql/colexec/columnarizer.go similarity index 75% rename from pkg/sql/colflow/columnarizer.go rename to pkg/sql/colexec/columnarizer.go index 39c95161f94f..289906f4fc5e 100644 --- a/pkg/sql/colflow/columnarizer.go +++ b/pkg/sql/colexec/columnarizer.go @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package colflow +package colexec import ( "context" @@ -16,7 +16,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coltypes" - "github.com/cockroachdb/cockroach/pkg/sql/colexec" "github.com/cockroachdb/cockroach/pkg/sql/colexec/execerror" "github.com/cockroachdb/cockroach/pkg/sql/colexec/typeconv" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" @@ -24,15 +23,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" ) -// TODO(yuzefovich): once row.CFetcher is moved into colexec, move Columnarizer -// there as well. - -// Columnarizer turns ane execinfra.RowSource input into an colexec.Operator -// output, by reading the input in chunks of size coldata.BatchSize and -// converting each chunk into a coldata.Batch column by column. +// Columnarizer turns an execinfra.RowSource input into an Operator output, by +// reading the input in chunks of size coldata.BatchSize and converting each +// chunk into a coldata.Batch column by column. type Columnarizer struct { execinfra.ProcessorBase - colexec.NonExplainable + NonExplainable input execinfra.RowSource da sqlbase.DatumAlloc @@ -44,7 +40,7 @@ type Columnarizer struct { typs []coltypes.T } -var _ colexec.StaticMemoryOperator = &Columnarizer{} +var _ StaticMemoryOperator = &Columnarizer{} // NewColumnarizer returns a new Columnarizer. func NewColumnarizer( @@ -72,13 +68,13 @@ func NewColumnarizer( return c, err } -// EstimateStaticMemoryUsage is part of the exec.StaticMemoryOperator +// EstimateStaticMemoryUsage is part of the StaticMemoryOperator // interface. func (c *Columnarizer) EstimateStaticMemoryUsage() int { - return colexec.EstimateBatchSizeBytes(c.typs, coldata.BatchSize) + return EstimateBatchSizeBytes(c.typs, coldata.BatchSize) } -// Init is part of the colexec.Operator interface. +// Init is part of the Operator interface. func (c *Columnarizer) Init() { c.batch = coldata.NewMemBatch(c.typs) c.buffered = make(sqlbase.EncDatumRows, coldata.BatchSize) @@ -89,7 +85,7 @@ func (c *Columnarizer) Init() { c.input.Start(c.ctx) } -// Next is part of the colexec.Operator interface. +// Next is part of the Operator interface. func (c *Columnarizer) Next(context.Context) coldata.Batch { c.batch.ResetInternalBatch() // Buffer up n rows. @@ -113,7 +109,7 @@ func (c *Columnarizer) Next(context.Context) coldata.Batch { // Write each column into the output batch. for idx, ct := range columnTypes { - err := colexec.EncDatumRowsToColVec(c.buffered[:nRows], c.batch.ColVec(idx), idx, &ct, &c.da) + err := EncDatumRowsToColVec(c.buffered[:nRows], c.batch.ColVec(idx), idx, &ct, &c.da) if err != nil { execerror.VectorizedInternalPanic(err) } @@ -123,13 +119,13 @@ func (c *Columnarizer) Next(context.Context) coldata.Batch { // Run is part of the execinfra.Processor interface. // -// columnarizers are not expected to be Run, so we prohibit calling this method +// Columnarizers are not expected to be Run, so we prohibit calling this method // on them. func (c *Columnarizer) Run(context.Context) { execerror.VectorizedInternalPanic("Columnarizer should not be Run") } -var _ colexec.Operator = &Columnarizer{} +var _ Operator = &Columnarizer{} var _ execinfrapb.MetadataSource = &Columnarizer{} // DrainMeta is part of the MetadataSource interface. @@ -140,21 +136,21 @@ func (c *Columnarizer) DrainMeta(ctx context.Context) []execinfrapb.ProducerMeta return c.accumulatedMeta } -// ChildCount is part of the colexec.Operator interface. +// ChildCount is part of the Operator interface. func (c *Columnarizer) ChildCount() int { - if _, ok := c.input.(execinfrapb.OpNode); ok { + if _, ok := c.input.(execinfra.OpNode); ok { return 1 } return 0 } -// Child is part of the colexec.Operator interface. -func (c *Columnarizer) Child(nth int) execinfrapb.OpNode { +// Child is part of the Operator interface. +func (c *Columnarizer) Child(nth int) execinfra.OpNode { if nth == 0 { - if n, ok := c.input.(execinfrapb.OpNode); ok { + if n, ok := c.input.(execinfra.OpNode); ok { return n } - execerror.VectorizedInternalPanic("input to Columnarizer is not an execinfrapb.OpNode") + execerror.VectorizedInternalPanic("input to Columnarizer is not an execinfra.OpNode") } execerror.VectorizedInternalPanic(fmt.Sprintf("invalid index %d", nth)) // This code is unreachable, but the compiler cannot infer that. diff --git a/pkg/sql/colflow/columnarizer_test.go b/pkg/sql/colexec/columnarizer_test.go similarity index 99% rename from pkg/sql/colflow/columnarizer_test.go rename to pkg/sql/colexec/columnarizer_test.go index b2a59b237914..398b19347437 100644 --- a/pkg/sql/colflow/columnarizer_test.go +++ b/pkg/sql/colexec/columnarizer_test.go @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package colflow +package colexec import ( "context" diff --git a/pkg/sql/colexec/execgen/cmd/execgen/rank_gen.go b/pkg/sql/colexec/execgen/cmd/execgen/rank_gen.go index baed37749b77..357ae3df283e 100644 --- a/pkg/sql/colexec/execgen/cmd/execgen/rank_gen.go +++ b/pkg/sql/colexec/execgen/cmd/execgen/rank_gen.go @@ -59,7 +59,7 @@ var ( ) func genRankOps(wr io.Writer) error { - d, err := ioutil.ReadFile("pkg/sql/colexec/vecbuiltins/rank_tmpl.go") + d, err := ioutil.ReadFile("pkg/sql/colexec/rank_tmpl.go") if err != nil { return err } diff --git a/pkg/sql/colexec/execgen/cmd/execgen/row_number_gen.go b/pkg/sql/colexec/execgen/cmd/execgen/row_number_gen.go index 1268c22c23ed..f753b86e5e72 100644 --- a/pkg/sql/colexec/execgen/cmd/execgen/row_number_gen.go +++ b/pkg/sql/colexec/execgen/cmd/execgen/row_number_gen.go @@ -23,7 +23,7 @@ type rowNumberTmplInfo struct { } func genRowNumberOp(wr io.Writer) error { - d, err := ioutil.ReadFile("pkg/sql/colexec/vecbuiltins/row_number_tmpl.go") + d, err := ioutil.ReadFile("pkg/sql/colexec/row_number_tmpl.go") if err != nil { return err } diff --git a/pkg/sql/colflow/execplan.go b/pkg/sql/colexec/execplan.go similarity index 88% rename from pkg/sql/colflow/execplan.go rename to pkg/sql/colexec/execplan.go index 2eb182f639ee..c9f8ba0b9d42 100644 --- a/pkg/sql/colflow/execplan.go +++ b/pkg/sql/colexec/execplan.go @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package colflow +package colexec import ( "context" @@ -18,9 +18,7 @@ import ( "strings" "github.com/cockroachdb/cockroach/pkg/col/coltypes" - "github.com/cockroachdb/cockroach/pkg/sql/colexec" "github.com/cockroachdb/cockroach/pkg/sql/colexec/typeconv" - "github.com/cockroachdb/cockroach/pkg/sql/colexec/vecbuiltins" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -31,7 +29,7 @@ import ( "github.com/cockroachdb/errors" ) -func checkNumIn(inputs []colexec.Operator, numIn int) error { +func checkNumIn(inputs []Operator, numIn int) error { if len(inputs) != numIn { return errors.Errorf("expected %d input(s), got %d", numIn, len(inputs)) } @@ -43,7 +41,7 @@ func checkNumIn(inputs []colexec.Operator, numIn int) error { func wrapRowSource( ctx context.Context, flowCtx *execinfra.FlowCtx, - input colexec.Operator, + input Operator, inputTypes []types.T, newToWrap func(execinfra.RowSource) (execinfra.RowSource, error), ) (*Columnarizer, error) { @@ -87,7 +85,7 @@ func wrapRowSource( // NewColOperatorResult is a helper struct that encompasses all of the return // values of NewColOperator call. type NewColOperatorResult struct { - Op colexec.Operator + Op Operator ColumnTypes []types.T MemUsage int MetadataSources []execinfrapb.MetadataSource @@ -99,7 +97,7 @@ func NewColOperator( ctx context.Context, flowCtx *execinfra.FlowCtx, spec *execinfrapb.ProcessorSpec, - inputs []colexec.Operator, + inputs []Operator, ) (result NewColOperatorResult, err error) { log.VEventf(ctx, 2, "planning col operator for spec %q", spec) @@ -117,7 +115,7 @@ func NewColOperator( if err := checkNumIn(inputs, 1); err != nil { return result, err } - result.Op, result.IsStreaming = colexec.NewNoop(inputs[0]), true + result.Op, result.IsStreaming = NewNoop(inputs[0]), true result.ColumnTypes = spec.Input[0].ColumnTypes case core.TableReader != nil: if err := checkNumIn(inputs, 0); err != nil { @@ -145,7 +143,7 @@ func NewColOperator( // However, some of the long-running operators (for example, sorter) are // still responsible for doing the cancellation check on their own while // performing long operations. - result.Op = colexec.NewCancelChecker(result.Op) + result.Op = NewCancelChecker(result.Op) returnMutations := core.TableReader.Visibility == execinfrapb.ScanVisibility_PUBLIC_AND_NOT_PUBLIC result.ColumnTypes = core.TableReader.Table.ColumnTypesWithMutations(returnMutations) case core.Aggregator != nil: @@ -163,7 +161,7 @@ func NewColOperator( // TODO(solon): The distsql plan for this case includes a TableReader, so // we end up creating an orphaned colBatchScan. We should avoid that. // Ideally the optimizer would not plan a scan in this unusual case. - result.Op, result.IsStreaming, err = colexec.NewSingleTupleNoInputOp(), true, nil + result.Op, result.IsStreaming, err = NewSingleTupleNoInputOp(), true, nil // We make ColumnTypes non-nil so that sanity check doesn't panic. result.ColumnTypes = make([]types.T, 0) break @@ -173,7 +171,7 @@ func NewColOperator( aggSpec.Aggregations[0].FilterColIdx == nil && aggSpec.Aggregations[0].Func == execinfrapb.AggregatorSpec_COUNT_ROWS && !aggSpec.Aggregations[0].Distinct { - result.Op, result.IsStreaming, err = colexec.NewCountOp(inputs[0]), true, nil + result.Op, result.IsStreaming, err = NewCountOp(inputs[0]), true, nil result.ColumnTypes = []types.T{*types.Int} break } @@ -242,11 +240,11 @@ func NewColOperator( return result, err } if needHash { - result.Op, err = colexec.NewHashAggregator( + result.Op, err = NewHashAggregator( inputs[0], typs, aggFns, aggSpec.GroupCols, aggCols, execinfrapb.IsScalarAggregate(aggSpec), ) } else { - result.Op, err = colexec.NewOrderedAggregator( + result.Op, err = NewOrderedAggregator( inputs[0], typs, aggFns, aggSpec.GroupCols, aggCols, execinfrapb.IsScalarAggregate(aggSpec), ) result.IsStreaming = true @@ -278,7 +276,7 @@ func NewColOperator( if err != nil { return result, err } - result.Op, err = colexec.NewOrderedDistinct(inputs[0], core.Distinct.OrderedColumns, typs) + result.Op, err = NewOrderedDistinct(inputs[0], core.Distinct.OrderedColumns, typs) result.IsStreaming = true case core.Ordinality != nil: @@ -286,7 +284,7 @@ func NewColOperator( return result, err } result.ColumnTypes = append(spec.Input[0].ColumnTypes, *types.Int) - result.Op, result.IsStreaming = colexec.NewOrdinalityOp(inputs[0]), true + result.Op, result.IsStreaming = NewOrdinalityOp(inputs[0]), true case core.HashJoiner != nil: if err := checkNumIn(inputs, 2); err != nil { @@ -345,7 +343,7 @@ func NewColOperator( ) } - result.Op, err = colexec.NewEqHashJoinerOp( + result.Op, err = NewEqHashJoinerOp( inputs[0], inputs[1], core.HashJoiner.LeftEqColumns, @@ -430,7 +428,7 @@ func NewColOperator( onExpr *execinfrapb.Expression filterPlanning *filterPlanningState filterOnlyOnLeft bool - filterConstructor func(colexec.Operator) (colexec.Operator, error) + filterConstructor func(Operator) (Operator, error) ) if !core.MergeJoiner.OnExpr.Empty() { // At the moment, we want to be on the conservative side and not run @@ -450,7 +448,7 @@ func NewColOperator( ) case sqlbase.JoinType_LEFT_SEMI, sqlbase.JoinType_LEFT_ANTI: filterOnlyOnLeft = filterPlanning.isFilterOnlyOnLeft(*onExpr) - filterConstructor = func(op colexec.Operator) (colexec.Operator, error) { + filterConstructor = func(op Operator) (Operator, error) { r := NewColOperatorResult{ Op: op, ColumnTypes: append(spec.Input[0].ColumnTypes, spec.Input[1].ColumnTypes...), @@ -466,7 +464,7 @@ func NewColOperator( } } - result.Op, err = colexec.NewMergeJoinOp( + result.Op, err = NewMergeJoinOp( core.MergeJoiner.Type, inputs[0], inputs[1], @@ -553,16 +551,16 @@ func NewColOperator( if matchLen > 0 { // The input is already partially ordered. Use a chunks sorter to avoid // loading all the rows into memory. - result.Op, err = colexec.NewSortChunks(input, inputTypes, orderingCols, int(matchLen)) + result.Op, err = NewSortChunks(input, inputTypes, orderingCols, int(matchLen)) } else if post.Limit != 0 && post.Filter.Empty() && post.Limit+post.Offset < math.MaxUint16 { // There is a limit specified with no post-process filter, so we know // exactly how many rows the sorter should output. Choose a top K sorter, // which uses a heap to avoid storing more rows than necessary. k := uint16(post.Limit + post.Offset) - result.Op, result.IsStreaming = colexec.NewTopKSorter(input, inputTypes, orderingCols, k), true + result.Op, result.IsStreaming = NewTopKSorter(input, inputTypes, orderingCols, k), true } else { // No optimizations possible. Default to the standard sort operator. - result.Op, err = colexec.NewSorter(input, inputTypes, orderingCols) + result.Op, err = NewSorter(input, inputTypes, orderingCols) } result.ColumnTypes = spec.Input[0].ColumnTypes @@ -595,11 +593,11 @@ func NewColOperator( // TODO(yuzefovich): add support for hashing partitioner (probably by // leveraging hash routers once we can distribute). The decision about // which kind of partitioner to use should come from the optimizer. - input, err = colexec.NewWindowSortingPartitioner(input, typs, core.Windower.PartitionBy, wf.Ordering.Columns, int(wf.OutputColIdx)) + input, err = NewWindowSortingPartitioner(input, typs, core.Windower.PartitionBy, wf.Ordering.Columns, int(wf.OutputColIdx)) tempPartitionColOffset, partitionColIdx = 1, int(wf.OutputColIdx) } else { if len(wf.Ordering.Columns) > 0 { - input, err = colexec.NewSorter(input, typs, wf.Ordering.Columns) + input, err = NewSorter(input, typs, wf.Ordering.Columns) } // TODO(yuzefovich): when both PARTITION BY and ORDER BY clauses are // omitted, the window function operator is actually streaming. @@ -614,11 +612,11 @@ func NewColOperator( } switch *wf.Func.WindowFunc { case execinfrapb.WindowerSpec_ROW_NUMBER: - result.Op = vecbuiltins.NewRowNumberOperator(input, int(wf.OutputColIdx)+tempPartitionColOffset, partitionColIdx) + result.Op = NewRowNumberOperator(input, int(wf.OutputColIdx)+tempPartitionColOffset, partitionColIdx) case execinfrapb.WindowerSpec_RANK: - result.Op, err = vecbuiltins.NewRankOperator(input, typs, false /* dense */, orderingCols, int(wf.OutputColIdx)+tempPartitionColOffset, partitionColIdx) + result.Op, err = NewRankOperator(input, typs, false /* dense */, orderingCols, int(wf.OutputColIdx)+tempPartitionColOffset, partitionColIdx) case execinfrapb.WindowerSpec_DENSE_RANK: - result.Op, err = vecbuiltins.NewRankOperator(input, typs, true /* dense */, orderingCols, int(wf.OutputColIdx)+tempPartitionColOffset, partitionColIdx) + result.Op, err = NewRankOperator(input, typs, true /* dense */, orderingCols, int(wf.OutputColIdx)+tempPartitionColOffset, partitionColIdx) default: return result, errors.Newf("window function %s is not supported", wf.String()) } @@ -631,7 +629,7 @@ func NewColOperator( projection = append(projection, i) } projection = append(projection, wf.OutputColIdx+1) - result.Op = colexec.NewSimpleProjectOp(result.Op, int(wf.OutputColIdx+1), projection) + result.Op = NewSimpleProjectOp(result.Op, int(wf.OutputColIdx+1), projection) } result.ColumnTypes = append(spec.Input[0].ColumnTypes, *types.Int) @@ -646,7 +644,7 @@ func NewColOperator( // After constructing the base operator, calculate the memory usage // of the operator. - if sMemOp, ok := result.Op.(colexec.StaticMemoryOperator); ok { + if sMemOp, ok := result.Op.(StaticMemoryOperator); ok { result.MemUsage += sMemOp.EstimateStaticMemoryUsage() } @@ -662,7 +660,7 @@ func NewColOperator( } } if post.Projection { - result.Op = colexec.NewSimpleProjectOp(result.Op, len(result.ColumnTypes), post.OutputColumns) + result.Op = NewSimpleProjectOp(result.Op, len(result.ColumnTypes), post.OutputColumns) // Update output ColumnTypes. newTypes := make([]types.T, 0, len(post.OutputColumns)) for _, j := range post.OutputColumns { @@ -693,7 +691,7 @@ func NewColOperator( result.MemUsage += renderMem renderedCols = append(renderedCols, uint32(outputIdx)) } - result.Op = colexec.NewSimpleProjectOp(result.Op, len(result.ColumnTypes), renderedCols) + result.Op = NewSimpleProjectOp(result.Op, len(result.ColumnTypes), renderedCols) newTypes := make([]types.T, 0, len(renderedCols)) for _, j := range renderedCols { newTypes = append(newTypes, result.ColumnTypes[j]) @@ -701,10 +699,10 @@ func NewColOperator( result.ColumnTypes = newTypes } if post.Offset != 0 { - result.Op = colexec.NewOffsetOp(result.Op, post.Offset) + result.Op = NewOffsetOp(result.Op, post.Offset) } if post.Limit != 0 { - result.Op = colexec.NewLimitOp(result.Op, post.Limit) + result.Op = NewLimitOp(result.Op, post.Limit) } return result, err } @@ -855,7 +853,7 @@ func (p *filterPlanningState) projectOutExtraCols( for i := 0; i < len(rightOutCols)-p.extraRightOutCols; i++ { projection = append(projection, uint32(i+len(leftOutCols))) } - result.Op = colexec.NewSimpleProjectOp(result.Op, len(leftOutCols)+len(rightOutCols), projection) + result.Op = NewSimpleProjectOp(result.Op, len(leftOutCols)+len(rightOutCols), projection) } } @@ -873,7 +871,7 @@ func (r *NewColOperatorResult) planFilterExpr( if helper.Expr == tree.DNull { // The filter expression is tree.DNull meaning that it is always false, so // we put a zero operator. - r.Op = colexec.NewZeroOp(r.Op) + r.Op = NewZeroOp(r.Op) return nil } var filterColumnTypes []types.T @@ -889,19 +887,19 @@ func (r *NewColOperatorResult) planFilterExpr( for i := range r.ColumnTypes { outputColumns = append(outputColumns, uint32(i)) } - r.Op = colexec.NewSimpleProjectOp(r.Op, len(filterColumnTypes), outputColumns) + r.Op = NewSimpleProjectOp(r.Op, len(filterColumnTypes), outputColumns) } return nil } func planSelectionOperators( - ctx *tree.EvalContext, expr tree.TypedExpr, columnTypes []types.T, input colexec.Operator, -) (op colexec.Operator, resultIdx int, ct []types.T, memUsed int, err error) { + ctx *tree.EvalContext, expr tree.TypedExpr, columnTypes []types.T, input Operator, +) (op Operator, resultIdx int, ct []types.T, memUsed int, err error) { switch t := expr.(type) { case *tree.IndexedVar: - return colexec.NewBoolVecToSelOp(input, t.Idx), -1, columnTypes, memUsed, nil + return NewBoolVecToSelOp(input, t.Idx), -1, columnTypes, memUsed, nil case *tree.AndExpr: - var leftOp, rightOp colexec.Operator + var leftOp, rightOp Operator var memUsedLeft, memUsedRight int leftOp, _, ct, memUsedLeft, err = planSelectionOperators(ctx, t.TypedLeft(), columnTypes, input) if err != nil { @@ -915,11 +913,11 @@ func planSelectionOperators( // statement. Since CASE statements don't have a selection form, plan a // projection and then convert the resulting boolean to a selection vector. op, resultIdx, ct, memUsed, err = planProjectionOperators(ctx, expr, columnTypes, input) - op = colexec.NewBoolVecToSelOp(op, resultIdx) + op = NewBoolVecToSelOp(op, resultIdx) return op, resultIdx, ct, memUsed, err case *tree.CaseExpr: op, resultIdx, ct, memUsed, err = planProjectionOperators(ctx, expr, columnTypes, input) - op = colexec.NewBoolVecToSelOp(op, resultIdx) + op = NewBoolVecToSelOp(op, resultIdx) return op, resultIdx, ct, memUsed, err case *tree.ComparisonExpr: cmpOp := t.Operator @@ -931,7 +929,7 @@ func planSelectionOperators( if constArg, ok := t.Right.(tree.Datum); ok { if t.Operator == tree.Like || t.Operator == tree.NotLike { negate := t.Operator == tree.NotLike - op, err := colexec.GetLikeOperator( + op, err := GetLikeOperator( ctx, leftOp, leftIdx, string(tree.MustBeDString(constArg)), negate) return op, resultIdx, ct, memUsageLeft, err } @@ -942,17 +940,17 @@ func planSelectionOperators( err = errors.Errorf("IN is only supported for constant expressions") return nil, resultIdx, ct, memUsed, err } - op, err := colexec.GetInOperator(lTyp, leftOp, leftIdx, datumTuple, negate) + op, err := GetInOperator(lTyp, leftOp, leftIdx, datumTuple, negate) return op, resultIdx, ct, memUsageLeft, err } - op, err := colexec.GetSelectionConstOperator(lTyp, t.TypedRight().ResolvedType(), cmpOp, leftOp, leftIdx, constArg) + op, err := GetSelectionConstOperator(lTyp, t.TypedRight().ResolvedType(), cmpOp, leftOp, leftIdx, constArg) return op, resultIdx, ct, memUsageLeft, err } rightOp, rightIdx, ct, memUsageRight, err := planProjectionOperators(ctx, t.TypedRight(), ct, leftOp) if err != nil { return nil, resultIdx, ct, memUsageLeft + memUsageRight, err } - op, err := colexec.GetSelectionOperator(lTyp, &ct[rightIdx], cmpOp, rightOp, leftIdx, rightIdx) + op, err := GetSelectionOperator(lTyp, &ct[rightIdx], cmpOp, rightOp, leftIdx, rightIdx) return op, resultIdx, ct, memUsageLeft + memUsageRight, err default: return nil, resultIdx, nil, memUsed, errors.Errorf("unhandled selection expression type: %s", reflect.TypeOf(t)) @@ -967,13 +965,13 @@ func planTypedMaybeNullProjectionOperators( expr tree.TypedExpr, exprTyp *types.T, columnTypes []types.T, - input colexec.Operator, -) (op colexec.Operator, resultIdx int, ct []types.T, memUsed int, err error) { + input Operator, +) (op Operator, resultIdx int, ct []types.T, memUsed int, err error) { if expr == tree.DNull { resultIdx = len(columnTypes) - op = colexec.NewConstNullOp(input, resultIdx, typeconv.FromColumnType(exprTyp)) + op = NewConstNullOp(input, resultIdx, typeconv.FromColumnType(exprTyp)) ct = append(columnTypes, *exprTyp) - memUsed = op.(colexec.StaticMemoryOperator).EstimateStaticMemoryUsage() + memUsed = op.(StaticMemoryOperator).EstimateStaticMemoryUsage() return op, resultIdx, ct, memUsed, nil } return planProjectionOperators(ctx, expr, columnTypes, input) @@ -984,8 +982,8 @@ func planTypedMaybeNullProjectionOperators( // of the expression's result (if any, otherwise -1) and the column types of the // resulting batches. func planProjectionOperators( - ctx *tree.EvalContext, expr tree.TypedExpr, columnTypes []types.T, input colexec.Operator, -) (op colexec.Operator, resultIdx int, ct []types.T, memUsed int, err error) { + ctx *tree.EvalContext, expr tree.TypedExpr, columnTypes []types.T, input Operator, +) (op Operator, resultIdx int, ct []types.T, memUsed int, err error) { resultIdx = -1 switch t := expr.(type) { case *tree.IndexedVar: @@ -1009,9 +1007,9 @@ func planProjectionOperators( return nil, 0, nil, 0, err } outputIdx := len(ct) - op, err = colexec.GetCastOperator(op, resultIdx, outputIdx, expr.ResolvedType(), t.Type) + op, err = GetCastOperator(op, resultIdx, outputIdx, expr.ResolvedType(), t.Type) ct = append(ct, *t.Type) - if sMem, ok := op.(colexec.StaticMemoryOperator); ok { + if sMem, ok := op.(StaticMemoryOperator); ok { memUsed += sMem.EstimateStaticMemoryUsage() } return op, outputIdx, ct, memUsed, err @@ -1037,7 +1035,7 @@ func planProjectionOperators( funcOutputType := t.ResolvedType() resultIdx = len(ct) ct = append(ct, *funcOutputType) - op = colexec.NewBuiltinFunctionOperator(ctx, t, ct, inputCols, resultIdx, op) + op = NewBuiltinFunctionOperator(ctx, t, ct, inputCols, resultIdx, op) return op, resultIdx, ct, memUsed, nil case tree.Datum: datumType := t.ResolvedType() @@ -1052,7 +1050,7 @@ func planProjectionOperators( if err != nil { return nil, resultIdx, ct, memUsed, err } - op, err := colexec.NewConstOp(input, typ, constVal, resultIdx) + op, err := NewConstOp(input, typ, constVal, resultIdx) if err != nil { return nil, resultIdx, ct, memUsed, err } @@ -1062,8 +1060,8 @@ func planProjectionOperators( return nil, resultIdx, ct, 0, errors.New("CASE WHEN expressions unsupported") } - buffer := colexec.NewBufferOp(input) - caseOps := make([]colexec.Operator, len(t.Whens)) + buffer := NewBufferOp(input) + caseOps := make([]Operator, len(t.Whens)) caseOutputType := typeconv.FromColumnType(t.ResolvedType()) caseOutputIdx := len(columnTypes) ct = append(columnTypes, *t.ResolvedType()) @@ -1089,7 +1087,7 @@ func planProjectionOperators( return nil, resultIdx, ct, 0, err } // Transform the booleans to a selection vector. - caseOps[i] = colexec.NewBoolVecToSelOp(caseOps[i], resultIdx) + caseOps[i] = NewBoolVecToSelOp(caseOps[i], resultIdx) // Run the "then" clause on those tuples that were selected. caseOps[i], thenIdxs[i], ct, thenMemUsed, err = planTypedMaybeNullProjectionOperators( @@ -1101,7 +1099,7 @@ func planProjectionOperators( memUsed += whenMemUsed + thenMemUsed } var elseMem int - var elseOp colexec.Operator + var elseOp Operator elseExpr := t.Else if elseExpr == nil { // If there's no ELSE arm, we write NULLs. @@ -1114,11 +1112,11 @@ func planProjectionOperators( } memUsed += elseMem - op := colexec.NewCaseOp(buffer, caseOps, elseOp, thenIdxs, caseOutputIdx, caseOutputType) + op := NewCaseOp(buffer, caseOps, elseOp, thenIdxs, caseOutputIdx, caseOutputType) return op, caseOutputIdx, ct, memUsed, nil case *tree.AndExpr: - var leftOp, rightOp colexec.Operator + var leftOp, rightOp Operator var leftIdx, rightIdx, lMemUsed, rMemUsed int leftOp, leftIdx, ct, lMemUsed, err = planTypedMaybeNullProjectionOperators(ctx, t.TypedLeft(), types.Bool, columnTypes, input) if err != nil { @@ -1131,7 +1129,7 @@ func planProjectionOperators( // Add a new boolean column that ands the two output columns. resultIdx = len(ct) ct = append(ct, *t.ResolvedType()) - andOp := colexec.NewAndOp(rightOp, leftIdx, rightIdx, resultIdx) + andOp := NewAndOp(rightOp, leftIdx, rightIdx, resultIdx) return andOp, resultIdx, ct, lMemUsed + rMemUsed, nil case *tree.OrExpr: // Rewrite the OR expression as an equivalent CASE expression. @@ -1161,8 +1159,8 @@ func planProjectionExpr( outputType *types.T, left, right tree.TypedExpr, columnTypes []types.T, - input colexec.Operator, -) (op colexec.Operator, resultIdx int, ct []types.T, memUsed int, err error) { + input Operator, +) (op Operator, resultIdx int, ct []types.T, memUsed int, err error) { resultIdx = -1 // There are 3 cases. Either the left is constant, the right is constant, // or neither are constant. @@ -1172,7 +1170,7 @@ func planProjectionExpr( // Normally, the optimizer normalizes binary exprs so that the constant // argument is on the right side. This doesn't happen for non-commutative // operators such as - and /, though, so we still need this case. - var rightOp colexec.Operator + var rightOp Operator var rightIdx int rightOp, rightIdx, ct, memUsed, err = planProjectionOperators(ctx, right, columnTypes, input) if err != nil { @@ -1181,9 +1179,9 @@ func planProjectionExpr( resultIdx = len(ct) // The projection result will be outputted to a new column which is appended // to the input batch. - op, err = colexec.GetProjectionLConstOperator(left.ResolvedType(), &ct[rightIdx], binOp, rightOp, rightIdx, lConstArg, resultIdx) + op, err = GetProjectionLConstOperator(left.ResolvedType(), &ct[rightIdx], binOp, rightOp, rightIdx, lConstArg, resultIdx) ct = append(ct, *outputType) - if sMem, ok := op.(colexec.StaticMemoryOperator); ok { + if sMem, ok := op.(StaticMemoryOperator); ok { memUsed += sMem.EstimateStaticMemoryUsage() } return op, resultIdx, ct, memUsed, err @@ -1199,7 +1197,7 @@ func planProjectionExpr( resultIdx = len(ct) if binOp == tree.Like || binOp == tree.NotLike { negate := binOp == tree.NotLike - op, err = colexec.GetLikeProjectionOperator( + op, err = GetLikeProjectionOperator( ctx, leftOp, leftIdx, resultIdx, string(tree.MustBeDString(rConstArg)), negate) } else if binOp == tree.In || binOp == tree.NotIn { negate := binOp == tree.NotIn @@ -1208,12 +1206,12 @@ func planProjectionExpr( err = errors.Errorf("IN operator supported only on constant expressions") return nil, resultIdx, ct, leftMem, err } - op, err = colexec.GetInProjectionOperator(&ct[leftIdx], leftOp, leftIdx, resultIdx, datumTuple, negate) + op, err = GetInProjectionOperator(&ct[leftIdx], leftOp, leftIdx, resultIdx, datumTuple, negate) } else { - op, err = colexec.GetProjectionRConstOperator(&ct[leftIdx], right.ResolvedType(), binOp, leftOp, leftIdx, rConstArg, resultIdx) + op, err = GetProjectionRConstOperator(&ct[leftIdx], right.ResolvedType(), binOp, leftOp, leftIdx, rConstArg, resultIdx) } ct = append(ct, *outputType) - if sMem, ok := op.(colexec.StaticMemoryOperator); ok { + if sMem, ok := op.(StaticMemoryOperator); ok { memUsed += sMem.EstimateStaticMemoryUsage() } return op, resultIdx, ct, leftMem + memUsed, err @@ -1224,9 +1222,9 @@ func planProjectionExpr( return nil, resultIdx, nil, leftMem + rightMem, err } resultIdx = len(ct) - op, err = colexec.GetProjectionOperator(&ct[leftIdx], &ct[rightIdx], binOp, rightOp, leftIdx, rightIdx, resultIdx) + op, err = GetProjectionOperator(&ct[leftIdx], &ct[rightIdx], binOp, rightOp, leftIdx, rightIdx, resultIdx) ct = append(ct, *outputType) - if sMem, ok := op.(colexec.StaticMemoryOperator); ok { + if sMem, ok := op.(StaticMemoryOperator); ok { memUsed += sMem.EstimateStaticMemoryUsage() } return op, resultIdx, ct, leftMem + rightMem + memUsed, err diff --git a/pkg/sql/colflow/expr.go b/pkg/sql/colexec/expr.go similarity index 99% rename from pkg/sql/colflow/expr.go rename to pkg/sql/colexec/expr.go index a69e10897eb2..dfebb0fd0334 100644 --- a/pkg/sql/colflow/expr.go +++ b/pkg/sql/colexec/expr.go @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package colflow +package colexec import ( "fmt" diff --git a/pkg/sql/row/fetcherstate_string.go b/pkg/sql/colexec/fetcherstate_string.go similarity index 98% rename from pkg/sql/row/fetcherstate_string.go rename to pkg/sql/colexec/fetcherstate_string.go index b3a688e37545..c66270fe58fc 100644 --- a/pkg/sql/row/fetcherstate_string.go +++ b/pkg/sql/colexec/fetcherstate_string.go @@ -1,6 +1,6 @@ // Code generated by "stringer -type=fetcherState"; DO NOT EDIT. -package row +package colexec import "strconv" diff --git a/pkg/sql/colexec/hash_aggregator.go b/pkg/sql/colexec/hash_aggregator.go index 7ce516d1f85b..51006ebabf7c 100644 --- a/pkg/sql/colexec/hash_aggregator.go +++ b/pkg/sql/colexec/hash_aggregator.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coltypes" "github.com/cockroachdb/cockroach/pkg/sql/colexec/execerror" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/util" ) @@ -169,13 +170,13 @@ type hashGrouper struct { buildFinished bool } -var _ execinfrapb.OpNode = &hashGrouper{} +var _ execinfra.OpNode = &hashGrouper{} func (op *hashGrouper) ChildCount() int { return 1 } -func (op *hashGrouper) Child(nth int) execinfrapb.OpNode { +func (op *hashGrouper) Child(nth int) execinfra.OpNode { if nth == 0 { return op.builder.spec.source } diff --git a/pkg/sql/colexec/hashjoiner.go b/pkg/sql/colexec/hashjoiner.go index 789ad90dbb1b..4202d0dd6b07 100644 --- a/pkg/sql/colexec/hashjoiner.go +++ b/pkg/sql/colexec/hashjoiner.go @@ -17,7 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coltypes" "github.com/cockroachdb/cockroach/pkg/sql/colexec/execerror" - "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/pkg/errors" ) @@ -197,7 +197,7 @@ func (hj *hashJoinEqOp) ChildCount() int { return 2 } -func (hj *hashJoinEqOp) Child(nth int) execinfrapb.OpNode { +func (hj *hashJoinEqOp) Child(nth int) execinfra.OpNode { switch nth { case 0: return hj.spec.left.source diff --git a/pkg/sql/colflow/materializer.go b/pkg/sql/colexec/materializer.go similarity index 93% rename from pkg/sql/colflow/materializer.go rename to pkg/sql/colexec/materializer.go index c40b66939313..7658811812b2 100644 --- a/pkg/sql/colflow/materializer.go +++ b/pkg/sql/colexec/materializer.go @@ -8,14 +8,13 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package colflow +package colexec import ( "context" "fmt" "github.com/cockroachdb/cockroach/pkg/col/coldata" - "github.com/cockroachdb/cockroach/pkg/sql/colexec" "github.com/cockroachdb/cockroach/pkg/sql/colexec/execerror" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -23,12 +22,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/types" ) -// Materializer converts an colexec.Operator input into a execinfra.RowSource. +// Materializer converts an Operator input into a execinfra.RowSource. type Materializer struct { execinfra.ProcessorBase - colexec.NonExplainable + NonExplainable - input colexec.Operator + input Operator da sqlbase.DatumAlloc @@ -72,7 +71,7 @@ const materializerProcName = "materializer" func NewMaterializer( flowCtx *execinfra.FlowCtx, processorID int32, - input colexec.Operator, + input Operator, typs []types.T, post *execinfrapb.PostProcessSpec, output execinfra.RowReceiver, @@ -111,7 +110,7 @@ func NewMaterializer( return m, nil } -var _ execinfrapb.OpNode = &Materializer{} +var _ execinfra.OpNode = &Materializer{} // ChildCount is part of the exec.OpNode interface. func (m *Materializer) ChildCount() int { @@ -119,7 +118,7 @@ func (m *Materializer) ChildCount() int { } // Child is part of the exec.OpNode interface. -func (m *Materializer) Child(nth int) execinfrapb.OpNode { +func (m *Materializer) Child(nth int) execinfra.OpNode { if nth == 0 { return m.input } @@ -166,7 +165,7 @@ func (m *Materializer) next() (sqlbase.EncDatumRow, *execinfrapb.ProducerMetadat typs := m.OutputTypes() for colIdx := 0; colIdx < len(typs); colIdx++ { col := m.batch.ColVec(colIdx) - m.row[colIdx].Datum = colexec.PhysicalTypeColElemToDatum(col, rowIdx, m.da, typs[colIdx]) + m.row[colIdx].Datum = PhysicalTypeColElemToDatum(col, rowIdx, m.da, typs[colIdx]) } return m.ProcessRowHelper(m.row), nil } diff --git a/pkg/sql/colflow/materializer_test.go b/pkg/sql/colexec/materializer_test.go similarity index 99% rename from pkg/sql/colflow/materializer_test.go rename to pkg/sql/colexec/materializer_test.go index 28a90f5d8be9..901ced0268ae 100644 --- a/pkg/sql/colflow/materializer_test.go +++ b/pkg/sql/colexec/materializer_test.go @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package colflow +package colexec import ( "context" diff --git a/pkg/sql/colexec/operator.go b/pkg/sql/colexec/operator.go index 77af7b29adaf..a95a8e691142 100644 --- a/pkg/sql/colexec/operator.go +++ b/pkg/sql/colexec/operator.go @@ -16,7 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/sql/colexec/execerror" - "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" ) // Operator is a column vector operator that produces a Batch as output. @@ -36,7 +36,7 @@ type Operator interface { // execution. Next(context.Context) coldata.Batch - execinfrapb.OpNode + execinfra.OpNode } // NonExplainable is a marker interface which identifies an Operator that @@ -47,23 +47,23 @@ type NonExplainable interface { nonExplainableMarker() } -// NewOneInputNode returns an execinfrapb.OpNode with a single Operator input. +// NewOneInputNode returns an execinfra.OpNode with a single Operator input. func NewOneInputNode(input Operator) OneInputNode { return OneInputNode{input: input} } -// OneInputNode is an execinfrapb.OpNode with a single Operator input. +// OneInputNode is an execinfra.OpNode with a single Operator input. type OneInputNode struct { input Operator } -// ChildCount implements the execinfrapb.OpNode interface. +// ChildCount implements the execinfra.OpNode interface. func (OneInputNode) ChildCount() int { return 1 } -// Child implements the execinfrapb.OpNode interface. -func (n OneInputNode) Child(nth int) execinfrapb.OpNode { +// Child implements the execinfra.OpNode interface. +func (n OneInputNode) Child(nth int) execinfra.OpNode { if nth == 0 { return n.input } @@ -77,22 +77,22 @@ func (n OneInputNode) Input() Operator { return n.input } -// ZeroInputNode is an execinfrapb.OpNode with no inputs. +// ZeroInputNode is an execinfra.OpNode with no inputs. type ZeroInputNode struct{} -// ChildCount implements the execinfrapb.OpNode interface. +// ChildCount implements the execinfra.OpNode interface. func (ZeroInputNode) ChildCount() int { return 0 } -// Child implements the execinfrapb.OpNode interface. -func (ZeroInputNode) Child(nth int) execinfrapb.OpNode { +// Child implements the execinfra.OpNode interface. +func (ZeroInputNode) Child(nth int) execinfra.OpNode { execerror.VectorizedInternalPanic(fmt.Sprintf("invalid index %d", nth)) // This code is unreachable, but the compiler cannot infer that. return nil } -// newTwoInputNode returns an execinfrapb.OpNode with two Operator inputs. +// newTwoInputNode returns an execinfra.OpNode with two Operator inputs. func newTwoInputNode(inputOne, inputTwo Operator) twoInputNode { return twoInputNode{inputOne: inputOne, inputTwo: inputTwo} } @@ -106,7 +106,7 @@ func (twoInputNode) ChildCount() int { return 2 } -func (n *twoInputNode) Child(nth int) execinfrapb.OpNode { +func (n *twoInputNode) Child(nth int) execinfra.OpNode { switch nth { case 0: return n.inputOne diff --git a/pkg/sql/colexec/orderedsynchronizer.go b/pkg/sql/colexec/orderedsynchronizer.go index 93f6aee435f7..bbb47efc3a48 100644 --- a/pkg/sql/colexec/orderedsynchronizer.go +++ b/pkg/sql/colexec/orderedsynchronizer.go @@ -17,7 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coltypes" "github.com/cockroachdb/cockroach/pkg/sql/colexec/execerror" - "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util/encoding" ) @@ -47,7 +47,7 @@ func (o *OrderedSynchronizer) ChildCount() int { } // Child implements the execinfrapb.OpNode interface. -func (o *OrderedSynchronizer) Child(nth int) execinfrapb.OpNode { +func (o *OrderedSynchronizer) Child(nth int) execinfra.OpNode { return o.inputs[nth] } diff --git a/pkg/sql/colexec/vecbuiltins/rank.go b/pkg/sql/colexec/rank.go similarity index 78% rename from pkg/sql/colexec/vecbuiltins/rank.go rename to pkg/sql/colexec/rank.go index d2fdfff45975..a2317acc7a87 100644 --- a/pkg/sql/colexec/vecbuiltins/rank.go +++ b/pkg/sql/colexec/rank.go @@ -8,12 +8,9 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package vecbuiltins +package colexec -import ( - "github.com/cockroachdb/cockroach/pkg/col/coltypes" - "github.com/cockroachdb/cockroach/pkg/sql/colexec" -) +import "github.com/cockroachdb/cockroach/pkg/col/coltypes" // TODO(yuzefovich): add benchmarks. @@ -23,22 +20,22 @@ import ( // outputColIdx specifies in which exec.Vec the operator should put its output // (if there is no such column, a new column is appended). func NewRankOperator( - input colexec.Operator, + input Operator, inputTyps []coltypes.T, dense bool, orderingCols []uint32, outputColIdx int, partitionColIdx int, -) (colexec.Operator, error) { +) (Operator, error) { if len(orderingCols) == 0 { - return colexec.NewConstOp(input, coltypes.Int64, int64(1), outputColIdx) + return NewConstOp(input, coltypes.Int64, int64(1), outputColIdx) } - op, outputCol, err := colexec.OrderedDistinctColsToOperators(input, orderingCols, inputTyps) + op, outputCol, err := OrderedDistinctColsToOperators(input, orderingCols, inputTyps) if err != nil { return nil, err } initFields := rankInitFields{ - OneInputNode: colexec.NewOneInputNode(op), + OneInputNode: NewOneInputNode(op), distinctCol: outputCol, outputColIdx: outputColIdx, partitionColIdx: partitionColIdx, diff --git a/pkg/sql/colexec/vecbuiltins/rank_tmpl.go b/pkg/sql/colexec/rank_tmpl.go similarity index 96% rename from pkg/sql/colexec/vecbuiltins/rank_tmpl.go rename to pkg/sql/colexec/rank_tmpl.go index 5ca1d85f9040..7285bd887f6f 100644 --- a/pkg/sql/colexec/vecbuiltins/rank_tmpl.go +++ b/pkg/sql/colexec/rank_tmpl.go @@ -17,14 +17,13 @@ // // */}} -package vecbuiltins +package colexec import ( "context" "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coltypes" - "github.com/cockroachdb/cockroach/pkg/sql/colexec" "github.com/cockroachdb/cockroach/pkg/sql/colexec/execerror" ) @@ -45,7 +44,7 @@ func _UPDATE_RANK_INCREMENT() { // */}} type rankInitFields struct { - colexec.OneInputNode + OneInputNode // distinctCol is the output column of the chain of ordered distinct // operators in which true will indicate that a new rank needs to be assigned // to the corresponding tuple. @@ -67,7 +66,7 @@ type _RANK_STRINGOp struct { rankIncrement int64 } -var _ colexec.Operator = &_RANK_STRINGOp{} +var _ Operator = &_RANK_STRINGOp{} func (r *_RANK_STRINGOp) Init() { r.Input().Init() diff --git a/pkg/sql/colexec/routers.go b/pkg/sql/colexec/routers.go index c126c606ba7e..56ca36ff81c3 100644 --- a/pkg/sql/colexec/routers.go +++ b/pkg/sql/colexec/routers.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coltypes" "github.com/cockroachdb/cockroach/pkg/sql/colexec/execerror" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) @@ -25,7 +26,7 @@ import ( // routerOutput is an interface implemented by router outputs. It exists for // easier test mocking of outputs. type routerOutput interface { - execinfrapb.OpNode + execinfra.OpNode // addBatch adds the elements specified by the selection vector from batch to // the output. It returns whether or not the output changed its state to // blocked (see implementations). @@ -40,7 +41,7 @@ const defaultRouterOutputBlockedThreshold = coldata.BatchSize * 2 type routerOutputOp struct { // input is a reference to our router. - input execinfrapb.OpNode + input execinfra.OpNode types []coltypes.T // zeroBatch is used to return a 0 length batch in some cases. @@ -72,7 +73,7 @@ func (o *routerOutputOp) ChildCount() int { return 1 } -func (o *routerOutputOp) Child(nth int) execinfrapb.OpNode { +func (o *routerOutputOp) Child(nth int) execinfra.OpNode { if nth == 0 { return o.input } diff --git a/pkg/sql/colexec/vecbuiltins/row_number.go b/pkg/sql/colexec/row_number.go similarity index 81% rename from pkg/sql/colexec/vecbuiltins/row_number.go rename to pkg/sql/colexec/row_number.go index 1444ebe30784..cc37a44293b8 100644 --- a/pkg/sql/colexec/vecbuiltins/row_number.go +++ b/pkg/sql/colexec/row_number.go @@ -8,9 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package vecbuiltins - -import "github.com/cockroachdb/cockroach/pkg/sql/colexec" +package colexec // TODO(yuzefovich): add benchmarks. @@ -18,11 +16,9 @@ import "github.com/cockroachdb/cockroach/pkg/sql/colexec" // function ROW_NUMBER. outputColIdx specifies in which exec.Vec the operator // should put its output (if there is no such column, a new column is // appended). -func NewRowNumberOperator( - input colexec.Operator, outputColIdx int, partitionColIdx int, -) colexec.Operator { +func NewRowNumberOperator(input Operator, outputColIdx int, partitionColIdx int) Operator { base := rowNumberBase{ - OneInputNode: colexec.NewOneInputNode(input), + OneInputNode: NewOneInputNode(input), outputColIdx: outputColIdx, partitionColIdx: partitionColIdx, } @@ -36,7 +32,7 @@ func NewRowNumberOperator( // variations of row number operators. Note that it is not an operator itself // and should not be used directly. type rowNumberBase struct { - colexec.OneInputNode + OneInputNode outputColIdx int partitionColIdx int diff --git a/pkg/sql/colexec/vecbuiltins/row_number_tmpl.go b/pkg/sql/colexec/row_number_tmpl.go similarity index 94% rename from pkg/sql/colexec/vecbuiltins/row_number_tmpl.go rename to pkg/sql/colexec/row_number_tmpl.go index 1ffeaaf3002d..83e38211d2e7 100644 --- a/pkg/sql/colexec/vecbuiltins/row_number_tmpl.go +++ b/pkg/sql/colexec/row_number_tmpl.go @@ -17,14 +17,13 @@ // // */}} -package vecbuiltins +package colexec import ( "context" "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coltypes" - "github.com/cockroachdb/cockroach/pkg/sql/colexec" "github.com/cockroachdb/cockroach/pkg/sql/colexec/execerror" ) @@ -34,7 +33,7 @@ type _ROW_NUMBER_STRINGOp struct { rowNumberBase } -var _ colexec.Operator = &_ROW_NUMBER_STRINGOp{} +var _ Operator = &_ROW_NUMBER_STRINGOp{} func (r *_ROW_NUMBER_STRINGOp) Next(ctx context.Context) coldata.Batch { batch := r.Input().Next(ctx) diff --git a/pkg/sql/colexec/sort.go b/pkg/sql/colexec/sort.go index 5b5ac09ad400..efaafb8e0dd1 100644 --- a/pkg/sql/colexec/sort.go +++ b/pkg/sql/colexec/sort.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coltypes" "github.com/cockroachdb/cockroach/pkg/sql/colexec/execerror" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/pkg/errors" ) @@ -60,7 +61,7 @@ func newSorter( // spooler is a column vector operator that spools the data from its input. type spooler interface { - execinfrapb.OpNode + execinfra.OpNode // init initializes this spooler and will be called once at the setup time. init() @@ -379,7 +380,7 @@ func (p *sortOp) ChildCount() int { return 1 } -func (p *sortOp) Child(nth int) execinfrapb.OpNode { +func (p *sortOp) Child(nth int) execinfra.OpNode { if nth == 0 { return p.input } diff --git a/pkg/sql/colexec/sort_chunks.go b/pkg/sql/colexec/sort_chunks.go index c9706813d800..cb1e60921cd7 100644 --- a/pkg/sql/colexec/sort_chunks.go +++ b/pkg/sql/colexec/sort_chunks.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coltypes" "github.com/cockroachdb/cockroach/pkg/sql/colexec/execerror" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" ) @@ -57,7 +58,7 @@ func (c *sortChunksOp) ChildCount() int { return 0 } -func (c *sortChunksOp) Child(nth int) execinfrapb.OpNode { +func (c *sortChunksOp) Child(nth int) execinfra.OpNode { if nth == 0 { return c.input } diff --git a/pkg/sql/colexec/unorderedsynchronizer.go b/pkg/sql/colexec/unorderedsynchronizer.go index 55847cb3aa2c..5db7595f29c4 100644 --- a/pkg/sql/colexec/unorderedsynchronizer.go +++ b/pkg/sql/colexec/unorderedsynchronizer.go @@ -18,7 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/coltypes" "github.com/cockroachdb/cockroach/pkg/sql/colexec/execerror" - "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/util/contextutil" ) @@ -72,13 +72,13 @@ type UnorderedSynchronizer struct { errCh chan error } -// ChildCount implements the execinfrapb.OpNode interface. +// ChildCount implements the execinfra.OpNode interface. func (s *UnorderedSynchronizer) ChildCount() int { return len(s.inputs) } -// Child implements the execinfrapb.OpNode interface. -func (s *UnorderedSynchronizer) Child(nth int) execinfrapb.OpNode { +// Child implements the execinfra.OpNode interface. +func (s *UnorderedSynchronizer) Child(nth int) execinfra.OpNode { return s.inputs[nth] } diff --git a/pkg/sql/colflow/colbatch_scan_test.go b/pkg/sql/colflow/colbatch_scan_test.go index 6136005c9faa..737c16ebc6d3 100644 --- a/pkg/sql/colflow/colbatch_scan_test.go +++ b/pkg/sql/colflow/colbatch_scan_test.go @@ -8,7 +8,11 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package colflow +// Note that this file is not in pkg/sql/colexec because it instantiates a +// server, and if it were moved into sql/colexec, that would create a cycle +// with pkg/server. + +package colflow_test import ( "context" @@ -17,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/internal/client" + "github.com/cockroachdb/cockroach/pkg/sql/colexec" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -47,13 +52,16 @@ func BenchmarkColBatchScan(b *testing.B) { ) tableDesc := sqlbase.GetTableDescriptor(kvDB, "test", tableName) b.Run(fmt.Sprintf("rows=%d", numRows), func(b *testing.B) { - spec := execinfrapb.TableReaderSpec{ - Table: *tableDesc, - Spans: []execinfrapb.TableReaderSpan{{Span: tableDesc.PrimaryIndexSpan()}}, - } - post := execinfrapb.PostProcessSpec{ - Projection: true, - OutputColumns: []uint32{0, 1}, + spec := execinfrapb.ProcessorSpec{ + Core: execinfrapb.ProcessorCoreUnion{ + TableReader: &execinfrapb.TableReaderSpec{ + Table: *tableDesc, + Spans: []execinfrapb.TableReaderSpan{{Span: tableDesc.PrimaryIndexSpan()}}, + }}, + Post: execinfrapb.PostProcessSpec{ + Projection: true, + OutputColumns: []uint32{0, 1}, + }, } evalCtx := tree.MakeTestingEvalContext(s.ClusterSettings()) @@ -70,17 +78,15 @@ func BenchmarkColBatchScan(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { b.StopTimer() - tr, err := newColBatchScan(&flowCtx, &spec, &post) - tr.Init() - b.StartTimer() + res, err := colexec.NewColOperator(ctx, &flowCtx, &spec, nil /* inputs */) if err != nil { b.Fatal(err) } + tr := res.Op + tr.Init() + b.StartTimer() for { bat := tr.Next(ctx) - if err != nil { - b.Fatal(err) - } if bat.Length() == 0 { break } diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index a4ca3f521231..6849f15b0225 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -147,7 +147,7 @@ type flowCreatorHelper interface { // to be run asynchronously. accumulateAsyncComponent(runFn) // addMaterializer adds a materializer to the flow. - addMaterializer(*Materializer) + addMaterializer(*colexec.Materializer) // getCancelFlowFn returns a flow cancellation function. getCancelFlowFn() context.CancelFunc } @@ -202,7 +202,7 @@ type vectorizedFlowCreator struct { // leaves accumulates all operators that have no further outputs on the // current node, for the purposes of EXPLAIN output. - leaves []execinfrapb.OpNode + leaves []execinfra.OpNode } func newVectorizedFlowCreator( @@ -236,7 +236,7 @@ func (s *vectorizedFlowCreator) setupRemoteOutputStream( outputTyps []coltypes.T, stream *execinfrapb.StreamEndpointSpec, metadataSourcesQueue []execinfrapb.MetadataSource, -) (execinfrapb.OpNode, error) { +) (execinfra.OpNode, error) { outbox, err := s.remoteComponentCreator.newOutbox(op, outputTyps, metadataSourcesQueue) if err != nil { return nil, err @@ -499,7 +499,7 @@ func (s *vectorizedFlowCreator) setupOutput( ) } } - proc, err := NewMaterializer( + proc, err := colexec.NewMaterializer( flowCtx, pspec.ProcessorID, op, @@ -529,7 +529,7 @@ func (s *vectorizedFlowCreator) setupFlow( flowCtx *execinfra.FlowCtx, processorSpecs []execinfrapb.ProcessorSpec, acc *mon.BoundAccount, -) (leaves []execinfrapb.OpNode, err error) { +) (leaves []execinfra.OpNode, err error) { streamIDToSpecIdx := make(map[execinfrapb.StreamID]int) // queue is a queue of indices into processorSpecs, for topologically // ordered processing. @@ -580,7 +580,7 @@ func (s *vectorizedFlowCreator) setupFlow( inputs = append(inputs, input) } - result, err := NewColOperator(ctx, flowCtx, pspec, inputs) + result, err := colexec.NewColOperator(ctx, flowCtx, pspec, inputs) if err != nil { return nil, errors.Wrapf(err, "unable to vectorize execution plan") } @@ -720,7 +720,7 @@ func (r *vectorizedFlowCreatorHelper) accumulateAsyncComponent(run runFn) { })) } -func (r *vectorizedFlowCreatorHelper) addMaterializer(m *Materializer) { +func (r *vectorizedFlowCreatorHelper) addMaterializer(m *colexec.Materializer) { processors := make([]execinfra.Processor, 1) processors[0] = m r.f.SetProcessors(processors) @@ -759,7 +759,7 @@ func (r *noopFlowCreatorHelper) checkInboundStreamID(sid execinfrapb.StreamID) e func (r *noopFlowCreatorHelper) accumulateAsyncComponent(runFn) {} -func (r *noopFlowCreatorHelper) addMaterializer(*Materializer) {} +func (r *noopFlowCreatorHelper) addMaterializer(*colexec.Materializer) {} func (r *noopFlowCreatorHelper) getCancelFlowFn() context.CancelFunc { return nil @@ -772,7 +772,7 @@ func (r *noopFlowCreatorHelper) getCancelFlowFn() context.CancelFunc { // EXPLAIN output. func SupportsVectorized( ctx context.Context, flowCtx *execinfra.FlowCtx, processorSpecs []execinfrapb.ProcessorSpec, -) (leaves []execinfrapb.OpNode, err error) { +) (leaves []execinfra.OpNode, err error) { creator := newVectorizedFlowCreator( newNoopFlowCreatorHelper(), vectorizedRemoteComponentCreator{}, diff --git a/pkg/sql/colflow/vectorized_flow_shutdown_test.go b/pkg/sql/colflow/vectorized_flow_shutdown_test.go index cbeb53440d6f..8f558407aec8 100644 --- a/pkg/sql/colflow/vectorized_flow_shutdown_test.go +++ b/pkg/sql/colflow/vectorized_flow_shutdown_test.go @@ -275,7 +275,7 @@ func TestVectorizedFlowShutdown(t *testing.T) { } ctxLocal, cancelLocal := context.WithCancel(ctxLocal) - materializer, err := NewMaterializer( + materializer, err := colexec.NewMaterializer( flowCtx, 1, /* processorID */ materializerInput, diff --git a/pkg/sql/colflow/vectorized_flow_space_test.go b/pkg/sql/colflow/vectorized_flow_space_test.go index 6567922035c1..cd86680f82dd 100644 --- a/pkg/sql/colflow/vectorized_flow_space_test.go +++ b/pkg/sql/colflow/vectorized_flow_space_test.go @@ -127,7 +127,7 @@ func TestVectorizeSpaceError(t *testing.T) { memMon.Start(ctx, nil, mon.MakeStandaloneBudget(1)) } acc := memMon.MakeBoundAccount() - result, err := NewColOperator(ctx, flowCtx, tc.spec, inputs) + result, err := colexec.NewColOperator(ctx, flowCtx, tc.spec, inputs) if err != nil { t.Fatal(err) } diff --git a/pkg/sql/colflow/vectorized_meta_propagation_test.go b/pkg/sql/colflow/vectorized_meta_propagation_test.go index c7fa3d0271ad..e2bb03543daa 100644 --- a/pkg/sql/colflow/vectorized_meta_propagation_test.go +++ b/pkg/sql/colflow/vectorized_meta_propagation_test.go @@ -65,13 +65,13 @@ func TestVectorizedMetaPropagation(t *testing.T) { t.Fatal(err) } - col, err := NewColumnarizer(ctx, &flowCtx, 1, mts) + col, err := colexec.NewColumnarizer(ctx, &flowCtx, 1, mts) if err != nil { t.Fatal(err) } noop := colexec.NewNoop(col) - mat, err := NewMaterializer( + mat, err := colexec.NewMaterializer( &flowCtx, 2, /* processorID */ noop, diff --git a/pkg/sql/colflow/vectorized_panic_propagation_test.go b/pkg/sql/colflow/vectorized_panic_propagation_test.go index 2176d25d2e40..2ff97af923e9 100644 --- a/pkg/sql/colflow/vectorized_panic_propagation_test.go +++ b/pkg/sql/colflow/vectorized_panic_propagation_test.go @@ -46,13 +46,13 @@ func TestVectorizedInternalPanic(t *testing.T) { types := sqlbase.OneIntCol input := execinfra.NewRepeatableRowSource(types, sqlbase.MakeIntRows(nRows, nCols)) - col, err := NewColumnarizer(ctx, &flowCtx, 0 /* processorID */, input) + col, err := colexec.NewColumnarizer(ctx, &flowCtx, 0 /* processorID */, input) if err != nil { t.Fatal(err) } vee := newTestVectorizedInternalPanicEmitter(col) - mat, err := NewMaterializer( + mat, err := colexec.NewMaterializer( &flowCtx, 1, /* processorID */ vee, @@ -93,13 +93,13 @@ func TestNonVectorizedPanicPropagation(t *testing.T) { types := sqlbase.OneIntCol input := execinfra.NewRepeatableRowSource(types, sqlbase.MakeIntRows(nRows, nCols)) - col, err := NewColumnarizer(ctx, &flowCtx, 0 /* processorID */, input) + col, err := colexec.NewColumnarizer(ctx, &flowCtx, 0 /* processorID */, input) if err != nil { t.Fatal(err) } nvee := newTestNonVectorizedPanicEmitter(col) - mat, err := NewMaterializer( + mat, err := colexec.NewMaterializer( &flowCtx, 1, /* processorID */ nvee, diff --git a/pkg/sql/distsql/columnar_utils_test.go b/pkg/sql/distsql/columnar_utils_test.go index 3a50d6448511..c59d3b86befd 100644 --- a/pkg/sql/distsql/columnar_utils_test.go +++ b/pkg/sql/distsql/columnar_utils_test.go @@ -16,7 +16,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/colexec" - "github.com/cockroachdb/cockroach/pkg/sql/colflow" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/rowexec" @@ -78,19 +77,19 @@ func verifyColOperator( columnarizers := make([]colexec.Operator, len(inputs)) for i, input := range inputsColOp { - c, err := colflow.NewColumnarizer(ctx, flowCtx, int32(i)+1, input) + c, err := colexec.NewColumnarizer(ctx, flowCtx, int32(i)+1, input) if err != nil { return err } columnarizers[i] = c } - result, err := colflow.NewColOperator(ctx, flowCtx, pspec, columnarizers) + result, err := colexec.NewColOperator(ctx, flowCtx, pspec, columnarizers) if err != nil { return err } - outColOp, err := colflow.NewMaterializer( + outColOp, err := colexec.NewMaterializer( flowCtx, int32(len(inputs))+2, result.Op, diff --git a/pkg/sql/distsql/vectorized_panic_propagation_test.go b/pkg/sql/distsql/vectorized_panic_propagation_test.go index 6b2a8a769010..cc8a7cc733f4 100644 --- a/pkg/sql/distsql/vectorized_panic_propagation_test.go +++ b/pkg/sql/distsql/vectorized_panic_propagation_test.go @@ -51,7 +51,7 @@ func TestNonVectorizedPanicDoesntHangServer(t *testing.T) { ) flow := colflow.NewVectorizedFlow(base) - mat, err := colflow.NewMaterializer( + mat, err := colexec.NewMaterializer( &flowCtx, 0, /* processorID */ &colexec.CallbackOperator{ diff --git a/pkg/sql/execinfra/dep_test.go b/pkg/sql/execinfra/dep_test.go index ff307475d630..09a7dd5dc034 100644 --- a/pkg/sql/execinfra/dep_test.go +++ b/pkg/sql/execinfra/dep_test.go @@ -23,8 +23,7 @@ func TestNoLinkForbidden(t *testing.T) { buildutil.VerifyNoImports(t, "github.com/cockroachdb/cockroach/pkg/sql/execinfra", true, []string{ - // TODO(yuzefovich): fix this (CFetcher in sql/row is the issue). - //"github.com/cockroachdb/cockroach/pkg/sql/colexec", + "github.com/cockroachdb/cockroach/pkg/sql/colexec", "github.com/cockroachdb/cockroach/pkg/sql/colflow", "github.com/cockroachdb/cockroach/pkg/sql/flowinfra", "github.com/cockroachdb/cockroach/pkg/sql/rowexec", diff --git a/pkg/sql/execinfra/indexjoiner.go b/pkg/sql/execinfra/indexjoiner.go index 75441ec67ca7..33bca86573c0 100644 --- a/pkg/sql/execinfra/indexjoiner.go +++ b/pkg/sql/execinfra/indexjoiner.go @@ -59,7 +59,7 @@ type IndexJoiner struct { var _ Processor = &IndexJoiner{} var _ RowSource = &IndexJoiner{} var _ execinfrapb.MetadataSource = &IndexJoiner{} -var _ execinfrapb.OpNode = &IndexJoiner{} +var _ OpNode = &IndexJoiner{} const indexJoinerProcName = "index joiner" @@ -263,18 +263,18 @@ func (ij *IndexJoiner) DrainMeta(ctx context.Context) []execinfrapb.ProducerMeta return ij.generateMeta(ctx) } -// ChildCount is part of the execinfrapb.OpNode interface. +// ChildCount is part of the OpNode interface. func (ij *IndexJoiner) ChildCount() int { return 1 } -// Child is part of the execinfrapb.OpNode interface. -func (ij *IndexJoiner) Child(nth int) execinfrapb.OpNode { +// Child is part of the OpNode interface. +func (ij *IndexJoiner) Child(nth int) OpNode { if nth == 0 { - if n, ok := ij.input.(execinfrapb.OpNode); ok { + if n, ok := ij.input.(OpNode); ok { return n } - panic("input to IndexJoiner is not an execinfrapb.OpNode") + panic("input to IndexJoiner is not an OpNode") } panic(fmt.Sprintf("invalid index %d", nth)) } diff --git a/pkg/sql/execinfra/joinreader.go b/pkg/sql/execinfra/joinreader.go index 1aba72b7ac19..64309ffa031c 100644 --- a/pkg/sql/execinfra/joinreader.go +++ b/pkg/sql/execinfra/joinreader.go @@ -130,7 +130,7 @@ type JoinReader struct { var _ Processor = &JoinReader{} var _ RowSource = &JoinReader{} var _ execinfrapb.MetadataSource = &JoinReader{} -var _ execinfrapb.OpNode = &JoinReader{} +var _ OpNode = &JoinReader{} const joinReaderProcName = "join reader" @@ -715,18 +715,18 @@ func (jr *JoinReader) DrainMeta(ctx context.Context) []execinfrapb.ProducerMetad return jr.generateMeta(ctx) } -// ChildCount is part of the execinfrapb.OpNode interface. +// ChildCount is part of the OpNode interface. func (jr *JoinReader) ChildCount() int { return 1 } -// Child is part of the execinfrapb.OpNode interface. -func (jr *JoinReader) Child(nth int) execinfrapb.OpNode { +// Child is part of the OpNode interface. +func (jr *JoinReader) Child(nth int) OpNode { if nth == 0 { - if n, ok := jr.input.(execinfrapb.OpNode); ok { + if n, ok := jr.input.(OpNode); ok { return n } - panic("input to JoinReader is not an execinfrapb.OpNode") + panic("input to JoinReader is not an OpNode") } panic(fmt.Sprintf("invalid index %d", nth)) } diff --git a/pkg/sql/execinfrapb/operator.go b/pkg/sql/execinfra/operator.go similarity index 80% rename from pkg/sql/execinfrapb/operator.go rename to pkg/sql/execinfra/operator.go index f6c9c751d4bb..c5dcf8e26ef7 100644 --- a/pkg/sql/execinfrapb/operator.go +++ b/pkg/sql/execinfra/operator.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Cockroach Authors. +// Copyright 2018 The Cockroach Authors. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt. @@ -8,10 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package execinfrapb - -// TODO(yuzefovich): move this into execinfra once CFetcher is moved out of -// sql/row. +package execinfra // OpNode is an interface to operator-like structures with children. type OpNode interface { diff --git a/pkg/sql/explain_vec.go b/pkg/sql/explain_vec.go index 3de9a8510433..791d6d50af43 100644 --- a/pkg/sql/explain_vec.go +++ b/pkg/sql/explain_vec.go @@ -137,19 +137,19 @@ func makeExplainVecPlanningCtx( return planCtx } -func shouldOutput(operator execinfrapb.OpNode, verbose bool) bool { +func shouldOutput(operator execinfra.OpNode, verbose bool) bool { _, nonExplainable := operator.(colexec.NonExplainable) return !nonExplainable || verbose } -func formatOpChain(operator execinfrapb.OpNode, node treeprinter.Node, verbose bool) { +func formatOpChain(operator execinfra.OpNode, node treeprinter.Node, verbose bool) { if shouldOutput(operator, verbose) { doFormatOpChain(operator, node.Child(reflect.TypeOf(operator).String()), verbose) } else { doFormatOpChain(operator, node, verbose) } } -func doFormatOpChain(operator execinfrapb.OpNode, node treeprinter.Node, verbose bool) { +func doFormatOpChain(operator execinfra.OpNode, node treeprinter.Node, verbose bool) { for i := 0; i < operator.ChildCount(); i++ { child := operator.Child(i) if shouldOutput(child, verbose) { diff --git a/pkg/sql/logictest/testdata/logic_test/dist_vectorize b/pkg/sql/logictest/testdata/logic_test/dist_vectorize index 1362a3afc018..99107ec76027 100644 --- a/pkg/sql/logictest/testdata/logic_test/dist_vectorize +++ b/pkg/sql/logictest/testdata/logic_test/dist_vectorize @@ -91,7 +91,7 @@ EXPLAIN (VEC, VERBOSE) SELECT count(*) FROM kv ---- │ ├ Node 1 -│ └ *colflow.Materializer +│ └ *colexec.Materializer │ └ *colexec.orderedAggregator │ └ *colexec.oneShotOp │ └ *colexec.distinctChainOps @@ -99,7 +99,7 @@ EXPLAIN (VEC, VERBOSE) SELECT count(*) FROM kv │ ├ *colexec.countOp │ │ └ *colexec.simpleProjectOp │ │ └ *colexec.CancelChecker -│ │ └ *colflow.colBatchScan +│ │ └ *colexec.colBatchScan │ ├ *colrpc.Inbox │ ├ *colrpc.Inbox │ ├ *colrpc.Inbox @@ -110,35 +110,35 @@ EXPLAIN (VEC, VERBOSE) SELECT count(*) FROM kv │ └ *colexec.countOp │ └ *colexec.simpleProjectOp │ └ *colexec.CancelChecker -│ └ *colflow.colBatchScan +│ └ *colexec.colBatchScan ├ Node 3 │ └ *colrpc.Outbox │ └ *colexec.deselectorOp │ └ *colexec.countOp │ └ *colexec.simpleProjectOp │ └ *colexec.CancelChecker -│ └ *colflow.colBatchScan +│ └ *colexec.colBatchScan ├ Node 4 │ └ *colrpc.Outbox │ └ *colexec.deselectorOp │ └ *colexec.countOp │ └ *colexec.simpleProjectOp │ └ *colexec.CancelChecker -│ └ *colflow.colBatchScan +│ └ *colexec.colBatchScan └ Node 5 └ *colrpc.Outbox └ *colexec.deselectorOp └ *colexec.countOp └ *colexec.simpleProjectOp └ *colexec.CancelChecker - └ *colflow.colBatchScan + └ *colexec.colBatchScan query T EXPLAIN (VEC, VERBOSE) SELECT count(*) FROM kv NATURAL INNER HASH JOIN kv kv2 ---- │ ├ Node 1 -│ └ *colflow.Materializer +│ └ *colexec.Materializer │ └ *colexec.orderedAggregator │ └ *colexec.oneShotOp │ └ *colexec.distinctChainOps @@ -150,7 +150,7 @@ EXPLAIN (VEC, VERBOSE) SELECT count(*) FROM kv NATURAL INNER HASH JOIN kv kv2 │ │ │ ├ *colexec.routerOutputOp │ │ │ │ └ *colexec.HashRouter │ │ │ │ └ *colexec.CancelChecker -│ │ │ │ └ *colflow.colBatchScan +│ │ │ │ └ *colexec.colBatchScan │ │ │ ├ *colrpc.Inbox │ │ │ ├ *colrpc.Inbox │ │ │ ├ *colrpc.Inbox @@ -159,7 +159,7 @@ EXPLAIN (VEC, VERBOSE) SELECT count(*) FROM kv NATURAL INNER HASH JOIN kv kv2 │ │ ├ *colexec.routerOutputOp │ │ │ └ *colexec.HashRouter │ │ │ └ *colexec.CancelChecker -│ │ │ └ *colflow.colBatchScan +│ │ │ └ *colexec.colBatchScan │ │ ├ *colrpc.Inbox │ │ ├ *colrpc.Inbox │ │ ├ *colrpc.Inbox @@ -179,7 +179,7 @@ EXPLAIN (VEC, VERBOSE) SELECT count(*) FROM kv NATURAL INNER HASH JOIN kv kv2 │ │ ├ *colexec.routerOutputOp │ │ │ └ *colexec.HashRouter │ │ │ └ *colexec.CancelChecker -│ │ │ └ *colflow.colBatchScan +│ │ │ └ *colexec.colBatchScan │ │ ├ *colrpc.Inbox │ │ ├ *colrpc.Inbox │ │ └ *colrpc.Inbox @@ -188,7 +188,7 @@ EXPLAIN (VEC, VERBOSE) SELECT count(*) FROM kv NATURAL INNER HASH JOIN kv kv2 │ ├ *colexec.routerOutputOp │ │ └ *colexec.HashRouter │ │ └ *colexec.CancelChecker -│ │ └ *colflow.colBatchScan +│ │ └ *colexec.colBatchScan │ ├ *colrpc.Inbox │ ├ *colrpc.Inbox │ └ *colrpc.Inbox @@ -204,7 +204,7 @@ EXPLAIN (VEC, VERBOSE) SELECT count(*) FROM kv NATURAL INNER HASH JOIN kv kv2 │ │ ├ *colexec.routerOutputOp │ │ │ └ *colexec.HashRouter │ │ │ └ *colexec.CancelChecker -│ │ │ └ *colflow.colBatchScan +│ │ │ └ *colexec.colBatchScan │ │ ├ *colrpc.Inbox │ │ └ *colrpc.Inbox │ └ *colexec.UnorderedSynchronizer @@ -213,7 +213,7 @@ EXPLAIN (VEC, VERBOSE) SELECT count(*) FROM kv NATURAL INNER HASH JOIN kv kv2 │ ├ *colexec.routerOutputOp │ │ └ *colexec.HashRouter │ │ └ *colexec.CancelChecker -│ │ └ *colflow.colBatchScan +│ │ └ *colexec.colBatchScan │ ├ *colrpc.Inbox │ └ *colrpc.Inbox ├ Node 4 @@ -229,7 +229,7 @@ EXPLAIN (VEC, VERBOSE) SELECT count(*) FROM kv NATURAL INNER HASH JOIN kv kv2 │ │ ├ *colexec.routerOutputOp │ │ │ └ *colexec.HashRouter │ │ │ └ *colexec.CancelChecker -│ │ │ └ *colflow.colBatchScan +│ │ │ └ *colexec.colBatchScan │ │ └ *colrpc.Inbox │ └ *colexec.UnorderedSynchronizer │ ├ *colrpc.Inbox @@ -238,7 +238,7 @@ EXPLAIN (VEC, VERBOSE) SELECT count(*) FROM kv NATURAL INNER HASH JOIN kv kv2 │ ├ *colexec.routerOutputOp │ │ └ *colexec.HashRouter │ │ └ *colexec.CancelChecker -│ │ └ *colflow.colBatchScan +│ │ └ *colexec.colBatchScan │ └ *colrpc.Inbox └ Node 5 └ *colrpc.Outbox @@ -254,7 +254,7 @@ EXPLAIN (VEC, VERBOSE) SELECT count(*) FROM kv NATURAL INNER HASH JOIN kv kv2 │ └ *colexec.routerOutputOp │ └ *colexec.HashRouter │ └ *colexec.CancelChecker - │ └ *colflow.colBatchScan + │ └ *colexec.colBatchScan └ *colexec.UnorderedSynchronizer ├ *colrpc.Inbox ├ *colrpc.Inbox @@ -263,7 +263,7 @@ EXPLAIN (VEC, VERBOSE) SELECT count(*) FROM kv NATURAL INNER HASH JOIN kv kv2 └ *colexec.routerOutputOp └ *colexec.HashRouter └ *colexec.CancelChecker - └ *colflow.colBatchScan + └ *colexec.colBatchScan # Test that SelOnDest flag of coldata.SliceArgs is respected when setting # nulls. diff --git a/pkg/sql/logictest/testdata/logic_test/tpch_vec b/pkg/sql/logictest/testdata/logic_test/tpch_vec index 74cfbb2c0a09..47711f39a8ae 100644 --- a/pkg/sql/logictest/testdata/logic_test/tpch_vec +++ b/pkg/sql/logictest/testdata/logic_test/tpch_vec @@ -542,7 +542,7 @@ EXPLAIN (VEC) SELECT l_returnflag, l_linestatus, sum(l_quantity) AS sum_qty, sum └ *colexec.projMultFloat64Float64Op └ *colexec.projMinusFloat64ConstFloat64Op └ *colexec.selLEInt64Int64ConstOp - └ *colflow.colBatchScan + └ *colexec.colBatchScan # Query 2 query T @@ -557,23 +557,23 @@ EXPLAIN (VEC) SELECT s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_ └ *colexec.hashGrouper └ *colexec.hashJoinEqOp ├ *colexec.hashJoinEqOp - │ ├ *colflow.colBatchScan + │ ├ *colexec.colBatchScan │ └ *execinfra.JoinReader │ └ *execinfra.JoinReader │ └ *colexec.selEQBytesBytesConstOp - │ └ *colflow.colBatchScan + │ └ *colexec.colBatchScan └ *colexec.hashJoinEqOp ├ *colexec.hashJoinEqOp - │ ├ *colflow.colBatchScan + │ ├ *colexec.colBatchScan │ └ *colexec.hashJoinEqOp - │ ├ *colflow.colBatchScan + │ ├ *colexec.colBatchScan │ └ *colexec.hashJoinEqOp - │ ├ *colflow.colBatchScan + │ ├ *colexec.colBatchScan │ └ *colexec.selEQBytesBytesConstOp - │ └ *colflow.colBatchScan + │ └ *colexec.colBatchScan └ *colexec.selSuffixBytesBytesConstOp └ *colexec.selEQInt64Int64ConstOp - └ *colflow.colBatchScan + └ *colexec.colBatchScan # Query 3 query T @@ -588,9 +588,9 @@ EXPLAIN (VEC) SELECT l_orderkey, sum(l_extendedprice * (1 - l_discount)) AS reve └ *execinfra.JoinReader └ *colexec.hashJoinEqOp ├ *colexec.selLTInt64Int64ConstOp - │ └ *colflow.colBatchScan + │ └ *colexec.colBatchScan └ *colexec.selEQBytesBytesConstOp - └ *colflow.colBatchScan + └ *colexec.colBatchScan # Query 4 query T @@ -603,9 +603,9 @@ EXPLAIN (VEC) SELECT o_orderpriority, count(*) AS order_count FROM orders WHERE └ *colexec.hashGrouper └ *colexec.hashJoinEqOp ├ *execinfra.IndexJoiner - │ └ *colflow.colBatchScan + │ └ *colexec.colBatchScan └ *colexec.selLTInt64Int64Op - └ *colflow.colBatchScan + └ *colexec.colBatchScan # Query 5 query T @@ -621,15 +621,15 @@ EXPLAIN (VEC) SELECT n_name, sum(l_extendedprice * (1 - l_discount)) AS revenue └ *colexec.hashJoinEqOp ├ *colexec.hashJoinEqOp │ ├ *colexec.hashJoinEqOp - │ │ ├ *colflow.colBatchScan + │ │ ├ *colexec.colBatchScan │ │ └ *execinfra.JoinReader │ │ └ *colexec.hashJoinEqOp - │ │ ├ *colflow.colBatchScan + │ │ ├ *colexec.colBatchScan │ │ └ *colexec.selEQBytesBytesConstOp - │ │ └ *colflow.colBatchScan + │ │ └ *colexec.colBatchScan │ └ *execinfra.IndexJoiner - │ └ *colflow.colBatchScan - └ *colflow.colBatchScan + │ └ *colexec.colBatchScan + └ *colexec.colBatchScan # Query 6 query T @@ -641,7 +641,7 @@ EXPLAIN (VEC) SELECT sum(l_extendedprice * l_discount) AS revenue FROM lineitem └ *colexec.oneShotOp └ *colexec.distinctChainOps └ *execinfra.IndexJoiner - └ *colflow.colBatchScan + └ *colexec.colBatchScan # Query 7 query T @@ -662,8 +662,8 @@ EXPLAIN (VEC) SELECT supp_nation, cust_nation, l_year, sum(volume) AS revenue FR │ └ *execinfra.JoinReader │ └ *colexec.caseOp │ ├ *colexec.hashJoinEqOp - │ │ ├ *colflow.colBatchScan - │ │ └ *colflow.colBatchScan + │ │ ├ *colexec.colBatchScan + │ │ └ *colexec.colBatchScan │ ├ *colexec.constBoolOp │ │ └ *colexec.andOp │ │ └ *colexec.projEQBytesBytesConstOp @@ -673,7 +673,7 @@ EXPLAIN (VEC) SELECT supp_nation, cust_nation, l_year, sum(volume) AS revenue FR │ │ └ *colexec.projEQBytesBytesConstOp │ │ └ *colexec.projEQBytesBytesConstOp │ └ *colexec.constBoolOp - └ *colflow.colBatchScan + └ *colexec.colBatchScan # Query 8 query T @@ -693,20 +693,20 @@ EXPLAIN (VEC) SELECT o_year, sum(CASE WHEN nation = 'BRAZIL' THEN volume ELSE 0 │ └ *colexec.hashJoinEqOp │ ├ *colexec.hashJoinEqOp │ │ ├ *colexec.hashJoinEqOp - │ │ │ ├ *colflow.colBatchScan + │ │ │ ├ *colexec.colBatchScan │ │ │ └ *colexec.hashJoinEqOp │ │ │ ├ *colexec.hashJoinEqOp │ │ │ │ ├ *execinfra.JoinReader │ │ │ │ │ └ *execinfra.JoinReader │ │ │ │ │ └ *colexec.selEQBytesBytesConstOp - │ │ │ │ │ └ *colflow.colBatchScan - │ │ │ │ └ *colflow.colBatchScan + │ │ │ │ │ └ *colexec.colBatchScan + │ │ │ │ └ *colexec.colBatchScan │ │ │ └ *colexec.selLEInt64Int64ConstOp │ │ │ └ *colexec.selGEInt64Int64ConstOp - │ │ │ └ *colflow.colBatchScan - │ │ └ *colflow.colBatchScan + │ │ │ └ *colexec.colBatchScan + │ │ └ *colexec.colBatchScan │ └ *colexec.selEQBytesBytesConstOp - │ └ *colflow.colBatchScan + │ └ *colexec.colBatchScan ├ *colexec.projEQBytesBytesConstOp └ *colexec.constFloat64Op @@ -725,9 +725,9 @@ EXPLAIN (VEC) SELECT nation, o_year, sum(amount) AS sum_profit FROM ( SELECT n_n │ ├ *execinfra.JoinReader │ │ └ *execinfra.JoinReader │ │ └ *execinfra.JoinReader - │ │ └ *colflow.colBatchScan - │ └ *colflow.colBatchScan - └ *colflow.colBatchScan + │ │ └ *colexec.colBatchScan + │ └ *colexec.colBatchScan + └ *colexec.colBatchScan # Query 10 query T @@ -742,10 +742,10 @@ EXPLAIN (VEC) SELECT c_custkey, c_name, sum(l_extendedprice * (1 - l_discount)) └ *execinfra.JoinReader └ *colexec.hashJoinEqOp ├ *colexec.hashJoinEqOp - │ ├ *colflow.colBatchScan + │ ├ *colexec.colBatchScan │ └ *execinfra.IndexJoiner - │ └ *colflow.colBatchScan - └ *colflow.colBatchScan + │ └ *colexec.colBatchScan + └ *colexec.colBatchScan # Query 11 query T @@ -763,7 +763,7 @@ EXPLAIN (VEC) SELECT ps_partkey, sum(ps_supplycost * ps_availqty::float) AS valu └ *execinfra.JoinReader └ *execinfra.JoinReader └ *colexec.selEQBytesBytesConstOp - └ *colflow.colBatchScan + └ *colexec.colBatchScan # Query 12 query error unable to vectorize execution plan: sum on int cols not supported @@ -782,8 +782,8 @@ EXPLAIN (VEC) SELECT c_count, count(*) AS custdist FROM ( SELECT c_custkey, coun └ *colexec.hashGrouper └ *colexec.hashJoinEqOp ├ *colexec.selNotRegexpBytesBytesConstOp - │ └ *colflow.colBatchScan - └ *colflow.colBatchScan + │ └ *colexec.colBatchScan + └ *colexec.colBatchScan # Query 14 query T @@ -800,9 +800,9 @@ EXPLAIN (VEC) SELECT 100.00 * sum(CASE WHEN p_type LIKE 'PROMO%' THEN l_extended └ *colexec.projMinusFloat64ConstFloat64Op └ *colexec.caseOp ├ *colexec.hashJoinEqOp - │ ├ *colflow.colBatchScan + │ ├ *colexec.colBatchScan │ └ *execinfra.IndexJoiner - │ └ *colflow.colBatchScan + │ └ *colexec.colBatchScan ├ *colexec.projMultFloat64Float64Op │ └ *colexec.projMinusFloat64ConstFloat64Op │ └ *colexec.projPrefixBytesBytesConstOp @@ -836,7 +836,7 @@ EXPLAIN (VEC) SELECT sum(l_extendedprice) / 7.0 AS avg_yearly FROM lineitem, par └ *execinfra.JoinReader └ *colexec.selEQBytesBytesConstOp └ *colexec.selEQBytesBytesConstOp - └ *colflow.colBatchScan + └ *colexec.colBatchScan # Query 18 query T @@ -851,12 +851,12 @@ EXPLAIN (VEC) SELECT c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice, s └ *execinfra.JoinReader └ *colexec.hashJoinEqOp ├ *colexec.mergeJoinLeftSemiOp - │ ├ *colflow.colBatchScan + │ ├ *colexec.colBatchScan │ └ *colexec.selGTFloat64Float64ConstOp │ └ *colexec.orderedAggregator │ └ *colexec.distinctChainOps - │ └ *colflow.colBatchScan - └ *colflow.colBatchScan + │ └ *colexec.colBatchScan + └ *colexec.colBatchScan # Query 19 query T @@ -873,9 +873,9 @@ EXPLAIN (VEC) SELECT sum(l_extendedprice* (1 - l_discount)) AS revenue FROM line ├ *colexec.hashJoinEqOp │ ├ *colexec.selEQBytesBytesConstOp │ │ └ *colexec.selectInOpBytes - │ │ └ *colflow.colBatchScan + │ │ └ *colexec.colBatchScan │ └ *colexec.selGEInt64Int64ConstOp - │ └ *colflow.colBatchScan + │ └ *colexec.colBatchScan ├ *colexec.constBoolOp │ └ *colexec.caseOp │ ├ *colexec.constBoolOp @@ -920,7 +920,7 @@ EXPLAIN (VEC) SELECT s_name, s_address FROM supplier, nation WHERE s_suppkey IN └ *colexec.sortOp └ *colexec.hashJoinEqOp ├ *colexec.hashJoinEqOp - │ ├ *colflow.colBatchScan + │ ├ *colexec.colBatchScan │ └ *colexec.hashJoinEqOp │ ├ *colexec.selGTInt64Float64Op │ │ └ *colexec.projMultFloat64Float64ConstOp @@ -928,12 +928,12 @@ EXPLAIN (VEC) SELECT s_name, s_address FROM supplier, nation WHERE s_suppkey IN │ │ └ *colexec.hashGrouper │ │ └ *colexec.hashJoinEqOp │ │ ├ *execinfra.IndexJoiner - │ │ │ └ *colflow.colBatchScan - │ │ └ *colflow.colBatchScan + │ │ │ └ *colexec.colBatchScan + │ │ └ *colexec.colBatchScan │ └ *colexec.selPrefixBytesBytesConstOp - │ └ *colflow.colBatchScan + │ └ *colexec.colBatchScan └ *colexec.selEQBytesBytesConstOp - └ *colflow.colBatchScan + └ *colexec.colBatchScan # Query 21 query error can't plan non-inner hash join with on expressions @@ -956,4 +956,4 @@ EXPLAIN (VEC) SELECT cntrycode, count(*) AS numcust, sum(c_acctbal) AS totacctba └ *colexec.substringFunctionOperator └ *colexec.constInt64Op └ *colexec.constInt64Op - └ *colflow.colBatchScan + └ *colexec.colBatchScan diff --git a/pkg/sql/row/errors.go b/pkg/sql/row/errors.go index a0ea1c2c32bd..5f8e1593e7d7 100644 --- a/pkg/sql/row/errors.go +++ b/pkg/sql/row/errors.go @@ -40,9 +40,9 @@ func (f *singleKVFetcher) nextBatch( return true, f.kvs[:], nil, roachpb.Span{}, nil } -// getRangesInfo implements the kvBatchFetcher interface. -func (f *singleKVFetcher) getRangesInfo() []roachpb.RangeInfo { - panic(errors.AssertionFailedf("getRangesInfo() called on singleKVFetcher")) +// GetRangesInfo implements the kvBatchFetcher interface. +func (f *singleKVFetcher) GetRangesInfo() []roachpb.RangeInfo { + panic(errors.AssertionFailedf("GetRangesInfo() called on singleKVFetcher")) } // ConvertBatchError returns a user friendly constraint violation error. diff --git a/pkg/sql/row/fetcher.go b/pkg/sql/row/fetcher.go index 8aeaca5e865b..e9be6961098e 100644 --- a/pkg/sql/row/fetcher.go +++ b/pkg/sql/row/fetcher.go @@ -32,9 +32,9 @@ import ( "github.com/cockroachdb/errors" ) -// debugRowFetch can be used to turn on some low-level debugging logs. We use +// DebugRowFetch can be used to turn on some low-level debugging logs. We use // this to avoid using log.V in the hot path. -const debugRowFetch = false +const DebugRowFetch = false type kvBatchFetcher interface { // nextBatch returns the next batch of rows. Returns false in the first @@ -43,7 +43,7 @@ type kvBatchFetcher interface { // version - both must be handled by calling code. nextBatch(ctx context.Context) (ok bool, kvs []roachpb.KeyValue, batchResponse []byte, origSpan roachpb.Span, err error) - getRangesInfo() []roachpb.RangeInfo + GetRangesInfo() []roachpb.RangeInfo } type tableInfo struct { @@ -204,7 +204,7 @@ type Fetcher struct { // -- Fields updated during a scan -- - kvFetcher kvFetcher + kvFetcher *KVFetcher indexKey []byte // the index key of the current row prettyValueBuf *bytes.Buffer @@ -566,7 +566,7 @@ func (rf *Fetcher) NextKey(ctx context.Context) (rowDone bool, err error) { var ok bool for { - ok, rf.kv, _, err = rf.kvFetcher.nextKV(ctx) + ok, rf.kv, _, err = rf.kvFetcher.NextKV(ctx) if err != nil { return false, err } @@ -893,7 +893,7 @@ func (rf *Fetcher) processKV( } } - if debugRowFetch { + if DebugRowFetch { if hasExtraCols(table) { log.Infof(ctx, "Scan %s -> %s", kv.Key, rf.prettyEncDatums(table.extraTypes, table.extraVals)) } else { @@ -963,7 +963,7 @@ func (rf *Fetcher) processValueSingle( prettyValue = value.String() } table.row[idx] = sqlbase.DatumToEncDatum(typ, value) - if debugRowFetch { + if DebugRowFetch { log.Infof(ctx, "Scan %s -> %v", kv.Key, value) } return prettyKey, prettyValue, nil @@ -972,7 +972,7 @@ func (rf *Fetcher) processValueSingle( // No need to unmarshal the column value. Either the column was part of // the index key or it isn't needed. - if debugRowFetch { + if DebugRowFetch { log.Infof(ctx, "Scan %s -> [%d] (skipped)", kv.Key, colID) } return prettyKey, prettyValue, nil @@ -1011,7 +1011,7 @@ func (rf *Fetcher) processValueBytes( return "", "", err } valueBytes = valueBytes[len:] - if debugRowFetch { + if DebugRowFetch { log.Infof(ctx, "Scan %s -> [%d] (skipped)", kv.Key, colID) } continue @@ -1037,7 +1037,7 @@ func (rf *Fetcher) processValueBytes( } table.row[idx] = encValue rf.valueColsFound++ - if debugRowFetch { + if DebugRowFetch { log.Infof(ctx, "Scan %d -> %v", idx, encValue) } } @@ -1416,10 +1416,10 @@ func (rf *Fetcher) GetRangesInfo() []roachpb.RangeInfo { // Not yet initialized. return nil } - return f.getRangesInfo() + return f.GetRangesInfo() } -// GetBytesRead returns total number of bytes read by the underlying kvFetcher. +// GetBytesRead returns total number of bytes read by the underlying KVFetcher. func (rf *Fetcher) GetBytesRead() int64 { return rf.kvFetcher.bytesRead } diff --git a/pkg/sql/row/fk_existence_batch.go b/pkg/sql/row/fk_existence_batch.go index 7c210b9d77ff..2eaa28f40d79 100644 --- a/pkg/sql/row/fk_existence_batch.go +++ b/pkg/sql/row/fk_existence_batch.go @@ -151,7 +151,7 @@ func (f *SpanKVFetcher) nextBatch( return true, res, nil, roachpb.Span{}, nil } -// getRangesInfo implements the kvBatchFetcher interface. -func (f *SpanKVFetcher) getRangesInfo() []roachpb.RangeInfo { - panic(errors.AssertionFailedf("getRangesInfo() called on SpanKVFetcher")) +// GetRangesInfo implements the kvBatchFetcher interface. +func (f *SpanKVFetcher) GetRangesInfo() []roachpb.RangeInfo { + panic(errors.AssertionFailedf("GetRangesInfo() called on SpanKVFetcher")) } diff --git a/pkg/sql/row/kv_batch_fetcher.go b/pkg/sql/row/kv_batch_fetcher.go index 7b9377212a2b..5897ad0d9e19 100644 --- a/pkg/sql/row/kv_batch_fetcher.go +++ b/pkg/sql/row/kv_batch_fetcher.go @@ -78,7 +78,7 @@ type txnKVFetcher struct { var _ kvBatchFetcher = &txnKVFetcher{} -func (f *txnKVFetcher) getRangesInfo() []roachpb.RangeInfo { +func (f *txnKVFetcher) GetRangesInfo() []roachpb.RangeInfo { if !f.returnRangeInfo { panic(errors.AssertionFailedf("GetRangesInfo() called on kvBatchFetcher that wasn't configured with returnRangeInfo")) } diff --git a/pkg/sql/row/kv_fetcher.go b/pkg/sql/row/kv_fetcher.go index da3fd4967090..2280a124dcc2 100644 --- a/pkg/sql/row/kv_fetcher.go +++ b/pkg/sql/row/kv_fetcher.go @@ -13,33 +13,47 @@ package row import ( "context" + "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" ) -// kvFetcher wraps kvBatchFetcher, providing a nextKV interface that returns the +// KVFetcher wraps kvBatchFetcher, providing a NextKV interface that returns the // next kv from its input. -type kvFetcher struct { +type KVFetcher struct { kvBatchFetcher kvs []roachpb.KeyValue batchResponse []byte bytesRead int64 - span roachpb.Span + Span roachpb.Span newSpan bool } -func newKVFetcher(batchFetcher kvBatchFetcher) kvFetcher { - return kvFetcher{ +// NewKVFetcher creates a new KVFetcher. +func NewKVFetcher( + txn *client.Txn, + spans roachpb.Spans, + reverse bool, + useBatchLimit bool, + firstBatchLimit int64, + returnRangeInfo bool, +) (*KVFetcher, error) { + kvBatchFetcher, err := makeKVBatchFetcher(txn, spans, reverse, useBatchLimit, firstBatchLimit, returnRangeInfo) + return newKVFetcher(&kvBatchFetcher), err +} + +func newKVFetcher(batchFetcher kvBatchFetcher) *KVFetcher { + return &KVFetcher{ kvBatchFetcher: batchFetcher, } } -// nextKV returns the next kv from this fetcher. Returns false if there are no +// NextKV returns the next kv from this fetcher. Returns false if there are no // more kvs to fetch, the kv that was fetched, and any errors that may have // occurred. -func (f *kvFetcher) nextKV( +func (f *KVFetcher) NextKV( ctx context.Context, ) (ok bool, kv roachpb.KeyValue, newSpan bool, err error) { for { @@ -66,7 +80,7 @@ func (f *kvFetcher) nextKV( }, newSpan, nil } - ok, f.kvs, f.batchResponse, f.span, err = f.nextBatch(ctx) + ok, f.kvs, f.batchResponse, f.Span, err = f.nextBatch(ctx) if err != nil { return ok, kv, false, err } diff --git a/pkg/sql/rowexec/dep_test.go b/pkg/sql/rowexec/dep_test.go index 146cbb37551a..fa0256f0331b 100644 --- a/pkg/sql/rowexec/dep_test.go +++ b/pkg/sql/rowexec/dep_test.go @@ -23,9 +23,7 @@ func TestNoLinkForbidden(t *testing.T) { buildutil.VerifyNoImports(t, "github.com/cockroachdb/cockroach/pkg/sql/rowexec", true, []string{ - // TODO(yuzefovich): backfiller processor brings in colexec. Fix this. - // Probably the problem is CFetcher. - //"github.com/cockroachdb/cockroach/pkg/sql/colexec", + "github.com/cockroachdb/cockroach/pkg/sql/colexec", "github.com/cockroachdb/cockroach/pkg/sql/colflow", "github.com/cockroachdb/cockroach/pkg/sql/rowflow", }, nil,