From 095b7a49dc0389af61f8161b86b41c451fdb96a8 Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Wed, 10 Feb 2021 14:55:02 -0500 Subject: [PATCH 1/2] logictest: deflake crdb_internal.gossip_liveness test The regex needs to also match `0,0` Release note: None --- pkg/sql/logictest/testdata/logic_test/crdb_internal | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/sql/logictest/testdata/logic_test/crdb_internal b/pkg/sql/logictest/testdata/logic_test/crdb_internal index 3778e86f77de..a0857029b673 100644 --- a/pkg/sql/logictest/testdata/logic_test/crdb_internal +++ b/pkg/sql/logictest/testdata/logic_test/crdb_internal @@ -354,9 +354,9 @@ node_id network address attrs locality server_version 1 tcp 127.0.0.1: [] region=test,dc=dc1 query ITTBBT colnames -SELECT node_id, regexp_replace(epoch::string, '^\d+$', '') as epoch, regexp_replace(expiration, '^\d+\.\d+,\d+$', '') as expiration, draining, decommissioning, membership FROM crdb_internal.gossip_liveness WHERE node_id = 1 +SELECT node_id, regexp_replace(epoch::string, '^\d+$', '') as epoch, regexp_replace(expiration, '^(\d+\.)?\d+,\d+$', '') as expiration, draining, decommissioning, membership FROM crdb_internal.gossip_liveness WHERE node_id = 1 ---- -node_id epoch expiration draining decommissioning membership +node_id epoch expiration draining decommissioning membership 1 false false active query ITTTTTT colnames From 1be6518f6fb8ede59c011be5f90caf68c18c841c Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Fri, 22 Jan 2021 21:01:57 -0800 Subject: [PATCH 2/2] sqlutil: remove Query and QueryEx from InternalExecutor interface All usages of `sqlutil.InternalExecutor.Query` and `sqlutil.InternalExecutor.QueryEx` have been refactored: most of the changes are taking advantage of the iterator API, in a couple of places we now use `Exec` and `ExecEx` methods since in those places we only need the number of affected rows. In a few places where we still need to buffer all rows, the caller currently performs the buffering. In the follow-up commit `QueryBuffered` and `QueryBufferedEx` will be added into the interface (essentially renaming `Query` and `QueryEx` to make it explicit that the buffering occurs), and the custom logic will be removed. Release note: None --- pkg/jobs/adopt.go | 35 +++++++++++---- pkg/jobs/deprecated.go | 13 +++++- pkg/jobs/registry.go | 45 ++++++++++++------- .../kvserver/protectedts/ptstorage/storage.go | 39 ++++++++++------ .../protectedts/ptstorage/storage_test.go | 28 ++++-------- .../reports/constraint_stats_report.go | 11 ++--- .../reports/critical_localities_report.go | 11 ++--- .../critical_localities_report_test.go | 15 ++++--- .../reports/replication_stats_report.go | 11 ++--- pkg/migration/migrationmanager/manager.go | 12 ++++- pkg/sql/catalog/lease/lease.go | 16 +++++-- pkg/sql/sem/tree/eval.go | 6 --- pkg/sql/sqlutil/internal_executor.go | 28 +----------- pkg/sql/stats/automatic_stats.go | 29 +++++++----- pkg/sql/stats/stats_cache.go | 20 +++++++-- .../stmtdiagnostics/statement_diagnostics.go | 11 ++++- 16 files changed, 201 insertions(+), 129 deletions(-) diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index fe84b8dfefb9..c0cf6db2bb47 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -46,7 +46,7 @@ const ( // available. func (r *Registry) claimJobs(ctx context.Context, s sqlliveness.Session) error { return r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - rows, err := r.ex.Query( + numRows, err := r.ex.Exec( ctx, "claim-jobs", txn, ` UPDATE system.jobs SET claim_session_id = $1, claim_instance_id = $2 @@ -60,8 +60,8 @@ RETURNING id;`, if err != nil { return errors.Wrap(err, "could not query jobs table") } - if log.ExpensiveLogEnabled(ctx, 1) || len(rows) > 0 { - log.Infof(ctx, "claimed %d jobs", len(rows)) + if log.ExpensiveLogEnabled(ctx, 1) || numRows > 0 { + log.Infof(ctx, "claimed %d jobs", numRows) } return nil }) @@ -69,7 +69,7 @@ RETURNING id;`, // processClaimedJobs processes all jobs currently claimed by the registry. func (r *Registry) processClaimedJobs(ctx context.Context, s sqlliveness.Session) error { - rows, err := r.ex.QueryEx( + it, err := r.ex.QueryIteratorEx( ctx, "select-running/get-claimed-jobs", nil, sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, ` SELECT id FROM system.jobs @@ -77,17 +77,22 @@ WHERE (status = $1 OR status = $2) AND (claim_session_id = $3 AND claim_instance StatusRunning, StatusReverting, s.ID().UnsafeBytes(), r.ID(), ) if err != nil { - return errors.Wrapf(err, "could query for claimed jobs") + return errors.Wrapf(err, "could not query for claimed jobs") } // This map will eventually contain the job ids that must be resumed. - claimedToResume := make(map[int64]struct{}, len(rows)) + claimedToResume := make(map[int64]struct{}) // Initially all claimed jobs are supposed to be resumed but some may be // running on this registry already so we will filter them out later. - for _, row := range rows { + var ok bool + for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { + row := it.Cur() id := int64(*row[0].(*tree.DInt)) claimedToResume[id] = struct{}{} } + if err != nil { + return errors.Wrapf(err, "could not query for claimed jobs") + } r.filterAlreadyRunningAndCancelFromPreviousSessions(ctx, s, claimedToResume) r.resumeClaimedJobs(ctx, s, claimedToResume) @@ -255,7 +260,7 @@ func (r *Registry) runJob( func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqlliveness.Session) error { return r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - rows, err := r.ex.QueryEx( + it, err := r.ex.QueryIteratorEx( ctx, "cancel/pause-requested", txn, sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, ` UPDATE system.jobs SET status = @@ -273,6 +278,20 @@ RETURNING id, status`, if err != nil { return errors.Wrap(err, "could not query jobs table") } + // Note that we have to buffer all rows first - before processing each + // job - because we have to make sure that the query executes without an + // error (otherwise, the system.jobs table might diverge from the jobs + // registry). + // TODO(yuzefovich): use QueryBufferedEx method once it is added to + // sqlutil.InternalExecutor interface. + var rows []tree.Datums + var ok bool + for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { + rows = append(rows, it.Cur()) + } + if err != nil { + return errors.Wrap(err, "could not query jobs table") + } for _, row := range rows { id := int64(*row[0].(*tree.DInt)) job := &Job{id: &id, registry: r} diff --git a/pkg/jobs/deprecated.go b/pkg/jobs/deprecated.go index 61802dea1f97..f0e52d13306c 100644 --- a/pkg/jobs/deprecated.go +++ b/pkg/jobs/deprecated.go @@ -65,7 +65,7 @@ func (r *Registry) deprecatedMaybeAdoptJob( SELECT id, payload, progress IS NULL, status FROM system.jobs WHERE status IN ($1, $2, $3, $4, $5) ORDER BY created DESC` - rows, err := r.ex.Query( + it, err := r.ex.QueryIterator( ctx, "adopt-job", nil /* txn */, stmt, StatusPending, StatusRunning, StatusCancelRequested, StatusPauseRequested, StatusReverting, ) @@ -73,6 +73,17 @@ WHERE status IN ($1, $2, $3, $4, $5) ORDER BY created DESC` return errors.Wrap(err, "failed querying for jobs") } + // TODO(yuzefovich): use QueryBuffered method once it is added to + // sqlutil.InternalExecutor interface. + var rows []tree.Datums + var ok bool + for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { + rows = append(rows, it.Cur()) + } + if err != nil { + return errors.Wrap(err, "failed querying for jobs") + } + if randomizeJobOrder { rand.Seed(timeutil.Now().UnixNano()) rand.Shuffle(len(rows), func(i, j int) { rows[i], rows[j] = rows[j], rows[i] }) diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 78663dc484ac..1d38bbd84c46 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -898,30 +898,31 @@ func (r *Registry) cleanupOldJobs(ctx context.Context, olderThan time.Time) erro // previous page). func (r *Registry) cleanupOldJobsPage( ctx context.Context, olderThan time.Time, minID int64, pageSize int, -) (done bool, maxID int64, _ error) { +) (done bool, maxID int64, retErr error) { const stmt = "SELECT id, payload, status, created FROM system.jobs " + "WHERE created < $1 AND id > $2 " + "ORDER BY id " + // the ordering is important as we keep track of the maximum ID we've seen "LIMIT $3" - rows, err := r.ex.Query(ctx, "gc-jobs", nil /* txn */, stmt, olderThan, minID, pageSize) + it, err := r.ex.QueryIterator(ctx, "gc-jobs", nil /* txn */, stmt, olderThan, minID, pageSize) if err != nil { return false, 0, err } - log.VEventf(ctx, 2, "read potentially expired jobs: %d", len(rows)) - - if len(rows) == 0 { - return true, 0, nil - } - // Track the highest ID we encounter, so it can serve as the bottom of the - // next page. - maxID = int64(*(rows[len(rows)-1][0].(*tree.DInt))) - // If we got as many rows as we asked for, there might be more. - morePages := len(rows) == pageSize - + // We have to make sure to close the iterator since we might return from the + // for loop early (before Next() returns false). + defer func() { + closeErr := it.Close() + if retErr == nil { + retErr = closeErr + } + }() toDelete := tree.NewDArray(types.Int) - toDelete.Array = make(tree.Datums, 0, len(rows)) oldMicros := timeutil.ToUnixMicros(olderThan) - for _, row := range rows { + + var ok bool + var numRows int + for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { + numRows++ + row := it.Cur() payload, err := UnmarshalPayload(row[1]) if err != nil { return false, 0, err @@ -941,6 +942,14 @@ func (r *Registry) cleanupOldJobsPage( toDelete.Array = append(toDelete.Array, row[0]) } } + if err != nil { + return false, 0, err + } + if numRows == 0 { + return true, 0, nil + } + + log.VEventf(ctx, 2, "read potentially expired jobs: %d", numRows) if len(toDelete.Array) > 0 { log.Infof(ctx, "cleaning up expired job records: %d", len(toDelete.Array)) const stmt = `DELETE FROM system.jobs WHERE id = ANY($1)` @@ -955,6 +964,12 @@ func (r *Registry) cleanupOldJobsPage( len(toDelete.Array), nDeleted) } } + // If we got as many rows as we asked for, there might be more. + morePages := numRows == pageSize + // Track the highest ID we encounter, so it can serve as the bottom of the + // next page. + lastRow := it.Cur() + maxID = int64(*(lastRow[0].(*tree.DInt))) return !morePages, maxID, nil } diff --git a/pkg/kv/kvserver/protectedts/ptstorage/storage.go b/pkg/kv/kvserver/protectedts/ptstorage/storage.go index 1acdf351615f..2cf23804ba8f 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/storage.go +++ b/pkg/kv/kvserver/protectedts/ptstorage/storage.go @@ -76,7 +76,7 @@ func (p *storage) Protect(ctx context.Context, txn *kv.Txn, r *ptpb.Record) erro meta = []byte{} } s := makeSettings(p.settings) - rows, err := p.ex.QueryEx(ctx, "protectedts-protect", txn, + it, err := p.ex.QueryIteratorEx(ctx, "protectedts-protect", txn, sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, protectQuery, s.maxSpans, s.maxBytes, len(r.Spans), @@ -86,7 +86,17 @@ func (p *storage) Protect(ctx context.Context, txn *kv.Txn, r *ptpb.Record) erro if err != nil { return errors.Wrapf(err, "failed to write record %v", r.ID) } - row := rows[0] + ok, err := it.Next(ctx) + if err != nil { + return errors.Wrapf(err, "failed to write record %v", r.ID) + } + if !ok { + return errors.Newf("failed to write record %v", r.ID) + } + row := it.Cur() + if err := it.Close(); err != nil { + log.Infof(ctx, "encountered %v when writing record %v", err, r.ID) + } if failed := *row[0].(*tree.DBool); failed { curNumSpans := int64(*row[1].(*tree.DInt)) if curNumSpans+int64(len(r.Spans)) > s.maxSpans { @@ -132,13 +142,13 @@ func (p *storage) MarkVerified(ctx context.Context, txn *kv.Txn, id uuid.UUID) e if txn == nil { return errNoTxn } - rows, err := p.ex.QueryEx(ctx, "protectedts-MarkVerified", txn, + numRows, err := p.ex.ExecEx(ctx, "protectedts-MarkVerified", txn, sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, markVerifiedQuery, id.GetBytesMut()) if err != nil { return errors.Wrapf(err, "failed to mark record %v as verified", id) } - if len(rows) == 0 { + if numRows == 0 { return protectedts.ErrNotExists } return nil @@ -148,13 +158,13 @@ func (p *storage) Release(ctx context.Context, txn *kv.Txn, id uuid.UUID) error if txn == nil { return errNoTxn } - rows, err := p.ex.QueryEx(ctx, "protectedts-Release", txn, + numRows, err := p.ex.ExecEx(ctx, "protectedts-Release", txn, sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, releaseQuery, id.GetBytesMut()) if err != nil { return errors.Wrapf(err, "failed to release record %v", id) } - if len(rows) == 0 { + if numRows == 0 { return protectedts.ErrNotExists } return nil @@ -200,20 +210,23 @@ func (p *storage) GetState(ctx context.Context, txn *kv.Txn) (ptpb.State, error) } func (p *storage) getRecords(ctx context.Context, txn *kv.Txn) ([]ptpb.Record, error) { - rows, err := p.ex.QueryEx(ctx, "protectedts-GetRecords", txn, + it, err := p.ex.QueryIteratorEx(ctx, "protectedts-GetRecords", txn, sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, getRecordsQuery) if err != nil { return nil, errors.Wrap(err, "failed to read records") } - if len(rows) == 0 { - return nil, nil - } - records := make([]ptpb.Record, len(rows)) - for i, row := range rows { - if err := rowToRecord(ctx, row, &records[i]); err != nil { + var ok bool + var records []ptpb.Record + for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { + var record ptpb.Record + if err := rowToRecord(ctx, it.Cur(), &record); err != nil { log.Errorf(ctx, "failed to parse row as record: %v", err) } + records = append(records, record) + } + if err != nil { + return nil, errors.Wrap(err, "failed to read records") } return records, nil } diff --git a/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go b/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go index db135d691214..6c6502c97b38 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go +++ b/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go @@ -653,23 +653,12 @@ func (ie *wrappedInternalExecutor) ExecEx( stmt string, qargs ...interface{}, ) (int, error) { - panic("unimplemented") -} - -func (ie *wrappedInternalExecutor) QueryEx( - ctx context.Context, - opName string, - txn *kv.Txn, - session sessiondata.InternalExecutorOverride, - stmt string, - qargs ...interface{}, -) ([]tree.Datums, error) { if f := ie.getErrFunc(); f != nil { if err := f(stmt); err != nil { - return nil, err + return 0, err } } - return ie.wrapped.QueryEx(ctx, opName, txn, session, stmt, qargs...) + return ie.wrapped.ExecEx(ctx, opName, txn, o, stmt, qargs...) } func (ie *wrappedInternalExecutor) QueryWithCols( @@ -699,12 +688,6 @@ func (ie *wrappedInternalExecutor) QueryRowEx( return ie.wrapped.QueryRowEx(ctx, opName, txn, session, stmt, qargs...) } -func (ie *wrappedInternalExecutor) Query( - ctx context.Context, opName string, txn *kv.Txn, statement string, params ...interface{}, -) ([]tree.Datums, error) { - panic("not implemented") -} - func (ie *wrappedInternalExecutor) QueryRow( ctx context.Context, opName string, txn *kv.Txn, statement string, qargs ...interface{}, ) (tree.Datums, error) { @@ -725,7 +708,12 @@ func (ie *wrappedInternalExecutor) QueryIteratorEx( stmt string, qargs ...interface{}, ) (sqlutil.InternalRows, error) { - panic("not implemented") + if f := ie.getErrFunc(); f != nil { + if err := f(stmt); err != nil { + return nil, err + } + } + return ie.wrapped.QueryIteratorEx(ctx, opName, txn, session, stmt, qargs...) } func (ie *wrappedInternalExecutor) getErrFunc() func(statement string) error { diff --git a/pkg/kv/kvserver/reports/constraint_stats_report.go b/pkg/kv/kvserver/reports/constraint_stats_report.go index d885b3ee79f0..ca6f4a834ecd 100644 --- a/pkg/kv/kvserver/reports/constraint_stats_report.go +++ b/pkg/kv/kvserver/reports/constraint_stats_report.go @@ -173,15 +173,17 @@ func (r *replicationConstraintStatsReportSaver) loadPreviousVersion( } const prevViolations = "select zone_id, subzone_id, type, config, " + "violating_ranges from system.replication_constraint_stats" - rows, err := ex.Query( + it, err := ex.QueryIterator( ctx, "get-previous-replication-constraint-stats", txn, prevViolations, ) if err != nil { return err } - r.previousVersion = make(ConstraintReport, len(rows)) - for _, row := range rows { + r.previousVersion = make(ConstraintReport) + var ok bool + for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { + row := it.Cur() key := ConstraintStatusKey{} key.ZoneID = (config.SystemTenantObjectID)(*row[0].(*tree.DInt)) key.SubzoneID = base.SubzoneID((*row[1].(*tree.DInt))) @@ -189,8 +191,7 @@ func (r *replicationConstraintStatsReportSaver) loadPreviousVersion( key.Constraint = (ConstraintRepr)(*row[3].(*tree.DString)) r.previousVersion[key] = ConstraintStatus{(int)(*row[4].(*tree.DInt))} } - - return nil + return err } func (r *replicationConstraintStatsReportSaver) updateTimestamp( diff --git a/pkg/kv/kvserver/reports/critical_localities_report.go b/pkg/kv/kvserver/reports/critical_localities_report.go index f64bb15d7983..1fda6345f8ab 100644 --- a/pkg/kv/kvserver/reports/critical_localities_report.go +++ b/pkg/kv/kvserver/reports/critical_localities_report.go @@ -103,23 +103,24 @@ func (r *replicationCriticalLocalitiesReportSaver) loadPreviousVersion( } const prevViolations = "select zone_id, subzone_id, locality, at_risk_ranges " + "from system.replication_critical_localities" - rows, err := ex.Query( + it, err := ex.QueryIterator( ctx, "get-previous-replication-critical-localities", txn, prevViolations, ) if err != nil { return err } - r.previousVersion = make(LocalityReport, len(rows)) - for _, row := range rows { + r.previousVersion = make(LocalityReport) + var ok bool + for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { + row := it.Cur() key := localityKey{} key.ZoneID = (config.SystemTenantObjectID)(*row[0].(*tree.DInt)) key.SubzoneID = base.SubzoneID(*row[1].(*tree.DInt)) key.locality = (LocalityRepr)(*row[2].(*tree.DString)) r.previousVersion[key] = localityStatus{(int32)(*row[3].(*tree.DInt))} } - - return nil + return err } func (r *replicationCriticalLocalitiesReportSaver) updateTimestamp( diff --git a/pkg/kv/kvserver/reports/critical_localities_report_test.go b/pkg/kv/kvserver/reports/critical_localities_report_test.go index 0fd5c3bee455..fcf3efb9bfed 100644 --- a/pkg/kv/kvserver/reports/critical_localities_report_test.go +++ b/pkg/kv/kvserver/reports/critical_localities_report_test.go @@ -146,17 +146,22 @@ func TestLocalityReport(t *testing.T) { func TableData( ctx context.Context, tableName string, executor sqlutil.InternalExecutor, ) [][]string { - if rows, err := executor.Query( - ctx, "test-select-"+tableName, nil /* txn */, "select * from "+tableName); err == nil { - result := make([][]string, 0, len(rows)) - for _, row := range rows { + if it, err := executor.QueryIterator( + ctx, "test-select-"+tableName, nil /* txn */, "select * from "+tableName, + ); err == nil { + var result [][]string + var ok bool + for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { + row := it.Cur() stringRow := make([]string, 0, row.Len()) for _, item := range row { stringRow = append(stringRow, item.String()) } result = append(result, stringRow) } - return result + if err == nil { + return result + } } return nil } diff --git a/pkg/kv/kvserver/reports/replication_stats_report.go b/pkg/kv/kvserver/reports/replication_stats_report.go index 8124294d5389..3015de4ead7b 100644 --- a/pkg/kv/kvserver/reports/replication_stats_report.go +++ b/pkg/kv/kvserver/reports/replication_stats_report.go @@ -111,15 +111,17 @@ func (r *replicationStatsReportSaver) loadPreviousVersion( const prevViolations = "select zone_id, subzone_id, total_ranges, " + "unavailable_ranges, under_replicated_ranges, over_replicated_ranges " + "from system.replication_stats" - rows, err := ex.Query( + it, err := ex.QueryIterator( ctx, "get-previous-replication-stats", txn, prevViolations, ) if err != nil { return err } - r.previousVersion = make(RangeReport, len(rows)) - for _, row := range rows { + r.previousVersion = make(RangeReport) + var ok bool + for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { + row := it.Cur() key := ZoneKey{} key.ZoneID = (config.SystemTenantObjectID)(*row[0].(*tree.DInt)) key.SubzoneID = base.SubzoneID(*row[1].(*tree.DInt)) @@ -130,8 +132,7 @@ func (r *replicationStatsReportSaver) loadPreviousVersion( (int32)(*row[5].(*tree.DInt)), } } - - return nil + return err } func (r *replicationStatsReportSaver) updateTimestamp( diff --git a/pkg/migration/migrationmanager/manager.go b/pkg/migration/migrationmanager/manager.go index 893ce37a8d41..af23abca15df 100644 --- a/pkg/migration/migrationmanager/manager.go +++ b/pkg/migration/migrationmanager/manager.go @@ -312,7 +312,17 @@ SELECT id, status if err != nil { return false, 0, errors.Wrap(err, "failed to marshal version to JSON") } - rows, err := m.ie.Query(ctx, "migration-manager-find-jobs", txn, query, jsonMsg.String()) + // TODO(yuzefovich): use QueryBuffered method once it is added to + // sqlutil.InternalExecutor interface. + it, err := m.ie.QueryIterator(ctx, "migration-manager-find-jobs", txn, query, jsonMsg.String()) + if err != nil { + return false, 0, err + } + var rows []tree.Datums + var ok bool + for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { + rows = append(rows, it.Cur()) + } if err != nil { return false, 0, err } diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index af963322c73b..78568969a043 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -2139,9 +2139,19 @@ SELECT "descID", version, expiration FROM system.public.lease AS OF SYSTEM TIME retryOptions.Closer = m.stopper.ShouldQuiesce() // The retry is required because of errors caused by node restarts. Retry 30 times. if err := retry.WithMaxAttempts(ctx, retryOptions, 30, func() error { - var err error - rows, err = m.storage.internalExecutor.Query( - ctx, "read orphaned leases", nil /*txn*/, sqlQuery) + it, err := m.storage.internalExecutor.QueryIterator( + ctx, "read orphaned leases", nil /*txn*/, sqlQuery, + ) + if err != nil { + return err + } + rows = rows[:0] + // TODO(yuzefovich): use QueryBuffered method once it is added to + // sqlutil.InternalExecutor interface. + var ok bool + for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { + rows = append(rows, it.Cur()) + } return err }); err != nil { log.Warningf(ctx, "unable to read orphaned leases: %+v", err) diff --git a/pkg/sql/sem/tree/eval.go b/pkg/sql/sem/tree/eval.go index a3174c727811..34431f70499a 100644 --- a/pkg/sql/sem/tree/eval.go +++ b/pkg/sql/sem/tree/eval.go @@ -3136,12 +3136,6 @@ type ClientNoticeSender interface { // this to sqlutil.InternalExecutor or sql.InternalExecutor, and use the // alternatives. type InternalExecutor interface { - // Query is part of the sqlutil.InternalExecutor interface. - Query( - ctx context.Context, opName string, txn *kv.Txn, - stmt string, qargs ...interface{}, - ) ([]Datums, error) - // QueryRow is part of the sqlutil.InternalExecutor interface. QueryRow( ctx context.Context, opName string, txn *kv.Txn, stmt string, qargs ...interface{}, diff --git a/pkg/sql/sqlutil/internal_executor.go b/pkg/sql/sqlutil/internal_executor.go index ecdfaf864d31..32c07157a323 100644 --- a/pkg/sql/sqlutil/internal_executor.go +++ b/pkg/sql/sqlutil/internal_executor.go @@ -50,34 +50,10 @@ type InternalExecutor interface { qargs ...interface{}, ) (int, error) - // Query executes the supplied SQL statement and returns the resulting rows. - // If no user has been previously set through SetSessionData, the statement is - // executed as the root user. + // QueryWithCols executes the supplied SQL statement and returns the + // resulting rows as well as the computed ResultColumns of the input query. // // If txn is not nil, the statement will be executed in the respective txn. - // - // Query is deprecated because it may transparently execute a query as root. Use - // QueryEx instead. - Query( - ctx context.Context, opName string, txn *kv.Txn, statement string, qargs ...interface{}, - ) ([]tree.Datums, error) - - // QueryEx is like Query, but allows the caller to override some session data - // fields. - // - // The fields set in session that are set override the respective fields if - // they have previously been set through SetSessionData(). - QueryEx( - ctx context.Context, - opName string, - txn *kv.Txn, - session sessiondata.InternalExecutorOverride, - stmt string, - qargs ...interface{}, - ) ([]tree.Datums, error) - - // QueryWithCols is like QueryEx, but it also returns the computed ResultColumns - // of the input query. QueryWithCols( ctx context.Context, opName string, txn *kv.Txn, o sessiondata.InternalExecutorOverride, statement string, qargs ...interface{}, diff --git a/pkg/sql/stats/automatic_stats.go b/pkg/sql/stats/automatic_stats.go index 8b011bd6fb64..2bbd5b7362d3 100644 --- a/pkg/sql/stats/automatic_stats.go +++ b/pkg/sql/stats/automatic_stats.go @@ -333,25 +333,34 @@ AND drop_time IS NULL `, initialTableCollectionDelay) - rows, err := r.ex.Query( + it, err := r.ex.QueryIterator( ctx, "get-tables", nil, /* txn */ getAllTablesQuery, ) + if err == nil { + var ok bool + for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { + row := it.Cur() + tableID := descpb.ID(*row[0].(*tree.DInt)) + // Don't create statistics for system tables or virtual tables. + // TODO(rytaft): Don't add views here either. Unfortunately views are not + // identified differently from tables in crdb_internal.tables. + if !descpb.IsReservedID(tableID) && !descpb.IsVirtualTable(tableID) { + r.mutationCounts[tableID] += 0 + } + } + } if err != nil { + // Note that it is ok if the iterator returned partial results before + // encountering an error - in that case we added entries to + // r.mutationCounts for some of the tables and operation of adding an + // entry is idempotent (i.e. we didn't mess up anything for the next + // call to this method). log.Errorf(ctx, "failed to get tables for automatic stats: %v", err) return } - for _, row := range rows { - tableID := descpb.ID(*row[0].(*tree.DInt)) - // Don't create statistics for system tables or virtual tables. - // TODO(rytaft): Don't add views here either. Unfortunately views are not - // identified differently from tables in crdb_internal.tables. - if !descpb.IsReservedID(tableID) && !descpb.IsVirtualTable(tableID) { - r.mutationCounts[tableID] += 0 - } - } } // NotifyMutation is called by SQL mutation operations to signal to the diff --git a/pkg/sql/stats/stats_cache.go b/pkg/sql/stats/stats_cache.go index f23d5bb304dd..621be40a1f7b 100644 --- a/pkg/sql/stats/stats_cache.go +++ b/pkg/sql/stats/stats_cache.go @@ -483,7 +483,7 @@ func (sc *TableStatisticsCache) parseStats( // for the given table ID. func (sc *TableStatisticsCache) getTableStatsFromDB( ctx context.Context, tableID descpb.ID, -) ([]*TableStatistic, error) { +) (_ []*TableStatistic, retErr error) { const getTableStatisticsStmt = ` SELECT "tableID", @@ -499,21 +499,33 @@ FROM system.table_statistics WHERE "tableID" = $1 ORDER BY "createdAt" DESC ` - rows, err := sc.SQLExecutor.Query( + it, err := sc.SQLExecutor.QueryIterator( ctx, "get-table-statistics", nil /* txn */, getTableStatisticsStmt, tableID, ) if err != nil { return nil, err } + // We have to make sure to close the iterator since we might return from the + // for loop early (before Next() returns false). + defer func() { + closeErr := it.Close() + if retErr == nil { + retErr = closeErr + } + }() var statsList []*TableStatistic - for _, row := range rows { - stats, err := sc.parseStats(ctx, row) + var ok bool + for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { + stats, err := sc.parseStats(ctx, it.Cur()) if err != nil { return nil, err } statsList = append(statsList, stats) } + if err != nil { + return nil, err + } return statsList, nil } diff --git a/pkg/sql/stmtdiagnostics/statement_diagnostics.go b/pkg/sql/stmtdiagnostics/statement_diagnostics.go index 96b1255f761e..5e7fd871839a 100644 --- a/pkg/sql/stmtdiagnostics/statement_diagnostics.go +++ b/pkg/sql/stmtdiagnostics/statement_diagnostics.go @@ -455,8 +455,7 @@ func (r *Registry) pollRequests(ctx context.Context) error { epoch := r.mu.epoch r.mu.Unlock() - var err error - rows, err = r.ie.QueryEx(ctx, "stmt-diag-poll", nil, /* txn */ + it, err := r.ie.QueryIteratorEx(ctx, "stmt-diag-poll", nil, /* txn */ sessiondata.InternalExecutorOverride{ User: security.RootUserName(), }, @@ -465,6 +464,14 @@ func (r *Registry) pollRequests(ctx context.Context) error { if err != nil { return err } + rows = rows[:0] + var ok bool + for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { + rows = append(rows, it.Cur()) + } + if err != nil { + return err + } r.mu.Lock() // If the epoch changed it means that a request was added to the registry