Skip to content

Commit

Permalink
upgrades,clusterversion: write a migration to wait for the GC job to …
Browse files Browse the repository at this point in the history
…upgrade

Release note: None
  • Loading branch information
ajwerner committed Aug 14, 2022
1 parent 85590e2 commit de12cee
Show file tree
Hide file tree
Showing 12 changed files with 330 additions and 9 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -286,4 +286,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 22.1-56 set the active cluster version in the format '<major>.<minor>'
version version 22.1-58 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,6 @@
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.span_registry.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://<ui>/#/debug/tracez</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>22.1-56</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>22.1-58</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
7 changes: 7 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,9 @@ const (
// and then waits for the data to be removed automatically before removing
// the descriptor and zone configurations.
UseDelRangeInGCJob
// WaitedForDelRangeInGCJob corresponds to the migration which waits for
// the GC jobs to adopt the use of DelRange with tombstones.
WaitedForDelRangeInGCJob

// *************************************************
// Step (1): Add new versions here.
Expand Down Expand Up @@ -535,6 +538,10 @@ var versionsSingleton = keyedVersions{
Key: UseDelRangeInGCJob,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 56},
},
{
Key: WaitedForDelRangeInGCJob,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 58},
},
// *************************************************
// Step (2): Add new versions here.
// Do not add new versions to a patch release.
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 19 additions & 1 deletion pkg/sql/gcjob/gc_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func waitForGC(
switch {
case details.Indexes != nil:
return errors.Wrap(
waitForIndexGC(ctx, execCfg, details.ParentID, progress),
deleteIndexZoneConfigsAfterGC(ctx, execCfg, details.ParentID, progress),
"attempting to delete index data",
)
case details.Tables != nil:
Expand Down Expand Up @@ -415,6 +415,16 @@ func (r schemaChangeGCResumer) legacyWaitAndClearTableData(

gossipUpdateC, cleanup := execCfg.GCJobNotifier.AddNotifyee(ctx)
defer cleanup()

// Now that we've registered to be notified, check to see if we raced
// with the new version becoming active.
//
// TODO(ajwerner): Adopt the DeleteRange protocol for tenant GC.
if details.Tenant == nil &&
execCfg.Settings.Version.IsActive(ctx, clusterversion.UseDelRangeInGCJob) {
return r.deleteDataAndWaitForGC(ctx, execCfg, details, progress)
}

var timerDuration time.Duration
ts := timeutil.DefaultTimeSource{}

Expand All @@ -425,6 +435,14 @@ func (r schemaChangeGCResumer) legacyWaitAndClearTableData(
); err != nil {
return err
}
// We'll be notified if the new version becomes active, so check and
// see if it's now time to change to the new protocol.
//
// TODO(ajwerner): Adopt the DeleteRange protocol for tenant GC.
if details.Tenant == nil &&
execCfg.Settings.Version.IsActive(ctx, clusterversion.UseDelRangeInGCJob) {
return r.deleteDataAndWaitForGC(ctx, execCfg, details, progress)
}

// Refresh the status of all elements in case any GC TTLs have changed.
var expired bool
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/gcjob/gcjobnotifier/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/sql/gcjob/gcjobnotifier",
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/config",
"//pkg/gossip",
"//pkg/keys",
Expand Down
17 changes: 17 additions & 0 deletions pkg/sql/gcjob/gcjobnotifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package gcjobnotifier
import (
"context"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down Expand Up @@ -69,6 +70,8 @@ func (n *Notifier) SystemConfigProvider() config.SystemConfigProvider {
func noopFunc() {}

// AddNotifyee should be called prior to the first reading of the system config.
// The returned channel will also receive a notification if the cluster version
// UseDelRangeInGCJob is activated.
//
// TODO(lucy,ajwerner): Currently we're calling refreshTables on every zone
// config update to any table. We should really be only updating a cached
Expand Down Expand Up @@ -141,10 +144,21 @@ func (n *Notifier) Start(ctx context.Context) {
func (n *Notifier) run(_ context.Context) {
defer n.markStopped()
systemConfigUpdateCh, _ := n.provider.RegisterSystemConfigChannel()
var haveNotified bool
versionSettingChanged := make(chan struct{}, 1)
versionBeingWaited := clusterversion.ByKey(clusterversion.UseDelRangeInGCJob)
n.settings.Version.SetOnChange(func(ctx context.Context, newVersion clusterversion.ClusterVersion) {
if !haveNotified && versionBeingWaited.LessEq(newVersion.Version) {
haveNotified = true
versionSettingChanged <- struct{}{}
}
})
for {
select {
case <-n.stopper.ShouldQuiesce():
return
case <-versionSettingChanged:
n.notify()
case <-systemConfigUpdateCh:
n.maybeNotify()
}
Expand All @@ -170,7 +184,10 @@ func (n *Notifier) maybeNotify() {
if !zoneConfigUpdated {
return
}
n.notify()
}

func (n *Notifier) notify() {
for c := range n.mu.notifyees {
select {
case c <- struct{}{}:
Expand Down
12 changes: 8 additions & 4 deletions pkg/sql/gcjob/index_garbage_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,15 @@ func clearIndex(
return clearOrDeleteSpanData(ctx, execCfg.DB, execCfg.DistSender, rSpan)
}

func waitForIndexGC(
func deleteIndexZoneConfigsAfterGC(
ctx context.Context,
execCfg *sql.ExecutorConfig,
parentID descpb.ID,
progress *jobspb.SchemaChangeGCProgress,
) error {

for _, index := range progress.Indexes {
if index.Status != jobspb.SchemaChangeGCProgress_CLEARING {
if index.Status == jobspb.SchemaChangeGCProgress_CLEARED {
continue
}

Expand Down Expand Up @@ -212,10 +212,14 @@ func waitForIndexGC(
ctx, txn, execCfg, descriptors, freshParentTableDesc, []uint32{uint32(index.IndexID)},
)
}
if err := sql.DescsTxn(ctx, execCfg, removeIndexZoneConfigs); err != nil {
err := sql.DescsTxn(ctx, execCfg, removeIndexZoneConfigs)
switch {
case errors.Is(err, catalog.ErrDescriptorNotFound):
log.Infof(ctx, "removing index %d zone config from table %d failed: %v",
index.IndexID, parentID, err)
case err != nil:
return errors.Wrapf(err, "removing index %d zone configs", index.IndexID)
}

markIndexGCed(
ctx, index.IndexID, progress, jobspb.SchemaChangeGCProgress_CLEARED,
)
Expand Down
6 changes: 6 additions & 0 deletions pkg/upgrade/upgrades/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ go_library(
"system_users_role_id_migration.go",
"upgrade_sequence_to_be_referenced_by_ID.go",
"upgrades.go",
"wait_for_del_range_in_gc_job.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/upgrade/upgrades",
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv",
"//pkg/security/username",
Expand All @@ -40,13 +42,16 @@ go_library(
"//pkg/sql/catalog/systemschema",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/parser",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlutil",
"//pkg/upgrade",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/retry",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_kr_pretty//:pretty",
Expand All @@ -71,6 +76,7 @@ go_test(
"sampled_stmt_diagnostics_requests_test.go",
"system_privileges_test.go",
"upgrade_sequence_to_be_referenced_by_ID_external_test.go",
"wait_for_del_range_in_gc_job_test.go",
],
data = glob(["testdata/**"]),
embed = [":upgrades"],
Expand Down
5 changes: 5 additions & 0 deletions pkg/upgrade/upgrades/upgrades.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ var upgrades = []upgrade.Upgrade{
NoPrecondition,
setSystemRoleOptionsUserIDColumnNotNull,
),
upgrade.NewTenantUpgrade("ensure all GC jobs send DeleteRange requests",
toCV(clusterversion.WaitedForDelRangeInGCJob),
checkForPausedGCJobs,
waitForDelRangeInGCJob,
),
}

func init() {
Expand Down
134 changes: 134 additions & 0 deletions pkg/upgrade/upgrades/wait_for_del_range_in_gc_job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package upgrades

import (
"context"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/upgrade"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/errors"
)

// waitForDelRangeInGCJob ensures that any running GC jobs have adopted the new
// DeleteRange protocol.
func waitForDelRangeInGCJob(
ctx context.Context, _ clusterversion.ClusterVersion, deps upgrade.TenantDeps, _ *jobs.Job,
) error {
for r := retry.StartWithCtx(ctx, retry.Options{}); r.Next(); {
jobIDs, err := collectJobIDsFromQuery(
ctx, deps.InternalExecutor, "wait-for-gc-job-upgrades", `
WITH jobs AS (
SELECT *
FROM (
SELECT id,
crdb_internal.pb_to_json('payload', payload) AS pl,
crdb_internal.pb_to_json('progress', progress) AS progress,
`+jobs.NextRunClause+` AS next_run,
args.max_delay as max_delay
FROM system.jobs,
(
SELECT (
SELECT value
FROM crdb_internal.cluster_settings
WHERE variable = 'jobs.registry.retry.initial_delay'
)::INTERVAL::FLOAT8 AS initial_delay,
(
SELECT value
FROM crdb_internal.cluster_settings
WHERE variable = 'jobs.registry.retry.max_delay'
)::INTERVAL::FLOAT8 AS max_delay
) AS args
WHERE status IN `+jobs.NonTerminalStatusTupleString+`
)
WHERE (pl->'schemaChangeGC') IS NOT NULL
AND (
next_run < (now() + '5m'::INTERVAL)
OR max_delay < '5m'::INTERVAL::FLOAT8
)
),
tables AS (
SELECT id,
json_array_elements(pl->'schemaChangeGC'->'tables') AS pl,
json_array_elements(progress->'schemaChangeGC'->'tables') AS progress
FROM jobs
),
indexes AS (
SELECT id,
json_array_elements(pl->'schemaChangeGC'->'indexes'),
json_array_elements(progress->'schemaChangeGC'->'indexes')
FROM jobs
),
elements AS (SELECT * FROM tables UNION ALL SELECT * FROM indexes)
SELECT id
FROM elements
WHERE COALESCE(progress->>'status' NOT IN ('WAITING_FOR_MVCC_GC', 'CLEARED'), true)
GROUP BY id;
`)
if err != nil || len(jobIDs) == 0 {
return err
}
log.Infof(ctx, "waiting for %d GC jobs to adopt the new protocol: %v", len(jobIDs), jobIDs)
}
return ctx.Err()
}

// If there are any paused GC jobs, they are going to make this migration
// take forever, so we should detect them and give the user the opportunity
// to unpause them with a clear error.
func checkForPausedGCJobs(
ctx context.Context, version clusterversion.ClusterVersion, deps upgrade.TenantDeps,
) (retErr error) {
jobIDs, err := collectJobIDsFromQuery(
ctx, deps.InternalExecutor, "check-for-paused-gc-jobs", `
SELECT job_id
FROM crdb_internal.jobs
WHERE job_type = 'SCHEMA CHANGE GC'
AND status IN ('paused', 'pause-requested')`)
if err != nil {
return err
}
if len(jobIDs) > 0 {
return errors.WithHint(pgerror.Newf(pgcode.ObjectNotInPrerequisiteState,
"paused GC jobs prevent upgrading GC job behavior: %v", jobIDs),
"unpause the jobs to allow the upgrade to proceed")
}
return nil
}

// collectJobIDsFromQuery is a helper to execute a query which returns rows
// where the first column is a jobID and returns the job IDs from those rows.
func collectJobIDsFromQuery(
ctx context.Context, ie sqlutil.InternalExecutor, opName string, query string,
) (jobIDs []jobspb.JobID, retErr error) {
it, err := ie.QueryIteratorEx(ctx, opName, nil, /* txn */
sessiondata.InternalExecutorOverride{
User: username.NodeUserName(),
}, query)
if err != nil {
return nil, err
}
defer func() { retErr = errors.CombineErrors(retErr, it.Close()) }()
for ok, err := it.Next(ctx); ok && err == nil; ok, err = it.Next(ctx) {
jobIDs = append(jobIDs, jobspb.JobID(tree.MustBeDInt(it.Cur()[0])))
}
return jobIDs, err
}
Loading

0 comments on commit de12cee

Please sign in to comment.