Skip to content

Commit

Permalink
Merge #69370
Browse files Browse the repository at this point in the history
69370: jobs,jobspb: rename ExecutionLog, populate r=ajwerner a=ajwerner

I got cold feet on the `ExecutionLog` in the `Payload` being populated at the
beginning of job execution. I become uncomfortable with the idea of writing to
the `Payload` an extra time for every job execution in the happy case. These
things can get big. Instead we only populate events when a retriable error
occurs. Given that, the events have been renamed to `RetriableExecutionError`.

This data is exposed in the crdb_internal.jobs table via the `execution_errors`
string array. The decision to use an array of strings instead of json was
predicated upon the idea that these errors are for human consumption. The
structured data continues to exist in the protobuf.

These events are retained in the `Payload` in `RetriableExecutionErrorLog`.
A finite number of events are retained, and that number is controlled by the new
non-public cluster setting `jobs.execution_errors.max_entries`. The size of the
events is also controlled by a cluster setting: `jobs.execution_errors.max_entry_size`.

If an event would be larger than this size, the error it would contain is formatted
to a string and truncated to the max size. This loses structure but at least retains
some observability.

Fixes #68800.

Release justification: bug fixes and low-risk updates to new functionality.

Release note (general change): When jobs encounter retriable errors during
execution, they will now record these errors into their state. The errors as
well as metadata about the execution can be inspected via the newly added
`execution_errors` field of `crdb_internal.jobs` which is a `STRING[]` column.

Co-authored-by: Andrew Werner <awerner32@gmail.com>
  • Loading branch information
craig[bot] and ajwerner committed Aug 27, 2021
2 parents 0fa29ad + b7301c6 commit 56afa9f
Show file tree
Hide file tree
Showing 29 changed files with 1,461 additions and 1,011 deletions.
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ func TestClusterRestoreFailCleanup(t *testing.T) {
r.testingKnobs.duringSystemTableRestoration = func(systemTableName string) error {
if !alreadyErrored && systemTableName == customRestoreSystemTable {
alreadyErrored = true
return jobs.NewRetryJobError("injected error")
return jobs.MarkAsRetryJobError(errors.New("injected error"))
}
return nil
}
Expand All @@ -645,7 +645,7 @@ func TestClusterRestoreFailCleanup(t *testing.T) {
}
}
// The initial restore will return an error, and restart.
sqlDBRestore.ExpectErr(t, `injected error: restarting in background`, `RESTORE FROM $1`, LocalFoo)
sqlDBRestore.ExpectErr(t, `running execution from '.*' to '.*' on \d+ failed: injected error`, `RESTORE FROM $1`, LocalFoo)
// Reduce retry delays.
sqlDBRestore.Exec(t, "SET CLUSTER SETTING jobs.registry.retry.initial_delay = '1ms'")
// Expect the restore to succeed.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ func (b *changefeedResumer) resumeWithRetries(
// retries will not help.
// Instead, we want to make sure that the changefeed job is not marked failed
// due to a transient, retryable error.
err = jobs.NewRetryJobError(fmt.Sprintf("retryable flow error: %+v", err))
err = jobs.MarkAsRetryJobError(err)
b.setJobRunningStatus(ctx, "retryable flow error: %s", err)
}

Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"adopt.go",
"config.go",
"errors.go",
"executor_impl.go",
"helpers.go",
"job_scheduler.go",
Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ func (r *Registry) runJob(
}

r.maybeDumpTrace(ctx, resumer, int64(job.ID()), int64(span.TraceID()), err)
r.maybeRecordExecutionFailure(ctx, err, job)
if r.knobs.AfterJobStateMachine != nil {
r.knobs.AfterJobStateMachine()
}
Expand Down
44 changes: 35 additions & 9 deletions pkg/jobs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,19 @@ import (
)

const (
intervalBaseSettingKey = "jobs.registry.interval.base"
adoptIntervalSettingKey = "jobs.registry.interval.adopt"
cancelIntervalSettingKey = "jobs.registry.interval.cancel"
gcIntervalSettingKey = "jobs.registry.interval.gc"
retentionTimeSettingKey = "jobs.retention_time"
cancelUpdateLimitKey = "jobs.cancel_update_limit"
retryInitialDelaySettingKey = "jobs.registry.retry.initial_delay"
retryMaxDelaySettingKey = "jobs.registry.retry.max_delay"
intervalBaseSettingKey = "jobs.registry.interval.base"
adoptIntervalSettingKey = "jobs.registry.interval.adopt"
cancelIntervalSettingKey = "jobs.registry.interval.cancel"
gcIntervalSettingKey = "jobs.registry.interval.gc"
retentionTimeSettingKey = "jobs.retention_time"
cancelUpdateLimitKey = "jobs.cancel_update_limit"
retryInitialDelaySettingKey = "jobs.registry.retry.initial_delay"
retryMaxDelaySettingKey = "jobs.registry.retry.max_delay"
executionErrorsMaxEntriesKey = "jobs.execution_errors.max_entries"
executionErrorsMaxEntrySizeKey = "jobs.execution_errors.max_entry_size"
)

var (
const (
// defaultAdoptInterval is the default adopt interval.
defaultAdoptInterval = 30 * time.Second

Expand All @@ -58,6 +60,15 @@ var (

// defaultRetryMaxDelay is the maximum delay to retry a failed job.
defaultRetryMaxDelay = 24 * time.Hour

// defaultExecutionErrorsMaxEntries is the default number of error entries
// which will be retained.
defaultExecutionErrorsMaxEntries = 3

// defaultExecutionErrorsMaxEntrySize is the maximum allowed size of an
// error. If this size is exceeded, the error will be formatted as a string
// and then truncated to fit the size.
defaultExecutionErrorsMaxEntrySize = 64 << 10 // 64 KiB
)

var (
Expand Down Expand Up @@ -122,6 +133,21 @@ var (
defaultRetryMaxDelay,
settings.PositiveDuration,
)

executionErrorsMaxEntriesSetting = settings.RegisterIntSetting(
executionErrorsMaxEntriesKey,
"the maximum number of retriable error entries which will be stored for introspection",
defaultExecutionErrorsMaxEntries,
settings.NonNegativeInt,
)

executionErrorsMaxEntrySize = settings.RegisterByteSizeSetting(
executionErrorsMaxEntrySizeKey,
"the maximum byte size of individual error entries which will be stored"+
" for introspection",
defaultExecutionErrorsMaxEntrySize,
settings.NonNegativeInt,
)
)

// jitter adds a small jitter in the given duration.
Expand Down
161 changes: 161 additions & 0 deletions pkg/jobs/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package jobs

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

// errRetryJobSentinel exists so the errors returned from MarkAsRetryJobError can
// be marked with it, allowing more robust detection of retry errors even if
// they are wrapped, etc. This was originally introduced to deal with injected
// retry errors from testing knobs.
var errRetryJobSentinel = errors.New("retriable job error")

// MarkAsRetryJobError marks an error as a retriable job error which
// indicates that the registry should retry the job.
func MarkAsRetryJobError(err error) error {
return errors.Mark(err, errRetryJobSentinel)
}

// Registry does not retry a job that fails due to a permanent error.
var errJobPermanentSentinel = errors.New("permanent job error")

// MarkAsPermanentJobError marks an error as a permanent job error, which indicates
// Registry to not retry the job when it fails due to this error.
func MarkAsPermanentJobError(err error) error {
return errors.Mark(err, errJobPermanentSentinel)
}

// IsPermanentJobError checks whether the given error is a permanent error.
func IsPermanentJobError(err error) bool {
return errors.Is(err, errJobPermanentSentinel)
}

// InvalidStatusError is the error returned when the desired operation is
// invalid given the job's current status.
type InvalidStatusError struct {
id jobspb.JobID
status Status
op string
err string
}

func (e *InvalidStatusError) Error() string {
if e.err != "" {
return fmt.Sprintf("cannot %s %s job (id %d, err: %q)", e.op, e.status, e.id, e.err)
}
return fmt.Sprintf("cannot %s %s job (id %d)", e.op, e.status, e.id)
}

// SimplifyInvalidStatusError unwraps an *InvalidStatusError into an error
// message suitable for users. Other errors are returned as passed.
func SimplifyInvalidStatusError(err error) error {
if ierr := (*InvalidStatusError)(nil); errors.As(err, &ierr) {
return errors.Errorf("job %s", ierr.status)
}
return err
}

// retriableExecutionError captures metadata about retriable errors encountered
// during the execution of a job. These errors propagate information to be
// stored in the payload in Payload.RetriableExecutionErrorLog.
type retriableExecutionError struct {
instanceID base.SQLInstanceID
start, end time.Time
status Status
cause error
}

func newRetriableExecutionError(
instanceID base.SQLInstanceID, status Status, start, end time.Time, cause error,
) *retriableExecutionError {
return &retriableExecutionError{
instanceID: instanceID,
status: status,
start: start,
end: end,
cause: cause,
}
}

// Error makes retriableExecutionError and error.
func (e *retriableExecutionError) Error() string {
return formatRetriableExecutionFailure(
e.instanceID, e.status, e.start, e.end, e.cause,
)

}

func formatRetriableExecutionFailure(
instanceID base.SQLInstanceID, status Status, start, end time.Time, cause error,
) string {
mustTimestamp := func(ts time.Time) *tree.DTimestamp {
ret, _ := tree.MakeDTimestamp(ts, time.Microsecond)
return ret
}
return fmt.Sprintf(
"%s execution from %v to %v on %d failed: %v",
status,
mustTimestamp(start),
mustTimestamp(end),
instanceID,
cause,
)
}

// Cause exposes the underlying error.
func (e *retriableExecutionError) Cause() error { return e.cause }

// Unwrap exposes the underlying error.
func (e *retriableExecutionError) Unwrap() error { return e.cause }

// Format formats the error.
func (e *retriableExecutionError) Format(s fmt.State, verb rune) { errors.FormatError(e, s, verb) }

// SafeFormatError formats the error safely.
func (e *retriableExecutionError) SafeFormatError(p errors.Printer) error {
if p.Detail() {
p.Printf("retriable execution error")
}
return e.cause
}

func (e *retriableExecutionError) toRetriableExecutionFailure(
ctx context.Context, maxErrorSize int,
) *jobspb.RetriableExecutionFailure {
// If the cause is too large, we format it, losing all structure, and retain
// a prefix.
ef := &jobspb.RetriableExecutionFailure{
Status: string(e.status),
ExecutionStartMicros: timeutil.ToUnixMicros(e.start),
ExecutionEndMicros: timeutil.ToUnixMicros(e.end),
InstanceID: e.instanceID,
}
if encodedCause := errors.EncodeError(ctx, e.cause); encodedCause.Size() < maxErrorSize {
ef.Error = &encodedCause
} else {
formatted := e.cause.Error()
if len(formatted) > maxErrorSize {
formatted = formatted[:maxErrorSize]
}
ef.TruncatedError = formatted
}
return ef
}
50 changes: 20 additions & 30 deletions pkg/jobs/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,36 +89,26 @@ func (j *Job) Succeeded(ctx context.Context) error {
return j.succeeded(ctx, nil /* txn */, nil /* fn */)
}

var (
AdoptQuery = claimQuery

CancelQuery = pauseAndCancelUpdate

GcQuery = expiredJobsQuery

RemoveClaimsQuery = removeClaimsQuery

ProcessJobsQuery = processQueryWithBackoff

IntervalBaseSettingKey = intervalBaseSettingKey

AdoptIntervalSettingKey = adoptIntervalSettingKey

CancelIntervalSettingKey = cancelIntervalSettingKey

GcIntervalSettingKey = gcIntervalSettingKey

RetentionTimeSettingKey = retentionTimeSettingKey

AdoptIntervalSetting = adoptIntervalSetting

CancelIntervalSetting = cancelIntervalSetting
const (
AdoptQuery = claimQuery
CancelQuery = pauseAndCancelUpdate
GcQuery = expiredJobsQuery
RemoveClaimsQuery = removeClaimsQuery
ProcessJobsQuery = processQueryWithBackoff
IntervalBaseSettingKey = intervalBaseSettingKey
AdoptIntervalSettingKey = adoptIntervalSettingKey
CancelIntervalSettingKey = cancelIntervalSettingKey
GcIntervalSettingKey = gcIntervalSettingKey
RetentionTimeSettingKey = retentionTimeSettingKey
DefaultAdoptInterval = defaultAdoptInterval
ExecutionErrorsMaxEntriesKey = executionErrorsMaxEntriesKey
ExecutionErrorsMaxEntrySizeKey = executionErrorsMaxEntrySizeKey
)

var (
AdoptIntervalSetting = adoptIntervalSetting
CancelIntervalSetting = cancelIntervalSetting
CancellationsUpdateLimitSetting = cancellationsUpdateLimitSetting

GcIntervalSetting = gcIntervalSetting

RetentionTimeSetting = retentionTimeSetting

DefaultAdoptInterval = defaultAdoptInterval
GcIntervalSetting = gcIntervalSetting
RetentionTimeSetting = retentionTimeSetting
)
Loading

0 comments on commit 56afa9f

Please sign in to comment.