-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sync): headers commit threshold #296
Conversation
Codecov Report
@@ Coverage Diff @@
## main #296 +/- ##
========================================
Coverage 73.73% 73.74%
========================================
Files 229 230 +1
Lines 21081 21393 +312
========================================
+ Hits 15544 15776 +232
- Misses 5537 5617 +80
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the stream looks alright,
since we need to day a lot of checks and conditionally return it makes sense to solve this with macros here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice work. some nits, requests for more docs, and questions aroudn if we had bugs before
let mut headers = resp.0.into_iter().skip(1).map(|h| h.seal()).collect::<Vec<_>>(); | ||
headers.sort_unstable_by_key(|h| h.number); | ||
Poll::Ready(Ok(headers)) | ||
Poll::Ready(Ok(headers.into_iter().rev().collect())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why rev here? why was it not before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initially, we returned it in reverse. then it was changed during a recent refactor. that made sense for the Future
implementation alone, but it doesn't for the Stream
which returns reverse batches. i changed it back to keep it consistent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @mattsse
while let Some(header) = headers_iter.next() { | ||
ensure_parent(header, headers_iter.peek().unwrap_or(&&head)) | ||
.map_err(|err| StageError::Download(err.to_string()))?; | ||
if let Some(parent) = headers_iter.peek() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if there's no parent? should this error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this means that the parent will come in the next batch. best we can do is to validate upon receiving that batch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok lets add that comment then
} | ||
} | ||
|
||
if !this.done && this.buffered.len() > 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if we're not done and we have only 1 buffered?
or does having only 1 buffered item mean we're done? let's document that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the done
flag is set to true
only when the local head (latest db header at the beginning of execution) is reached. we need to keep at least one header to form the next request until we are done
Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
…oundry-rs/reth into rkrasiuk/sync-headers-commit-threshold
let empty = SealedHeader::default(); | ||
if let Err(error) = this.consensus.validate_header(&empty, &empty) { | ||
this.done = true; | ||
return Poll::Ready(Some(Err(DownloadError::HeaderValidation { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we validating the default header against the default header?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we're polling consensus stub merely to see whether it will produce an error. it is set on the TestConsensus
struct and it will either succeed or fail regardless of input parameters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm - merging to get the ball rolling but pls address comments in followup
let mut headers = resp.0.into_iter().skip(1).map(|h| h.seal()).collect::<Vec<_>>(); | ||
headers.sort_unstable_by_key(|h| h.number); | ||
Poll::Ready(Ok(headers)) | ||
Poll::Ready(Ok(headers.into_iter().rev().collect())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @mattsse
} | ||
|
||
Ok(latest) | ||
} | ||
|
||
/// Iterate over inserted headers and write td entries | ||
fn write_td<DB: Database>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure what about a 3rd party client that wants to overwrite the difficulty table for any reason?
while let Some(header) = headers_iter.next() { | ||
ensure_parent(header, headers_iter.peek().unwrap_or(&&head)) | ||
.map_err(|err| StageError::Download(err.to_string()))?; | ||
if let Some(parent) = headers_iter.peek() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok lets add that comment then
} | ||
|
||
Ok(latest) | ||
} | ||
|
||
/// Iterate over inserted headers and write td entries | ||
fn write_td<DB: Database>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i also think it looks weird given it doesn't need &self
either
* headers stream init * fix tests * return header if available regardless of control flow * proper stream termination & docs * upd headers stage to consume stream * adjust response validation for stream * use cursor.insert for headers * wrap poll_next in a loop to bypass poking waker * fix typo * fix last td lookup * Apply suggestions from code review Co-authored-by: Georgios Konstantopoulos <me@gakonst.com> * misc * remove waker ref * dedup response handling logic * clippy * add docs to poll Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
Modify the headers stage to download the block headers in batches. Add a
commit_threshold
parameter for aggregated commitsblocked by #310