From da74375968ee4a604e2c11ff2c2a16156df71b4d Mon Sep 17 00:00:00 2001 From: Spas Bojanov Date: Wed, 28 Aug 2019 11:21:37 -0400 Subject: [PATCH] jobs: make new index backfill status tracking bwd compatible Currently if a new node participates in a cluster with old version nodes the index backfill will be busy waiting since the different versions use different ways of reporting status. We detect this situation and revert to the old status reporting style if necessary. Touches #36601. Release note: None --- pkg/sql/backfill.go | 19 +++++++++++++++++++ pkg/sql/distsqlrun/backfiller.go | 14 ++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index 193947e037f1..e6e89146ef58 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -704,6 +704,7 @@ func (sc *SchemaChanger) distBackfill( filter backfill.MutationFilter, targetSpans []roachpb.Span, ) error { + inMemoryStatusEnabled := sc.execCfg.Settings.Version.IsActive(cluster.VersionAtomicChangeReplicasTrigger) duration := checkpointInterval if sc.testingKnobs.WriteCheckpointInterval > 0 { duration = sc.testingKnobs.WriteCheckpointInterval @@ -879,6 +880,24 @@ func (sc *SchemaChanger) distBackfill( }); err != nil { return err } + if !inMemoryStatusEnabled { + var resumeSpans []roachpb.Span + // There is a worker node of older version that will communicate + // its done work by writing to the jobs table. + // In this case we intersect todoSpans with what the old node(s) + // have set in the jobs table not to overwrite their done work. + if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { + var err error + resumeSpans, _, _, err = distsqlrun.GetResumeSpans( + ctx, sc.jobRegistry, txn, sc.tableID, sc.mutationID, filter) + return err + }); err != nil { + return err + } + // A \intersect B = A - (A - B) + todoSpans = roachpb.SubtractSpans(todoSpans, roachpb.SubtractSpans(todoSpans, resumeSpans)) + + } // Record what is left to do for the job. // TODO(spaskob): Execute this at a regular cadence. if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { diff --git a/pkg/sql/distsqlrun/backfiller.go b/pkg/sql/distsqlrun/backfiller.go index af2aa2de4e63..a1f8e7a04fec 100644 --- a/pkg/sql/distsqlrun/backfiller.go +++ b/pkg/sql/distsqlrun/backfiller.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/backfill" "github.com/cockroachdb/cockroach/pkg/sql/distsqlpb" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" @@ -134,6 +135,19 @@ func (b *backfiller) doRun(ctx context.Context) *distsqlpb.ProducerMetadata { if err != nil { return &distsqlpb.ProducerMetadata{Err: err} } + if !b.flowCtx.Cfg.Settings.Version.IsActive(cluster.VersionAtomicChangeReplicasTrigger) { + // There is a node of older version which could be the coordinator. + // So we communicate the finished work by writing to the jobs row. + err = WriteResumeSpan(ctx, + b.flowCtx.Cfg.DB, + b.spec.Table.ID, + b.spec.Table.Mutations[0].MutationID, + b.filter, + finishedSpans, + b.flowCtx.Cfg.JobRegistry, + ) + return &distsqlpb.ProducerMetadata{Err: err} + } var prog distsqlpb.RemoteProducerMetadata_BulkProcessorProgress prog.CompletedSpans = append(prog.CompletedSpans, finishedSpans...) return &distsqlpb.ProducerMetadata{BulkProcessorProgress: &prog}