Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streamingest: add per processor polling to observe cutover #61001

Merged
merged 1 commit into from
Mar 1, 2021

Conversation

adityamaru
Copy link
Contributor

@adityamaru adityamaru commented Feb 23, 2021

This change adds a poller to every stream ingestion processor to
check if the job has been marked for completion. If it has, it signals
the proessor to drain gracefully and eventually reverts the cluster to
the cutover time thereby leaving it in a consistent state.

Note we have also made the stream ingestion job non-cancelable and do
not perform any cleanup on failure.

Release justification: low risk, high reward (changes to new
functionality)

Release note: None

@adityamaru adityamaru requested review from pbardea, miretskiy and a team February 23, 2021 14:21
@cockroach-teamcity
Copy link
Member

This change is Reviewable

Copy link
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 2 of 8 files at r1, 1 of 2 files at r2.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru and @pbardea)


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 82 at r2 (raw file):

registry

super minor nit: swap poller and registry args? Kinda feel stopPoller + cancel are related.


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 134 at r2 (raw file):

	// ingestCtx is used to plan and run the DistSQL flow.
	ingestCtx, cancelIngest := context.WithCancel(resumeCtx)

You definitely need to call

defer cancelIngest()

Basically, you must always call cancelIngest -- even if your group exits w/ an error (and you never
call cancelIngest in your polloer).
Also, calling cancelIngest multiple times is safe.


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 142 at r2 (raw file):

		return s.checkForCutoverSignal(ctx, stopPoller, p.ExecCfg().JobRegistry, cancelIngest)
	})

nit: Add a comment for a symmetry w/ poller go routine?


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 163 at r2 (raw file):

	// TODO(adityamaru): We probably want to use the resultsCh to indicate that
	// the processors have completed setup. We can then return the job ID in the
	// plan hook similar to how changefeeds do it.

Comment is definitely outdated -- I killed resultsCh in the Resumer :).


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 195 at r2 (raw file):

	if sp.StreamIngest.CutoverTime.IsEmpty() {
		return errors.New("cutover time is unexpectedly empty, cannot revert to a consistent state")

nit: This is almost certainly AssertionFailedf


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 233 at r2 (raw file):

			EndKey: sd.Span.EndKey,
		},
	})

I'm curious... is this something we want to do in case of an error? Like: nuke everything if something strange happened... even after you've been streaming, maybe for days?

Not saying it's not correct -- but I'm just not sure if we want to use such a big hammer.


pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go, line 73 at r2 (raw file):

	ctx := context.Background()
	defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond)

missing () at the end of the function call to defer returned -- you're not actually lowering
time settings in the registry w/out that.


pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go, line 76 at r2 (raw file):

	canBeCompletedCh := make(chan struct{})
	threshold := 10

nit: const?


pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go, line 135 at r2 (raw file):

	query := fmt.Sprintf(`RESTORE TENANT 10 FROM REPLICATION STREAM FROM '%s'`, streamAddr)
	go func() {
		_, err := conn.Exec(query)

I just realized (looking at stream_ingestion_planning) that we block this query until restore "finishes".
But does that make sense? To finish it, you have to use cutover function anyway (presumably from another connection)....

Wouldn't it make more sense not to AwaitCompletion in the stream_ingestion_planning and just return job id (basically make restore tenant always detached)?

(this would also make this test simpler, I think)


pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go, line 143 at r2 (raw file):

		t.Fatalf("%s: query returned before expected: %s", err, query)
	}
	close(allowResponse)

This is a bit confusing to me: aren't we writing to a closed channel in the above select? Shouldn't that panic?


pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go, line 216 at r2 (raw file):

	ctx := context.Background()
	defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond)

missing ()


pkg/ccl/streamingccl/streamingutils/utils.go, line 65 at r2 (raw file):

	if highWaterTimestamp.Less(cutoverTimestamp) {
		return errors.Newf("cannot cutover to a timestamp %s that is after the latest resolved time"+
			" %s for job %d", cutoverTimestamp.String(), highWaterTimestamp.String(), jobID)

perhaps combine this if statement with the one above?

if hw == nil || hw.Less(cutover) {
  ...
}

pkg/ccl/streamingccl/streamingutils/utils.go, line 72 at r2 (raw file):

	// TODO(adityamaru): This should change in the future, a user should be
	// allowed to correct their cutover time if the process of reverting the job
	// has not started..

nit: extra . in ..

Copy link
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru and @pbardea)


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 84 at r2 (raw file):

	ctx context.Context, stopPoller chan struct{}, registry *jobs.Registry, cancelIngestionCtx func(),
) error {
	tick := time.NewTicker(time.Second * 10)

needs to be configurable for tests.

Copy link
Contributor Author

@adityamaru adityamaru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy and @pbardea)


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 82 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…
registry

super minor nit: swap poller and registry args? Kinda feel stopPoller + cancel are related.

done.


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 84 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

needs to be configurable for tests.

done.


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 134 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

You definitely need to call

defer cancelIngest()

Basically, you must always call cancelIngest -- even if your group exits w/ an error (and you never
call cancelIngest in your polloer).
Also, calling cancelIngest multiple times is safe.

makes sense, done.


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 142 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

nit: Add a comment for a symmetry w/ poller go routine?

done.


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 163 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

Comment is definitely outdated -- I killed resultsCh in the Resumer :).

deleted!


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 195 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

nit: This is almost certainly AssertionFailedf

good point, done.


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 233 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

I'm curious... is this something we want to do in case of an error? Like: nuke everything if something strange happened... even after you've been streaming, maybe for days?

Not saying it's not correct -- but I'm just not sure if we want to use such a big hammer.

hmm, when the job is cancelled/failed I would assume the expectation is that everything is left as before. I can start a discussion on bulk-io but an alternative (which I'm not convinced of yet) would be to revert to the last resolved ts?


pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go, line 73 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

missing () at the end of the function call to defer returned -- you're not actually lowering
time settings in the registry w/out that.

done.


pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go, line 76 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

nit: const?

done, here and in the other test.


pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go, line 135 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

I just realized (looking at stream_ingestion_planning) that we block this query until restore "finishes".
But does that make sense? To finish it, you have to use cutover function anyway (presumably from another connection)....

Wouldn't it make more sense not to AwaitCompletion in the stream_ingestion_planning and just return job id (basically make restore tenant always detached)?

(this would also make this test simpler, I think)

yeah thats a good point, changed it to return the jobID.


pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go, line 143 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

This is a bit confusing to me: aren't we writing to a closed channel in the above select? Shouldn't that panic?

the BulkResponseFilter set above will attempt to read from this channel. So it will only send a response once we send the struct{}{}, and that guarantees that we've processed at least one AddSSTTable request. We don't want to block subsequent responses so we close it.


pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go, line 216 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

missing ()

done.


pkg/ccl/streamingccl/streamingutils/utils.go, line 65 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

perhaps combine this if statement with the one above?

if hw == nil || hw.Less(cutover) {
  ...
}

done.


pkg/ccl/streamingccl/streamingutils/utils.go, line 72 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

nit: extra . in ..

done.

Copy link
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 5 of 6 files at r3.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru and @pbardea)


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 233 at r2 (raw file):

Previously, adityamaru (Aditya Maru) wrote…

hmm, when the job is cancelled/failed I would assume the expectation is that everything is left as before. I can start a discussion on bulk-io but an alternative (which I'm not convinced of yet) would be to revert to the last resolved ts?

Yeah -- let's...


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 35 at r3 (raw file):

// the system.jobs table to check whether the stream ingestion job has been
// signaled to cutover.
var checkForCutoverSignalFrequency = 10 * time.Second

might even be higher ... 30 seconds?


pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go, line 52 at r3 (raw file):

	defer log.Scope(t).Close(t)

	skip.WithIssue(t, 60789)

You might want to update PR description to close this issue.

Copy link
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru and @pbardea)


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 128 at r3 (raw file):

					return errors.Newf("cutover has been requested before job %d has had a chance to"+
						" record a resolved ts", *s.job.ID())
				}

I wonder if this is actually possible... I mean, it's extraordinarily unlikely... but you could start your poller below; and then, your other go routine (ingest) kinda takes some time to start... So, you wind up ticking before the job actually had a chance to update it's highwater... Could conceivably happen in the tests.

Copy link
Contributor

@pbardea pbardea left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru and @miretskiy)


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 35 at r3 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

might even be higher ... 30 seconds?

Is it worth making this a non-public cluster setting? Might be nice to be able to crank it up sometimes?


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 128 at r3 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

I wonder if this is actually possible... I mean, it's extraordinarily unlikely... but you could start your poller below; and then, your other go routine (ingest) kinda takes some time to start... So, you wind up ticking before the job actually had a chance to update it's highwater... Could conceivably happen in the tests.

Don't we enforce that the cutover time is only set if the highwatermark is set though?


pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go, line 52 at r3 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

You might want to update PR description to close this issue.

(Can we also stress this for a bit to make sure that it does indeed fix this test)

Copy link
Contributor Author

@adityamaru adityamaru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy)


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 233 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

Yeah -- let's...

Done.


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 35 at r3 (raw file):

Previously, pbardea (Paul Bardea) wrote…

Is it worth making this a non-public cluster setting? Might be nice to be able to crank it up sometimes?

done.


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 128 at r3 (raw file):

Previously, pbardea (Paul Bardea) wrote…

Don't we enforce that the cutover time is only set if the highwatermark is set though?

yeah rethinking this these cases really shouldn't happen because of the checks on the builtin side, so they seem like good candidates for AssertionFailedf.

@adityamaru
Copy link
Contributor Author

Stressing both these tests is leading to some puzzling behavior that I'm debugging.

On cancelation - once in ~500 runs the ClearRange seems to leave a key behind and so the assertion that checks if the keyspace is empty returns false.

On cutover - on stress almost immediately, the assertEqualKVs() method is returning a mismatch where the store is seeing a version of the key after the RevertRange target time.

@adityamaru
Copy link
Contributor Author

Not sure I understand what's going on. We are going on ingesting keys into the store even while the ClearRange in the OnFailOrCancel is executing. I stressed the Cancel test with 2 or 3 partitions, and logged the time.Now() at which we were running the ClearRange, and at which we are ingesting the keys, and there are some key ingestions interleaved in the time it takes to run the ClearRange.

We only stop consuming events when sip.EventCh is closed, which only happens when all the goroutines reading from the partitions in merge see the ctx.Done(). In one case, this was after the ClearRange had completed. How does dsp.Run even return without stopping ingestion...

@adityamaru adityamaru force-pushed the ctx-cutover branch 4 times, most recently from 6b2ed7e to d9119e9 Compare February 25, 2021 05:53
@adityamaru
Copy link
Contributor Author

@pbardea @miretskiy the last commit is the approach where every processor checks the jobs table for the cutover signal. Still not sure what we want to do for the OnFailOrCancel(), but I could remove that commit from this PR and make this solely about the cutover. I got 1700+ stress runs on the TestStreamIngestionJobWithRandomClient with this approach, long live synchronization!

I'll squash the commits once it is reviewed.

@@ -252,6 +326,17 @@ func (sip *streamIngestionProcessor) merge(
return nil
}

// Check for a context cancellation first, so that we can stop
// ingesting as soon as possible.
// TODO(during review): Does this buy us anything? From my logging
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that it's rare that we'll hit this since we'd usually be blocked sending on the merged ch anyway when the context is cancelled. I think if we're not seeing any difference when stressing we can remove this.

details := s.job.Details().(jobspb.StreamIngestionDetails)
p := execCtx.(sql.JobExecContext)

// ingestCtx is used to plan and run the DistSQL flow.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leftover comment?

// A non-nil error is only possible if the job was signaled to cutover and the
// processors shut down gracefully, i.e stopped ingesting any additional
// events from the replication stream.
//At this point it is safe to revert to the cutoff time to leave the cluster
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: spacing here

Copy link
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 8 files at r1, 1 of 6 files at r3, 4 of 7 files at r5, 7 of 7 files at r6.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru, @miretskiy, and @pbardea)


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 35 at r6 (raw file):

// the system.jobs table to check whether the stream ingestion job has been
// signaled to cutover.
var cutoverSignalPollInterval = settings.RegisterDurationSetting(

does this setting need to move to processor file?


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 102 at r6 (raw file):

	}

	// A non-nil error is only possible if the job was signaled to cutover and the

you mean nil error?


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 178 at r6 (raw file):

// leftover in the keyspace even after the ClearRange is issued. In general the
// tenant keyspace of a failed/canceled ingestion job should be treated as
// corrupted, and the tenant should be dropped before resuming the ingestion.

If we're doing this... should we just not revert anything?


pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go, line 331 at r6 (raw file):

Previously, pbardea (Paul Bardea) wrote…

I think that it's rare that we'll hit this since we'd usually be blocked sending on the merged ch anyway when the context is cancelled. I think if we're not seeing any difference when stressing we can remove this.

+1 on removing

Copy link
Contributor Author

@adityamaru adityamaru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy and @pbardea)


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 35 at r6 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

does this setting need to move to processor file?

good idea, done.


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 94 at r6 (raw file):

Previously, pbardea (Paul Bardea) wrote…

Leftover comment?

Done.


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 102 at r6 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

you mean nil error?

yep fixed.


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 105 at r6 (raw file):

Previously, pbardea (Paul Bardea) wrote…

nit: spacing here

done.


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go, line 178 at r6 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

If we're doing this... should we just not revert anything?

yeah stubbed out OnFailOrCancel() and removed the test.


pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go, line 331 at r6 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

+1 on removing

removed.

@adityamaru adityamaru changed the title streamingest: add ctx cancelation to support cutting over streamingest: add per processor polling to observe cutover Feb 26, 2021
Copy link
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 6 of 6 files at r7.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy and @pbardea)

This change adds a poller to every stream ingestion processor to
check if the job has been marked for completion. If it has, it signals
the proessor to drain gracefully and eventually reverts the cluster  to
the cutover time thereby leaving it in a consistent state.

Note we have also made the stream ingestion job non-cancelable and do
not perform any cleanup on failure.

Release justification: low risk, high reward (changes to new
functionality)

Release note: None
@adityamaru
Copy link
Contributor Author

TFTRs!

bors r=pbardea,miretskiy

@adityamaru
Copy link
Contributor Author

adityamaru commented Feb 26, 2021

bors r-

@dt can you give this a quick look please, thanks!

@craig
Copy link
Contributor

craig bot commented Feb 26, 2021

Canceled.

@adityamaru
Copy link
Contributor Author

TFTR!

bors r+

@craig
Copy link
Contributor

craig bot commented Mar 1, 2021

Build succeeded:

@craig craig bot merged commit 220e611 into cockroachdb:master Mar 1, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants