Skip to content

Commit

Permalink
streamingest: add ctx cancelation to support cutting over
Browse files Browse the repository at this point in the history
This change adds logic to leverage context cancelation as a means of
trigerring the RevertRange logic that is run when the job is requested
to cutover.

This change also adds a poller to check if the job has been marked for
completion.

Release note: None
  • Loading branch information
pbardea authored and adityamaru committed Feb 23, 2021
1 parent eab1ae3 commit 90ec8f3
Show file tree
Hide file tree
Showing 8 changed files with 256 additions and 76 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdctest/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func NewOrderValidator(topic string) Validator {
}
}

// NewStreamOrderValidator wraps and orderValidator as described above, and
// NewStreamOrderValidator wraps an orderValidator as described above, and
// exposes additional methods for introspection.
func NewStreamOrderValidator() StreamValidator {
return &orderValidator{
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_library(
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/storage",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/protoutil",
Expand Down Expand Up @@ -67,6 +68,7 @@ go_test(
"//pkg/ccl/storageccl",
"//pkg/ccl/streamingccl",
"//pkg/ccl/streamingccl/streamclient",
"//pkg/ccl/streamingccl/streamingutils",
"//pkg/ccl/utilccl",
"//pkg/jobs",
"//pkg/jobs/jobspb",
Expand Down
140 changes: 111 additions & 29 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package streamingest

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
Expand All @@ -19,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -70,23 +72,89 @@ func ingest(
}

// Plan and run the DistSQL flow.
err = distStreamIngest(ctx, execCtx, nodes, jobID, planCtx, dsp, streamIngestionSpecs,
return distStreamIngest(ctx, execCtx, nodes, jobID, planCtx, dsp, streamIngestionSpecs,
streamIngestionFrontierSpec)
if err != nil {
return err
}
}

return nil
// checkForCutoverSignal periodically loads the job progress to check for the
// sentinel value that signals the ingestion job to complete.
func (s *streamIngestionResumer) checkForCutoverSignal(
ctx context.Context, stopPoller chan struct{}, registry *jobs.Registry, cancelIngestionCtx func(),
) error {
tick := time.NewTicker(time.Second * 10)
defer tick.Stop()
for {
select {
case <-stopPoller:
return nil
case <-ctx.Done():
return ctx.Err()
case <-tick.C:
j, err := registry.LoadJob(ctx, *s.job.ID())
if err != nil {
return err
}
progress := j.Progress()
var sp *jobspb.Progress_StreamIngest
var ok bool
if sp, ok = progress.GetDetails().(*jobspb.Progress_StreamIngest); !ok {
return errors.Newf("unknown progress type %T in stream ingestion job %d",
j.Progress().Progress, *s.job.ID())
}
// Job has been signaled to complete.
if !sp.StreamIngest.CutoverTime.IsEmpty() {
// Sanity check that the requested cutover time is less than equal to
// the resolved ts recorded in the job progress. This should already
// have been enforced when the cutover was signalled via the builtin.
// TODO(adityamaru): Remove this when we allow users to specify a
// cutover time in the future.
resolvedTimestamp := progress.GetHighWater()
if resolvedTimestamp == nil {
return errors.Newf("cutover has been requested before job %d has had a chance to"+
" record a resolved ts", *s.job.ID())
}
if resolvedTimestamp.Less(sp.StreamIngest.CutoverTime) {
return errors.Newf("requested cutover time %s is before the resolved time %s recorded"+
" in job %d", sp.StreamIngest.CutoverTime.String(), resolvedTimestamp.String(),
*s.job.ID())
}
cancelIngestionCtx()
return nil
}
}
}
}

// Resume is part of the jobs.Resumer interface.
func (s *streamIngestionResumer) Resume(ctx context.Context, execCtx interface{}) error {
func (s *streamIngestionResumer) Resume(resumeCtx context.Context, execCtx interface{}) error {
details := s.job.Details().(jobspb.StreamIngestionDetails)
p := execCtx.(sql.JobExecContext)

err := ingest(ctx, p, details.StreamAddress, s.job.Progress(),
*s.job.ID())
if err != nil {
// ingestCtx is used to plan and run the DistSQL flow.
ingestCtx, cancelIngest := context.WithCancel(resumeCtx)
g := ctxgroup.WithContext(ingestCtx)

// Start a poller to check if the job has been requested to cutover.
stopPoller := make(chan struct{})
g.GoCtx(func(ctx context.Context) error {
return s.checkForCutoverSignal(ctx, stopPoller, p.ExecCfg().JobRegistry, cancelIngest)
})

g.GoCtx(func(ctx context.Context) error {
defer close(stopPoller)
return ingest(ctx, p, details.StreamAddress, s.job.Progress(), *s.job.ID())
})

if err := g.Wait(); err != nil {
// Check if the ingestCtx has been canceled while the resumeCtx does not
// have an error set on it. This is only possible if the resumer observed a
// cutover and explicitly requested a teardown via the ingestCtx, in which
// case we should revert the data to the cutover time to get the cluster
// into a consistent state.
// In all other cases we should treat the context cancellation as an error.
if errors.Is(err, context.Canceled) && resumeCtx.Err() == nil {
return s.revertToLatestResolvedTimestamp(resumeCtx, execCtx)
}
return err
}

Expand All @@ -97,42 +165,56 @@ func (s *streamIngestionResumer) Resume(ctx context.Context, execCtx interface{}
return nil
}

// OnFailOrCancel is part of the jobs.Resumer interface.
func (s *streamIngestionResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error {
// revertToLatestResolvedTimestamp reads the job progress for the cutover time
// and issues a RevertRangeRequest with the target time set to that cutover
// time, to bring the ingesting cluster to a consistent state.
func (s *streamIngestionResumer) revertToLatestResolvedTimestamp(
ctx context.Context, execCtx interface{},
) error {
p := execCtx.(sql.JobExecContext)
db := p.ExecCfg().DB
details := s.job.Details().(jobspb.StreamIngestionDetails)

resolvedTime := details.StartTime
prog := s.job.Progress()
if highWatermark := prog.GetHighWater(); highWatermark != nil {
if highWatermark.Less(resolvedTime) {
return errors.Newf("progress timestamp %+v cannot be older than start time %+v",
highWatermark, resolvedTime)
}
resolvedTime = *highWatermark
j, err := p.ExecCfg().JobRegistry.LoadJob(ctx, *s.job.ID())
if err != nil {
return err
}
details := j.Details()
var sd jobspb.StreamIngestionDetails
var ok bool
if sd, ok = details.(jobspb.StreamIngestionDetails); !ok {
return errors.Newf("unknown details type %T in stream ingestion job %d",
details, *s.job.ID())
}
progress := j.Progress()
var sp *jobspb.Progress_StreamIngest
if sp, ok = progress.GetDetails().(*jobspb.Progress_StreamIngest); !ok {
return errors.Newf("unknown progress type %T in stream ingestion job %d",
j.Progress().Progress, *s.job.ID())
}

// TODO(adityamaru): If the job progress was not set then we should
// probably ClearRange. Take this into account when writing the ClearRange
// OnFailOrCancel().
if resolvedTime.IsEmpty() {
return nil
if sp.StreamIngest.CutoverTime.IsEmpty() {
return errors.New("cutover time is unexpectedly empty, cannot revert to a consistent state")
}

var b kv.Batch
b.AddRawRequest(&roachpb.RevertRangeRequest{
RequestHeader: roachpb.RequestHeader{
Key: details.Span.Key,
EndKey: details.Span.EndKey,
Key: sd.Span.Key,
EndKey: sd.Span.EndKey,
},
TargetTime: resolvedTime,
TargetTime: sp.StreamIngest.CutoverTime,
EnableTimeBoundIteratorOptimization: true,
})
b.Header.MaxSpanRequestKeys = sql.RevertTableDefaultBatchSize

return db.Run(ctx, &b)
}

// OnFailOrCancel is part of the jobs.Resumer interface.
func (s *streamIngestionResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error {
// TODO(adityamaru): Add ClearRange logic.
return nil
}

var _ jobs.Resumer = &streamIngestionResumer{}

func init() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
_ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingutils"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down
Loading

0 comments on commit 90ec8f3

Please sign in to comment.