diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index eeb990b86818..b712350b37f9 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -712,6 +712,7 @@ go_test( "//pkg/build/bazel", "//pkg/ccl/kvccl/kvtenantccl", "//pkg/clusterversion", + "//pkg/col/coldata", "//pkg/config", "//pkg/config/zonepb", "//pkg/gossip", diff --git a/pkg/sql/builtin_mem_usage_test.go b/pkg/sql/builtin_mem_usage_test.go index fc3885eb3182..a5245ec34e22 100644 --- a/pkg/sql/builtin_mem_usage_test.go +++ b/pkg/sql/builtin_mem_usage_test.go @@ -12,112 +12,132 @@ package sql import ( "context" - gosql "database/sql" + "strings" "testing" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/col/coldata" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/errors" - "github.com/lib/pq" ) -// rowSize is the length of the string present in each row of the table created -// by createTableWithLongStrings. -const rowSize = 50000 - -// numRows is the number of rows to insert in createTableWithLongStrings. -// numRows and rowSize were picked arbitrarily but so that rowSize * numRows > -// lowMemoryBudget, so that aggregating them all in a CONCAT_AGG or -// ARRAY_AGG will exhaust lowMemoryBudget. -const numRows = 100 +// TestAggregatesMonitorMemory verifies that the aggregates report their memory +// usage to the memory accounting system. This test works by blocking the query +// with the aggregate when it is in the "draining metadata" state in one +// goroutine and observing the memory monitoring system via +// crdb_internal.node_memory_monitors virtual table in another. +func TestAggregatesMonitorMemory(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) -// lowMemoryBudget is the memory budget used to test builtins are recording -// their memory use. The budget needs to be large enough to establish the -// initial database connection, but small enough to overflow easily. It's set -// to be comfortably large enough that the server can start up with a bit of -// extra space to overflow. -const lowMemoryBudget = rowSize*numRows - 1 + statements := []string{ + // By avoiding printing the aggregate results we prevent anything + // besides the aggregate itself from using a lot of memory. + `SELECT length(concat_agg(a)) FROM d.t`, + `SELECT array_length(array_agg(a), 1) FROM d.t`, + `SELECT json_typeof(json_agg(a)) FROM d.t`, + } -// createTableWithLongStrings creates a table with a modest number of long strings, -// with the intention of using them to exhaust a memory budget. -func createTableWithLongStrings(sqlDB *gosql.DB) error { + // blockMainCh is used to block the main goroutine until the worker + // goroutine is trapped by the callback. + blockMainCh := make(chan struct{}) + // blockWorkerCh is used to block the worker goroutine until the main + // goroutine checks the memory monitoring state. + blockWorkerCh := make(chan struct{}) + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SQLExecutor: &ExecutorTestingKnobs{ + DistSQLReceiverPushCallbackFactory: func(query string) func(rowenc.EncDatumRow, coldata.Batch, *execinfrapb.ProducerMetadata) { + var block bool + for _, testQuery := range statements { + block = block || query == testQuery + } + if !block { + return nil + } + var seenMeta bool + return func(_ rowenc.EncDatumRow, _ coldata.Batch, meta *execinfrapb.ProducerMetadata) { + if meta != nil && !seenMeta { + // If this is the first metadata object, then we + // know that the test query is almost done + // executing, so unblock the main goroutine and then + // wait for that goroutine to signal us to proceed. + blockMainCh <- struct{}{} + <-blockWorkerCh + seenMeta = true + } + } + }, + }, + }, + }) + defer s.Stopper().Stop(context.Background()) + + // Create a table with a modest number of long strings. if _, err := sqlDB.Exec(` CREATE DATABASE d; CREATE TABLE d.t (a STRING) `); err != nil { - return err + t.Fatal(err) } - + const numRows, rowSize = 100, 50000 for i := 0; i < numRows; i++ { if _, err := sqlDB.Exec(`INSERT INTO d.t VALUES (repeat('a', $1))`, rowSize); err != nil { - return err + t.Fatal(err) } } - return nil -} - -// TestConcatAggMonitorsMemory verifies that the aggregates incrementally -// record their memory usage as they build up their result. -func TestAggregatesMonitorMemory(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - // By avoiding printing the aggregate results we prevent anything - // besides the aggregate itself from being able to catch the - // large memory usage. - statements := []string{ - `SELECT length(concat_agg(a)) FROM d.t`, - `SELECT array_length(array_agg(a), 1) FROM d.t`, - `SELECT json_typeof(json_agg(A)) FROM d.t`, - } + const expectedMemUsage = numRows * rowSize for _, statement := range statements { - s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ - SQLMemoryPoolSize: lowMemoryBudget, - }) - - defer s.Stopper().Stop(context.Background()) - - if err := createTableWithLongStrings(sqlDB); err != nil { + errCh := make(chan error) + go func(statement string) { + dbConn := serverutils.OpenDBConn( + t, s.ServingSQLAddr(), "" /* useDatabase */, false /* insecure */, s.Stopper(), + ) + defer dbConn.Close() + _, err := dbConn.Exec(statement) + errCh <- err + }(statement) + // Block this goroutine until the worker is at the end of its query + // execution. + <-blockMainCh + // Now verify that we have at least one memory monitor that uses more + // than the expected memory usage. + rows, err := sqlDB.Query("SELECT name, used FROM crdb_internal.node_memory_monitors") + if err != nil { t.Fatal(err) } - - _, err := sqlDB.Exec(statement) - - if pqErr := (*pq.Error)(nil); !errors.As(err, &pqErr) || pgcode.MakeCode(string(pqErr.Code)) != pgcode.OutOfMemory { - t.Fatalf("Expected \"%s\" to consume too much memory", statement) - } - } -} - -func TestEvaluatedMemoryIsChecked(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - // We select the LENGTH here and elsewhere because if we passed the result of - // REPEAT up as a result, the memory error would be caught there even if - // REPEAT was not doing its accounting. - testData := []string{ - `SELECT length(repeat('abc', 70000000))`, - `SELECT crdb_internal.no_constant_folding(length(repeat('abc', 70000000)))`, - } - - for _, statement := range testData { - t.Run("", func(t *testing.T) { - s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ - SQLMemoryPoolSize: lowMemoryBudget, - }) - defer s.Stopper().Stop(context.Background()) - - _, err := sqlDB.Exec( - statement, - ) - if pqErr := (*pq.Error)(nil); !errors.As(err, &pqErr) || pgcode.MakeCode(string(pqErr.Code)) != pgcode.ProgramLimitExceeded { - t.Errorf(`expected %q to encounter "requested length too large" error, but it didn't`, statement) + var found bool + for rows.Next() { + var name string + var used int64 + if err = rows.Scan(&name, &used); err != nil { + t.Fatal(err) } - }) + log.Infof(context.Background(), "%s: %d", name, used) + // We are likely to not have a separate monitor for the aggregator, + // so instead we look at the flow monitor for the query. "Our" flow + // monitor could be uniquely identified by the FlowID, but we can't + // easily get that information here, so we just assume that if we + // find the monitor for some flow, and it has large enough memory + // usage, then this is "ours" (this assumption sounds reasonable + // since we don't expect internal queries to use this much memory). + if strings.HasPrefix(name, "flow") && used >= expectedMemUsage { + found = true + } + } + blockWorkerCh <- struct{}{} + if err = <-errCh; err != nil { + t.Fatal(err) + } + if err = rows.Err(); err != nil { + t.Fatal(err) + } + if !found { + t.Fatalf("didn't find a memory monitor with at least %d bytes used", expectedMemUsage) + } } } diff --git a/pkg/sql/crdb_internal_test.go b/pkg/sql/crdb_internal_test.go index 8541882e9fd6..db0327da2782 100644 --- a/pkg/sql/crdb_internal_test.go +++ b/pkg/sql/crdb_internal_test.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -1567,12 +1568,12 @@ func TestVirtualTableDoesntHangOnQueryCanceledError(t *testing.T) { ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ SQLExecutor: &sql.ExecutorTestingKnobs{ - DistSQLReceiverPushCallbackFactory: func(query string) func(rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { + DistSQLReceiverPushCallbackFactory: func(query string) func(rowenc.EncDatumRow, coldata.Batch, *execinfrapb.ProducerMetadata) { if !addCallback.Load() || strings.HasPrefix(query, sql.SystemJobsAndJobInfoBaseQuery) { return nil } numCallbacksAdded.Add(1) - return func(row rowenc.EncDatumRow, meta *execinfrapb.ProducerMetadata) { + return func(_ rowenc.EncDatumRow, _ coldata.Batch, meta *execinfrapb.ProducerMetadata) { if meta != nil { *meta = execinfrapb.ProducerMetadata{} meta.Err = err diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index d295b652954e..15f2f903c657 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -984,8 +984,8 @@ type DistSQLReceiver struct { testingKnobs struct { // pushCallback, if set, will be called every time DistSQLReceiver.Push - // is called, with the same arguments. - pushCallback func(rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) + // or DistSQLReceiver.PushBatch is called, with the same arguments. + pushCallback func(rowenc.EncDatumRow, coldata.Batch, *execinfrapb.ProducerMetadata) } } @@ -1413,7 +1413,7 @@ func (r *DistSQLReceiver) Push( ) execinfra.ConsumerStatus { r.checkConcurrentError() if r.testingKnobs.pushCallback != nil { - r.testingKnobs.pushCallback(row, meta) + r.testingKnobs.pushCallback(row, nil /* batch */, meta) } if meta != nil { return r.pushMeta(meta) @@ -1492,6 +1492,9 @@ func (r *DistSQLReceiver) PushBatch( batch coldata.Batch, meta *execinfrapb.ProducerMetadata, ) execinfra.ConsumerStatus { r.checkConcurrentError() + if r.testingKnobs.pushCallback != nil { + r.testingKnobs.pushCallback(nil /* row */, batch, meta) + } if meta != nil { return r.pushMeta(meta) } diff --git a/pkg/sql/distsql_running_test.go b/pkg/sql/distsql_running_test.go index edc76cee5cf9..0dcf5fe0d5dd 100644 --- a/pkg/sql/distsql_running_test.go +++ b/pkg/sql/distsql_running_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -598,11 +599,11 @@ func TestDistSQLReceiverDrainsMeta(t *testing.T) { UseDatabase: "test", Knobs: base.TestingKnobs{ SQLExecutor: &ExecutorTestingKnobs{ - DistSQLReceiverPushCallbackFactory: func(query string) func(rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { + DistSQLReceiverPushCallbackFactory: func(query string) func(rowenc.EncDatumRow, coldata.Batch, *execinfrapb.ProducerMetadata) { if query != testQuery { return nil } - return func(row rowenc.EncDatumRow, meta *execinfrapb.ProducerMetadata) { + return func(_ rowenc.EncDatumRow, _ coldata.Batch, meta *execinfrapb.ProducerMetadata) { if meta != nil { accumulatedMeta = append(accumulatedMeta, *meta) } diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index c52816787804..111f88e158b3 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1590,8 +1590,8 @@ type ExecutorTestingKnobs struct { // DistSQLReceiverPushCallbackFactory, if set, will be called every time a // DistSQLReceiver is created for a new query execution, and it should // return, possibly nil, a callback that will be called every time - // DistSQLReceiver.Push is called. - DistSQLReceiverPushCallbackFactory func(query string) func(rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) + // DistSQLReceiver.Push or DistSQLReceiver.PushBatch is called. + DistSQLReceiverPushCallbackFactory func(query string) func(rowenc.EncDatumRow, coldata.Batch, *execinfrapb.ProducerMetadata) // OnTxnRetry, if set, will be called if there is a transaction retry. OnTxnRetry func(autoRetryReason error, evalCtx *eval.Context) diff --git a/pkg/sql/importer/BUILD.bazel b/pkg/sql/importer/BUILD.bazel index 6d80bb46ed5c..b1cd98c87028 100644 --- a/pkg/sql/importer/BUILD.bazel +++ b/pkg/sql/importer/BUILD.bazel @@ -171,6 +171,7 @@ go_test( "//pkg/cloud/impl:cloudimpl", "//pkg/cloud/nodelocal", "//pkg/cloud/userfile", + "//pkg/col/coldata", "//pkg/config", "//pkg/config/zonepb", "//pkg/jobs", diff --git a/pkg/sql/importer/exportcsv_test.go b/pkg/sql/importer/exportcsv_test.go index 4c6e75d526f7..07ab7b8e4efc 100644 --- a/pkg/sql/importer/exportcsv_test.go +++ b/pkg/sql/importer/exportcsv_test.go @@ -28,6 +28,7 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/keys" @@ -623,9 +624,9 @@ func TestProcessorEncountersUncertaintyError(t *testing.T) { 0: { Knobs: base.TestingKnobs{ SQLExecutor: &sql.ExecutorTestingKnobs{ - DistSQLReceiverPushCallbackFactory: func(query string) func(rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { + DistSQLReceiverPushCallbackFactory: func(query string) func(rowenc.EncDatumRow, coldata.Batch, *execinfrapb.ProducerMetadata) { if strings.Contains(query, "EXPORT") { - return func(_ rowenc.EncDatumRow, meta *execinfrapb.ProducerMetadata) { + return func(_ rowenc.EncDatumRow, _ coldata.Batch, meta *execinfrapb.ProducerMetadata) { if meta != nil && meta.Err != nil { if testutils.IsError(meta.Err, "ReadWithinUncertaintyIntervalError") { close(gotRWUIOnGateway)