Skip to content

Commit

Permalink
Merge #60428 #60443
Browse files Browse the repository at this point in the history
60428: sqlutil: remove Query and QueryEx from InternalExecutor interface r=yuzefovich a=yuzefovich

All usages of `sqlutil.InternalExecutor.Query` and
`sqlutil.InternalExecutor.QueryEx` have been refactored: most of
the changes are taking advantage of the iterator API, in a couple
of places we now use `Exec` and `ExecEx` methods since in those
places we only need the number of affected rows.

In a few places where we still need to buffer all rows, the caller
currently performs the buffering. In the follow-up commit
`QueryBuffered` and `QueryBufferedEx` will be added into the interface
(essentially renaming `Query` and `QueryEx` to make it explicit that the
buffering occurs), and the custom logic will be removed.

Addresses: #48595.

Release note: None

60443: logictest: deflake crdb_internal.gossip_liveness test r=otan a=rafiss

The regex needs to also match `0,0`

Release note: None

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
Co-authored-by: Rafi Shamim <rafi@cockroachlabs.com>
  • Loading branch information
3 people committed Feb 10, 2021
3 parents e4b1c5c + 1be6518 + 095b7a4 commit 0ee3587
Show file tree
Hide file tree
Showing 17 changed files with 203 additions and 131 deletions.
35 changes: 27 additions & 8 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ const (
// available.
func (r *Registry) claimJobs(ctx context.Context, s sqlliveness.Session) error {
return r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
rows, err := r.ex.Query(
numRows, err := r.ex.Exec(
ctx, "claim-jobs", txn, `
UPDATE system.jobs
SET claim_session_id = $1, claim_instance_id = $2
Expand All @@ -60,34 +60,39 @@ RETURNING id;`,
if err != nil {
return errors.Wrap(err, "could not query jobs table")
}
if log.ExpensiveLogEnabled(ctx, 1) || len(rows) > 0 {
log.Infof(ctx, "claimed %d jobs", len(rows))
if log.ExpensiveLogEnabled(ctx, 1) || numRows > 0 {
log.Infof(ctx, "claimed %d jobs", numRows)
}
return nil
})
}

// processClaimedJobs processes all jobs currently claimed by the registry.
func (r *Registry) processClaimedJobs(ctx context.Context, s sqlliveness.Session) error {
rows, err := r.ex.QueryEx(
it, err := r.ex.QueryIteratorEx(
ctx, "select-running/get-claimed-jobs", nil,
sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, `
SELECT id FROM system.jobs
WHERE (status = $1 OR status = $2) AND (claim_session_id = $3 AND claim_instance_id = $4)`,
StatusRunning, StatusReverting, s.ID().UnsafeBytes(), r.ID(),
)
if err != nil {
return errors.Wrapf(err, "could query for claimed jobs")
return errors.Wrapf(err, "could not query for claimed jobs")
}

// This map will eventually contain the job ids that must be resumed.
claimedToResume := make(map[int64]struct{}, len(rows))
claimedToResume := make(map[int64]struct{})
// Initially all claimed jobs are supposed to be resumed but some may be
// running on this registry already so we will filter them out later.
for _, row := range rows {
var ok bool
for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) {
row := it.Cur()
id := int64(*row[0].(*tree.DInt))
claimedToResume[id] = struct{}{}
}
if err != nil {
return errors.Wrapf(err, "could not query for claimed jobs")
}

r.filterAlreadyRunningAndCancelFromPreviousSessions(ctx, s, claimedToResume)
r.resumeClaimedJobs(ctx, s, claimedToResume)
Expand Down Expand Up @@ -255,7 +260,7 @@ func (r *Registry) runJob(

func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqlliveness.Session) error {
return r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
rows, err := r.ex.QueryEx(
it, err := r.ex.QueryIteratorEx(
ctx, "cancel/pause-requested", txn, sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, `
UPDATE system.jobs
SET status =
Expand All @@ -273,6 +278,20 @@ RETURNING id, status`,
if err != nil {
return errors.Wrap(err, "could not query jobs table")
}
// Note that we have to buffer all rows first - before processing each
// job - because we have to make sure that the query executes without an
// error (otherwise, the system.jobs table might diverge from the jobs
// registry).
// TODO(yuzefovich): use QueryBufferedEx method once it is added to
// sqlutil.InternalExecutor interface.
var rows []tree.Datums
var ok bool
for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) {
rows = append(rows, it.Cur())
}
if err != nil {
return errors.Wrap(err, "could not query jobs table")
}
for _, row := range rows {
id := int64(*row[0].(*tree.DInt))
job := &Job{id: &id, registry: r}
Expand Down
13 changes: 12 additions & 1 deletion pkg/jobs/deprecated.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,25 @@ func (r *Registry) deprecatedMaybeAdoptJob(
SELECT id, payload, progress IS NULL, status
FROM system.jobs
WHERE status IN ($1, $2, $3, $4, $5) ORDER BY created DESC`
rows, err := r.ex.Query(
it, err := r.ex.QueryIterator(
ctx, "adopt-job", nil /* txn */, stmt,
StatusPending, StatusRunning, StatusCancelRequested, StatusPauseRequested, StatusReverting,
)
if err != nil {
return errors.Wrap(err, "failed querying for jobs")
}

// TODO(yuzefovich): use QueryBuffered method once it is added to
// sqlutil.InternalExecutor interface.
var rows []tree.Datums
var ok bool
for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) {
rows = append(rows, it.Cur())
}
if err != nil {
return errors.Wrap(err, "failed querying for jobs")
}

if randomizeJobOrder {
rand.Seed(timeutil.Now().UnixNano())
rand.Shuffle(len(rows), func(i, j int) { rows[i], rows[j] = rows[j], rows[i] })
Expand Down
45 changes: 30 additions & 15 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,30 +898,31 @@ func (r *Registry) cleanupOldJobs(ctx context.Context, olderThan time.Time) erro
// previous page).
func (r *Registry) cleanupOldJobsPage(
ctx context.Context, olderThan time.Time, minID int64, pageSize int,
) (done bool, maxID int64, _ error) {
) (done bool, maxID int64, retErr error) {
const stmt = "SELECT id, payload, status, created FROM system.jobs " +
"WHERE created < $1 AND id > $2 " +
"ORDER BY id " + // the ordering is important as we keep track of the maximum ID we've seen
"LIMIT $3"
rows, err := r.ex.Query(ctx, "gc-jobs", nil /* txn */, stmt, olderThan, minID, pageSize)
it, err := r.ex.QueryIterator(ctx, "gc-jobs", nil /* txn */, stmt, olderThan, minID, pageSize)
if err != nil {
return false, 0, err
}
log.VEventf(ctx, 2, "read potentially expired jobs: %d", len(rows))

if len(rows) == 0 {
return true, 0, nil
}
// Track the highest ID we encounter, so it can serve as the bottom of the
// next page.
maxID = int64(*(rows[len(rows)-1][0].(*tree.DInt)))
// If we got as many rows as we asked for, there might be more.
morePages := len(rows) == pageSize

// We have to make sure to close the iterator since we might return from the
// for loop early (before Next() returns false).
defer func() {
closeErr := it.Close()
if retErr == nil {
retErr = closeErr
}
}()
toDelete := tree.NewDArray(types.Int)
toDelete.Array = make(tree.Datums, 0, len(rows))
oldMicros := timeutil.ToUnixMicros(olderThan)
for _, row := range rows {

var ok bool
var numRows int
for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) {
numRows++
row := it.Cur()
payload, err := UnmarshalPayload(row[1])
if err != nil {
return false, 0, err
Expand All @@ -941,6 +942,14 @@ func (r *Registry) cleanupOldJobsPage(
toDelete.Array = append(toDelete.Array, row[0])
}
}
if err != nil {
return false, 0, err
}
if numRows == 0 {
return true, 0, nil
}

log.VEventf(ctx, 2, "read potentially expired jobs: %d", numRows)
if len(toDelete.Array) > 0 {
log.Infof(ctx, "cleaning up expired job records: %d", len(toDelete.Array))
const stmt = `DELETE FROM system.jobs WHERE id = ANY($1)`
Expand All @@ -955,6 +964,12 @@ func (r *Registry) cleanupOldJobsPage(
len(toDelete.Array), nDeleted)
}
}
// If we got as many rows as we asked for, there might be more.
morePages := numRows == pageSize
// Track the highest ID we encounter, so it can serve as the bottom of the
// next page.
lastRow := it.Cur()
maxID = int64(*(lastRow[0].(*tree.DInt)))
return !morePages, maxID, nil
}

Expand Down
39 changes: 26 additions & 13 deletions pkg/kv/kvserver/protectedts/ptstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (p *storage) Protect(ctx context.Context, txn *kv.Txn, r *ptpb.Record) erro
meta = []byte{}
}
s := makeSettings(p.settings)
rows, err := p.ex.QueryEx(ctx, "protectedts-protect", txn,
it, err := p.ex.QueryIteratorEx(ctx, "protectedts-protect", txn,
sessiondata.InternalExecutorOverride{User: security.NodeUserName()},
protectQuery,
s.maxSpans, s.maxBytes, len(r.Spans),
Expand All @@ -86,7 +86,17 @@ func (p *storage) Protect(ctx context.Context, txn *kv.Txn, r *ptpb.Record) erro
if err != nil {
return errors.Wrapf(err, "failed to write record %v", r.ID)
}
row := rows[0]
ok, err := it.Next(ctx)
if err != nil {
return errors.Wrapf(err, "failed to write record %v", r.ID)
}
if !ok {
return errors.Newf("failed to write record %v", r.ID)
}
row := it.Cur()
if err := it.Close(); err != nil {
log.Infof(ctx, "encountered %v when writing record %v", err, r.ID)
}
if failed := *row[0].(*tree.DBool); failed {
curNumSpans := int64(*row[1].(*tree.DInt))
if curNumSpans+int64(len(r.Spans)) > s.maxSpans {
Expand Down Expand Up @@ -132,13 +142,13 @@ func (p *storage) MarkVerified(ctx context.Context, txn *kv.Txn, id uuid.UUID) e
if txn == nil {
return errNoTxn
}
rows, err := p.ex.QueryEx(ctx, "protectedts-MarkVerified", txn,
numRows, err := p.ex.ExecEx(ctx, "protectedts-MarkVerified", txn,
sessiondata.InternalExecutorOverride{User: security.NodeUserName()},
markVerifiedQuery, id.GetBytesMut())
if err != nil {
return errors.Wrapf(err, "failed to mark record %v as verified", id)
}
if len(rows) == 0 {
if numRows == 0 {
return protectedts.ErrNotExists
}
return nil
Expand All @@ -148,13 +158,13 @@ func (p *storage) Release(ctx context.Context, txn *kv.Txn, id uuid.UUID) error
if txn == nil {
return errNoTxn
}
rows, err := p.ex.QueryEx(ctx, "protectedts-Release", txn,
numRows, err := p.ex.ExecEx(ctx, "protectedts-Release", txn,
sessiondata.InternalExecutorOverride{User: security.NodeUserName()},
releaseQuery, id.GetBytesMut())
if err != nil {
return errors.Wrapf(err, "failed to release record %v", id)
}
if len(rows) == 0 {
if numRows == 0 {
return protectedts.ErrNotExists
}
return nil
Expand Down Expand Up @@ -200,20 +210,23 @@ func (p *storage) GetState(ctx context.Context, txn *kv.Txn) (ptpb.State, error)
}

func (p *storage) getRecords(ctx context.Context, txn *kv.Txn) ([]ptpb.Record, error) {
rows, err := p.ex.QueryEx(ctx, "protectedts-GetRecords", txn,
it, err := p.ex.QueryIteratorEx(ctx, "protectedts-GetRecords", txn,
sessiondata.InternalExecutorOverride{User: security.NodeUserName()},
getRecordsQuery)
if err != nil {
return nil, errors.Wrap(err, "failed to read records")
}
if len(rows) == 0 {
return nil, nil
}
records := make([]ptpb.Record, len(rows))
for i, row := range rows {
if err := rowToRecord(ctx, row, &records[i]); err != nil {
var ok bool
var records []ptpb.Record
for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) {
var record ptpb.Record
if err := rowToRecord(ctx, it.Cur(), &record); err != nil {
log.Errorf(ctx, "failed to parse row as record: %v", err)
}
records = append(records, record)
}
if err != nil {
return nil, errors.Wrap(err, "failed to read records")
}
return records, nil
}
Expand Down
28 changes: 8 additions & 20 deletions pkg/kv/kvserver/protectedts/ptstorage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,23 +653,12 @@ func (ie *wrappedInternalExecutor) ExecEx(
stmt string,
qargs ...interface{},
) (int, error) {
panic("unimplemented")
}

func (ie *wrappedInternalExecutor) QueryEx(
ctx context.Context,
opName string,
txn *kv.Txn,
session sessiondata.InternalExecutorOverride,
stmt string,
qargs ...interface{},
) ([]tree.Datums, error) {
if f := ie.getErrFunc(); f != nil {
if err := f(stmt); err != nil {
return nil, err
return 0, err
}
}
return ie.wrapped.QueryEx(ctx, opName, txn, session, stmt, qargs...)
return ie.wrapped.ExecEx(ctx, opName, txn, o, stmt, qargs...)
}

func (ie *wrappedInternalExecutor) QueryWithCols(
Expand Down Expand Up @@ -699,12 +688,6 @@ func (ie *wrappedInternalExecutor) QueryRowEx(
return ie.wrapped.QueryRowEx(ctx, opName, txn, session, stmt, qargs...)
}

func (ie *wrappedInternalExecutor) Query(
ctx context.Context, opName string, txn *kv.Txn, statement string, params ...interface{},
) ([]tree.Datums, error) {
panic("not implemented")
}

func (ie *wrappedInternalExecutor) QueryRow(
ctx context.Context, opName string, txn *kv.Txn, statement string, qargs ...interface{},
) (tree.Datums, error) {
Expand All @@ -725,7 +708,12 @@ func (ie *wrappedInternalExecutor) QueryIteratorEx(
stmt string,
qargs ...interface{},
) (sqlutil.InternalRows, error) {
panic("not implemented")
if f := ie.getErrFunc(); f != nil {
if err := f(stmt); err != nil {
return nil, err
}
}
return ie.wrapped.QueryIteratorEx(ctx, opName, txn, session, stmt, qargs...)
}

func (ie *wrappedInternalExecutor) getErrFunc() func(statement string) error {
Expand Down
11 changes: 6 additions & 5 deletions pkg/kv/kvserver/reports/constraint_stats_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,24 +173,25 @@ func (r *replicationConstraintStatsReportSaver) loadPreviousVersion(
}
const prevViolations = "select zone_id, subzone_id, type, config, " +
"violating_ranges from system.replication_constraint_stats"
rows, err := ex.Query(
it, err := ex.QueryIterator(
ctx, "get-previous-replication-constraint-stats", txn, prevViolations,
)
if err != nil {
return err
}

r.previousVersion = make(ConstraintReport, len(rows))
for _, row := range rows {
r.previousVersion = make(ConstraintReport)
var ok bool
for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) {
row := it.Cur()
key := ConstraintStatusKey{}
key.ZoneID = (config.SystemTenantObjectID)(*row[0].(*tree.DInt))
key.SubzoneID = base.SubzoneID((*row[1].(*tree.DInt)))
key.ViolationType = (ConstraintType)(*row[2].(*tree.DString))
key.Constraint = (ConstraintRepr)(*row[3].(*tree.DString))
r.previousVersion[key] = ConstraintStatus{(int)(*row[4].(*tree.DInt))}
}

return nil
return err
}

func (r *replicationConstraintStatsReportSaver) updateTimestamp(
Expand Down
11 changes: 6 additions & 5 deletions pkg/kv/kvserver/reports/critical_localities_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,23 +103,24 @@ func (r *replicationCriticalLocalitiesReportSaver) loadPreviousVersion(
}
const prevViolations = "select zone_id, subzone_id, locality, at_risk_ranges " +
"from system.replication_critical_localities"
rows, err := ex.Query(
it, err := ex.QueryIterator(
ctx, "get-previous-replication-critical-localities", txn, prevViolations,
)
if err != nil {
return err
}

r.previousVersion = make(LocalityReport, len(rows))
for _, row := range rows {
r.previousVersion = make(LocalityReport)
var ok bool
for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) {
row := it.Cur()
key := localityKey{}
key.ZoneID = (config.SystemTenantObjectID)(*row[0].(*tree.DInt))
key.SubzoneID = base.SubzoneID(*row[1].(*tree.DInt))
key.locality = (LocalityRepr)(*row[2].(*tree.DString))
r.previousVersion[key] = localityStatus{(int32)(*row[3].(*tree.DInt))}
}

return nil
return err
}

func (r *replicationCriticalLocalitiesReportSaver) updateTimestamp(
Expand Down
Loading

0 comments on commit 0ee3587

Please sign in to comment.