Skip to content

Commit

Permalink
jobs: avoid crdb_internal.system_jobs in gc-jobs
Browse files Browse the repository at this point in the history
The crdb_internal.system_jobs is a virtual table that joins
information from the jobs table and the jobs_info table.

For the previous query,

    SELECT id, payload, status FROM "".crdb_internal.system_jobs
    WHERE (created < $1) AND (id > $2)
    ORDER BY id
    LIMIT $3

this is a little suboptimal because:

- We don't make use of the progress column so any read of that is
  useless.

- While the crdb_internal.virtual table has a virtual index on job id,
  and EXPLAIN will even claim that it will be used:

      • limit
      │ count: 100
      │
      └── • filter
          │ filter: created < '2023-07-20 07:29:01.17001'
          │
          └── • virtual table
                table: system_jobs@system_jobs_id_idx
                spans: [/101 - ]

  This is actually a lie. A virtual index can only handle single-key
  spans. As a result the unconstrained query is used:

    WITH
        latestpayload AS (SELECT job_id, value FROM system.job_info AS payload WHERE info_key = 'legacy_payload' ORDER BY written DESC),
        latestprogress AS (SELECT job_id, value FROM system.job_info AS progress WHERE info_key = 'legacy_progress' ORDER BY written DESC)
    SELECT
       DISTINCT(id), status, created, payload.value AS payload, progress.value AS progress,
                created_by_type, created_by_id, claim_session_id, claim_instance_id, num_runs, last_run, job_type
    FROM system.jobs AS j
    INNER JOIN latestpayload AS payload ON j.id = payload.job_id
    LEFT JOIN latestprogress AS progress ON j.id = progress.job_id

  which has a full scan of the jobs table and 2 full scans of the info
  table:

      • distinct
      │ distinct on: id, value, value
      │
      └── • merge join
          │ equality: (job_id) = (id)
          │
          ├── • render
          │   │
          │   └── • filter
          │       │ estimated row count: 7,318
          │       │ filter: info_key = 'legacy_payload'
          │       │
          │       └── • scan
          │             estimated row count: 14,648 (100% of the table; stats collected 39 minutes ago; using stats forecast for 2 hours in the future)
          │             table: job_info@primary
          │             spans: FULL SCAN
          │
          └── • merge join (right outer)
              │ equality: (job_id) = (id)
              │ right cols are key
              │
              ├── • render
              │   │
              │   └── • filter
              │       │ estimated row count: 7,317
              │       │ filter: info_key = 'legacy_progress'
              │       │
              │       └── • scan
              │             estimated row count: 14,648 (100% of the table; stats collected 39 minutes ago; using stats forecast for 2 hours in the future)
              │             table: job_info@primary
              │             spans: FULL SCAN
              │
              └── • scan
                    missing stats
                    table: jobs@primary
                    spans: FULL SCAN

  Because of the limit, I don't think this ends up being as bad as it
  looks. But it still isn't great.

In this PR, we replace crdb_internal.jobs with a query that removes
the join on the unused progress field and also constrains the query of
the job_info table.

      • distinct
      │ distinct on: id, value
      │
      └── • merge join
          │ equality: (job_id) = (id)
          │ right cols are key
          │
          ├── • render
          │   │
          │   └── • filter
          │       │ estimated row count: 7,318
          │       │ filter: info_key = 'legacy_payload'
          │       │
          │       └── • scan
          │             estimated row count: 14,646 (100% of the table; stats collected 45 minutes ago; using stats forecast for 2 hours in the future)
          │             table: job_info@primary
          │             spans: [/101/'legacy_payload' - ]
          │
          └── • render
              │
              └── • limit
                  │ count: 100
                  │
                  └── • filter
                      │ filter: created < '2023-07-20 07:29:01.17001'
                      │
                      └── • scan
                            missing stats
                            table: jobs@primary
                            spans: [/101 - ]

In a local example, this does seem faster:

    > SELECT id, payload, status, created
    > FROM "".crdb_internal.system_jobs
    > WHERE (created < '2023-07-20 07:29:01.17001') AND (id > 100) ORDER BY id LIMIT 100;

    id | payload | status | created
    -----+---------+--------+----------
    (0 rows)

    Time: 183ms total (execution 183ms / network 0ms)

    > WITH
    > latestpayload AS (
    >     SELECT job_id, value
    >     FROM system.job_info AS payload
    >     WHERE job_id > 100 AND info_key = 'legacy_payload'
    >     ORDER BY written desc
    > ),
    > jobpage AS (
    >     SELECT id, status, created
    >     FROM system.jobs
    >     WHERE (created < '2023-07-20 07:29:01.17001') and (id > 100)
    >     ORDER BY id
    >     LIMIT 100
    > )
    > SELECT distinct (id), latestpayload.value AS payload, status
    > FROM jobpage AS j
    > INNER JOIN latestpayload ON j.id = latestpayload.job_id;
      id | payload | status
    -----+---------+---------
    (0 rows)

    Time: 43ms total (execution 42ms / network 0ms)

Release note: None

Epic: none
  • Loading branch information
stevendanna committed May 15, 2024
1 parent e39b9ff commit cbb28c7
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 31 deletions.
1 change: 0 additions & 1 deletion pkg/jobs/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ func (j *Job) TestingCurrentStatus(ctx context.Context) (Status, error) {
const (
AdoptQuery = claimQuery
CancelQuery = pauseAndCancelUpdate
GcQuery = expiredJobsQuery
RemoveClaimsQuery = removeClaimsForDeadSessionsQuery
ProcessJobsQuery = processQueryWithBackoff
IntervalBaseSettingKey = intervalBaseSettingKey
Expand Down
33 changes: 29 additions & 4 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -1240,20 +1240,45 @@ func (r *Registry) cleanupOldJobs(ctx context.Context, olderThan time.Time) erro

// The ordering is important as we keep track of the maximum ID we've seen.
const expiredJobsQuery = `
SELECT id, payload, status, created FROM "".crdb_internal.system_jobs
SELECT id, payload, status FROM "".crdb_internal.system_jobs
WHERE (created < $1) AND (id > $2)
ORDER BY id
LIMIT $3
`
LIMIT $3`

const expiredJobsQueryWithJobInfoTable = `
WITH
latestpayload AS (
SELECT job_id, value
FROM system.job_info AS payload
WHERE job_id > $2 AND info_key = 'legacy_payload'
ORDER BY written desc
),
jobpage AS (
SELECT id, status
FROM system.jobs
WHERE (created < $1) and (id > $2)
ORDER BY id
LIMIT $3
)
SELECT distinct (id), latestpayload.value AS payload, status
FROM jobpage AS j
INNER JOIN latestpayload ON j.id = latestpayload.job_id`

// cleanupOldJobsPage deletes up to cleanupPageSize job rows with ID > minID.
// minID is supposed to be the maximum ID returned by the previous page (0 if no
// previous page).
func (r *Registry) cleanupOldJobsPage(
ctx context.Context, olderThan time.Time, minID jobspb.JobID, pageSize int,
) (done bool, maxID jobspb.JobID, retErr error) {
var query string
if r.settings.Version.IsActive(ctx, clusterversion.V23_1JobInfoTableIsBackfilled) {
query = expiredJobsQueryWithJobInfoTable
} else {
query = expiredJobsQuery
}

it, err := r.db.Executor().QueryIterator(ctx, "gc-jobs", nil, /* txn */
expiredJobsQuery, olderThan, minID, pageSize)
query, olderThan, minID, pageSize)
if err != nil {
return false, 0, err
}
Expand Down
63 changes: 37 additions & 26 deletions pkg/jobs/registry_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,13 @@ func TestRegistrySettingUpdate(t *testing.T) {
}

for _, test := range [...]struct {
name string // Test case ID.
setting string // Cluster setting key.
value interface{} // Duration when expecting a large number of job runs.
matchStmt string // SQL statement to match to identify the target job.
initCount int // Initial number of jobs to ignore at the beginning of the test.
toOverride *settings.DurationSetting
name string // Test case ID.
setting string // Cluster setting key.
value interface{} // Duration when expecting a large number of job runs.
matchStmt string // SQL statement to match to identify the target job.
matchAppName string
initCount int // Initial number of jobs to ignore at the beginning of the test.
toOverride *settings.DurationSetting
}{
{
name: "adopt setting",
Expand Down Expand Up @@ -240,33 +241,44 @@ func TestRegistrySettingUpdate(t *testing.T) {
toOverride: jobs.CancelIntervalSetting,
},
{
name: "gc setting",
setting: jobs.GcIntervalSettingKey,
value: shortDuration,
matchStmt: jobs.GcQuery,
initCount: 0,
toOverride: jobs.GcIntervalSetting,
name: "gc setting",
setting: jobs.GcIntervalSettingKey,
value: shortDuration,
matchAppName: "$ internal-gc-jobs",
initCount: 0,
toOverride: jobs.GcIntervalSetting,
},
{
name: "gc setting with base",
setting: jobs.IntervalBaseSettingKey,
value: shortDurationBase,
matchStmt: jobs.GcQuery,
initCount: 0,
toOverride: jobs.GcIntervalSetting,
name: "gc setting with base",
setting: jobs.IntervalBaseSettingKey,
value: shortDurationBase,
matchAppName: "$ internal-gc-jobs",
initCount: 0,
toOverride: jobs.GcIntervalSetting,
},
} {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
// Replace multiple white spaces with a single space, remove the last ';', and
// trim leading and trailing spaces.
matchStmt := strings.TrimSpace(regexp.MustCompile(`(\s+|;+)`).ReplaceAllString(test.matchStmt, " "))
var stmtMatcher func(*sessiondata.SessionData, string) bool
if test.matchAppName != "" {
stmtMatcher = func(sd *sessiondata.SessionData, _ string) bool {
return sd.ApplicationName == test.matchAppName
}
} else {
// Replace multiple white spaces with a single space, remove the last ';', and
// trim leading and trailing spaces.
matchStmt := strings.TrimSpace(regexp.MustCompile(`(\s+|;+)`).ReplaceAllString(test.matchStmt, " "))
stmtMatcher = func(_ *sessiondata.SessionData, stmt string) bool {
return stmt == matchStmt
}
}

var seen = int32(0)
stmtFilter := func(ctxt context.Context, _ *sessiondata.SessionData, stmt string, err error) {
stmtFilter := func(_ context.Context, sd *sessiondata.SessionData, stmt string, err error) {
if err != nil {
return
}
if stmt == matchStmt {
if stmtMatcher(sd, stmt) {
atomic.AddInt32(&seen, 1)
}
}
Expand Down Expand Up @@ -329,13 +341,12 @@ func TestGCDurationControl(t *testing.T) {
//
// Replace multiple white spaces with a single space, remove the last ';', and
// trim leading and trailing spaces.
gcStmt := strings.TrimSpace(regexp.MustCompile(`(\s+|;+)`).ReplaceAllString(jobs.GcQuery, " "))
var seen = int32(0)
stmtFilter := func(ctxt context.Context, _ *sessiondata.SessionData, stmt string, err error) {
stmtFilter := func(_ context.Context, sd *sessiondata.SessionData, _ string, err error) {
if err != nil {
return
}
if stmt == gcStmt {
if sd.ApplicationName == "$ internal-gc-jobs" {
atomic.AddInt32(&seen, 1)
}
}
Expand Down

0 comments on commit cbb28c7

Please sign in to comment.