Skip to content

Commit

Permalink
Merge #40288
Browse files Browse the repository at this point in the history
40288: jobs: make new index backfill status tracking bwd compatible r=spaskob a=spaskob

Currently if a new node participates in a cluster with old version nodes the index backfill
will be busy waiting since the different versions use different ways of reporting status.
We detect this situation and revert to the old status reporting style if necessary.

Touches #36601.

Release note: None

Co-authored-by: Spas Bojanov <spas@cockroachlabs.com>
  • Loading branch information
craig[bot] and Spas Bojanov committed Aug 29, 2019
2 parents d5b6b97 + da74375 commit 4fbdcad
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 0 deletions.
19 changes: 19 additions & 0 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,7 @@ func (sc *SchemaChanger) distBackfill(
filter backfill.MutationFilter,
targetSpans []roachpb.Span,
) error {
inMemoryStatusEnabled := sc.execCfg.Settings.Version.IsActive(cluster.VersionAtomicChangeReplicasTrigger)
duration := checkpointInterval
if sc.testingKnobs.WriteCheckpointInterval > 0 {
duration = sc.testingKnobs.WriteCheckpointInterval
Expand Down Expand Up @@ -879,6 +880,24 @@ func (sc *SchemaChanger) distBackfill(
}); err != nil {
return err
}
if !inMemoryStatusEnabled {
var resumeSpans []roachpb.Span
// There is a worker node of older version that will communicate
// its done work by writing to the jobs table.
// In this case we intersect todoSpans with what the old node(s)
// have set in the jobs table not to overwrite their done work.
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
var err error
resumeSpans, _, _, err = distsqlrun.GetResumeSpans(
ctx, sc.jobRegistry, txn, sc.tableID, sc.mutationID, filter)
return err
}); err != nil {
return err
}
// A \intersect B = A - (A - B)
todoSpans = roachpb.SubtractSpans(todoSpans, roachpb.SubtractSpans(todoSpans, resumeSpans))

}
// Record what is left to do for the job.
// TODO(spaskob): Execute this at a regular cadence.
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
Expand Down
14 changes: 14 additions & 0 deletions pkg/sql/distsqlrun/backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/backfill"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
Expand Down Expand Up @@ -134,6 +135,19 @@ func (b *backfiller) doRun(ctx context.Context) *distsqlpb.ProducerMetadata {
if err != nil {
return &distsqlpb.ProducerMetadata{Err: err}
}
if !b.flowCtx.Cfg.Settings.Version.IsActive(cluster.VersionAtomicChangeReplicasTrigger) {
// There is a node of older version which could be the coordinator.
// So we communicate the finished work by writing to the jobs row.
err = WriteResumeSpan(ctx,
b.flowCtx.Cfg.DB,
b.spec.Table.ID,
b.spec.Table.Mutations[0].MutationID,
b.filter,
finishedSpans,
b.flowCtx.Cfg.JobRegistry,
)
return &distsqlpb.ProducerMetadata{Err: err}
}
var prog distsqlpb.RemoteProducerMetadata_BulkProcessorProgress
prog.CompletedSpans = append(prog.CompletedSpans, finishedSpans...)
return &distsqlpb.ProducerMetadata{BulkProcessorProgress: &prog}
Expand Down

0 comments on commit 4fbdcad

Please sign in to comment.