Skip to content

Commit

Permalink
sql: move CFetcher from sql/row into sql/colexec
Browse files Browse the repository at this point in the history
Previously CFetcher lived in sql/row which caused several things
that ought to live in sql/colexec to actually be in sql/colflow.
This also forced the "planning" code for columnar operators to be
in sql/colflow which didn't allow for reuse of that code in the
unit tests of the operators. Now CFetcher is moved into sql/colexec
(a few things needed to be exposed from sql/row) and several things
are moved into sql/colexec from sql/colflow.

I needed to extract things out of colexec/vecbuiltins package in
order not to create a cycle with colexec due to the movement of the
planning code into colexec.

Release justification: Category 1: Non-production code changes.

Release note: None
  • Loading branch information
yuzefovich committed Sep 16, 2019
1 parent 5cee194 commit 4523361
Show file tree
Hide file tree
Showing 48 changed files with 380 additions and 383 deletions.
12 changes: 6 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,9 @@ EXECGEN_TARGETS = \
pkg/sql/colexec/mergejoiner_leftsemi.eg.go \
pkg/sql/colexec/mergejoiner_rightouter.eg.go \
pkg/sql/colexec/min_max_agg.eg.go \
pkg/sql/colexec/overloads_test_utils.eg.go \
pkg/sql/colexec/rank.eg.go \
pkg/sql/colexec/row_number.eg.go \
pkg/sql/colexec/projection_ops.eg.go \
pkg/sql/colexec/quicksort.eg.go \
pkg/sql/colexec/rowstovec.eg.go \
Expand All @@ -812,10 +815,7 @@ EXECGEN_TARGETS = \
pkg/sql/colexec/sum_agg.eg.go \
pkg/sql/colexec/tuples_differ.eg.go \
pkg/sql/colexec/vec_comparators.eg.go \
pkg/sql/colexec/vecbuiltins/rank.eg.go \
pkg/sql/colexec/vecbuiltins/row_number.eg.go \
pkg/sql/colexec/zerocolumns.eg.go \
pkg/sql/colexec/overloads_test_utils.eg.go
pkg/sql/colexec/zerocolumns.eg.go

execgen-exclusions = $(addprefix -not -path ,$(EXECGEN_TARGETS))

Expand Down Expand Up @@ -1485,14 +1485,14 @@ pkg/sql/colexec/mergejoiner_leftsemi.eg.go: pkg/sql/colexec/mergejoiner_tmpl.go
pkg/sql/colexec/mergejoiner_rightouter.eg.go: pkg/sql/colexec/mergejoiner_tmpl.go
pkg/sql/colexec/min_max_agg.eg.go: pkg/sql/colexec/min_max_agg_tmpl.go
pkg/sql/colexec/quicksort.eg.go: pkg/sql/colexec/quicksort_tmpl.go
pkg/sql/colexec/rank.eg.go: pkg/sql/colexec/rank_tmpl.go
pkg/sql/colexec/row_number.eg.go: pkg/sql/colexec/row_number_tmpl.go
pkg/sql/colexec/rowstovec.eg.go: pkg/sql/colexec/rowstovec_tmpl.go
pkg/sql/colexec/select_in.eg.go: pkg/sql/colexec/select_in_tmpl.go
pkg/sql/colexec/sort.eg.go: pkg/sql/colexec/sort_tmpl.go
pkg/sql/colexec/sum_agg.eg.go: pkg/sql/colexec/sum_agg_tmpl.go
pkg/sql/colexec/tuples_differ.eg.go: pkg/sql/colexec/tuples_differ_tmpl.go
pkg/sql/colexec/vec_comparators.eg.go: pkg/sql/colexec/vec_comparators_tmpl.go
pkg/sql/colexec/vecbuiltins/rank.eg.go: pkg/sql/colexec/vecbuiltins/rank_tmpl.go
pkg/sql/colexec/vecbuiltins/row_number.eg.go: pkg/sql/colexec/vecbuiltins/row_number_tmpl.go
pkg/sql/colexec/zerocolumns.eg.go: pkg/sql/colexec/zerocolumns_tmpl.go

$(EXECGEN_TARGETS): bin/execgen
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/colexec/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ mergejoiner_leftouter.eg.go
mergejoiner_leftsemi.eg.go
mergejoiner_rightouter.eg.go
min_max_agg.eg.go
overloads_test_utils.eg.go
projection_ops.eg.go
quicksort.eg.go
rank.eg.go
row_number.eg.go
rowstovec.eg.go
selection_ops.eg.go
select_in.eg.go
sort.eg.go
sum_agg.eg.go
tuples_differ.eg.go
vec_comparators.eg.go
vecbuiltins/rank.eg.go
vecbuiltins/row_number.eg.go
zerocolumns.eg.go
overloads_test_utils.eg.go
4 changes: 2 additions & 2 deletions pkg/sql/colexec/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/col/coltypes"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/execerror"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
)

type caseOp struct {
Expand All @@ -40,7 +40,7 @@ func (c *caseOp) ChildCount() int {
return 1 + len(c.caseOps) + 1
}

func (c *caseOp) Child(nth int) execinfrapb.OpNode {
func (c *caseOp) Child(nth int) execinfra.OpNode {
if nth == 0 {
return c.buffer.input
} else if nth < len(c.caseOps)+1 {
Expand Down
76 changes: 38 additions & 38 deletions pkg/sql/row/cfetcher.go → pkg/sql/colexec/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package row
package colexec

import (
"bytes"
Expand All @@ -23,9 +23,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/colencoding"
"github.com/cockroachdb/cockroach/pkg/sql/colexec"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/execerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/typeconv"
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/scrub"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
Expand All @@ -36,8 +36,6 @@ import (
"github.com/cockroachdb/errors"
)

// TODO(yuzefovich): move CFetcher into sql/colexec.

// Only unique secondary indexes have extra columns to decode (namely the
// primary index columns).
func cHasExtraCols(table *cTableInfo) bool {
Expand Down Expand Up @@ -145,10 +143,10 @@ func (m colIdxMap) get(c sqlbase.ColumnID) (int, bool) {
return 0, false
}

// CFetcher handles fetching kvs and forming table rows for an
// cFetcher handles fetching kvs and forming table rows for an
// arbitrary number of tables.
// Usage:
// var rf CFetcher
// var rf cFetcher
// err := rf.Init(..)
// // Handle err
// err := rf.StartScan(..)
Expand All @@ -162,7 +160,7 @@ func (m colIdxMap) get(c sqlbase.ColumnID) (int, bool) {
// }
// // Process res.colBatch
// }
type CFetcher struct {
type cFetcher struct {
// table is the table that's configured for fetching.
table cTableInfo

Expand Down Expand Up @@ -194,7 +192,7 @@ type CFetcher struct {
traceKV bool

// fetcher is the underlying fetcher that provides KVs.
fetcher kvFetcher
fetcher *row.KVFetcher

// machine contains fields that get updated during the run of the fetcher.
machine struct {
Expand Down Expand Up @@ -231,16 +229,16 @@ type CFetcher struct {
}

// estimatedStaticMemoryUsage is the best guess about how much memory the
// CFetcher will use.
// cFetcher will use.
estimatedStaticMemoryUsage int
}

// Init sets up a Fetcher for a given table and index. If we are using a
// non-primary index, tables.ValNeededForCol can only refer to columns in the
// index.
func (rf *CFetcher) Init(
func (rf *cFetcher) Init(
reverse,
returnRangeInfo bool, isCheck bool, tables ...FetcherTableArgs,
returnRangeInfo bool, isCheck bool, tables ...row.FetcherTableArgs,
) error {
if len(tables) == 0 {
return errors.AssertionFailedf("no tables to fetch from")
Expand Down Expand Up @@ -290,7 +288,7 @@ func (rf *CFetcher) Init(
}
rf.machine.batch = coldata.NewMemBatch(typs)
rf.machine.colvecs = rf.machine.batch.ColVecs()
rf.estimatedStaticMemoryUsage = colexec.EstimateBatchSizeBytes(typs, coldata.BatchSize)
rf.estimatedStaticMemoryUsage = EstimateBatchSizeBytes(typs, coldata.BatchSize)

var err error

Expand Down Expand Up @@ -422,7 +420,7 @@ func (rf *CFetcher) Init(

// StartScan initializes and starts the key-value scan. Can be used multiple
// times.
func (rf *CFetcher) StartScan(
func (rf *cFetcher) StartScan(
ctx context.Context,
txn *client.Txn,
spans roachpb.Spans,
Expand Down Expand Up @@ -450,12 +448,14 @@ func (rf *CFetcher) StartScan(
firstBatchLimit++
}

f, err := makeKVBatchFetcher(txn, spans, rf.reverse, limitBatches, firstBatchLimit, rf.returnRangeInfo)
f, err := row.NewKVFetcher(
txn, spans, rf.reverse, limitBatches, firstBatchLimit, rf.returnRangeInfo,
)
if err != nil {
return err
}
rf.fetcher = f
rf.machine.lastRowPrefix = nil
rf.fetcher = newKVFetcher(&f)
rf.machine.state[0] = stateInitFetch
return nil
}
Expand Down Expand Up @@ -550,7 +550,7 @@ const debugState = false
// index used; columns that are not needed (as per neededCols) are empty. The
// Batch should not be modified and is only valid until the next call.
// When there are no more rows, the Batch.Length is 0.
func (rf *CFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) {
func (rf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) {
for {
if debugState {
log.Infof(ctx, "State %s", rf.machine.state[0])
Expand All @@ -559,7 +559,7 @@ func (rf *CFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) {
case stateInvalid:
return nil, errors.New("invalid fetcher state")
case stateInitFetch:
moreKeys, kv, newSpan, err := rf.fetcher.nextKV(ctx)
moreKeys, kv, newSpan, err := rf.fetcher.NextKV(ctx)
if err != nil {
return nil, execerror.NewStorageError(err)
}
Expand All @@ -568,7 +568,7 @@ func (rf *CFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) {
continue
}
if newSpan {
rf.machine.curSpan = rf.fetcher.span
rf.machine.curSpan = rf.fetcher.Span
// TODO(jordan): parse the logical longest common prefix of the span
// into a buffer. The logical longest common prefix is the longest
// common prefix that contains only full key components. For example,
Expand Down Expand Up @@ -661,7 +661,7 @@ func (rf *CFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) {
rf.machine.state[0] = stateFetchNextKVWithUnfinishedRow
case stateSeekPrefix:
for {
moreRows, kv, _, err := rf.fetcher.nextKV(ctx)
moreRows, kv, _, err := rf.fetcher.NextKV(ctx)
if err != nil {
return nil, execerror.NewStorageError(err)
}
Expand All @@ -684,7 +684,7 @@ func (rf *CFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) {
rf.shiftState()

case stateFetchNextKVWithUnfinishedRow:
moreKVs, kv, _, err := rf.fetcher.nextKV(ctx)
moreKVs, kv, _, err := rf.fetcher.NextKV(ctx)
if err != nil {
return nil, execerror.NewStorageError(err)
}
Expand Down Expand Up @@ -775,28 +775,28 @@ func (rf *CFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) {

// shiftState shifts the state queue to the left, removing the first element and
// clearing the last element.
func (rf *CFetcher) shiftState() {
func (rf *cFetcher) shiftState() {
copy(rf.machine.state[:2], rf.machine.state[1:])
rf.machine.state[2] = stateInvalid
}

func (rf *CFetcher) pushState(state fetcherState) {
func (rf *cFetcher) pushState(state fetcherState) {
copy(rf.machine.state[1:], rf.machine.state[:2])
rf.machine.state[0] = state
}

// getDatumAt returns the converted datum object at the given (colIdx, rowIdx).
// This function is meant for tracing and should not be used in hot paths.
func (rf *CFetcher) getDatumAt(colIdx int, rowIdx uint16, typ types.T) tree.Datum {
return colexec.PhysicalTypeColElemToDatum(rf.machine.colvecs[colIdx], rowIdx, rf.table.da, typ)
func (rf *cFetcher) getDatumAt(colIdx int, rowIdx uint16, typ types.T) tree.Datum {
return PhysicalTypeColElemToDatum(rf.machine.colvecs[colIdx], rowIdx, rf.table.da, typ)
}

// processValue processes the state machine's current value component, setting
// columns in the rowIdx'th tuple in the current batch depending on what data
// is found in the current value component.
// If debugStrings is true, returns pretty printed key and value
// information in prettyKey/prettyValue (otherwise they are empty strings).
func (rf *CFetcher) processValue(
func (rf *cFetcher) processValue(
ctx context.Context, familyID sqlbase.FamilyID,
) (prettyKey string, prettyValue string, err error) {
table := &rf.table
Expand Down Expand Up @@ -931,7 +931,7 @@ func (rf *CFetcher) processValue(
// processValueSingle processes the given value (of column
// family.DefaultColumnID), setting values in table.row accordingly. The key is
// only used for logging.
func (rf *CFetcher) processValueSingle(
func (rf *cFetcher) processValueSingle(
ctx context.Context,
table *cTableInfo,
family *sqlbase.ColumnFamilyDescriptor,
Expand Down Expand Up @@ -980,7 +980,7 @@ func (rf *CFetcher) processValueSingle(
if rf.traceKV {
prettyValue = rf.getDatumAt(idx, rf.machine.rowIdx, *typ).String()
}
if debugRowFetch {
if row.DebugRowFetch {
log.Infof(ctx, "Scan %s -> %v", rf.machine.nextKV.Key, "?")
}
return prettyKey, prettyValue, nil
Expand All @@ -989,13 +989,13 @@ func (rf *CFetcher) processValueSingle(

// No need to unmarshal the column value. Either the column was part of
// the index key or it isn't needed.
if debugRowFetch {
if row.DebugRowFetch {
log.Infof(ctx, "Scan %s -> [%d] (skipped)", rf.machine.nextKV.Key, colID)
}
return "", "", nil
}

func (rf *CFetcher) processValueBytes(
func (rf *cFetcher) processValueBytes(
ctx context.Context, table *cTableInfo, valueBytes []byte, prettyKeyPrefix string,
) (prettyKey string, prettyValue string, err error) {
prettyKey = prettyKeyPrefix
Expand Down Expand Up @@ -1038,7 +1038,7 @@ func (rf *CFetcher) processValueBytes(
return "", "", err
}
valueBytes = valueBytes[len:]
if debugRowFetch {
if row.DebugRowFetch {
log.Infof(ctx, "Scan %s -> [%d] (skipped)", rf.machine.nextKV.Key, colID)
}
continue
Expand Down Expand Up @@ -1082,13 +1082,13 @@ func (rf *CFetcher) processValueBytes(

// processValueTuple processes the given values (of columns family.ColumnIDs),
// setting values in the rf.row accordingly. The key is only used for logging.
func (rf *CFetcher) processValueTuple(
func (rf *cFetcher) processValueTuple(
ctx context.Context, table *cTableInfo, tupleBytes []byte, prettyKeyPrefix string,
) (prettyKey string, prettyValue string, err error) {
return rf.processValueBytes(ctx, table, tupleBytes, prettyKeyPrefix)
}

func (rf *CFetcher) fillNulls() error {
func (rf *cFetcher) fillNulls() error {
table := &rf.table
if rf.machine.remainingValueColsByIdx.Empty() {
return nil
Expand Down Expand Up @@ -1120,18 +1120,18 @@ func (rf *CFetcher) fillNulls() error {

// GetRangesInfo returns information about the ranges where the rows came from.
// The RangeInfo's are deduped and not ordered.
func (rf *CFetcher) GetRangesInfo() []roachpb.RangeInfo {
f := rf.fetcher.kvBatchFetcher
func (rf *cFetcher) GetRangesInfo() []roachpb.RangeInfo {
f := rf.fetcher
if f == nil {
// Not yet initialized.
return nil
}
return f.getRangesInfo()
return rf.fetcher.GetRangesInfo()
}

// getCurrentColumnFamilyID returns the column family id of the key in
// rf.machine.nextKV.Key.
func (rf *CFetcher) getCurrentColumnFamilyID() (sqlbase.FamilyID, error) {
func (rf *cFetcher) getCurrentColumnFamilyID() (sqlbase.FamilyID, error) {
// If the table only has 1 column family, and its ID is 0, we know that the
// key has to be the 0th column family.
if rf.table.maxColumnFamilyID == 0 {
Expand All @@ -1150,7 +1150,7 @@ func (rf *CFetcher) getCurrentColumnFamilyID() (sqlbase.FamilyID, error) {
}

// EstimateStaticMemoryUsage estimates how much memory is pre-allocated by the
// CFetcher.
func (rf *CFetcher) EstimateStaticMemoryUsage() int {
// cFetcher.
func (rf *cFetcher) EstimateStaticMemoryUsage() int {
return rf.estimatedStaticMemoryUsage
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package row
package colexec

import (
"testing"
Expand All @@ -19,7 +19,7 @@ import (
func TestCFetcherUninitialized(t *testing.T) {
// Regression test for #36570: make sure it's okay to call GetRangesInfo even
// before the fetcher was fully initialized.
var fetcher CFetcher
var fetcher cFetcher

assert.Nil(t, fetcher.GetRangesInfo())
}
Loading

0 comments on commit 4523361

Please sign in to comment.