Skip to content

Commit

Permalink
[WIP] streamingccl: add polling processor to check for cutover signal
Browse files Browse the repository at this point in the history
This change adds a new polling processor to the stream ingestion flow.
This processor is currently responsible for polling the systm.jobs table
and checking if a cutover has been signalled. If it finds a signal it
uses a mirror router to propagate that signal to all the downstream
stream ingestion processors. The teardown then ensues and eventually
pushes a "cutover" meta record back to the stream ingestion resumer.

This change moves the revert range request from OnFailOrCancel, to be
invoked when the resumer gets this "cutover" meta record.

Release note: None
  • Loading branch information
adityamaru committed Feb 19, 2021
1 parent 6f34689 commit cb87e36
Show file tree
Hide file tree
Showing 16 changed files with 905 additions and 301 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ func TestStreamIngestionFrontierProcessor(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
spec.PartitionAddresses = []streamingccl.PartitionAddress{pa1, pa2}
proc, err := newStreamIngestionDataProcessor(&flowCtx, 0 /* processorID */, spec, &post, out)
proc, err := newStreamIngestionDataProcessor(&flowCtx, 0 /* processorID */, spec,
nil, &post, out)
require.NoError(t, err)
sip, ok := proc.(*streamIngestionProcessor)
if !ok {
Expand Down
83 changes: 56 additions & 27 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,15 @@ func ingest(
}

// Construct stream ingestion processor specs.
streamIngestionSpecs, streamIngestionFrontierSpec, err := distStreamIngestionPlanSpecs(
streamAddress, topology, nodes, initialHighWater)
streamIngestionPollingSpec, streamIngestionSpecs, streamIngestionFrontierSpec,
err := distStreamIngestionPlanSpecs(streamAddress, topology, nodes, initialHighWater, jobID)
if err != nil {
return err
}

// Plan and run the DistSQL flow.
err = distStreamIngest(ctx, execCtx, nodes, jobID, planCtx, dsp, streamIngestionSpecs,
streamIngestionFrontierSpec)
if err != nil {
return err
}

return nil
return distStreamIngest(ctx, execCtx, nodes, jobID, planCtx, dsp, streamIngestionSpecs,
streamIngestionFrontierSpec, streamIngestionPollingSpec)
}

// Resume is part of the jobs.Resumer interface.
Expand All @@ -87,6 +82,16 @@ func (s *streamIngestionResumer) Resume(ctx context.Context, execCtx interface{}
err := ingest(ctx, p, details.StreamAddress, s.job.Progress(),
*s.job.ID())
if err != nil {
if errors.Is(err, errCutoverRequested) {
revertErr := s.revertToLatestResolvedTimestamp(ctx, execCtx)
if revertErr != nil {
return errors.Wrap(revertErr, "error while reverting stream ingestion job")
}
// The job should transition to a succeeded state once the cutover is
// complete.

return nil
}
return err
}

Expand All @@ -97,42 +102,66 @@ func (s *streamIngestionResumer) Resume(ctx context.Context, execCtx interface{}
return nil
}

// OnFailOrCancel is part of the jobs.Resumer interface.
func (s *streamIngestionResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error {
// revertToLatestResolvedTimestamp reads the job progress for the high watermark
// and issues a RevertRangeRequest with the target time set to that high
// watermark.
func (s *streamIngestionResumer) revertToLatestResolvedTimestamp(
ctx context.Context, execCtx interface{},
) error {
p := execCtx.(sql.JobExecContext)
db := p.ExecCfg().DB
details := s.job.Details().(jobspb.StreamIngestionDetails)
j, err := p.ExecCfg().JobRegistry.LoadJob(ctx, *s.job.ID())
if err != nil {
return err
}
details := j.Details()
var sd jobspb.StreamIngestionDetails
var ok bool
if sd, ok = details.(jobspb.StreamIngestionDetails); !ok {
return errors.Newf("unknown details type %T in stream ingestion job %d",
details, *s.job.ID())
}
progress := j.Progress()
streamIngestProgress := progress.GetStreamIngest()

revertTargetTime := sd.StartTime
if streamIngestProgress != nil {
revertTargetTime = streamIngestProgress.CutoverTime
}

resolvedTime := details.StartTime
prog := s.job.Progress()
if highWatermark := prog.GetHighWater(); highWatermark != nil {
if highWatermark.Less(resolvedTime) {
return errors.Newf("progress timestamp %+v cannot be older than start time %+v",
highWatermark, resolvedTime)
if highWatermark := progress.GetHighWater(); highWatermark != nil {
if highWatermark.Less(revertTargetTime) {
return errors.Newf("progress timestamp %+v cannot be older than the requested "+
"cutover time %+v", highWatermark, revertTargetTime)
}
resolvedTime = *highWatermark
}

// TODO(adityamaru): If the job progress was not set then we should
// probably ClearRange. Take this into account when writing the ClearRange
// OnFailOrCancel().
if resolvedTime.IsEmpty() {
return nil
// Sanity check that the resolvedTime is not less than the time at which the
// ingestion job was started.
if revertTargetTime.Less(sd.StartTime) {
return errors.Newf("revert target time %+v cannot be older than the start time "+
"cutover time %+v", revertTargetTime, sd.StartTime)
}

var b kv.Batch
b.AddRawRequest(&roachpb.RevertRangeRequest{
RequestHeader: roachpb.RequestHeader{
Key: details.Span.Key,
EndKey: details.Span.EndKey,
Key: sd.Span.Key,
EndKey: sd.Span.EndKey,
},
TargetTime: resolvedTime,
TargetTime: revertTargetTime,
EnableTimeBoundIteratorOptimization: true,
})
b.Header.MaxSpanRequestKeys = sql.RevertTableDefaultBatchSize
return db.Run(ctx, &b)
}

// OnFailOrCancel is part of the jobs.Resumer interface.
func (s *streamIngestionResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error {
// TODO(adityamaru): Add ClearRange logic.
return nil
}

var _ jobs.Resumer = &streamIngestionResumer{}

func init() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -81,8 +82,9 @@ func ingestionPlanHook(
streamIngestionDetails := jobspb.StreamIngestionDetails{
StreamAddress: streamingccl.StreamAddress(from[0]),
Span: roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()},
// TODO: Figure out what the initial ts should be.
StartTime: hlc.Timestamp{},
// TODO(adityamaru): This will change to the time the RESTORE was run at
// before starting the ingestion strea.
StartTime: hlc.Timestamp{WallTime: timeutil.Now().UnixNano()},
}

jobDescription, err := streamIngestionJobDescription(p, ingestionStmt)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// Copyright 2020 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streamingest

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
)

const streamIngestionPollingProcName = `ingestpolling`

type streamIngestionPollingProcessor struct {
execinfra.ProcessorBase
execinfra.StreamingProcessor

flowCtx *execinfra.FlowCtx
spec execinfrapb.StreamIngestionPollingSpec
output execinfra.RowReceiver

jobID int64
registry *jobs.Registry
stopPoller chan struct{}
}

var _ execinfra.Processor = &streamIngestionPollingProcessor{}
var _ execinfra.RowSource = &streamIngestionPollingProcessor{}

func init() {
rowexec.NewStreamIngestionPollingProcessor = newStreamIngestionPollingProcessor
}

func newStreamIngestionPollingProcessor(
flowCtx *execinfra.FlowCtx,
processorID int32,
spec execinfrapb.StreamIngestionPollingSpec,
post *execinfrapb.PostProcessSpec,
output execinfra.RowReceiver,
) (execinfra.Processor, error) {
sp := &streamIngestionPollingProcessor{
flowCtx: flowCtx,
spec: spec,
output: output,
jobID: spec.JobID,
registry: flowCtx.Cfg.JobRegistry,
stopPoller: make(chan struct{}),
}
if err := sp.Init(
sp,
post,
streamIngestionResultTypes,
flowCtx,
processorID,
output,
nil, /* memMonitor */
execinfra.ProcStateOpts{
InputsToDrain: []execinfra.RowSource{},
TrailingMetaCallback: func(ctx context.Context) []execinfrapb.ProducerMetadata {
sp.close()
return nil
},
},
); err != nil {
return nil, err
}
return sp, nil
}

func (sp *streamIngestionPollingProcessor) close() {
if sp.InternalClose() {
close(sp.stopPoller)
}
}

// Start is part of the RowSource interface.
func (sp *streamIngestionPollingProcessor) Start(ctx context.Context) context.Context {
return sp.StartInternal(ctx, streamIngestionPollingProcName)
}

// checkForCutoverSignal periodically loads the job progress to check for the
// sentinel value that signals the ingestion job to complete.
func (sp *streamIngestionPollingProcessor) checkForCutoverSignal(
ctx context.Context,
) (rowenc.EncDatumRow, error) {
tick := time.NewTicker(time.Second * 10)
defer tick.Stop()
for {
select {
case _, ok := <-sp.stopPoller:
if !ok {
return nil, nil
}
// Shouldn't come here.
return nil, nil
case <-tick.C:
j, err := sp.registry.LoadJob(ctx, sp.jobID)
if err != nil {
return nil, err
}
progress := j.Progress()
var prog *jobspb.Progress_StreamIngest
var ok bool
if prog, ok = progress.GetDetails().(*jobspb.Progress_StreamIngest); !ok {
return nil, errors.Newf("unknown progress type %T in stream ingestion job %d",
j.Progress().Progress, sp.jobID)
}
// Job has been signaled to complete.
if cutoverTime := prog.StreamIngest.CutoverTime; !cutoverTime.IsEmpty() {
streamIngestBytes, err := protoutil.Marshal(prog.StreamIngest)
if err != nil {
return nil, err
}
row := rowenc.EncDatumRow{
rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(streamIngestBytes))),
}
return row, nil
}
}
}
}

// Next is part of the RowSource interface.
func (sp *streamIngestionPollingProcessor) Next() (
rowenc.EncDatumRow,
*execinfrapb.ProducerMetadata,
) {
if sp.State != execinfra.StateRunning {
return nil, sp.DrainHelper()
}

row, err := sp.checkForCutoverSignal(sp.flowCtx.EvalCtx.Context)
if err != nil {
sp.MoveToDraining(errors.Wrap(err, "error when polling for cutover signal"))
return nil, sp.DrainHelper()
}

if row != nil {
return row, nil
}

sp.MoveToDraining(nil /* error */)
return nil, sp.DrainHelper()
}

// ConsumerClosed is part of the RowSource interface.
func (sp *streamIngestionPollingProcessor) ConsumerClosed() {
// The consumer is done, Next() will not be called again.
sp.close()
}
Loading

0 comments on commit cb87e36

Please sign in to comment.