Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jobs,backupccl: Update metrics poller to handle schedule PTS records #97651

Merged
merged 1 commit into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions pkg/ccl/backupccl/create_scheduled_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
"github.com/cockroachdb/cockroach/pkg/scheduledjobs/schedulebase"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
Expand Down Expand Up @@ -96,6 +97,7 @@ func newTestHelper(t *testing.T) (*testHelper, func()) {
}

args := base.TestServerArgs{
Settings: cluster.MakeClusterSettings(),
ExternalIODir: dir,
// Some scheduled backup tests fail when run within a tenant. More
// investigation is required. Tracked with #76378.
Expand All @@ -104,6 +106,7 @@ func newTestHelper(t *testing.T) (*testHelper, func()) {
JobsTestingKnobs: knobs,
},
}
jobs.PollJobsMetricsInterval.Override(context.Background(), &args.Settings.SV, 250*time.Millisecond)
s, db, _ := serverutils.StartServer(t, args)
require.NotNil(t, th.cfg)
th.sqlDB = sqlutils.MakeSQLRunner(db)
Expand Down Expand Up @@ -1154,11 +1157,11 @@ INSERT INTO t values (1), (10), (100);
}
})

metrics := func() *jobs.ExecutorMetrics {
metrics := func() *backupMetrics {
ex, err := jobs.GetScheduledJobExecutor(tree.ScheduledBackupExecutor.InternalName())
require.NoError(t, err)
require.NotNil(t, ex.Metrics())
return ex.Metrics().(*backupMetrics).ExecutorMetrics
return ex.Metrics().(*backupMetrics)
}()

t.Run("retry", func(t *testing.T) {
Expand Down Expand Up @@ -1226,6 +1229,12 @@ INSERT INTO t values (1), (10), (100);
}
return errors.Newf("expected 2 backup to succeed, got %d", delta)
})
testutils.SucceedsSoon(t, func() error {
if metrics.NumWithPTS.Value() > 0 {
return nil
}
return errors.New("still waiting for pts count > 0")
})
})
}

Expand Down
10 changes: 9 additions & 1 deletion pkg/ccl/backupccl/schedule_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type scheduledBackupExecutor struct {

type backupMetrics struct {
*jobs.ExecutorMetrics
*jobs.ExecutorPTSMetrics
RpoMetric *metric.Gauge
}

Expand All @@ -44,6 +45,11 @@ var _ metric.Struct = &backupMetrics{}
// MetricStruct implements metric.Struct interface
func (m *backupMetrics) MetricStruct() {}

// PTSMetrics implements jobs.PTSMetrics interface.
func (m *backupMetrics) PTSMetrics() *jobs.ExecutorPTSMetrics {
return m.ExecutorPTSMetrics
}

var _ jobs.ScheduledJobExecutor = &scheduledBackupExecutor{}

// ExecuteJob implements jobs.ScheduledJobExecutor interface.
Expand Down Expand Up @@ -546,9 +552,11 @@ func init() {
tree.ScheduledBackupExecutor.InternalName(),
func() (jobs.ScheduledJobExecutor, error) {
m := jobs.MakeExecutorMetrics(tree.ScheduledBackupExecutor.UserName())
pm := jobs.MakeExecutorPTSMetrics(tree.ScheduledBackupExecutor.UserName())
return &scheduledBackupExecutor{
metrics: backupMetrics{
ExecutorMetrics: &m,
ExecutorMetrics: &m,
ExecutorPTSMetrics: &pm,
RpoMetric: metric.NewGauge(metric.Metadata{
Name: "schedules.BACKUP.last-completed-time",
Help: "The unix timestamp of the most recently completed backup by a schedule specified as maintaining this metric",
Expand Down
2 changes: 2 additions & 0 deletions pkg/jobs/metricspoller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ go_library(
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprotectedts",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/roachpb",
"//pkg/scheduledjobs",
"//pkg/security/username",
"//pkg/settings/cluster",
"//pkg/sql",
Expand Down
178 changes: 130 additions & 48 deletions pkg/jobs/metricspoller/job_statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
Expand Down Expand Up @@ -79,76 +81,114 @@ func updatePausedMetrics(ctx context.Context, execCtx sql.JobExecContext) error
return nil
}

// manageJobsProtectedTimestamps manages protected timestamp records owned by various jobs.
// This function mostly concerns itself with collecting statistics related to job PTS records.
// It also detects PTS records that are too old (as configured by the owner job) and requests
// job cancellation for those jobs.
func manageJobsProtectedTimestamps(ctx context.Context, execCtx sql.JobExecContext) error {
type ptsStat struct {
numRecords int64
expired int64
oldest hlc.Timestamp
}
type ptsStat struct {
numRecords int64
expired int64
oldest hlc.Timestamp
}

type schedulePTSStat struct {
ptsStat
m *jobs.ExecutorPTSMetrics
}

// manageProtectedTimestamps manages protected timestamp records owned by
// various jobs or schedules.. This function mostly concerns itself with
// collecting statistics related to job PTS records. It also detects PTS records
// that are too old (as configured by the owner job) and requests job
// cancellation for those jobs.
func manageProtectedTimestamps(ctx context.Context, execCtx sql.JobExecContext) error {
var ptsStats map[jobspb.Type]*ptsStat
var schedulePtsStats map[string]*schedulePTSStat

execCfg := execCtx.ExecCfg()
if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
ptsStats = make(map[jobspb.Type]*ptsStat)
schedulePtsStats = make(map[string]*schedulePTSStat)

ptsState, err := execCfg.ProtectedTimestampProvider.WithTxn(txn).GetState(ctx)
if err != nil {
return err
}
for _, rec := range ptsState.Records {
if rec.MetaType != jobsprotectedts.GetMetaType(jobsprotectedts.Jobs) {
continue
}
id, err := jobsprotectedts.DecodeID(rec.Meta)
if err != nil {
return err
}
j, err := execCfg.JobRegistry.LoadJobWithTxn(ctx, jobspb.JobID(id), txn)
if err != nil {
continue
}
p := j.Payload()
jobType, err := p.CheckType()
if err != nil {
return err
}
stats := ptsStats[jobType]
if stats == nil {
stats = &ptsStat{}
ptsStats[jobType] = stats
}
stats.numRecords++
if stats.oldest.IsEmpty() || rec.Timestamp.Less(stats.oldest) {
stats.oldest = rec.Timestamp
}

// If MaximumPTSAge is set on the job payload, verify if PTS record
// timestamp is fresh enough. Note: we only look at paused jobs.
// If the running job wants to enforce an invariant wrt to PTS age,
// it can do so itself. This check here is a safety mechanism to detect
// paused jobs that own protected timestamp records.
if j.Status() == jobs.StatusPaused &&
p.MaximumPTSAge > 0 &&
rec.Timestamp.GoTime().Add(p.MaximumPTSAge).Before(timeutil.Now()) {
stats.expired++
ptsExpired := errors.Newf(
"protected timestamp records %s as of %s (age %s) exceeds job configured limit of %s",
rec.ID, rec.Timestamp, timeutil.Since(rec.Timestamp.GoTime()), p.MaximumPTSAge)
if err := j.WithTxn(txn).CancelRequestedWithReason(ctx, ptsExpired); err != nil {
switch rec.MetaType {
case jobsprotectedts.GetMetaType(jobsprotectedts.Jobs):
if err := processJobPTSRecord(ctx, execCfg, id, rec, ptsStats, txn); err != nil {
return err
}
log.Warningf(ctx, "job %d canceled due to %s", id, ptsExpired)
case jobsprotectedts.GetMetaType(jobsprotectedts.Schedules):
if err := processSchedulePTSRecord(ctx, id, rec, schedulePtsStats, txn); err != nil {
return err
}
default:
continue
}
}
return nil
}); err != nil {
return err
}

jobMetrics := execCtx.ExecCfg().JobRegistry.MetricsStruct()
updateJobPTSMetrics(execCfg.JobRegistry.MetricsStruct(), execCfg.Clock, ptsStats)
updateSchedulesPTSMetrics(execCfg.Clock, schedulePtsStats)

return nil
}

func processJobPTSRecord(
ctx context.Context,
execCfg *sql.ExecutorConfig,
jobID int64,
rec ptpb.Record,
ptsStats map[jobspb.Type]*ptsStat,
txn isql.Txn,
) error {
j, err := execCfg.JobRegistry.LoadJobWithTxn(ctx, jobspb.JobID(jobID), txn)
if err != nil {
return nil // nolint:returnerrcheck -- job maybe deleted when we run; just keep going.
}
p := j.Payload()
jobType, err := p.CheckType()
if err != nil {
return err
}
stats := ptsStats[jobType]
if stats == nil {
stats = &ptsStat{}
ptsStats[jobType] = stats
}
stats.numRecords++
if stats.oldest.IsEmpty() || rec.Timestamp.Less(stats.oldest) {
stats.oldest = rec.Timestamp
}

// If MaximumPTSAge is set on the job payload, verify if PTS record
// timestamp is fresh enough. Note: we only look at paused jobs.
// If the running job wants to enforce an invariant wrt to PTS age,
// it can do so itself. This check here is a safety mechanism to detect
// paused jobs that own protected timestamp records.
if j.Status() == jobs.StatusPaused &&
p.MaximumPTSAge > 0 &&
rec.Timestamp.GoTime().Add(p.MaximumPTSAge).Before(timeutil.Now()) {
stats.expired++
ptsExpired := errors.Newf(
"protected timestamp records %s as of %s (age %s) exceeds job configured limit of %s",
rec.ID, rec.Timestamp, timeutil.Since(rec.Timestamp.GoTime()), p.MaximumPTSAge)
if err := j.WithTxn(txn).CancelRequestedWithReason(ctx, ptsExpired); err != nil {
return err
}
log.Warningf(ctx, "job %d canceled due to %s", jobID, ptsExpired)
}
return nil
}

func updateJobPTSMetrics(
jobMetrics *jobs.Metrics, clock *hlc.Clock, ptsStats map[jobspb.Type]*ptsStat,
) {
for typ := 0; typ < jobspb.NumJobTypes; typ++ {
if jobspb.Type(typ) == jobspb.TypeUnspecified { // do not track TypeUnspecified
continue
Expand All @@ -159,7 +199,7 @@ func manageJobsProtectedTimestamps(ctx context.Context, execCtx sql.JobExecConte
m.NumJobsWithPTS.Update(stats.numRecords)
m.ExpiredPTS.Inc(stats.expired)
if stats.oldest.WallTime > 0 {
m.ProtectedAge.Update((execCfg.Clock.Now().WallTime - stats.oldest.WallTime) / 1e9)
m.ProtectedAge.Update((clock.Now().WallTime - stats.oldest.WallTime) / 1e9)
} else {
m.ProtectedAge.Update(0)
}
Expand All @@ -170,6 +210,48 @@ func manageJobsProtectedTimestamps(ctx context.Context, execCtx sql.JobExecConte
m.ProtectedAge.Update(0)
}
}
}

func processSchedulePTSRecord(
ctx context.Context,
scheduleID int64,
rec ptpb.Record,
ptsStats map[string]*schedulePTSStat,
txn isql.Txn,
) error {
sj, err := jobs.ScheduledJobTxn(txn).
Load(ctx, scheduledjobs.ProdJobSchedulerEnv, scheduleID)
if jobs.HasScheduledJobNotFoundError(err) {
return nil // nolint:returnerrcheck -- schedule maybe deleted when we run; just keep going.
}
ex, err := jobs.GetScheduledJobExecutor(sj.ExecutorType())
if err != nil {
return err
}
pm, ok := ex.Metrics().(jobs.PTSMetrics)
if !ok {
return nil
}

stats := ptsStats[sj.ExecutorType()]
if stats == nil {
stats = &schedulePTSStat{m: pm.PTSMetrics()}
ptsStats[sj.ExecutorType()] = stats
}
stats.numRecords++
if stats.oldest.IsEmpty() || rec.Timestamp.Less(stats.oldest) {
stats.oldest = rec.Timestamp
}
return nil
}

func updateSchedulesPTSMetrics(clock *hlc.Clock, ptsStats map[string]*schedulePTSStat) {
for _, st := range ptsStats {
st.m.NumWithPTS.Update(st.numRecords)
if st.oldest.WallTime > 0 {
st.m.PTSAge.Update((clock.Now().WallTime - st.oldest.WallTime) / 1e9)
} else {
st.m.PTSAge.Update(0)
}
}
}
2 changes: 1 addition & 1 deletion pkg/jobs/metricspoller/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type pollerMetrics struct {
// of metrics poller.
var metricPollerTasks = map[string]func(ctx context.Context, execCtx sql.JobExecContext) error{
"paused-jobs": updatePausedMetrics,
"manage-pts": manageJobsProtectedTimestamps,
"manage-pts": manageProtectedTimestamps,
}

func (m pollerMetrics) MetricStruct() {}
Expand Down
38 changes: 38 additions & 0 deletions pkg/jobs/schedule_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/util/metric"
io_prometheus_client "github.com/prometheus/client_model/go"
)

// ExecutorMetrics describes metrics related to scheduled
Expand All @@ -24,11 +25,28 @@ type ExecutorMetrics struct {
NumFailed *metric.Counter
}

// ExecutorPTSMetrics describes metrics related to protected
// timestamp system for executors that maintain PTS records.
type ExecutorPTSMetrics struct {
NumWithPTS *metric.Gauge
PTSAge *metric.Gauge
}

// PTSMetrics is a marker interface indicating that executor metrics
// also keep track of PTS related metrics.
type PTSMetrics interface {
PTSMetrics() *ExecutorPTSMetrics
}

var _ metric.Struct = &ExecutorMetrics{}
var _ metric.Struct = &ExecutorPTSMetrics{}

// MetricStruct implements metric.Struct interface
func (m *ExecutorMetrics) MetricStruct() {}

// MetricStruct implements metric.Struct interface.
func (m *ExecutorPTSMetrics) MetricStruct() {}

// SchedulerMetrics are metrics specific to job scheduler daemon.
type SchedulerMetrics struct {
// Number of scheduled jobs started.
Expand Down Expand Up @@ -114,3 +132,23 @@ func MakeExecutorMetrics(name string) ExecutorMetrics {
}),
}
}

// MakeExecutorPTSMetrics creates PTS metrics.
func MakeExecutorPTSMetrics(name string) ExecutorPTSMetrics {
return ExecutorPTSMetrics{
NumWithPTS: metric.NewGauge(metric.Metadata{
Name: fmt.Sprintf("schedules.%s.protected_record_count", name),
Help: fmt.Sprintf("Number of PTS records held by %s schedules", name),
Measurement: "Records",
Unit: metric.Unit_COUNT,
MetricType: io_prometheus_client.MetricType_GAUGE,
}),
PTSAge: metric.NewGauge(metric.Metadata{
Name: fmt.Sprintf("schedules.%s.protected_age_sec", name),
Help: fmt.Sprintf("The age of the oldest PTS record protected by %s schedules", name),
Measurement: "Seconds",
Unit: metric.Unit_SECONDS,
MetricType: io_prometheus_client.MetricType_GAUGE,
}),
}
}
Loading