-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
streamingccl: add IngestionFrontierProcessor #58672
streamingccl: add IngestionFrontierProcessor #58672
Conversation
Will rebase once the ingestion processor lands! |
02610c0
to
29f656b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM -- I have one question about updating the job progress here vs in the job itself.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru and @pbardea)
pkg/ccl/streamingccl/stream_ingestion_frontier_processor.go, line 44 at r1 (raw file):
// jobProgressedFn, if non-nil, is called to checkpoint the stream ingestion // job's progress in the corresponding system job entry. jobProgressedFn func(context.Context, jobs.HighWaterProgressedFn) error
I think an alternative to this processor updating the job directly would be for this processor's Next() to return when there is an update to the frontier and then let the job itself be responsible for maintaining it's own progress. I think that letting the processor not need to worry about the existence of jobs might be a nice property so that this processor just takes in rows and outputs rows without side-effects. (Concretely, we would return the new frontier in Next()
upon frontierChanged
.)
I'm curious if you considered that approach/any tradeoffs that you see with it. I would imagine that there would be more overhead in that case, but I'm not sure how much.
I don't feel strongly either way but want to discuss the tradeoffs.
pkg/ccl/streamingccl/stream_ingestion_frontier_processor.go, line 154 at r1 (raw file):
if !resolved.Timestamp.IsEmpty() && resolved.Timestamp.Less(sf.highWaterAtStart) { logcrash.ReportOrPanic(sf.Ctx, &sf.flowCtx.Cfg.Settings.SV, `got a resolved timestamp %s that is less than the initial high-water %s`,
nit: s/initial high-water/start time/ might be clearer here as an error message?
70979d1
to
86a1918
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @pbardea)
pkg/ccl/streamingccl/stream_ingestion_frontier_processor.go, line 44 at r1 (raw file):
Previously, pbardea (Paul Bardea) wrote…
I think an alternative to this processor updating the job directly would be for this processor's Next() to return when there is an update to the frontier and then let the job itself be responsible for maintaining it's own progress. I think that letting the processor not need to worry about the existence of jobs might be a nice property so that this processor just takes in rows and outputs rows without side-effects. (Concretely, we would return the new frontier in
Next()
uponfrontierChanged
.)I'm curious if you considered that approach/any tradeoffs that you see with it. I would imagine that there would be more overhead in that case, but I'm not sure how much.
I don't feel strongly either way but want to discuss the tradeoffs.
Changed to the above-described logic! I have tests for the frontier proc and the rows it returns. I will hook up the job progress and write tests for its progress update in a follow-up PR.
I also removed the LoadJob from the frontier processor which was being used to set the HighWaterAtStart field. Instead I plan to pass this in via the FrontierSpec once the job progress is hooked up.
pkg/ccl/streamingccl/stream_ingestion_frontier_processor.go, line 154 at r1 (raw file):
Previously, pbardea (Paul Bardea) wrote…
nit: s/initial high-water/start time/ might be clearer here as an error message?
updated.
86a1918
to
d8d05ca
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking good!
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru and @pbardea)
pkg/ccl/streamingccl/stream_ingestion_frontier_processor_test.go, line 74 at r2 (raw file):
name string events partitionToEvent expectedFrontierTimestamp hlc.Timestamp
Maybe we can also add a test case exercising some error cases (maybe the case where the input sends a resolved timestamp before the start time?)
pkg/ccl/streamingccl/stream_ingestion_frontier_processor_test.go, line 145 at r2 (raw file):
sip, ok := proc.(*streamIngestionProcessor) if !ok { t.Fatal("expected the processor that's created to be a split and scatter processor")
s/split and scatter/stream ingestion
pkg/ccl/streamingccl/stream_ingestion_processor_planning.go, line 157 at r2 (raw file):
} func (s *streamIngestionResultWriter) AddRow(ctx context.Context, row tree.Datums) error {
Can we add AddRow implements the rowexec.rowResultWriter interface
comments here and on the methods below?
pkg/ccl/streamingccl/stream_ingestion_processor_planning.go, line 166 at r2 (raw file):
// Decode the row and write the ts. var ingestedHighWatermark hlc.Timestamp if err := protoutil.Unmarshal([]byte(*row[0].(*tree.DBytes)),
Can we check that row is not empty and row[0] is non-nil and return an error to avoid a panic?
pkg/ccl/streamingccl/stream_ingestion_processor_test.go, line 58 at r2 (raw file):
var ok bool if events, ok = m.partitionEvents[address]; !ok { return nil, errors.Newf("No events found for paritition %s", address)
super nit (especially considering it's a test), but s/No/no :)
pkg/sql/execinfrapb/processors_bulk_io.proto, line 144 at r2 (raw file):
// processor. It is used to convey the timestamp until which the ingestion // job has ingested data.
Does this actually convey the timestamp up until which the ingestion job has ingested data? This is only the start time of the ingestion job right? Does it make sense to say that this is the timestamp at which the frontier is initialized to and the processor expects there to be no events passed to it at a lower timestamp?
d8d05ca
to
43c9f42
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @pbardea)
pkg/ccl/streamingccl/stream_ingestion_frontier_processor_test.go, line 74 at r2 (raw file):
Previously, pbardea (Paul Bardea) wrote…
Maybe we can also add a test case exercising some error cases (maybe the case where the input sends a resolved timestamp before the start time?)
added a test for the start time < checkpoint event case. The remaining error cases in the frontier processor arise from not being able to marshal/unmarshal the row returned by the ingestion processor.
pkg/ccl/streamingccl/stream_ingestion_frontier_processor_test.go, line 145 at r2 (raw file):
Previously, pbardea (Paul Bardea) wrote…
s/split and scatter/stream ingestion
Done.
pkg/ccl/streamingccl/stream_ingestion_processor_planning.go, line 157 at r2 (raw file):
Previously, pbardea (Paul Bardea) wrote…
Can we add
AddRow implements the rowexec.rowResultWriter interface
comments here and on the methods below?
Done.
pkg/ccl/streamingccl/stream_ingestion_processor_planning.go, line 166 at r2 (raw file):
Previously, pbardea (Paul Bardea) wrote…
Can we check that row is not empty and row[0] is non-nil and return an error to avoid a panic?
Done.
pkg/ccl/streamingccl/stream_ingestion_processor_test.go, line 58 at r2 (raw file):
Previously, pbardea (Paul Bardea) wrote…
super nit (especially considering it's a test), but s/No/no :)
Done.
pkg/sql/execinfrapb/processors_bulk_io.proto, line 144 at r2 (raw file):
Previously, pbardea (Paul Bardea) wrote…
// processor. It is used to convey the timestamp until which the ingestion // job has ingested data.
Does this actually convey the timestamp up until which the ingestion job has ingested data? This is only the start time of the ingestion job right? Does it make sense to say that this is the timestamp at which the frontier is initialized to and the processor expects there to be no events passed to it at a lower timestamp?
yup you're right. Updated!
43c9f42
to
e88d939
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM -- this should be good to merge after the rebase too once CI is happy with it.
e88d939
to
851a948
Compare
There's a data race that I believe will be fixed by the refactor of the ingestion processor in #59139, so I'm going to wait on that getting checked in. |
833a6ab
to
dfdfa8b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 1 of 0 LGTMs obtained (waiting on @adityamaru and @pbardea)
pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go, line 31 at r3 (raw file):
func init() { rowexec.NewStreamIngestionFrontierProcessor = newStreamIngestionFrontierProcessor
let's move this init
to the frontier processor itself?
pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go, line 38 at r3 (raw file):
a rowenc.DatumAlloc // input returns rows from one or more streamIngestion processors
nit: .
at the end of the line
pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go, line 88 at r3 (raw file):
sf.input.Start(ctx) ctx = sf.StartInternal(ctx, streamIngestionFrontierProcName) return ctx
nit: sf.StartInternal(...
since we don't use ctx?
pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go, line 118 at r3 (raw file):
if err != nil { sf.MoveToDraining(err) return nil, sf.DrainHelper()
nit: break
for consistency instead?
The IngestionFrontierProcessor (IFP) is fed rows from the IngestionProcessor. Each row represents a partition and the latest resolved ts event received for that partition. The IFP uses the existing util `spanFrontier` (used by changefeeds) to keep track of all the rows returned by the several ingestion procs, and resolves the minimum resolved ts for a particular partition across all procs. This is then written to the ingestion job progress indicating that all processors have ingested a stream upto a particular resolved ts. Release note: None
dfdfa8b
to
013a948
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 1 of 0 LGTMs obtained (waiting on @pbardea)
pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go, line 31 at r3 (raw file):
Previously, pbardea (Paul Bardea) wrote…
let's move this
init
to the frontier processor itself?
ahh, weird how this got here. Done.
TFTR! bors r=pbardea |
Build succeeded: |
The IngestionFrontierProcessor (IFP) is fed rows from the IngestionProcessor.
Each row represents a partition and the latest resolved ts event
received for that partition. The IFP uses the existing util
spanFrontier
(used by changefeeds) to keep track of all the rowsreturned by the several ingestion procs, and resolves the minimum
resolved ts for a particular partition across all procs. This is then
written to the ingestion job progress indicating that all processors
have ingested a stream up to a particular resolved ts.
TODO: Job resumption from the resolved ts reported in its progress. Will do this in a follow up PR, along with a test.
Informs: #57400
Release note: None