Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] streamingccl: add polling processor to check for cutover signal #60763

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ func TestStreamIngestionFrontierProcessor(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
spec.PartitionAddresses = []streamingccl.PartitionAddress{pa1, pa2}
proc, err := newStreamIngestionDataProcessor(&flowCtx, 0 /* processorID */, spec, &post, out)
proc, err := newStreamIngestionDataProcessor(&flowCtx, 0 /* processorID */, spec,
nil, &post, out)
require.NoError(t, err)
sip, ok := proc.(*streamIngestionProcessor)
if !ok {
Expand Down
83 changes: 56 additions & 27 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,15 @@ func ingest(
}

// Construct stream ingestion processor specs.
streamIngestionSpecs, streamIngestionFrontierSpec, err := distStreamIngestionPlanSpecs(
streamAddress, topology, nodes, initialHighWater)
streamIngestionPollingSpec, streamIngestionSpecs, streamIngestionFrontierSpec,
err := distStreamIngestionPlanSpecs(streamAddress, topology, nodes, initialHighWater, jobID)
if err != nil {
return err
}

// Plan and run the DistSQL flow.
err = distStreamIngest(ctx, execCtx, nodes, jobID, planCtx, dsp, streamIngestionSpecs,
streamIngestionFrontierSpec)
if err != nil {
return err
}

return nil
return distStreamIngest(ctx, execCtx, nodes, jobID, planCtx, dsp, streamIngestionSpecs,
streamIngestionFrontierSpec, streamIngestionPollingSpec)
}

// Resume is part of the jobs.Resumer interface.
Expand All @@ -87,6 +82,16 @@ func (s *streamIngestionResumer) Resume(ctx context.Context, execCtx interface{}
err := ingest(ctx, p, details.StreamAddress, s.job.Progress(),
*s.job.ID())
if err != nil {
if errors.Is(err, errCutoverRequested) {
revertErr := s.revertToLatestResolvedTimestamp(ctx, execCtx)
if revertErr != nil {
return errors.Wrap(revertErr, "error while reverting stream ingestion job")
}
// The job should transition to a succeeded state once the cutover is
// complete.

return nil
}
return err
}

Expand All @@ -97,42 +102,66 @@ func (s *streamIngestionResumer) Resume(ctx context.Context, execCtx interface{}
return nil
}

// OnFailOrCancel is part of the jobs.Resumer interface.
func (s *streamIngestionResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error {
// revertToLatestResolvedTimestamp reads the job progress for the high watermark
// and issues a RevertRangeRequest with the target time set to that high
// watermark.
func (s *streamIngestionResumer) revertToLatestResolvedTimestamp(
ctx context.Context, execCtx interface{},
) error {
p := execCtx.(sql.JobExecContext)
db := p.ExecCfg().DB
details := s.job.Details().(jobspb.StreamIngestionDetails)
j, err := p.ExecCfg().JobRegistry.LoadJob(ctx, *s.job.ID())
if err != nil {
return err
}
details := j.Details()
var sd jobspb.StreamIngestionDetails
var ok bool
if sd, ok = details.(jobspb.StreamIngestionDetails); !ok {
return errors.Newf("unknown details type %T in stream ingestion job %d",
details, *s.job.ID())
}
progress := j.Progress()
streamIngestProgress := progress.GetStreamIngest()

revertTargetTime := sd.StartTime
if streamIngestProgress != nil {
revertTargetTime = streamIngestProgress.CutoverTime
}

resolvedTime := details.StartTime
prog := s.job.Progress()
if highWatermark := prog.GetHighWater(); highWatermark != nil {
if highWatermark.Less(resolvedTime) {
return errors.Newf("progress timestamp %+v cannot be older than start time %+v",
highWatermark, resolvedTime)
if highWatermark := progress.GetHighWater(); highWatermark != nil {
if highWatermark.Less(revertTargetTime) {
return errors.Newf("progress timestamp %+v cannot be older than the requested "+
"cutover time %+v", highWatermark, revertTargetTime)
}
resolvedTime = *highWatermark
}

// TODO(adityamaru): If the job progress was not set then we should
// probably ClearRange. Take this into account when writing the ClearRange
// OnFailOrCancel().
if resolvedTime.IsEmpty() {
return nil
// Sanity check that the resolvedTime is not less than the time at which the
// ingestion job was started.
if revertTargetTime.Less(sd.StartTime) {
return errors.Newf("revert target time %+v cannot be older than the start time "+
"cutover time %+v", revertTargetTime, sd.StartTime)
}

var b kv.Batch
b.AddRawRequest(&roachpb.RevertRangeRequest{
RequestHeader: roachpb.RequestHeader{
Key: details.Span.Key,
EndKey: details.Span.EndKey,
Key: sd.Span.Key,
EndKey: sd.Span.EndKey,
},
TargetTime: resolvedTime,
TargetTime: revertTargetTime,
EnableTimeBoundIteratorOptimization: true,
})
b.Header.MaxSpanRequestKeys = sql.RevertTableDefaultBatchSize
return db.Run(ctx, &b)
}

// OnFailOrCancel is part of the jobs.Resumer interface.
func (s *streamIngestionResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error {
// TODO(adityamaru): Add ClearRange logic.
return nil
}

var _ jobs.Resumer = &streamIngestionResumer{}

func init() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -81,8 +82,9 @@ func ingestionPlanHook(
streamIngestionDetails := jobspb.StreamIngestionDetails{
StreamAddress: streamingccl.StreamAddress(from[0]),
Span: roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()},
// TODO: Figure out what the initial ts should be.
StartTime: hlc.Timestamp{},
// TODO(adityamaru): This will change to the time the RESTORE was run at
// before starting the ingestion strea.
StartTime: hlc.Timestamp{WallTime: timeutil.Now().UnixNano()},
}

jobDescription, err := streamIngestionJobDescription(p, ingestionStmt)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// Copyright 2020 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streamingest

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
)

const streamIngestionPollingProcName = `ingestpolling`

type streamIngestionPollingProcessor struct {
execinfra.ProcessorBase
execinfra.StreamingProcessor

flowCtx *execinfra.FlowCtx
spec execinfrapb.StreamIngestionPollingSpec
output execinfra.RowReceiver

jobID int64
registry *jobs.Registry
stopPoller chan struct{}
}

var _ execinfra.Processor = &streamIngestionPollingProcessor{}
var _ execinfra.RowSource = &streamIngestionPollingProcessor{}

func init() {
rowexec.NewStreamIngestionPollingProcessor = newStreamIngestionPollingProcessor
}

func newStreamIngestionPollingProcessor(
flowCtx *execinfra.FlowCtx,
processorID int32,
spec execinfrapb.StreamIngestionPollingSpec,
post *execinfrapb.PostProcessSpec,
output execinfra.RowReceiver,
) (execinfra.Processor, error) {
sp := &streamIngestionPollingProcessor{
flowCtx: flowCtx,
spec: spec,
output: output,
jobID: spec.JobID,
registry: flowCtx.Cfg.JobRegistry,
stopPoller: make(chan struct{}),
}
if err := sp.Init(
sp,
post,
streamIngestionResultTypes,
flowCtx,
processorID,
output,
nil, /* memMonitor */
execinfra.ProcStateOpts{
InputsToDrain: []execinfra.RowSource{},
TrailingMetaCallback: func(ctx context.Context) []execinfrapb.ProducerMetadata {
sp.close()
return nil
},
},
); err != nil {
return nil, err
}
return sp, nil
}

func (sp *streamIngestionPollingProcessor) close() {
if sp.InternalClose() {
close(sp.stopPoller)
}
}

// Start is part of the RowSource interface.
func (sp *streamIngestionPollingProcessor) Start(ctx context.Context) context.Context {
return sp.StartInternal(ctx, streamIngestionPollingProcName)
}

// checkForCutoverSignal periodically loads the job progress to check for the
// sentinel value that signals the ingestion job to complete.
func (sp *streamIngestionPollingProcessor) checkForCutoverSignal(
ctx context.Context,
) (rowenc.EncDatumRow, error) {
tick := time.NewTicker(time.Second * 10)
defer tick.Stop()
for {
select {
case _, ok := <-sp.stopPoller:
if !ok {
return nil, nil
}
// Shouldn't come here.
return nil, nil
case <-tick.C:
j, err := sp.registry.LoadJob(ctx, sp.jobID)
if err != nil {
return nil, err
}
progress := j.Progress()
var prog *jobspb.Progress_StreamIngest
var ok bool
if prog, ok = progress.GetDetails().(*jobspb.Progress_StreamIngest); !ok {
return nil, errors.Newf("unknown progress type %T in stream ingestion job %d",
j.Progress().Progress, sp.jobID)
}
// Job has been signaled to complete.
if cutoverTime := prog.StreamIngest.CutoverTime; !cutoverTime.IsEmpty() {
streamIngestBytes, err := protoutil.Marshal(prog.StreamIngest)
if err != nil {
return nil, err
}
row := rowenc.EncDatumRow{
rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(streamIngestBytes))),
}
return row, nil
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After we signal, what is the correct behavior? Is it okay that the processor might push this same row tick seconds later? I also played around with the idea of immediately tearing down the processor once we push a row but a downstream stream ingestion processor might handle the meta before the signalled row and that doesn't seem like what we want.

Currently, the stream ingestion processor will call ConsumerClosed() and then teardown this processor.

}
}
}
}

// Next is part of the RowSource interface.
func (sp *streamIngestionPollingProcessor) Next() (
rowenc.EncDatumRow,
*execinfrapb.ProducerMetadata,
) {
if sp.State != execinfra.StateRunning {
return nil, sp.DrainHelper()
}

row, err := sp.checkForCutoverSignal(sp.flowCtx.EvalCtx.Context)
if err != nil {
sp.MoveToDraining(errors.Wrap(err, "error when polling for cutover signal"))
return nil, sp.DrainHelper()
}

if row != nil {
return row, nil
}

sp.MoveToDraining(nil /* error */)
return nil, sp.DrainHelper()
}

// ConsumerClosed is part of the RowSource interface.
func (sp *streamIngestionPollingProcessor) ConsumerClosed() {
// The consumer is done, Next() will not be called again.
sp.close()
}
Loading