diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index bed494080251..ba78dfd9b123 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -856,7 +856,7 @@ func NewColOperator( // unlimited one. We also need another unlimited account for the // KV fetcher. accounts := args.MonitorRegistry.CreateUnlimitedMemAccounts( - ctx, flowCtx, "cfetcher" /* opName */, spec.ProcessorID, 2, /* numAccounts */ + ctx, flowCtx, "cfetcher" /* opName */, spec.ProcessorID, 3, /* numAccounts */ ) estimatedRowCount := spec.EstimatedRowCount var scanOp colfetcher.ScanOperator @@ -932,9 +932,16 @@ func NewColOperator( } } if scanOp == nil { + var streamerDiskMonitor *mon.BytesMonitor + if core.TableReader.MaintainOrdering { + streamerDiskMonitor = args.MonitorRegistry.CreateDiskMonitor( + ctx, flowCtx, "streamer" /* opName */, spec.ProcessorID, + ) + } scanOp, resultTypes, err = colfetcher.NewColBatchScan( ctx, colmem.NewAllocator(ctx, accounts[0], factory), accounts[1], - flowCtx, spec.ProcessorID, core.TableReader, post, estimatedRowCount, args.TypeResolver, + accounts[2], flowCtx, spec.ProcessorID, core.TableReader, post, + estimatedRowCount, streamerDiskMonitor, args.TypeResolver, ) if err != nil { return r, err diff --git a/pkg/sql/colfetcher/BUILD.bazel b/pkg/sql/colfetcher/BUILD.bazel index de4a1c21faf3..5c766d7d2137 100644 --- a/pkg/sql/colfetcher/BUILD.bazel +++ b/pkg/sql/colfetcher/BUILD.bazel @@ -21,6 +21,7 @@ go_library( "//pkg/col/typeconv", "//pkg/keys", "//pkg/kv", + "//pkg/kv/kvclient/kvstreamer", "//pkg/kv/kvpb", "//pkg/roachpb", "//pkg/settings", diff --git a/pkg/sql/colfetcher/colbatch_scan.go b/pkg/sql/colfetcher/colbatch_scan.go index 199eeb7c4c0f..24c6ae04c974 100644 --- a/pkg/sql/colfetcher/colbatch_scan.go +++ b/pkg/sql/colfetcher/colbatch_scan.go @@ -12,10 +12,12 @@ package colfetcher import ( "context" + "math" "sync" "time" "github.com/cockroachdb/cockroach/pkg/col/coldata" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvstreamer" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" @@ -26,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/sql/row" + "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/mon" @@ -174,6 +177,9 @@ func newColBatchScanBase( } s := colBatchScanBasePool.Get().(*colBatchScanBase) + // TODO(yuzefovich): consider sorting these spans when + // spec.MaintainOrdering==false and when streamer isn't used (similar to + // what we do for lookup/index joins. s.Spans = spec.Spans if !flowCtx.Local { // Make a copy of the spans so that we could get the misplanned ranges @@ -331,11 +337,13 @@ func NewColBatchScan( ctx context.Context, fetcherAllocator *colmem.Allocator, kvFetcherMemAcc *mon.BoundAccount, + streamerBudgetAcc *mon.BoundAccount, flowCtx *execinfra.FlowCtx, processorID int32, spec *execinfrapb.TableReaderSpec, post *execinfrapb.PostProcessSpec, estimatedRowCount uint64, + diskMonitor *mon.BytesMonitor, typeResolver *descs.DistSQLTypeResolver, ) (*ColBatchScan, []*types.T, error) { base, bsHeader, tableArgs, err := newColBatchScanBase( @@ -344,19 +352,70 @@ func NewColBatchScan( if err != nil { return nil, nil, err } - kvFetcher := row.NewKVFetcher( - flowCtx.Txn, - bsHeader, - spec.Reverse, - spec.LockingStrength, - spec.LockingWaitPolicy, - flowCtx.EvalCtx.SessionData().LockTimeout, - kvFetcherMemAcc, - flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, - ) + + totalMemoryLimit := execinfra.GetWorkMemLimit(flowCtx) + cFetcherMemoryLimit := totalMemoryLimit + + var useStreamer bool + txn := flowCtx.Txn + // TODO(yuzefovich): support reverse scans in the streamer. + if !spec.Reverse { + useStreamer, txn, err = flowCtx.UseStreamer() + if err != nil { + return nil, nil, err + } + } + var kvFetcher *row.KVFetcher + if useStreamer { + if streamerBudgetAcc == nil { + return nil, nil, errors.AssertionFailedf("streamer budget account is nil when the Streamer API is desired") + } + var diskBuffer kvstreamer.ResultDiskBuffer + if spec.MaintainOrdering { + if diskMonitor == nil { + return nil, nil, errors.AssertionFailedf("diskMonitor is nil when ordering needs to be maintained") + } + diskBuffer = rowcontainer.NewKVStreamerResultDiskBuffer( + flowCtx.Cfg.TempStorage, diskMonitor, + ) + } + // Keep 1/16th of the memory limit for the output batch of the cFetcher, + // and we'll give the remaining memory to the streamer budget below. + cFetcherMemoryLimit = int64(math.Ceil(float64(totalMemoryLimit) / 16.0)) + streamerBudgetLimit := 15 * cFetcherMemoryLimit + kvFetcher = row.NewStreamingKVFetcher( + flowCtx.Cfg.DistSender, + flowCtx.Stopper(), + txn, + flowCtx.EvalCtx.Settings, + spec.LockingWaitPolicy, + spec.LockingStrength, + streamerBudgetLimit, + streamerBudgetAcc, + spec.MaintainOrdering, + false, /* singleRowLookup */ + int(spec.FetchSpec.MaxKeysPerRow), + diskBuffer, + kvFetcherMemAcc, + ) + // TODO(XXX): think through this. + base.batchBytesLimit = 0 + } else { + kvFetcher = row.NewKVFetcher( + txn, + bsHeader, + spec.Reverse, + spec.LockingStrength, + spec.LockingWaitPolicy, + flowCtx.EvalCtx.SessionData().LockTimeout, + kvFetcherMemAcc, + flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, + ) + } + fetcher := cFetcherPool.Get().(*cFetcher) fetcher.cFetcherArgs = cFetcherArgs{ - execinfra.GetWorkMemLimit(flowCtx), + cFetcherMemoryLimit, estimatedRowCount, flowCtx.TraceKV, true, /* singleUse */ diff --git a/pkg/sql/colfetcher/index_join.go b/pkg/sql/colfetcher/index_join.go index ea66b7f16f2d..b0104c2cd286 100644 --- a/pkg/sql/colfetcher/index_join.go +++ b/pkg/sql/colfetcher/index_join.go @@ -124,13 +124,22 @@ type ColIndexJoin struct { // table because the scan might synthesize additional implicit system columns. ResultTypes []*types.T - // maintainOrdering is true when the index join is required to maintain its - // input ordering, in which case the ordering of the spans cannot be changed. - maintainOrdering bool - // usesStreamer indicates whether the ColIndexJoin is using the Streamer // API. usesStreamer bool + + // sortSpans indicates whether the ColIndexJoin should sort the spans before + // passing them to the cFetcher. + // + // We do so when maintaining order is not required. This allows lower layers + // to optimize iteration over the data. Note that the looked up rows are + // output unchanged, in the retrieval order, so it is not safe to do this + // when maintainOrdering is true (the ordering to be maintained may be + // different than the ordering in the index). + // + // We don't want to sort the spans if we're using the Streamer since it will + // perform the sort on its own. + sortSpans bool } var _ ScanOperator = &ColIndexJoin{} @@ -195,16 +204,7 @@ func (s *ColIndexJoin) Next() coldata.Batch { s.state = indexJoinDone continue } - - if !s.usesStreamer && !s.maintainOrdering { - // Sort the spans when !maintainOrdering. This allows lower layers to - // optimize iteration over the data. Note that the looked up rows are - // output unchanged, in the retrieval order, so it is not safe to do - // this when maintainOrdering is true (the ordering to be maintained - // may be different than the ordering in the index). - // - // We don't want to sort the spans here if we're using the - // Streamer since it will perform the sort on its own. + if s.sortSpans { sort.Sort(spans) } @@ -609,16 +609,16 @@ func NewColIndexJoin( ) op := &ColIndexJoin{ - OneInputNode: colexecop.NewOneInputNode(input), - flowCtx: flowCtx, - processorID: processorID, - cf: fetcher, - spanAssembler: spanAssembler, - ResultTypes: tableArgs.typs, - maintainOrdering: spec.MaintainOrdering, - txn: txn, - usesStreamer: useStreamer, - limitHintHelper: execinfra.MakeLimitHintHelper(spec.LimitHint, post), + OneInputNode: colexecop.NewOneInputNode(input), + flowCtx: flowCtx, + processorID: processorID, + cf: fetcher, + spanAssembler: spanAssembler, + ResultTypes: tableArgs.typs, + txn: txn, + usesStreamer: useStreamer, + sortSpans: !spec.MaintainOrdering && !useStreamer, + limitHintHelper: execinfra.MakeLimitHintHelper(spec.LimitHint, post), } op.mem.inputBatchSizeLimit = getIndexJoinBatchSize( useStreamer, flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, flowCtx.EvalCtx.SessionData(), diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index a8a185bb825a..98bb5d96fb9b 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -1614,6 +1614,7 @@ func initTableReaderSpecTemplate( TableDescriptorModificationTime: n.desc.GetModificationTime(), LockingStrength: n.lockingStrength, LockingWaitPolicy: n.lockingWaitPolicy, + MaintainOrdering: len(n.reqOrdering) > 0, } if err := rowenc.InitIndexFetchSpec(&s.FetchSpec, codec, n.desc, n.index, colIDs); err != nil { return nil, execinfrapb.PostProcessSpec{}, err diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index d8e3bb98fb11..f89d38f94e28 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -764,16 +764,24 @@ func (dsp *DistSQLPlanner) Run( if p.Spec.Core.LocalPlanNode != nil { return true } + if p.Spec.Core.TableReader != nil { + if planCtx.planner != nil { + if aost := planCtx.planner.EvalContext().AsOfSystemTime; aost != nil && + aost.BoundedStaleness { + // BoundedStaleness requires the RootTxn. + return true + } + } + } } return false }() if !containsNonDefaultLocking && !mustUseRootTxn { if evalCtx.SessionData().StreamerEnabled { for _, proc := range plan.Processors { - if jr := proc.Spec.Core.JoinReader; jr != nil { - // Both index and lookup joins, with and without - // ordering, are executed via the Streamer API that has - // concurrency. + if core := proc.Spec.Core; core.TableReader != nil || core.JoinReader != nil { + // TableReader and JoinReader cores might be powered + // by the Streamer API which has concurrency. localState.HasConcurrency = true break } diff --git a/pkg/sql/distsql_spec_exec_factory.go b/pkg/sql/distsql_spec_exec_factory.go index 4be97afb5e69..edb83df76f86 100644 --- a/pkg/sql/distsql_spec_exec_factory.go +++ b/pkg/sql/distsql_spec_exec_factory.go @@ -250,6 +250,7 @@ func (e *distSQLSpecExecFactory) ConstructScan( *trSpec = execinfrapb.TableReaderSpec{ Reverse: params.Reverse, TableDescriptorModificationTime: tabDesc.GetModificationTime(), + MaintainOrdering: len(reqOrdering) > 0, } if err := rowenc.InitIndexFetchSpec(&trSpec.FetchSpec, e.planner.ExecCfg().Codec, tabDesc, idx, columnIDs); err != nil { return nil, err diff --git a/pkg/sql/execinfra/readerbase.go b/pkg/sql/execinfra/readerbase.go index fbfd2b6cfa68..3475acfcdbcb 100644 --- a/pkg/sql/execinfra/readerbase.go +++ b/pkg/sql/execinfra/readerbase.go @@ -232,6 +232,7 @@ func (flowCtx *FlowCtx) UseStreamer() (bool, *kv.Txn, error) { var UseStreamerEnabled = settings.RegisterBoolSetting( settings.TenantWritable, "sql.distsql.use_streamer.enabled", + // TODO(XXX): description update. "determines whether the usage of the Streamer API is allowed. "+ "Enabling this will increase the speed of lookup/index joins "+ "while adhering to memory limits.", diff --git a/pkg/sql/execinfrapb/processors_sql.proto b/pkg/sql/execinfrapb/processors_sql.proto index d3e64c803ff9..71bdc4acc522 100644 --- a/pkg/sql/execinfrapb/processors_sql.proto +++ b/pkg/sql/execinfrapb/processors_sql.proto @@ -127,6 +127,8 @@ message TableReaderSpec { // leaseholder of the beginning of the key spans to be scanned). optional bool ignore_misplanned_ranges = 22 [(gogoproto.nullable) = false]; + optional bool maintain_ordering = 23 [(gogoproto.nullable) = false]; + reserved 1, 2, 4, 6, 7, 8, 13, 14, 15, 16, 19; } diff --git a/pkg/sql/opt/exec/execbuilder/testdata/dist_vectorize b/pkg/sql/opt/exec/execbuilder/testdata/dist_vectorize index 77c4c9ebe2e0..efe47ada384a 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/dist_vectorize +++ b/pkg/sql/opt/exec/execbuilder/testdata/dist_vectorize @@ -116,6 +116,7 @@ regions: │ KV bytes read: 40 B │ KV gRPC calls: 5 │ estimated max memory allocated: 0 B +│ estimated max sql temp disk usage: 0 B │ missing stats │ table: kv@kv_pkey │ spans: FULL SCAN @@ -131,11 +132,12 @@ regions: KV bytes read: 40 B KV gRPC calls: 5 estimated max memory allocated: 0 B + estimated max sql temp disk usage: 0 B missing stats table: kw@kw_pkey spans: FULL SCAN · -Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJzsmt1u2zYUx-_3FASv2lWuTX04toACxtIOSLckRRL0ZggKRjqxBcmiK9J2vCCPtRfYkw2S5yS2LM9HcyVDUi6KSqQ-zu_w_PknrUcqvwfUpteffv90ekN-Jr9eXZ4Tf0Y-X55dEH9OLi-IP3vvkw_En7_3qUZD4cIFH4Ok9h-UUY3qVKMG1ahJNWrRW41OIuGAlCKKuzwmF5y5D9TuaNQLJ1MVn77VqCMioPYjVZ4KgNr0ht8FcAXchajdoRp1QXEvSB7jzwb-7NvEhwXV6KkIpuNQ2sTXyIxq9HrC46NWO36R374S5Y3BJp2__5LLY0eECkLliTDVFIm5JC44wgXXJmx58m6hQJIIuGuTHvlleXJ49eWUODwI5HO_CfeiVb_40edfT0-JVDAhjpiGiryBB9X2QvXWJp0koGUHAD-rw5g_kDGMRbQgPAiEw1X8Wp3kHe64ckYgiZiqyVTZJO6fvP7qBKO3TxpdHi3Br8DeLciIy9E60kHc_1ajUvEhUJu9StvZR2p3nrR8metuZG4-8OfbMjd_yVybNbn7P7nTN3LXzczdy31F5EIE7uZ938UvslevLcPgHKIhfBZeCFGbbVRwAPfqzYC9e_sh8oaj5X-pRi_j6AdMG-jawNAGsYTAAzjTdMZ38Y3b5PeAKBhPiOtJn0wlH0Ju_C9ojQ20LLsuXjhMw23ctiK7EC0xaTNrHVYGgv-Mw8qMw9yMw1qLg-1f3wyvzG291Taa-j6YNrO8uTvJoc1N7g6qzSeZuStQm1k1tXm9LvT960LPoWlGq202dXEwTdPz5q6XQ9Oa3B1U03qZuStQ0_Rqatp6XRj714WRQ9PMVttq6uJgmmbkzV0_h6Y1uTuopvUzc1egphnV1LT1ujD3rwszh6ZZraYqDqZoZt7MWXhFa7UZ4aFLGBFqBFGTxYNpm5WZxQK1zaymtpmYffMrkBMRStjYJ9z-qM7Go1osZgzuEJaZk2IaOfAlEk7Sd3l4mdwoKWQXpFq26suDs3DVJBVXuzYefyTbDk1CjICPn3932D-UbmYoIai5iHwScAWhs3iOZXV-zj21HqULEiKPB96fPI1gdVny7hE44M0SBq-aVtry3JYAWLWOQcZcNi_G8mFIQP2SAbEdgHoHAcTWAelIQCy7Gio6hAwsIatuhMwUIYYQVP2oBLWLC6WbGUpFc82QgPolAypeUJGAWHY1VHQIGVhCVt0ImSlC-iahzmtCRnrGWd3KSmmzsfNWbF3ROkclzqlYzJzGvaLjhuH4ZK8Gyk81Q8ZStnEvPNc6ElANjTuWUNnGvXCvYqYIWTmNe0WH0AmOT_Zq4AgEFRlL2ca9eEFFAqqhcccSKtu4Fy-oKUJdrHGvNiArNeOcNBZ-TYVwfOq3oY8ElL0uKH9K1pGxlG7hC5cLA0uobAtf_FZRilCvsfCvAfVwfOq3oY8ElL0uOAJBRcZSuoUvXlCxhMq28MULaopQv7HwaxY-NeOwvN_PVHQIGUhAZZv4wocQwxKq3-4-ltCOtUH587KBjqZ-Pj6NKO9HNBUl1EcCKtvJF6-qWEL12-LHEtqxQDgCVUVHUz8zn0aE_pKm2qJhpeed1AdCR-Xmi_9NBwmobDdf_A4SllD93DyWUOk7_cWvmdGIspcI5U_NZjoazKeKNTCrFhJQ2W6-eFXFEqqfm8cSKn27v3hVRSPKXiIcgaqmo0l9gFRzN28-3Wr0PhDzb55Lbdr596-15Z_VH40v4ENJ7Ud6PRLzhNbNYgKS2vc8kKDRc-7DR1AQjb3Qk8pzqK2iKTw9_fRPAAAA__88tZtc +Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJzsmt1u2zYUx-_3FASv2lWuTX04toACxtIOSLckRVr0ZggKRjqxBcmiK9J2vCCPtRfYkw2S5yS2rMxHWClDUi6KSqQ-zu_w_PknrXsqv0fUpZ8__P7h9Av5mfx6dXlOwgX5eHl2QcIlubwg4eJtSN6RcPk2pAaNhQ8XfAqSun9QRg1qUoNa1KA2NahDrw06S4QHUook7XKfXXDm31G3Z9Agns1VevraoJ5IgLr3VAUqAurSL_wmgivgPiTdHjWoD4oHUfaYcDEKF99mIayoQU9FNJ_G0iWhQRbUoJ9nPD3qdNMX-e0rUcEUXNL7-y-5PvZErCBWgYhzTYlYSuKDJ3zwXcLWJ29WCiRJgPsuGZBf1ifHV59OicejSD72m_Eg2fRLH33-9fSUSAUz4ol5rMgruFPdIFavXdLLAlp3AAiLOkz5HZnCVCQrwqNIeFylr9XL3iFtk98jomA6I34gQzKXfAyb5huuvAlIIuZqNlcuSW-XRbc5wej1g0HXR-u8bLjfrMiEy8k28VHa_9qgUvExUJc9y-rZe-r2Hoxyie3vJHY5Cpf7Ert8SmyXtan9gak1d1LbL0zt031F4kMC_u5936QvclCvPaPkHJIxfBRBDEmX7dR_BLfq1Yi9ef0uCcaT9X-pQS_T6EfMGJnGyDJGqQDBHXjz_IDQif8JrbWDlhWXzROHebyP215kF6IjZl3mbMMqQPCfcTiFcdi7cThbcbDDy5_hdb1rdrpWW_66lJ2VTe1JCWVvU6tT2U8KU6tR2Vk9lX27bMzDy8YsoYhWp2u3ZaNLEc2yqR2UUMQ2tToVcVCYWo2KaNZTEbfLxjq8bKwSimh3uk5bNroU0Sqb2mEJRWxTq1MRh4Wp1aiIVj0Vcbts7MPLxi6hiE6nLRpdemiXTayD18NOlxEe-4QRoSaQtEnWpYxOYZI1KqNdT2W0MT8nXIGciVjCzv7o_kf1dh7VYSlj8MewzpwU88SDT4nwsr7rw8vsRlmd-yDVutVcH5zFmyapuHppw_VHsu3RLMQE-PTx55jDQ-kXhhKDWookJBFXEHurx1g255c8UNtR-iAhCXgU_MnzCDaXZe-egAfBImPwrGkjPY9tGYBN6xRkymX3YiwfhgQ0rBgQewHQ4H8BxLYBmUhArLgaajqELCwhp2mE7BwhhhBU86gEtY8LpV8YSk1zzZCAhhUD0i-oSECsuBpqOoQsLCGnaYTsHCFzl1DvOSErP-NsbuXktNl68VZsW9F6RyXOuVjsksa9puOG4fgUrwaqTzVDxlK1cdeeaxMJqIHGHUuoauOu3avYOUJOSeNe0yF0guNTvBo4AkFFxlK1cdcvqEhADTTuWEJVG3f9gpoj1Mca93oDcnIzzklr4bdUCMeneRv6SEDF64Lqp2QTGUvlFl67XFhYQlVbeP1bRTlCg9bCPwc0wPFp3oY-ElDxuuAIBBUZS-UWXr-gYglVbeH1C2qO0LC18FsWPjfjsLLfz9R0CFlIQFWbeO1DiGEJNW93H0vohbVB9fOyhY6meT4-j6jsRzQ1JTREAqrayetXVSyh5m3xYwm9sEA4AlVFR9M8M59HhP6Spt6i4eTnndwHQkfl5vX_poMEVLWb17-DhCXUPDePJVT5Tr_-NTMaUfESofqp2c5Hg_lUsQFm1UECqtrN61dVLKHmuXksocq3-_WrKhpR8RLhCFQ1H03uA6SGu3n74dqgt5FYfgt86tLev3-dPf9s_mh6AR9L6t7TzxOxzGh9Wc1AUveWRxIMes5DeA8KkmkQB1IFHnVVMoeHh5_-CQAA___IlP5s query T EXPLAIN (VEC, VERBOSE) SELECT count(*) FROM kv diff --git a/pkg/sql/opt/exec/execbuilder/testdata/explain_analyze_plans b/pkg/sql/opt/exec/execbuilder/testdata/explain_analyze_plans index 50ad414cbe32..05a43cdb3e1f 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/explain_analyze_plans +++ b/pkg/sql/opt/exec/execbuilder/testdata/explain_analyze_plans @@ -98,6 +98,7 @@ regions: │ KV bytes read: 40 B │ KV gRPC calls: 5 │ estimated max memory allocated: 0 B + │ estimated max sql temp disk usage: 0 B │ missing stats │ table: kv@kv_pkey │ spans: FULL SCAN @@ -113,11 +114,12 @@ regions: KV bytes read: 40 B KV gRPC calls: 5 estimated max memory allocated: 0 B + estimated max sql temp disk usage: 0 B missing stats table: kw@kw_pkey spans: FULL SCAN · -Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJzsm-1u2swSx7-fq1jtp1Q1hfULEEuVaNOeo_Q0UOWlUnUURQ6eEsvGpvYSkhPlsp4beK7skU1JsDeYDFAbeZ0PFdjGeH4znj_-7_SBRr88atKzz18_H50T9_adqxDrdnTgzt65b8i_TwcnxL0lXwbHfeLOyKCfHELek3g_-c_p4OIb-fgj2UgV6gc29K0xRNT8H2VUoSpVqEYVqlOFGvRSoZMwGEIUBWF8yEPygWP7jpothTr-ZMrjzZcKHQYhUPOBcod7QE16bl17cAqWDWGzRRVqA7ccL_ka97bn3l5NXLinCj0KvOnYj0wSX83ZxIpfNprxVfz3O-HOGEzS-vuvaP5-GPgcfO4EvrArDGYRsWEY2GCbhM03Xt9ziEgIlm2SLvk43zg6_XZEhpbnRU_HTSwnXBwXf_XJ96MjEnGYkGEw9Tk5gDvedHz-xiStJJr5AQDuqgPG1h0ZwzgI74nlecHQ4vFltZJruLb48AYiEkz5ZMpNEh-fXP5iA6OXjwqdv5tTX1C9vic3VnST5tmLj79UaMStEVCTLeXs-BM1W4_KZmlrZ9I267mzvLQ1WZ24bRKnZhLXXpm45_MGoQ0h2Nnzvo0v5FVHvVADJxCO4Evg-BA2Webe9eAnP-ixt2_eh87oZv6SKnQQR99jSi9OAtzBcCrmOo9svC_65REO4wmxncgl08gawcbgn6FqGagMdTt8GI1CGFk8CJvMEJKn0MEcbBw7VeiH_o-r_uD8qn_x9etBj8Vgzi5ODnpq_OpocNE___16BaE_XF96FoWxXYHl81Jb2_E6uzi5Oo6JafG7U_BtCJMaIz212dN2SPGZkJEhpK4ulueYp_5LjF7E0w8awaSpZipp0ziMlXG0s3GkM81erwEMKd1NtdHUag3YmXizTRPXwYp3nbidindnZeIKFG9WNfFG3Q7L4t2unHi3tyuwNeLNKiDe6WJRX987VazoaY2mXvfOnYmeumniuljRqxO3U9HrrkxcgaKnVk30ULfDsuh1Kid6ne0KbI3oqRUQvXSxaK_vnRpW9PRG06h7585ET9s0cYdY0asTt1PRO1yZuAJFT6ua6KFuh2XR61ZO9LrbFdga0dMqIHrpYtFf3zt1rOgZjbpz7kzy9E3TZiAlr9FkxPJtwkjAbyCsU7gz8TNWprBA8dOrJn6oG2NZ_A4rJ36H2xXYGvHTKyB-OmZB-xSiSeBHkFmrfPmrWpmvarCYJ9gjmPOPgmk4hG9hMEyOnb8dJCdKmr0NEZ_vVedvjv3FrohbPG_x80_eiC2ahBiCNX6aj3l9KN2VofjAZ0HoEs_i4A_vn2JZbJ9ZDk9HaUMEoWN5zv8tEcHiY8m1hzAE5zZhsLRrIUFP-xIAi71jiGIu2Q9j-TAkIKaVTIjlEOruhBBLE1KxhKSrIQ1JSC27hgonpAuEGKKjqnvVUdu4ULorQ6lorhkSUKajFk-o-I6KJSRdDWlIQmrZNVR8RxUIqVlCrWVCaUCt5VMZQnPWcmHr-_RzN3NrGQYuFtaSTYrbyGwzQzpCyBpSpauhDrKGVOlqqCPUkJ7bnl8AtDhVV4Bt5J5KbYutfm9-PAuxtDd0VipaNwzHZ_WjdvmpZshYSndWCk-2iiUkn7OCJFS6s1L4s6QuEOps6KxUtIQ6OD6rH7X3oKMiYyndWSm-o2IJyeesIAmV7qwU31EFQt3c39uHOc6K0JwPN3RWKlqNhuBi5QPKsWvK957ayGxL6Kwg0y2hs4KsIQmdFXFpQFhtXWetVFvAukINMcxqqwQ1pCIBSffczbCEcsyb8p-bVHQ0ZSe88J6hYRGV7rQUv-IqIhKWXOW2WrpIQNI9fDMsoRwHZw_aKjqashNefFvFIirdbim-rYqIhMXk1O95pucYLmKPFtZdJXdcBHdrDSEJJ2SwRZRj5OyBK4VNuISmCzbhErouYhXlT6GUTah410Usonq2Jf1jCAlIuodwhiUk4cgMGtE-j0hp2GgkdF1ERPWAS4rQIRKQdA_hDEtIwrkZNKJ9npPSsNFI6LqIiPKnXFjemIvYo-s5l7TrIppcGw-6VBRRG1tEEo7PYKsox8op35jqYBMuoesiJFytZ13SrotQROp-z7oUP86KBFS261L8oiyWkISuCxqRdFWkYRHlWDnlP0HpYjT7PetSeFsV_n_vGkBluy7Ft1UsIQldFzQi6apIwyLKsXL2oK2K0eTPuqh5sy5ij65nXdI9WjC51hCS0HXBFpGErgu2iiQcoMFWUY6VswfGlJjwetYl7broj5cK_ekFsyvHpiZt_f5rvPDP4o_GH7BGETUf6NlNMEtond9PIKLmT8uLQKEnlgufgEM4dnwn4s6QmjycwuPjv_4JAAD__4EMrOA= +Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJzsm-1u2kgXx78_VzGaT6lqCuMXIJYq0aZ9Vuk2UOWlUrWKIgefEgtjU3sIyUa5rL2BvbKVTUmwJ5gc0o6Rx_lQgW2Mz-8cnz_-z-kdjX_41KYnHz9_PDgl4-s3Y40416O98fzN-BX5__HgiIyvyafBYZ-M52TQTw8hb0myn_xxPDj7Qt5_SzdSjQahC31nAjG1_6KMalSnGjWoRk2qUYuea3QahUOI4zBKDrlLP3Do3lC7pVEvmM54svlco8MwAmrfUe5xH6hNT51LH47BcSFqtqhGXeCO56dfM77uja8vpmO4pRo9CP3ZJIhtklzNydRJXjaayVX8-ZVwbwI2af37T7x4PwwDDgH3wkDYFYXzmLgwDF1wbcIWGy9vOcQkAse1SZe8X2wcHX85IEPH9-OH46aOFy2PS7766OvBAYk5TMkwnAWc7MENb3oBf2WTVhrN4gCA8boDJs4NmcAkjG6J4_vh0OHJZbXSa0j2xT98wmEyJa4Xj8ksdkaw3H3p8OEVxCSc8emM2yQ5XRrdcgOj5_caXbxbJGUJ_fKWXDnxVRZ3Lzn-XKMxd0ZAbbaS0sMP1G7da9tltZ3L6rw3nhdltcnqvP7GvOq5vLbX5vXxvGHkQgRu_ryvkwt51lFPlMgRRCP4FHoBRE2Wu_N9-M73euz1q7eRN7pavKQaHSTR95jWS3IENzCciaUgE_wjVCMHlaHulnejUQQjh4dRk1lC8jQ6WIBNYqcafdf_dtEfnF70zz5_3uuxBMzJ2dFeT09eHQzO-qc_X68h9Jvry8yjsF5WYMW89NbLeJ2cHV0cJsSM5N0xBC5EaY2Rnt7sGb-Q4iMhK0dIX18sjzHPgqcYPYmnHzbCaVPPVdK2cVhr42jn48hmmj1fIhhS-Jt6o2nUEiFL-tm2ee1gpb_Oq0zp76zNq0TpZ1WTftTdsir97cpJf_tlBbZB-lkFpD9bLPrzW6uOlUyj0TTr1ipLMvVt89rFSmadV5mS2V2bV4mSqVdNMlF3y6pkdionmZ2XFdgGydQrIJnZYjGe31oNrGSajaZVt1ZZkmlsm9d9rGTWeZUpmftr8ypRMo2qSSbqblmVzG7lJLP7sgLbIJlGBSQzWyzm81uriZVMq1E3VlmCaW6bVQspmI0mI07gEkZCfgVRnWFZ0mmtzbBE6TSrJp2o-2ZVOvcrJ537LyuwDdJpVkA6TcxC_jHE0zCIIbdG-_RXtXJf1WAJT3BHsOAfh7NoCF-icJgeu3g7SE-UaoELMV_s1RdvDoPlrpg7vGjR93feiC2ahhiBM3kYG3p-KN21oQTA52E0Jr7DIRjePsSy3D53PJ6N0oUYIs_xvb8dEcHyY-m1RzAE7zplsLJrqVAP-1IAy70TiBMu-Q9j-TAkIGaUTIgVEOr-EkIsS0jHElKuhgwkIb3sGpJOyBQIMURH1Xeqo7ZxoXTXhlLRXDMkoFxHlU9IfkfFElKuhgwkIb3sGpLfUQVCep5Qa5VQFlBr9VSW0JyNQtjmLv3czd1aloWLhbVUk-I2MtvMUo4QsoZ05Wqog6whXbka6gg1ZBa25ycALU_VFWBbhafS22Kr35kfz0Is7S2dlYrWDcPxWf-oXX6qGTKW0p0V6cnWsYTUc1aQhEp3VqQ_S5oCoc6WzkpFS6iD47P-UXsHOioyltKdFfkdFUtIPWcFSah0Z0V-RxUIdQt_b-8XOCtCc97f0lmpaDVagotVDKjArinfe2ojs62gs4JMt4LOCrKGFHRWxKUBYbV1k7VSbQHrCjXEMKutCtSQjgSk3HM3wxIqMG_Kf27S0dGUnXDpPcPAIirdaZG_4ioiEpZc1bZaukhAyj18MyyhAgdnB9oqOpqyEy6_rWIRlW63yG-rIiJhMTnze56ZBYaL2KOFdVfFHRfB3dpASMEJGWwRFRg5O-BKYROuoOmCTbiCrotYRcVTKGUTku-6iEVUz7ZkfwwhASn3EM6whBQcmUEj2uURKQMbjYKui4ioHnDJENpHAlLuIZxhCSk4N4NGtMtzUgY2GgVdFxFR8ZQLKxpzEXt0PeeSdV1Ek2vrQZeKImpji0jB8RlsFRVYOeUbUx1swhV0XYSE6_WsS9Z1EYpI3-1ZF_njrEhAZbsu8hdlsYQUdF3QiJSrIgOLqMDKKf8JyhSj2e1ZF-ltVfj_vRsAle26yG-rWEIKui5oRMpVkYFFVGDl7EBbFaMpnnXRi2ZdxB5dz7pke7Rgcm0gpKDrgi0iBV0XbBUpOECDraICK2cHjCkx4fWsS9Z1Me_PNfrdD-cXnktt2vr513jin-UfTT7gjGJq39GTq3Ce0jq9nUJM7e-OH4NGj5wxfAAO0cQLvJh7Q2rzaAb39__7LwAA___inA__ # This query verifies stats collection for the hashJoiner, distinct and sorter. query T diff --git a/pkg/sql/opt/exec/execbuilder/testdata/lookup_join_limit b/pkg/sql/opt/exec/execbuilder/testdata/lookup_join_limit index 772feb554c88..196bd12f2124 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/lookup_join_limit +++ b/pkg/sql/opt/exec/execbuilder/testdata/lookup_join_limit @@ -441,6 +441,7 @@ regions: KV bytes read: 16 B KV gRPC calls: 2 estimated max memory allocated: 0 B + estimated max sql temp disk usage: 0 B estimated row count: 1 (100% of the table; stats collected ago) table: a@a_y_idx spans: FULL SCAN (SOFT LIMIT) diff --git a/pkg/sql/opt/exec/execbuilder/testdata/select b/pkg/sql/opt/exec/execbuilder/testdata/select index 2bd8347c6520..61f8c7baf214 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/select +++ b/pkg/sql/opt/exec/execbuilder/testdata/select @@ -1924,6 +1924,10 @@ ALTER TABLE a SPLIT AT VALUES(5) statement ok SELECT * FROM a +# We want to ensure that local spans are parallelized without the streamer. +statement ok +SET streamer_enabled = false + # Make sure that the scan actually gets parallelized. statement ok SET tracing = on; SELECT * FROM a WHERE a = 0 OR a = 10; SET tracing = off @@ -1950,6 +1954,9 @@ querying next range at /Table/127/1/0/0 === SPAN START: kv.DistSender: sending partial batch === querying next range at /Table/127/1/10/0 +statement ok +RESET streamer_enabled + # Test for 42202 -- ensure filters can get pushed down through project-set. statement ok CREATE TABLE e (x INT PRIMARY KEY, y INT, z STRING); diff --git a/pkg/sql/opt/exec/execbuilder/testdata/show_trace b/pkg/sql/opt/exec/execbuilder/testdata/show_trace index 488fe2750314..2f9f48d32586 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/show_trace +++ b/pkg/sql/opt/exec/execbuilder/testdata/show_trace @@ -275,8 +275,19 @@ WHERE (message LIKE 'querying next range%' OR message LIKE '%batch%') AND message NOT LIKE '%SystemConfigSpan%' AND message NOT LIKE '%PushTxn%' ---- +colbatchscan querying next range at /Table/12/1 dist sender send querying next range at /Table/12/1 dist sender send r14: sending batch 1 Scan to (n1,s1):1 +dist sender send querying next range at /Table/12/1/2023-06-03T17:35:01.249961Z/"\xcd\xecI\xf2\xdd\x04Fs\xb6C\xe5\xe6r\xbb\x9c\xf8"/0 +dist sender send r14: sending batch 1 Scan to (n1,s1):1 +dist sender send querying next range at /Table/12/1/2023-06-03T17:35:01.437908Z/"2\x8f\xb7_\xc5\xe2F\x1a\x9a\x8d1*\xfd\xee\xd5\x0f"/0 +dist sender send r14: sending batch 1 Scan to (n1,s1):1 +dist sender send querying next range at /Table/12/1/2023-06-03T17:35:01.525967Z/"N\t/\xf6\x9d*D4\x94\x17/\xba@\xb0\x87,"/0 +dist sender send r14: sending batch 1 Scan to (n1,s1):1 +dist sender send querying next range at /Table/12/1/2023-06-03T17:35:01.682513Z/"\x8a_\xb6eU\x9fD\xa6\xb8\xc0\xa1\x00\xf1\xa1g\xa5"/0 +dist sender send r14: sending batch 1 Scan to (n1,s1):1 +dist sender send querying next range at /Table/12/1/2023-06-03T17:35:02.038574Z/"}\x0e\xc4\xf9\x8f3D٭\xfeX\x84\xf0\x9f\xf4v"/0 +dist sender send r14: sending batch 1 Scan to (n1,s1):1 # Regression tests for incorrect interaction between consecutive session traces # (#59203, #60672). diff --git a/pkg/sql/opt/exec/execbuilder/testdata/vectorize_local b/pkg/sql/opt/exec/execbuilder/testdata/vectorize_local index 2b3627e1d8e3..7b1a2a09f3d4 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/vectorize_local +++ b/pkg/sql/opt/exec/execbuilder/testdata/vectorize_local @@ -176,6 +176,7 @@ regions: │ KV bytes read: 16 B │ KV gRPC calls: 2 │ estimated max memory allocated: 0 B +│ estimated max sql temp disk usage: 0 B │ estimated row count: 1 (100% of the table; stats collected ago) │ table: c@c_pkey │ spans: FULL SCAN @@ -191,11 +192,12 @@ regions: KV bytes read: 16 B KV gRPC calls: 2 estimated max memory allocated: 0 B + estimated max sql temp disk usage: 0 B missing stats table: d@d_pkey spans: FULL SCAN · -Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJzsU81OIzkQvu9TlOoEWgPdzWoPlpCiDdlRmEmCAuIyipBjF40Vt93YbpEI5bHmBebJRu4GhoQ_MZrj-NByVX3-qvx97jsMNwY5ng2-DPrnIPcF_D-djEDCcDweTGE0mH4awMlkOAYFk3ELOAK1P0eG1ikai4oC8q-Y44xh7Z2kEJxPqbsWMFRL5BlDbesmpvSMoXSekN9h1NEQcjwXc0NTEor8QYYMFUWhTUsre_KyXtAKGfadaSobOAhkeFaLtN1Dhp8vIOqKOGTfv4Uuls5GslE7-6zk3W0ARdIpUhyKLjlfRQrgSSgO-b_wX5ctp6d9kMKY8AishfYPwH-Q4eii34cQqQbpGhthh5bxQNu4yyFrr9IBiBavASqxhIoq51cgjHFSxDRX1s4wF1FeUwDXxLqJHBK-nf8hUeBszbCL7rUNUZSEPH9ixvAYebZmv-ZHvumH6qlnfsz_-PGuH8WWH_mrfvy0obHOK_KkNiyYpZPvQV4wdUS-pBOnLfmDYtNUQ1dxp5f_vXvkdXndbZHhJF2plx4ALUk2z-17S6tUCzcGIlU1KB0W0ARR0m-Q8nBLyuIjT3tKoXY20LakL3bKtjrt5UlYUiV1RgXXeEmn3skW24WTlqhNKAqxqxZdMLRtKU8dPInq8c98ypR_gKl4ypRvMxVvMh1uMGWbM80YXhl3e6kVcszu194Ln4eF6YAoQxL77NrdtrTnqzpJdSVMIIYjsaBjiuQrbXWIWiKPvqH1-q8fAQAA__9OjRUr +Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJzsk8FOGzEQhu99itGcQDWwu1Q9WEKKGtIqtElQQFyqCDn2EKx47cX2ikQoj9UX6JNV3iWUBGhFe-ilPqw8M79_2_N57zDcGOR41vvS656D3BfwcTwagIT-cNgbw6A3_tSDk1F_CApGw0ZwBGp_igytUzQUJQXkXzHHCcPKO0khOJ9Sd42grxbIM4baVnVM6QlD6Twhv8OooyHkeC6mhsYkFPmDDBkqikKbxlZ25GU1pyUy7DpTlzZwEMjwrBJpuocMP19A1CVxyL5_C20snY1ko3b2Scm72wCKpFOkOBRtcrqMFMCTUBzy9_Chzc7Gp12QwpjwIKyE9mvhO2Q4uOh2IUSqQLraRtihRTzQNu5yyJqrtAKi-UuCUiygpNL5JQhjnBQxnStrzpBq4cZApLICpcMc6iBmtC5PRZTXFMDVsaojh2TXXG-dKHCyYthG960PUcwIef6IVf8YebZif4Yr38SlOuoJrul_XH-Lq9jClb-I6yel2jqvyJPaIDRJK38neYb5gPyMTpy25A-KTeaGruJOJ3-7e-T17LqdIsNRulInvQ9akKyf0v03rTzcamXxmpc_plA5G2i7pc_ulG3ttJenxpKaUQsquNpLOvVONto2HDVGTUJRiG21aIO-bUp52sGTKB9-3MdO-SucisdO-bZT8Uunww2nbPNME4ZXxt1eaoUcs_ux98xnPTAtELOQmn127W4b2_NllVp1JUwghgMxp2OK5EttdYhaIo--ptXqzY8AAAD__6sZKPs= statement ok RESET vectorize; RESET distsql @@ -218,6 +220,10 @@ ALTER TABLE tpar SPLIT AT VALUES(5) statement ok SELECT * FROM tpar +# We want to ensure that local spans are parallelized without the streamer. +statement ok +SET streamer_enabled = false + # Make sure that the scan actually gets parallelized. statement ok SET tracing = on; SELECT * FROM tpar WHERE a = 0 OR a = 10 @@ -244,6 +250,9 @@ querying next range at /Table/109/1/0/0 === SPAN START: kv.DistSender: sending partial batch === querying next range at /Table/109/1/10/0 +statement ok +RESET streamer_enabled + # Regression test for #46123 (rowexec.TableReader not implementing # execopnode.OpNode interface). statement ok diff --git a/pkg/sql/row/kv_batch_streamer.go b/pkg/sql/row/kv_batch_streamer.go index 7cf8965299db..261e13c99ab9 100644 --- a/pkg/sql/row/kv_batch_streamer.go +++ b/pkg/sql/row/kv_batch_streamer.go @@ -72,6 +72,7 @@ func (f *txnKVStreamer) SetupNextFetch( spans roachpb.Spans, spanIDs []int, bytesLimit rowinfra.BytesLimit, + // TODO(XXX): we probably will need to pay attention to the key limit. _ rowinfra.KeyLimit, _ bool, ) error { diff --git a/pkg/sql/rowexec/tablereader.go b/pkg/sql/rowexec/tablereader.go index b9d82c65fb99..4a033d6a5209 100644 --- a/pkg/sql/rowexec/tablereader.go +++ b/pkg/sql/rowexec/tablereader.go @@ -145,6 +145,7 @@ func newTableReader( return nil, err } + // TODO(XXX): use the streamer. var fetcher row.Fetcher if err := fetcher.Init( ctx,