-
Notifications
You must be signed in to change notification settings - Fork 466
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
storage: use fueled output handles in sources #27814
storage: use fueled output handles in sources #27814
Conversation
dd3f894
to
9cb5037
Compare
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
9cb5037
to
90fd919
Compare
MitigationsCompleting required mitigations increases Resilience Coverage.
Risk Summary:The risk score for this pull request is high at 80, indicating a significant chance of introducing a bug, especially since one of the modified files is a known bug hotspot. The repository's predicted bug trend is on the rise, although the observed trend has remained steady. It's important to note that, historically, pull requests with similar characteristics to this one are 106% more likely to cause a bug compared to the repository's baseline. The predictors driving this risk score are the sum of bug reports of the files affected and the change in executable lines of code. Note: The risk score is not based on semantic analysis but on historical predictors of bug occurrence in the repository. The attributes above were deemed the strongest predictors based on that history. Predictors and the score may change as the PR evolves in code, time, and review activity. Bug Hotspots:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - too bad you had to add back in all the .await
calls that you previously removed when updating the give
method!
impl Region for SourceMessageRegion { | ||
type Item = SourceMessage; | ||
|
||
unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This crate is wildly unsafe
😨
(from the columnation docstring )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
heh, yeah.. fortunately we will soon migrate to flatcontainer
which is safer
}); | ||
row_temp.push(c); | ||
} | ||
Ok(std::mem::take(&mut row_temp)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this works because Vec::default() doesn't allocate, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right!
Will wait for a Nightly run before merging https://buildkite.com/materialize/nightly/builds/8246 |
@@ -13,6 +13,7 @@ workspace = true | |||
differential-dataflow = "0.12.0" | |||
either = "1" | |||
lgalloc = "0.3" | |||
columnation = { git = "https://github.com/frankmcsherry/columnation" } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Don't depend on it, but use timely::container::columnation
.
@@ -102,6 +106,62 @@ pub struct SourceMessage { | |||
pub metadata: Row, | |||
} | |||
|
|||
mod columnation { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implementation LGTM.
Motivation
This PR makes it so that sources produce collections backed by
StackWrapper<T>
timely containers as opposed toVec<T>
containers. TheStackWrapper
container stores its data into flat regions (currently backed byColumnation
) which allow us to easily measure the heap size of the data produced so far. Currently these container survive until the reclocking boundary where they turn back to normalVec<T>
containers in the mz scope. We will probably want to use those region allocated containers throughout the storage dataflows but I didn't want to implement a bigger change than required for this feature.Creating an output in an async operator with
StackWrapper
container and the newly introducedAccountedStackBuilder
container builder unlocks a.give_fueled()
API which will automatically yield back to timely once certain amount of MBs have been emitted into the dataflow. The limit is currently set to 128MB.I have gone through the feature benchmark and confirmed that this change does not produce a performance regression for any of the ingestion workloads. You can find the results here https://buildkite.com/materialize/nightly/builds/8237
Closes #27211
Tips for reviewer
The PR is made of 5 simple commits that add
Columnation
implementations for relevant types and change the return type required bySourceRender::render
to be aCollection
that usesStackWrapper
containers.Then there are 4 separate commits, one for each source type, that have source specific changes required to produce these stack-container-based collections.
The files that each commit touches are disjoint so this can also be reviewed using the full diff.
Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way), then it is tagged with aT-proto
label.