Skip to content

Commit

Permalink
Merge #137218
Browse files Browse the repository at this point in the history
137218: jobs: write to new jobs tables r=dt a=dt

See commits.

Co-authored-by: David Taylor <tinystatemachine@gmail.com>
  • Loading branch information
craig[bot] and dt committed Dec 16, 2024
2 parents 2494fcc + 4a138aa commit b8209ca
Show file tree
Hide file tree
Showing 10 changed files with 639 additions and 35 deletions.
6 changes: 3 additions & 3 deletions pkg/bench/rttanalysis/testdata/benchmark_expectations
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,16 @@ exp,benchmark
10,Grant/grant_all_on_3_tables
17,GrantRole/grant_1_role
21,GrantRole/grant_2_roles
6,Jobs/cancel_job
12,Jobs/cancel_job
3,Jobs/crdb_internal.system_jobs
3-5,Jobs/jobs_page_default
3,Jobs/jobs_page_latest_50
3,Jobs/jobs_page_type_filtered
1-3,Jobs/jobs_page_type_filtered_no_matches
4,Jobs/non_admin_crdb_internal.system_jobs
4,Jobs/non_admin_show_jobs
8,Jobs/pause_job
6,Jobs/resume_job
14,Jobs/pause_job
12,Jobs/resume_job
3,Jobs/show_job
3-5,Jobs/show_jobs
3,ORMQueries/activerecord_type_introspection_query
Expand Down
11 changes: 11 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,19 @@ const DevelopmentBranch = true
// TestFinalVersion).
const finalVersion Key = -1

// TestingExtraVersions may be set to true in tests which will intentionally use
// Keys greater than Latest, which typically would crash and/or cause errors.
// Test packages that utilize this may encounter odd behavior. Resetting it is
// not required.
var TestingExtraVersions = false

// Version returns the roachpb.Version corresponding to a key.
func (k Key) Version() roachpb.Version {
if TestingExtraVersions && k > Latest {
v := versionTable[Latest]
v.Internal += int32(k-Latest) * 2
return maybeApplyDevOffset(k, v)
}
version := versionTable[k]
return maybeApplyDevOffset(k, version)
}
Expand Down
1 change: 0 additions & 1 deletion pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ go_test(
deps = [
"//pkg/backup",
"//pkg/base",
"//pkg/cloud/impl:cloudimpl",
"//pkg/clusterversion",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprotectedts",
Expand Down
308 changes: 308 additions & 0 deletions pkg/jobs/job_info_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,326 @@ package jobs
import (
"bytes"
"context"
"math"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
)

var (
retainedProgressHistory = settings.RegisterIntSetting(
settings.ApplicationLevel, "jobs.retained_progress_entries", "number of historical progress entries to retain per job", 1000,
)
retainedMessageHistory = settings.RegisterIntSetting(
settings.ApplicationLevel, "jobs.retained_messages", "number of historical messages of each kind to retain per job", 100,
)
)

// ProgressStorage reads and writes progress rows.
type ProgressStorage jobspb.JobID

// ProgressStorage returns a new ProgressStorage with the passed in job and txn.
func (j *Job) ProgressStorage() ProgressStorage {
return ProgressStorage(j.id)
}

// Get returns the latest progress report for the job along with when it was
// written. If the fraction is null it is returned as NaN, and if the resolved
// ts is null, it is empty.
func (i ProgressStorage) Get(
ctx context.Context, txn isql.Txn,
) (float64, hlc.Timestamp, time.Time, error) {
ctx, sp := tracing.ChildSpan(ctx, "get-job-progress")
defer sp.Finish()

row, err := txn.QueryRowEx(
ctx, "job-progress-get", txn.KV(),
sessiondata.NodeUserSessionDataOverride,
"SELECT written, fraction, resolved FROM system.job_progress WHERE job_id = $1", i,
)

if err != nil || row == nil {
return 0, hlc.Timestamp{}, time.Time{}, err
}

written, ok := row[0].(*tree.DTimestampTZ)
if !ok {
return 0, hlc.Timestamp{}, time.Time{}, errors.AssertionFailedf("job progress: expected value to be DTimestampTZ (was %T)", row[1])
}

var fraction float64
if row[1] == tree.DNull {
fraction = math.NaN()
} else {
fracDatum, ok := row[1].(*tree.DFloat)
if !ok {
return 0, hlc.Timestamp{}, time.Time{}, errors.AssertionFailedf("job progress: expected value to be DFloat (was %T)", row[1])
}
fraction = float64(*fracDatum)
}

var ts hlc.Timestamp
if row[2] != tree.DNull {
resolved, ok := row[2].(*tree.DDecimal)
if !ok {
return 0, hlc.Timestamp{}, time.Time{}, errors.AssertionFailedf("job progress: expected value to be DDecimal (was %T)", row[1])
}
ts, err = hlc.DecimalToHLC(&resolved.Decimal)
if err != nil {
return 0, hlc.Timestamp{}, time.Time{}, err
}
}

return fraction, ts, written.Time, nil
}

// Set records a progress update. If fraction is NaN or resolved is empty, that
// field is left null. The time at which the progress was reported is recorded.
func (i ProgressStorage) Set(
ctx context.Context, txn isql.Txn, fraction float64, resolved hlc.Timestamp,
) error {
ctx, sp := tracing.ChildSpan(ctx, "write-job-progress")
defer sp.Finish()

if _, err := txn.ExecEx(
ctx, "write-job-progress-delete", txn.KV(), sessiondata.NodeUserSessionDataOverride,
`DELETE FROM system.job_progress WHERE job_id = $1`, i,
); err != nil {
return err
}

var frac, ts interface{}
if !math.IsNaN(fraction) {
frac = fraction
}
if resolved.IsSet() {
ts = resolved.AsOfSystemTime()
}

if _, err := txn.ExecEx(
ctx, "write-job-progress-insert", txn.KV(), sessiondata.NodeUserSessionDataOverride,
`INSERT INTO system.job_progress (job_id, written, fraction, resolved) VALUES ($1, now(), $2, $3)`,
i, frac, ts,
); err != nil {
return err
}

if _, err := txn.ExecEx(
ctx, "write-job-progress-history-insert", txn.KV(), sessiondata.NodeUserSessionDataOverride,
`UPSERT INTO system.job_progress_history (job_id, written, fraction, resolved) VALUES ($1, now(), $2, $3)`,
i, frac, ts,
); err != nil {
return err
}

if _, err := txn.ExecEx(
ctx, "write-job-progress-history-prune", txn.KV(), sessiondata.NodeUserSessionDataOverride,
`DELETE FROM system.job_progress_history WHERE job_id = $1 AND written IN
(SELECT written FROM system.job_progress_history WHERE job_id = $1 ORDER BY written DESC OFFSET $2)`,
i, retainedProgressHistory.Get(txn.KV().DB().SettingsValues()),
); err != nil {
return err
}

return nil
}

// StatusStorage reads and writes the "status" row for a job.
//
// A status is an optional short string that describes to a human what a job is
// doing at any given point while it is running or reverting, particularly if a
// given job does many distinct things over the course of running. If a job just
// does one thing when it is running, it likely does not need to set a status -
// the fact that the job of that type is running is all one needs to know.
//
// Status messages are strictly for human consumption; they should not be read,
// parsed or compared by code; if your job wants to store something and read it
// back later use InfoStorage instead.
//
// A job can only have one status at a time -- setting it will clears any
// previously set status. To just surface a message to the user that can be
// presented along side other messages rather than change its "current status"
// use MessageStorage.
type StatusStorage jobspb.JobID

// Status returns the StatusStorage for the job.
func (j *Job) StatusStorage() StatusStorage {
return StatusStorage(j.id)
}

// Clear clears the status message row for the job, if it exists.
func (i StatusStorage) Clear(ctx context.Context, txn isql.Txn) error {
_, err := txn.ExecEx(
ctx, "clear-job-status-delete", txn.KV(), sessiondata.NodeUserSessionDataOverride,
`DELETE FROM system.job_status WHERE job_id = $1`, i,
)
return err
}

// Sets writes the current status, replacing the current one if it exists.
// Setting an empty status is the same as calling Clear().
func (i StatusStorage) Set(ctx context.Context, txn isql.Txn, status string) error {
ctx, sp := tracing.ChildSpan(ctx, "write-job-status")
defer sp.Finish()

// Delete any existing status row in the same transaction before replacing it
// with the new one.
if err := i.Clear(ctx, txn); err != nil {
return err
}

if _, err := txn.ExecEx(
ctx, "write-job-status-insert", txn.KV(), sessiondata.NodeUserSessionDataOverride,
`INSERT INTO system.job_status (job_id, written, status) VALUES ($1, now(), $2)`,
i, status,
); err != nil {
return err
}

if err := MessageStorage(i).Record(ctx, txn, "status", status); err != nil {
return err
}

return nil
}

// Get gets the current status mesasge for a job, if any.
func (i StatusStorage) Get(ctx context.Context, txn isql.Txn) (string, time.Time, error) {
ctx, sp := tracing.ChildSpan(ctx, "get-job-status")
defer sp.Finish()

row, err := txn.QueryRowEx(
ctx, "job-status-get", txn.KV(), sessiondata.NodeUserSessionDataOverride,
"SELECT written, status FROM system.job_status WHERE job_id = $1",
i,
)

if err != nil {
return "", time.Time{}, err
}

if row == nil {
return "", time.Time{}, nil
}

written, ok := row[0].(*tree.DTimestampTZ)
if !ok {
return "", time.Time{}, errors.AssertionFailedf("job Status: expected value to be DTimestampTZ (was %T)", row[1])
}
status, ok := row[1].(*tree.DString)
if !ok {
return "", time.Time{}, errors.AssertionFailedf("job Status: expected value to be DString (was %T)", row[1])
}

return string(*status), written.Time, nil
}

// MessageStorage stores human-readable messages emitted by the execution of a
// job, including when it changes states in the job system, when it wishes to
// communicate its own custom status, or additional messages it wishes to
// surface to the user. Messages include a string identifier of the kind of
// message, and retention limits are enforced on the number of messages of each
// kind per job, so a more frequently emitted kind of message will not cause
// messages of other kinds to be pruned. For example, if a job emitted a "retry"
// every minute, after a couple hours older messages about retries would be
// pruned but a "state" message from days prior indicating it was unpaused would
// still be retrained.
type MessageStorage jobspb.JobID

// Messages returns the MessageStorage for the job.
func (j *Job) Messages() MessageStorage {
return MessageStorage(j.id)
}

// Record writes a human readable message of the specified kind to the message
// log for this job, and prunes retained messages of the same kind based on the
// configured limit to keep the total number of retained messages bounded.
func (i MessageStorage) Record(ctx context.Context, txn isql.Txn, kind, message string) error {
ctx, sp := tracing.ChildSpan(ctx, "write-job-message")
defer sp.Finish()

// Insert the new message.
if _, err := txn.ExecEx(
ctx, "write-job-message-insert", txn.KV(), sessiondata.NodeUserSessionDataOverride,
`UPSERT INTO system.job_message (job_id, written, kind, message) VALUES ($1, now(),$2, $3)`,
i, kind, message,
); err != nil {
return err
}

// Prune old messages of the same kind to bound historical data.
if _, err := txn.ExecEx(
ctx, "write-job-message-prune", txn.KV(), sessiondata.NodeUserSessionDataOverride,
`DELETE FROM system.job_message WHERE job_id = $1 AND kind = $2 AND written IN (
SELECT written FROM system.job_message WHERE job_id = $1 AND kind = $2 ORDER BY written DESC OFFSET $3
)`,
i, kind, retainedMessageHistory.Get(txn.KV().DB().SettingsValues()),
); err != nil {
return err
}

return nil
}

type JobMessage struct {
Kind, Message string
Written time.Time
}

func (i MessageStorage) Fetch(ctx context.Context, txn isql.Txn) (_ []JobMessage, retErr error) {
ctx, sp := tracing.ChildSpan(ctx, "get-all-job-message")
defer sp.Finish()

rows, err := txn.QueryIteratorEx(
ctx, "get-job-messages", txn.KV(), sessiondata.NodeUserSessionDataOverride,
`SELECT written, kind, message FROM system.job_message WHERE job_id = $1 ORDER BY written DESC`,
i,
)
if err != nil {
return nil, err
}
defer func(it isql.Rows) { retErr = errors.CombineErrors(retErr, it.Close()) }(rows)
var res []JobMessage
for {
ok, err := rows.Next(ctx)
if err != nil {
return nil, err
}
if !ok {
break
}
row := rows.Cur()
written, ok := row[0].(*tree.DTimestampTZ)
if !ok {
return nil, errors.AssertionFailedf("job message: expected written to be DTimestampTZ (was %T)", row[0])
}
kind, ok := row[1].(*tree.DString)
if !ok {
return nil, errors.AssertionFailedf("job message: expected kind to be DString (was %T)", row[1])
}
message, ok := row[2].(*tree.DString)
if !ok {
return nil, errors.AssertionFailedf("job message: expected message to be DString (was %T)", row[2])
}
res = append(res, JobMessage{
Kind: string(*kind),
Message: string(*message),
Written: written.Time,
})
}
return res, nil
}

// InfoStorage can be used to read and write rows to system.job_info table. All
// operations are scoped under the txn and are executed on behalf of Job j.
type InfoStorage struct {
Expand Down
Loading

0 comments on commit b8209ca

Please sign in to comment.