Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streamingest: add custom ctx to support cutting over #60536

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdctest/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func NewOrderValidator(topic string) Validator {
}
}

// NewStreamOrderValidator wraps and orderValidator as described above, and
// NewStreamOrderValidator wraps an orderValidator as described above, and
// exposes additional methods for introspection.
func NewStreamOrderValidator() StreamValidator {
return &orderValidator{
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_library(
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/storage",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/protoutil",
Expand Down Expand Up @@ -67,6 +68,7 @@ go_test(
"//pkg/ccl/storageccl",
"//pkg/ccl/streamingccl",
"//pkg/ccl/streamingccl/streamclient",
"//pkg/ccl/streamingccl/streamingutils",
"//pkg/ccl/utilccl",
"//pkg/jobs",
"//pkg/jobs/jobspb",
Expand Down
159 changes: 130 additions & 29 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package streamingest

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
Expand All @@ -19,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -70,23 +72,89 @@ func ingest(
}

// Plan and run the DistSQL flow.
err = distStreamIngest(ctx, execCtx, nodes, jobID, planCtx, dsp, streamIngestionSpecs,
return distStreamIngest(ctx, execCtx, nodes, jobID, planCtx, dsp, streamIngestionSpecs,
streamIngestionFrontierSpec)
if err != nil {
return err
}
}

return nil
// checkForCutoverSignal periodically loads the job progress to check for the
// sentinel value that signals the ingestion job to complete.
func (s *streamIngestionResumer) checkForCutoverSignal(
ctx context.Context, stopPoller chan struct{}, registry *jobs.Registry, cancelIngestionCtx func(),
) error {
tick := time.NewTicker(time.Second * 10)
defer tick.Stop()
for {
select {
case <-stopPoller:
return nil
case <-ctx.Done():
return ctx.Err()
case <-tick.C:
j, err := registry.LoadJob(ctx, *s.job.ID())
if err != nil {
return err
}
progress := j.Progress()
var sp *jobspb.Progress_StreamIngest
var ok bool
if sp, ok = progress.GetDetails().(*jobspb.Progress_StreamIngest); !ok {
return errors.Newf("unknown progress type %T in stream ingestion job %d",
j.Progress().Progress, *s.job.ID())
}
// Job has been signaled to complete.
if !sp.StreamIngest.CutoverTime.IsEmpty() {
// Sanity check that the requested cutover time is less than equal to
// the resolved ts recorded in the job progress. This should already
// have been enforced when the cutover was signaled via the builtin.
// TODO(adityamaru): Remove this when we allow users to specify a
// cutover time in the future.
resolvedTimestamp := progress.GetHighWater()
if resolvedTimestamp == nil {
return errors.Newf("cutover has been requested before job %d has had a chance to"+
" record a resolved ts", *s.job.ID())
}
if resolvedTimestamp.Less(sp.StreamIngest.CutoverTime) {
return errors.Newf("requested cutover time %s is before the resolved time %s recorded"+
" in job %d", sp.StreamIngest.CutoverTime.String(), resolvedTimestamp.String(),
*s.job.ID())
}
cancelIngestionCtx()
return nil
}
}
}
}

// Resume is part of the jobs.Resumer interface.
func (s *streamIngestionResumer) Resume(ctx context.Context, execCtx interface{}) error {
func (s *streamIngestionResumer) Resume(resumeCtx context.Context, execCtx interface{}) error {
details := s.job.Details().(jobspb.StreamIngestionDetails)
p := execCtx.(sql.JobExecContext)

err := ingest(ctx, p, details.StreamAddress, s.job.Progress(),
*s.job.ID())
if err != nil {
// ingestCtx is used to plan and run the DistSQL flow.
ingestCtx, cancelIngest := context.WithCancel(resumeCtx)
g := ctxgroup.WithContext(ingestCtx)

// Start a poller to check if the job has been requested to cutover.
stopPoller := make(chan struct{})
g.GoCtx(func(ctx context.Context) error {
return s.checkForCutoverSignal(ctx, stopPoller, p.ExecCfg().JobRegistry, cancelIngest)
})

g.GoCtx(func(ctx context.Context) error {
defer close(stopPoller)
return ingest(ctx, p, details.StreamAddress, s.job.Progress(), *s.job.ID())
})

if err := g.Wait(); err != nil {
// Check if the ingestCtx has been canceled while the resumeCtx does not
// have an error set on it. This is only possible if the resumer observed a
// cutover and explicitly requested a teardown via the ingestCtx, in which
// case we should revert the data to the cutover time to get the cluster
// into a consistent state.
// In all other cases we should treat the context cancellation as an error.
if errors.Is(err, context.Canceled) && resumeCtx.Err() == nil {
return s.revertToLatestResolvedTimestamp(resumeCtx, execCtx)
}
return err
}

Expand All @@ -97,39 +165,72 @@ func (s *streamIngestionResumer) Resume(ctx context.Context, execCtx interface{}
return nil
}

// OnFailOrCancel is part of the jobs.Resumer interface.
func (s *streamIngestionResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error {
// revertToLatestResolvedTimestamp reads the job progress for the cutover time
// and issues a RevertRangeRequest with the target time set to that cutover
// time, to bring the ingesting cluster to a consistent state.
func (s *streamIngestionResumer) revertToLatestResolvedTimestamp(
ctx context.Context, execCtx interface{},
) error {
p := execCtx.(sql.JobExecContext)
db := p.ExecCfg().DB
details := s.job.Details().(jobspb.StreamIngestionDetails)

resolvedTime := details.StartTime
prog := s.job.Progress()
if highWatermark := prog.GetHighWater(); highWatermark != nil {
if highWatermark.Less(resolvedTime) {
return errors.Newf("progress timestamp %+v cannot be older than start time %+v",
highWatermark, resolvedTime)
}
resolvedTime = *highWatermark
j, err := p.ExecCfg().JobRegistry.LoadJob(ctx, *s.job.ID())
if err != nil {
return err
}
details := j.Details()
var sd jobspb.StreamIngestionDetails
var ok bool
if sd, ok = details.(jobspb.StreamIngestionDetails); !ok {
return errors.Newf("unknown details type %T in stream ingestion job %d",
details, *s.job.ID())
}
progress := j.Progress()
var sp *jobspb.Progress_StreamIngest
if sp, ok = progress.GetDetails().(*jobspb.Progress_StreamIngest); !ok {
return errors.Newf("unknown progress type %T in stream ingestion job %d",
j.Progress().Progress, *s.job.ID())
}

// TODO(adityamaru): If the job progress was not set then we should
// probably ClearRange. Take this into account when writing the ClearRange
// OnFailOrCancel().
if resolvedTime.IsEmpty() {
return nil
if sp.StreamIngest.CutoverTime.IsEmpty() {
return errors.New("cutover time is unexpectedly empty, cannot revert to a consistent state")
}

var b kv.Batch
b.AddRawRequest(&roachpb.RevertRangeRequest{
RequestHeader: roachpb.RequestHeader{
Key: details.Span.Key,
EndKey: details.Span.EndKey,
Key: sd.Span.Key,
EndKey: sd.Span.EndKey,
},
TargetTime: resolvedTime,
TargetTime: sp.StreamIngest.CutoverTime,
EnableTimeBoundIteratorOptimization: true,
})
b.Header.MaxSpanRequestKeys = sql.RevertTableDefaultBatchSize

return db.Run(ctx, &b)
}

// OnFailOrCancel is part of the jobs.Resumer interface.
func (s *streamIngestionResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error {
p := execCtx.(sql.JobExecContext)
db := p.ExecCfg().DB
j, err := p.ExecCfg().JobRegistry.LoadJob(ctx, *s.job.ID())
if err != nil {
return err
}
details := j.Details()
var sd jobspb.StreamIngestionDetails
var ok bool
if sd, ok = details.(jobspb.StreamIngestionDetails); !ok {
return errors.Newf("unknown details type %T in stream ingestion job %d",
details, *s.job.ID())
}
var b kv.Batch
b.AddRawRequest(&roachpb.ClearRangeRequest{
RequestHeader: roachpb.RequestHeader{
Key: sd.Span.Key,
EndKey: sd.Span.EndKey,
},
})
return db.Run(ctx, &b)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
_ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingutils"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down
Loading