Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
96756: kvcoord: Eliminate 1 Go routine from MuxRangeFeed r=miretskiy a=miretskiy

Prior to this PR, the server side `MuxRangeFeed`
implementation spawned a separate Go routine executing
single RangeFeed for each incoming request.

This is wasteful and unnecessary.
Instead of blocking, and waiting for a single RangeFeed to complete,
have rangefeed related functions return a promise to return
a `*roachpb.Error` once rangefeed completes (`future.Future[*roachpb.Error]`).

Prior to this change MuxRangeFeed would spin up 4 Go routines
per range.  With this PR, the number is down to 3.
This improvement is particularly important when executing
rangefeed against large tables (10s-100s of thousands of ranges).

Informs #96395
Epic: None

Release note (enterprise change): Changefeeds running with
`changefeed.mux_rangefeed.enabled` setting set to true are
more efficient, particularly when executing against large tables.

97651: jobs,backupccl: Update metrics poller to handle schedule PTS records r=miretskiy a=miretskiy

Update metrics poller to keep track of PTS records held by scheduled jobs.

Epic: CRDB-21953

Release note: None

98587: server: status test filters out non-heap files r=knz,abarganier a=dhartunian

Resolves: #98213
Resolves: #47121

Epic: None
Release note: None

98588: multiregionccl: skip TestColdStartLatency r=ajwerner a=ajwerner

Epic: none

Informs #96334

Release note: None

Co-authored-by: Yevgeniy Miretskiy <yevgeniy@cockroachlabs.com>
Co-authored-by: David Hartunian <davidh@cockroachlabs.com>
Co-authored-by: ajwerner <awerner32@gmail.com>
  • Loading branch information
4 people committed Mar 14, 2023
5 parents a79338a + c3bac9d + 14d3fc9 + 5bef75a + 6d0ede1 commit 094df2b
Show file tree
Hide file tree
Showing 28 changed files with 928 additions and 261 deletions.
4 changes: 4 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ ALL_TESTS = [
"//pkg/util/errorutil:errorutil_test",
"//pkg/util/flagutil:flagutil_test",
"//pkg/util/fsm:fsm_test",
"//pkg/util/future:future_test",
"//pkg/util/fuzzystrmatch:fuzzystrmatch_test",
"//pkg/util/goschedstats:goschedstats_test",
"//pkg/util/grpcutil:grpcutil_test",
Expand Down Expand Up @@ -2121,6 +2122,8 @@ GO_TARGETS = [
"//pkg/util/flagutil:flagutil_test",
"//pkg/util/fsm:fsm",
"//pkg/util/fsm:fsm_test",
"//pkg/util/future:future",
"//pkg/util/future:future_test",
"//pkg/util/fuzzystrmatch:fuzzystrmatch",
"//pkg/util/fuzzystrmatch:fuzzystrmatch_test",
"//pkg/util/goschedstats:goschedstats",
Expand Down Expand Up @@ -3228,6 +3231,7 @@ GET_X_DATA_TARGETS = [
"//pkg/util/fileutil:get_x_data",
"//pkg/util/flagutil:get_x_data",
"//pkg/util/fsm:get_x_data",
"//pkg/util/future:get_x_data",
"//pkg/util/fuzzystrmatch:get_x_data",
"//pkg/util/goschedstats:get_x_data",
"//pkg/util/growstack:get_x_data",
Expand Down
13 changes: 11 additions & 2 deletions pkg/ccl/backupccl/create_scheduled_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
"github.com/cockroachdb/cockroach/pkg/scheduledjobs/schedulebase"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
Expand Down Expand Up @@ -96,6 +97,7 @@ func newTestHelper(t *testing.T) (*testHelper, func()) {
}

args := base.TestServerArgs{
Settings: cluster.MakeClusterSettings(),
ExternalIODir: dir,
// Some scheduled backup tests fail when run within a tenant. More
// investigation is required. Tracked with #76378.
Expand All @@ -104,6 +106,7 @@ func newTestHelper(t *testing.T) (*testHelper, func()) {
JobsTestingKnobs: knobs,
},
}
jobs.PollJobsMetricsInterval.Override(context.Background(), &args.Settings.SV, 250*time.Millisecond)
s, db, _ := serverutils.StartServer(t, args)
require.NotNil(t, th.cfg)
th.sqlDB = sqlutils.MakeSQLRunner(db)
Expand Down Expand Up @@ -1154,11 +1157,11 @@ INSERT INTO t values (1), (10), (100);
}
})

metrics := func() *jobs.ExecutorMetrics {
metrics := func() *backupMetrics {
ex, err := jobs.GetScheduledJobExecutor(tree.ScheduledBackupExecutor.InternalName())
require.NoError(t, err)
require.NotNil(t, ex.Metrics())
return ex.Metrics().(*backupMetrics).ExecutorMetrics
return ex.Metrics().(*backupMetrics)
}()

t.Run("retry", func(t *testing.T) {
Expand Down Expand Up @@ -1226,6 +1229,12 @@ INSERT INTO t values (1), (10), (100);
}
return errors.Newf("expected 2 backup to succeed, got %d", delta)
})
testutils.SucceedsSoon(t, func() error {
if metrics.NumWithPTS.Value() > 0 {
return nil
}
return errors.New("still waiting for pts count > 0")
})
})
}

Expand Down
10 changes: 9 additions & 1 deletion pkg/ccl/backupccl/schedule_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type scheduledBackupExecutor struct {

type backupMetrics struct {
*jobs.ExecutorMetrics
*jobs.ExecutorPTSMetrics
RpoMetric *metric.Gauge
}

Expand All @@ -44,6 +45,11 @@ var _ metric.Struct = &backupMetrics{}
// MetricStruct implements metric.Struct interface
func (m *backupMetrics) MetricStruct() {}

// PTSMetrics implements jobs.PTSMetrics interface.
func (m *backupMetrics) PTSMetrics() *jobs.ExecutorPTSMetrics {
return m.ExecutorPTSMetrics
}

var _ jobs.ScheduledJobExecutor = &scheduledBackupExecutor{}

// ExecuteJob implements jobs.ScheduledJobExecutor interface.
Expand Down Expand Up @@ -546,9 +552,11 @@ func init() {
tree.ScheduledBackupExecutor.InternalName(),
func() (jobs.ScheduledJobExecutor, error) {
m := jobs.MakeExecutorMetrics(tree.ScheduledBackupExecutor.UserName())
pm := jobs.MakeExecutorPTSMetrics(tree.ScheduledBackupExecutor.UserName())
return &scheduledBackupExecutor{
metrics: backupMetrics{
ExecutorMetrics: &m,
ExecutorMetrics: &m,
ExecutorPTSMetrics: &pm,
RpoMetric: metric.NewGauge(metric.Metadata{
Name: "schedules.BACKUP.last-completed-time",
Help: "The unix timestamp of the most recently completed backup by a schedule specified as maintaining this metric",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/multiregionccl/cold_start_latency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
func TestColdStartLatency(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.WithIssue(t, 96334)
skip.UnderRace(t, "too slow")
skip.UnderStress(t, "too slow")
defer envutil.TestSetEnv(t, "COCKROACH_MR_SYSTEM_DATABASE", "1")()
Expand Down
2 changes: 2 additions & 0 deletions pkg/jobs/metricspoller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ go_library(
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprotectedts",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/roachpb",
"//pkg/scheduledjobs",
"//pkg/security/username",
"//pkg/settings/cluster",
"//pkg/sql",
Expand Down
178 changes: 130 additions & 48 deletions pkg/jobs/metricspoller/job_statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
Expand Down Expand Up @@ -79,76 +81,114 @@ func updatePausedMetrics(ctx context.Context, execCtx sql.JobExecContext) error
return nil
}

// manageJobsProtectedTimestamps manages protected timestamp records owned by various jobs.
// This function mostly concerns itself with collecting statistics related to job PTS records.
// It also detects PTS records that are too old (as configured by the owner job) and requests
// job cancellation for those jobs.
func manageJobsProtectedTimestamps(ctx context.Context, execCtx sql.JobExecContext) error {
type ptsStat struct {
numRecords int64
expired int64
oldest hlc.Timestamp
}
type ptsStat struct {
numRecords int64
expired int64
oldest hlc.Timestamp
}

type schedulePTSStat struct {
ptsStat
m *jobs.ExecutorPTSMetrics
}

// manageProtectedTimestamps manages protected timestamp records owned by
// various jobs or schedules.. This function mostly concerns itself with
// collecting statistics related to job PTS records. It also detects PTS records
// that are too old (as configured by the owner job) and requests job
// cancellation for those jobs.
func manageProtectedTimestamps(ctx context.Context, execCtx sql.JobExecContext) error {
var ptsStats map[jobspb.Type]*ptsStat
var schedulePtsStats map[string]*schedulePTSStat

execCfg := execCtx.ExecCfg()
if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
ptsStats = make(map[jobspb.Type]*ptsStat)
schedulePtsStats = make(map[string]*schedulePTSStat)

ptsState, err := execCfg.ProtectedTimestampProvider.WithTxn(txn).GetState(ctx)
if err != nil {
return err
}
for _, rec := range ptsState.Records {
if rec.MetaType != jobsprotectedts.GetMetaType(jobsprotectedts.Jobs) {
continue
}
id, err := jobsprotectedts.DecodeID(rec.Meta)
if err != nil {
return err
}
j, err := execCfg.JobRegistry.LoadJobWithTxn(ctx, jobspb.JobID(id), txn)
if err != nil {
continue
}
p := j.Payload()
jobType, err := p.CheckType()
if err != nil {
return err
}
stats := ptsStats[jobType]
if stats == nil {
stats = &ptsStat{}
ptsStats[jobType] = stats
}
stats.numRecords++
if stats.oldest.IsEmpty() || rec.Timestamp.Less(stats.oldest) {
stats.oldest = rec.Timestamp
}

// If MaximumPTSAge is set on the job payload, verify if PTS record
// timestamp is fresh enough. Note: we only look at paused jobs.
// If the running job wants to enforce an invariant wrt to PTS age,
// it can do so itself. This check here is a safety mechanism to detect
// paused jobs that own protected timestamp records.
if j.Status() == jobs.StatusPaused &&
p.MaximumPTSAge > 0 &&
rec.Timestamp.GoTime().Add(p.MaximumPTSAge).Before(timeutil.Now()) {
stats.expired++
ptsExpired := errors.Newf(
"protected timestamp records %s as of %s (age %s) exceeds job configured limit of %s",
rec.ID, rec.Timestamp, timeutil.Since(rec.Timestamp.GoTime()), p.MaximumPTSAge)
if err := j.WithTxn(txn).CancelRequestedWithReason(ctx, ptsExpired); err != nil {
switch rec.MetaType {
case jobsprotectedts.GetMetaType(jobsprotectedts.Jobs):
if err := processJobPTSRecord(ctx, execCfg, id, rec, ptsStats, txn); err != nil {
return err
}
log.Warningf(ctx, "job %d canceled due to %s", id, ptsExpired)
case jobsprotectedts.GetMetaType(jobsprotectedts.Schedules):
if err := processSchedulePTSRecord(ctx, id, rec, schedulePtsStats, txn); err != nil {
return err
}
default:
continue
}
}
return nil
}); err != nil {
return err
}

jobMetrics := execCtx.ExecCfg().JobRegistry.MetricsStruct()
updateJobPTSMetrics(execCfg.JobRegistry.MetricsStruct(), execCfg.Clock, ptsStats)
updateSchedulesPTSMetrics(execCfg.Clock, schedulePtsStats)

return nil
}

func processJobPTSRecord(
ctx context.Context,
execCfg *sql.ExecutorConfig,
jobID int64,
rec ptpb.Record,
ptsStats map[jobspb.Type]*ptsStat,
txn isql.Txn,
) error {
j, err := execCfg.JobRegistry.LoadJobWithTxn(ctx, jobspb.JobID(jobID), txn)
if err != nil {
return nil // nolint:returnerrcheck -- job maybe deleted when we run; just keep going.
}
p := j.Payload()
jobType, err := p.CheckType()
if err != nil {
return err
}
stats := ptsStats[jobType]
if stats == nil {
stats = &ptsStat{}
ptsStats[jobType] = stats
}
stats.numRecords++
if stats.oldest.IsEmpty() || rec.Timestamp.Less(stats.oldest) {
stats.oldest = rec.Timestamp
}

// If MaximumPTSAge is set on the job payload, verify if PTS record
// timestamp is fresh enough. Note: we only look at paused jobs.
// If the running job wants to enforce an invariant wrt to PTS age,
// it can do so itself. This check here is a safety mechanism to detect
// paused jobs that own protected timestamp records.
if j.Status() == jobs.StatusPaused &&
p.MaximumPTSAge > 0 &&
rec.Timestamp.GoTime().Add(p.MaximumPTSAge).Before(timeutil.Now()) {
stats.expired++
ptsExpired := errors.Newf(
"protected timestamp records %s as of %s (age %s) exceeds job configured limit of %s",
rec.ID, rec.Timestamp, timeutil.Since(rec.Timestamp.GoTime()), p.MaximumPTSAge)
if err := j.WithTxn(txn).CancelRequestedWithReason(ctx, ptsExpired); err != nil {
return err
}
log.Warningf(ctx, "job %d canceled due to %s", jobID, ptsExpired)
}
return nil
}

func updateJobPTSMetrics(
jobMetrics *jobs.Metrics, clock *hlc.Clock, ptsStats map[jobspb.Type]*ptsStat,
) {
for typ := 0; typ < jobspb.NumJobTypes; typ++ {
if jobspb.Type(typ) == jobspb.TypeUnspecified { // do not track TypeUnspecified
continue
Expand All @@ -159,7 +199,7 @@ func manageJobsProtectedTimestamps(ctx context.Context, execCtx sql.JobExecConte
m.NumJobsWithPTS.Update(stats.numRecords)
m.ExpiredPTS.Inc(stats.expired)
if stats.oldest.WallTime > 0 {
m.ProtectedAge.Update((execCfg.Clock.Now().WallTime - stats.oldest.WallTime) / 1e9)
m.ProtectedAge.Update((clock.Now().WallTime - stats.oldest.WallTime) / 1e9)
} else {
m.ProtectedAge.Update(0)
}
Expand All @@ -170,6 +210,48 @@ func manageJobsProtectedTimestamps(ctx context.Context, execCtx sql.JobExecConte
m.ProtectedAge.Update(0)
}
}
}

func processSchedulePTSRecord(
ctx context.Context,
scheduleID int64,
rec ptpb.Record,
ptsStats map[string]*schedulePTSStat,
txn isql.Txn,
) error {
sj, err := jobs.ScheduledJobTxn(txn).
Load(ctx, scheduledjobs.ProdJobSchedulerEnv, scheduleID)
if jobs.HasScheduledJobNotFoundError(err) {
return nil // nolint:returnerrcheck -- schedule maybe deleted when we run; just keep going.
}
ex, err := jobs.GetScheduledJobExecutor(sj.ExecutorType())
if err != nil {
return err
}
pm, ok := ex.Metrics().(jobs.PTSMetrics)
if !ok {
return nil
}

stats := ptsStats[sj.ExecutorType()]
if stats == nil {
stats = &schedulePTSStat{m: pm.PTSMetrics()}
ptsStats[sj.ExecutorType()] = stats
}
stats.numRecords++
if stats.oldest.IsEmpty() || rec.Timestamp.Less(stats.oldest) {
stats.oldest = rec.Timestamp
}
return nil
}

func updateSchedulesPTSMetrics(clock *hlc.Clock, ptsStats map[string]*schedulePTSStat) {
for _, st := range ptsStats {
st.m.NumWithPTS.Update(st.numRecords)
if st.oldest.WallTime > 0 {
st.m.PTSAge.Update((clock.Now().WallTime - st.oldest.WallTime) / 1e9)
} else {
st.m.PTSAge.Update(0)
}
}
}
2 changes: 1 addition & 1 deletion pkg/jobs/metricspoller/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type pollerMetrics struct {
// of metrics poller.
var metricPollerTasks = map[string]func(ctx context.Context, execCtx sql.JobExecContext) error{
"paused-jobs": updatePausedMetrics,
"manage-pts": manageJobsProtectedTimestamps,
"manage-pts": manageProtectedTimestamps,
}

func (m pollerMetrics) MetricStruct() {}
Expand Down
Loading

0 comments on commit 094df2b

Please sign in to comment.