Skip to content

Commit

Permalink
Merge #60572
Browse files Browse the repository at this point in the history
60572: sqlutil: remove QueryWithCols in favor of QueryRowExWithCols r=yuzefovich a=yuzefovich

This commit refactors all usages of
`sqlutil.InternalExecutor.QueryWithCols` method in favor of newly added
`QueryRowExWithCols` since in almost all places where the former was
used, we expected exactly one row. The only noticeable change is the
refactor of `jobScheduler.executeSchedules` where we now use the
iterator pattern. The difference there is that now it is possible to
execute some schedules before encountering an error on the iterator
(previously, we would buffer up all rows first), but this change is
acceptable because each schedule is executed under savepoint to guard
against errors in schedule planning/execution.

Addresses: #48595.

Release note: None

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
craig[bot] and yuzefovich committed Feb 16, 2021
2 parents ef900a2 + b306c59 commit fe919cc
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 69 deletions.
6 changes: 3 additions & 3 deletions pkg/ccl/backupccl/create_scheduled_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,17 +140,17 @@ func (h *testHelper) createBackupSchedule(
var id int64
require.NoError(t, rows.Scan(&id, &unusedStr, &unusedStr, &unusedTS, &unusedStr, &unusedStr))
// Query system.scheduled_job table and load those schedules.
datums, cols, err := h.cfg.InternalExecutor.QueryWithCols(
datums, cols, err := h.cfg.InternalExecutor.QueryRowExWithCols(
context.Background(), "sched-load", nil,
sessiondata.InternalExecutorOverride{User: security.RootUserName()},
"SELECT * FROM system.scheduled_jobs WHERE schedule_id = $1",
id,
)
require.NoError(t, err)
require.Equal(t, 1, len(datums))
require.NotNil(t, datums)

s := jobs.NewScheduledJob(h.env)
require.NoError(t, s.InitFromDatums(datums[0], cols))
require.NoError(t, s.InitFromDatums(datums, cols))
schedules = append(schedules, s)
}

Expand Down
23 changes: 18 additions & 5 deletions pkg/jobs/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func withSavePoint(ctx context.Context, txn *kv.Txn, fn func() error) error {

func (s *jobScheduler) executeSchedules(
ctx context.Context, maxSchedules int64, txn *kv.Txn,
) error {
) (retErr error) {
stats, err := newLoopStats(ctx, s.env, s.InternalExecutor, txn)
if err != nil {
return err
Expand All @@ -283,7 +283,7 @@ func (s *jobScheduler) executeSchedules(
defer stats.updateMetrics(&s.metrics)

findSchedulesStmt := getFindSchedulesStatement(s.env, maxSchedules)
rows, cols, err := s.InternalExecutor.QueryWithCols(
it, err := s.InternalExecutor.QueryIteratorEx(
ctx, "find-scheduled-jobs",
txn,
sessiondata.InternalExecutorOverride{User: security.RootUserName()},
Expand All @@ -293,8 +293,21 @@ func (s *jobScheduler) executeSchedules(
return err
}

for _, row := range rows {
schedule, numRunning, err := s.unmarshalScheduledJob(row, cols)
// We have to make sure to close the iterator since we might return from the
// for loop early (before Next() returns false).
defer func() {
closeErr := it.Close()
if retErr == nil {
retErr = closeErr
}
}()

// The loop below might encounter an error after some schedules have been
// executed (i.e. previous iterations succeeded), and this is ok.
var ok bool
for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) {
row := it.Cur()
schedule, numRunning, err := s.unmarshalScheduledJob(row, it.Types())
if err != nil {
stats.malformed++
log.Errorf(ctx, "error parsing schedule: %+v", row)
Expand Down Expand Up @@ -343,7 +356,7 @@ func (s *jobScheduler) executeSchedules(
}
}

return nil
return err
}

func (s *jobScheduler) runDaemon(ctx context.Context, stopper *stop.Stopper) {
Expand Down
22 changes: 9 additions & 13 deletions pkg/jobs/scheduled_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,23 +84,20 @@ func LoadScheduledJob(
ex sqlutil.InternalExecutor,
txn *kv.Txn,
) (*ScheduledJob, error) {
rows, cols, err := ex.QueryWithCols(ctx, "lookup-schedule", txn,
row, cols, err := ex.QueryRowExWithCols(ctx, "lookup-schedule", txn,
sessiondata.InternalExecutorOverride{User: security.RootUserName()},
fmt.Sprintf("SELECT * FROM %s WHERE schedule_id = %d",
env.ScheduledJobsTableName(), id))

if err != nil {
return nil, err
return nil, errors.Wrapf(err, "expected to find 1 schedule with schedule_id=%d", id)
}

if len(rows) != 1 {
return nil, errors.Newf(
"expected to find 1 schedule, found %d with schedule_id=%d",
len(rows), id)
if row == nil {
return nil, errors.Newf("expected to find 1 schedule, found 0, with schedule_id=%d", id)
}

j := NewScheduledJob(env)
if err := j.InitFromDatums(rows[0], cols); err != nil {
if err := j.InitFromDatums(row, cols); err != nil {
return nil, err
}
return j, nil
Expand Down Expand Up @@ -362,22 +359,21 @@ func (j *ScheduledJob) Create(ctx context.Context, ex sqlutil.InternalExecutor,
return err
}

rows, retCols, err := ex.QueryWithCols(ctx, "sched-create", txn,
row, retCols, err := ex.QueryRowExWithCols(ctx, "sched-create", txn,
sessiondata.InternalExecutorOverride{User: security.RootUserName()},
fmt.Sprintf("INSERT INTO %s (%s) VALUES(%s) RETURNING schedule_id",
j.env.ScheduledJobsTableName(), strings.Join(cols, ","), generatePlaceholders(len(qargs))),
qargs...,
)

if err != nil {
return err
return errors.Wrapf(err, "failed to create new schedule")
}

if len(rows) != 1 {
if row == nil {
return errors.New("failed to create new schedule")
}

return j.InitFromDatums(rows[0], retCols)
return j.InitFromDatums(row, retCols)
}

// Update saves changes made to this schedule.
Expand Down
7 changes: 3 additions & 4 deletions pkg/jobs/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,16 @@ func (h *testHelper) newScheduledJobForExecutor(
// loadSchedule loads all columns for the specified scheduled job.
func (h *testHelper) loadSchedule(t *testing.T, id int64) *ScheduledJob {
j := NewScheduledJob(h.env)
rows, cols, err := h.cfg.InternalExecutor.QueryWithCols(
row, cols, err := h.cfg.InternalExecutor.QueryRowExWithCols(
context.Background(), "sched-load", nil,
sessiondata.InternalExecutorOverride{User: security.RootUserName()},
fmt.Sprintf(
"SELECT * FROM %s WHERE schedule_id = %d",
h.env.ScheduledJobsTableName(), id),
)
require.NoError(t, err)

require.Equal(t, 1, len(rows))
require.NoError(t, j.InitFromDatums(rows[0], cols))
require.NotNil(t, row)
require.NoError(t, j.InitFromDatums(row, cols))
return j
}

Expand Down
22 changes: 11 additions & 11 deletions pkg/kv/kvserver/protectedts/ptstorage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,17 +661,6 @@ func (ie *wrappedInternalExecutor) ExecEx(
return ie.wrapped.ExecEx(ctx, opName, txn, o, stmt, qargs...)
}

func (ie *wrappedInternalExecutor) QueryWithCols(
ctx context.Context,
opName string,
txn *kv.Txn,
o sessiondata.InternalExecutorOverride,
statement string,
qargs ...interface{},
) ([]tree.Datums, colinfo.ResultColumns, error) {
panic("unimplemented")
}

func (ie *wrappedInternalExecutor) QueryRowEx(
ctx context.Context,
opName string,
Expand All @@ -694,6 +683,17 @@ func (ie *wrappedInternalExecutor) QueryRow(
panic("not implemented")
}

func (ie *wrappedInternalExecutor) QueryRowExWithCols(
ctx context.Context,
opName string,
txn *kv.Txn,
session sessiondata.InternalExecutorOverride,
stmt string,
qargs ...interface{},
) (tree.Datums, colinfo.ResultColumns, error) {
panic("not implemented")
}

func (ie *wrappedInternalExecutor) QueryIterator(
ctx context.Context, opName string, txn *kv.Txn, stmt string, qargs ...interface{},
) (sqlutil.InternalRows, error) {
Expand Down
30 changes: 22 additions & 8 deletions pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ type InternalExecutor struct {
memMetrics MemoryMetrics

// sessionData, if not nil, represents the session variables used by
// statements executed on this internalExecutor. Note that queries executed by
// the executor will run on copies of this data.
// statements executed on this internalExecutor. Note that queries executed
// by the executor will run on copies of this data.
sessionData *sessiondata.SessionData

// syntheticDescriptors stores the synthetic descriptors to be injected into
Expand Down Expand Up @@ -118,7 +118,7 @@ func MakeInternalExecutor(
// SetSessionData binds the session variables that will be used by queries
// performed through this executor from now on.
//
// SetSessionData cannot be called concurently with query execution.
// SetSessionData cannot be called concurrently with query execution.
func (ie *InternalExecutor) SetSessionData(sessionData *sessiondata.SessionData) {
ie.s.populateMinimalSessionData(sessionData)
ie.sessionData = sessionData
Expand Down Expand Up @@ -459,17 +459,31 @@ func (ie *InternalExecutor) QueryRowEx(
stmt string,
qargs ...interface{},
) (tree.Datums, error) {
rows, _, err := ie.queryInternalBuffered(ctx, opName, txn, session, stmt, 2 /* limit */, qargs...)
rows, _, err := ie.QueryRowExWithCols(ctx, opName, txn, session, stmt, qargs...)
return rows, err
}

// QueryRowExWithCols is like QueryRowEx, additionally returning the computed
// ResultColumns of the input query.
func (ie *InternalExecutor) QueryRowExWithCols(
ctx context.Context,
opName string,
txn *kv.Txn,
session sessiondata.InternalExecutorOverride,
stmt string,
qargs ...interface{},
) (tree.Datums, colinfo.ResultColumns, error) {
rows, cols, err := ie.queryInternalBuffered(ctx, opName, txn, session, stmt, 2 /* limit */, qargs...)
if err != nil {
return nil, err
return nil, nil, err
}
switch len(rows) {
case 0:
return nil, nil
return nil, nil, nil
case 1:
return rows[0], nil
return rows[0], cols, nil
default:
return nil, &tree.MultipleResultsError{SQL: stmt}
return nil, nil, &tree.MultipleResultsError{SQL: stmt}
}
}

Expand Down
12 changes: 10 additions & 2 deletions pkg/sql/pgwire/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,15 +433,23 @@ func processPgxStartup(ctx context.Context, s serverutils.TestServerInterface, c
func execQuery(
ctx context.Context, query string, s serverutils.TestServerInterface, c *conn,
) error {
rows, cols, err := s.InternalExecutor().(sqlutil.InternalExecutor).QueryWithCols(
it, err := s.InternalExecutor().(sqlutil.InternalExecutor).QueryIteratorEx(
ctx, "test", nil, /* txn */
sessiondata.InternalExecutorOverride{User: security.RootUserName(), Database: "system"},
query,
)
if err != nil {
return err
}
return sendResult(ctx, c, cols, rows)
var rows []tree.Datums
var ok bool
for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) {
rows = append(rows, it.Cur())
}
if err != nil {
return err
}
return sendResult(ctx, c, it.Types(), rows)
}

func client(ctx context.Context, serverAddr net.Addr, wg *sync.WaitGroup) error {
Expand Down
49 changes: 26 additions & 23 deletions pkg/sql/sqlutil/internal_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,27 @@ import (

// InternalExecutor is meant to be used by layers below SQL in the system that
// nevertheless want to execute SQL queries (presumably against system tables).
// It is extracted in this "sql/util" package to avoid circular references and
// It is extracted in this "sqlutil" package to avoid circular references and
// is implemented by *sql.InternalExecutor.
type InternalExecutor interface {
// Exec executes the supplied SQL statement and returns the number of rows
// affected (not like the results; see Query()). If no user has been previously
// set through SetSessionData, the statement is executed as the root user.
// affected (not like the full results; see QueryIterator()). If no user has
// been previously set through SetSessionData, the statement is executed as
// the root user.
//
// If txn is not nil, the statement will be executed in the respective txn.
//
// Exec is deprecated because it may transparently execute a query as root. Use
// ExecEx instead.
// Exec is deprecated because it may transparently execute a query as root.
// Use ExecEx instead.
Exec(
ctx context.Context, opName string, txn *kv.Txn, statement string, params ...interface{},
) (int, error)

// ExecEx is like Exec, but allows the caller to override some session data
// fields.
//
// The fields set in session that are set override the respective fields if they
// have previously been set through SetSessionData().
// The fields set in session that are set override the respective fields if
// they have previously been set through SetSessionData().
ExecEx(
ctx context.Context,
opName string,
Expand All @@ -50,28 +51,19 @@ type InternalExecutor interface {
qargs ...interface{},
) (int, error)

// QueryWithCols executes the supplied SQL statement and returns the
// resulting rows as well as the computed ResultColumns of the input query.
// QueryRow executes the supplied SQL statement and returns a single row, or
// nil if no row is found, or an error if more that one row is returned.
//
// If txn is not nil, the statement will be executed in the respective txn.
QueryWithCols(
ctx context.Context, opName string, txn *kv.Txn,
o sessiondata.InternalExecutorOverride, statement string, qargs ...interface{},
) ([]tree.Datums, colinfo.ResultColumns, error)

// QueryRow is like Query, except it returns a single row, or nil if not row is
// found, or an error if more that one row is returned.
//
// QueryRow is deprecated (like Query). Use QueryRowEx() instead.
// QueryRow is deprecated. Use QueryRowEx() instead.
QueryRow(
ctx context.Context, opName string, txn *kv.Txn, statement string, qargs ...interface{},
) (tree.Datums, error)

// QueryRowEx is like QueryRow, but allows the caller to override some session data
// fields.
// QueryRowEx is like QueryRow, but allows the caller to override some
// session data fields.
//
// The fields set in session that are set override the respective fields if they
// have previously been set through SetSessionData().
// The fields set in session that are set override the respective fields if
// they have previously been set through SetSessionData().
QueryRowEx(
ctx context.Context,
opName string,
Expand All @@ -81,6 +73,17 @@ type InternalExecutor interface {
qargs ...interface{},
) (tree.Datums, error)

// QueryRowExWithCols is like QueryRowEx, additionally returning the
// computed ResultColumns of the input query.
QueryRowExWithCols(
ctx context.Context,
opName string,
txn *kv.Txn,
session sessiondata.InternalExecutorOverride,
stmt string,
qargs ...interface{},
) (tree.Datums, colinfo.ResultColumns, error)

// QueryIterator executes the query, returning an iterator that can be used
// to get the results. If the call is successful, the returned iterator
// *must* be closed.
Expand Down

0 comments on commit fe919cc

Please sign in to comment.