-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] streamingccl: add polling processor to check for cutover signal #60763
Conversation
This is still a WIP because of the lack of tests, but I wanted to get a review to see if anything is amiss. I've also added a few inline questions I have. |
if meta.Err != nil { | ||
sip.MoveToDraining(nil /* err */) | ||
} | ||
sip.ingestionMeta = meta |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm not a 100% sure if I'm handling this meta correctly. Should we just have another channel (like the signalCh) where we send the meta?
row := rowenc.EncDatumRow{ | ||
rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(streamIngestBytes))), | ||
} | ||
return row, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After we signal, what is the correct behavior? Is it okay that the processor might push this same row tick seconds later? I also played around with the idea of immediately tearing down the processor once we push a row but a downstream stream ingestion processor might handle the meta before the signalled row and that doesn't seem like what we want.
Currently, the stream ingestion processor will call ConsumerClosed()
and then teardown this processor.
This change adds a new polling processor to the stream ingestion flow. This processor is currently responsible for polling the systm.jobs table and checking if a cutover has been signalled. If it finds a signal it uses a mirror router to propagate that signal to all the downstream stream ingestion processors. The teardown then ensues and eventually pushes a "cutover" meta record back to the stream ingestion resumer. This change moves the revert range request from OnFailOrCancel, to be invoked when the resumer gets this "cutover" meta record. Release note: None
3d01e19
to
cb87e36
Compare
} | ||
|
||
if err := sip.Init(sip, post, streamIngestionResultTypes, flowCtx, processorID, output, nil, /* memMonitor */ | ||
if err := sip.Init(sip, post, input.OutputTypes(), flowCtx, processorID, output, nil, /* memMonitor */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although in this case input.OutputTypes() == streamIngestionResultsTypes
I think that this should remain as they were?
Closing in favour of #61001. |
This change adds a new polling processor to the stream ingestion flow.
This processor is currently responsible for polling the systm.jobs table
and checking if a cutover has been signalled. If it finds a signal it
uses a mirror router to propagate that signal to all the downstream
stream ingestion processors. The teardown then ensues and eventually
pushes a "cutover" meta record back to the stream ingestion resumer.
This change moves the revert range request from OnFailOrCancel, to be
invoked when the resumer gets this "cutover" meta record.
Release note: None