Skip to content

Commit

Permalink
change requesting logic
Browse files Browse the repository at this point in the history
  • Loading branch information
mike1729 committed Nov 24, 2022
1 parent d38352b commit 561a8e2
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 80 deletions.
8 changes: 5 additions & 3 deletions finality-aleph/src/justification/handler.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::{sync::Arc,time::{Duration, Instant}};
use std::{
sync::Arc,
time::{Duration, Instant},
};

use futures::{channel::mpsc, Stream, StreamExt};
use futures_timer::Delay;
Expand Down Expand Up @@ -50,7 +53,7 @@ where
finalizer: F,
justification_request_scheduler: S,
metrics: Option<Metrics<<B::Header as Header>::Hash>>,
justification_handler_config: JustificationHandlerConfig<B>,
justification_handler_config: JustificationHandlerConfig,
) -> Self {
Self {
session_info_provider,
Expand All @@ -60,7 +63,6 @@ where
finalizer,
justification_request_scheduler,
metrics,
justification_handler_config.min_allowed_delay,
),
verifier_timeout: justification_handler_config.verifier_timeout,
notification_timeout: justification_handler_config.notification_timeout,
Expand Down
18 changes: 5 additions & 13 deletions finality-aleph/src/justification/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,36 +55,28 @@ pub struct JustificationNotification<Block: BlockT> {
}

#[derive(Clone)]
pub struct JustificationHandlerConfig<B: BlockT> {
pub struct JustificationHandlerConfig {
/// How long should we wait when the session verifier is not yet available.
verifier_timeout: Duration,
/// How long should we wait for any notification.
notification_timeout: Duration,
///Distance (in amount of blocks) between the best and the block we want to request justification
min_allowed_delay: NumberFor<B>,
}

impl<B: BlockT> Default for JustificationHandlerConfig<B> {
impl Default for JustificationHandlerConfig {
fn default() -> Self {
Self {
verifier_timeout: Duration::from_millis(500),
notification_timeout: Duration::from_millis(1000),
min_allowed_delay: 3u32.into(),
notification_timeout: Duration::from_millis(800),
}
}
}

#[cfg(test)]
impl<B: BlockT> JustificationHandlerConfig<B> {
pub fn new(
verifier_timeout: Duration,
notification_timeout: Duration,
min_allowed_delay: NumberFor<B>,
) -> Self {
impl JustificationHandlerConfig {
pub fn new(verifier_timeout: Duration, notification_timeout: Duration) -> Self {
Self {
verifier_timeout,
notification_timeout,
min_allowed_delay,
}
}
}
109 changes: 45 additions & 64 deletions finality-aleph/src/justification/requester.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{sync::Arc, fmt, marker::PhantomData, time::Instant};
use std::{fmt, marker::PhantomData, sync::Arc, time::Instant};

use aleph_primitives::ALEPH_ENGINE_ID;
use log::{debug, error, info, warn};
Expand Down Expand Up @@ -79,13 +79,13 @@ where
S: JustificationRequestScheduler,
F: BlockFinalizer<B>,
V: Verifier<B>,
BE: Backend<B>,
{
block_requester: RB,
backend: Arc<BE>,
finalizer: F,
justification_request_scheduler: S,
metrics: Option<Metrics<<B::Header as Header>::Hash>>,
min_allowed_delay: NumberFor<B>,
request_status: JustificationRequestStatus<B>,
_phantom: PhantomData<V>,
}
Expand All @@ -105,15 +105,13 @@ where
finalizer: F,
justification_request_scheduler: S,
metrics: Option<Metrics<<B::Header as Header>::Hash>>,
min_allowed_delay: NumberFor<B>,
) -> Self {
BlockRequester {
block_requester,
backend,
finalizer,
justification_request_scheduler,
metrics,
min_allowed_delay,
request_status: JustificationRequestStatus::new(),
_phantom: PhantomData,
}
Expand Down Expand Up @@ -171,7 +169,7 @@ where
pub fn request_justification(&mut self, num: NumberFor<B>) {
match self.justification_request_scheduler.schedule_action() {
SchedulerActions::Request => {
self.leaves(num)
self.request_targets(num)
.into_iter()
.for_each(|hash| self.request(hash));
}
Expand All @@ -187,68 +185,51 @@ where
self.backend.blockchain().info().finalized_number
}

fn request(&mut self, header: <B as BlockT>::Header) {
let number = *header.number();
let hash = header.hash();

debug!(target: "aleph-justification", "Trying to request block {:?} {:?}", number, hash);
self.request_status.save_block_number(number);

self.request_status.insert_hash(hash);
debug!(target: "aleph-justification", "We have block {:?} with hash {:?}. Requesting justification.", number, hash);
self.justification_request_scheduler.on_request_sent();
self.block_requester.request_justification(&hash, number);
fn request(&mut self, hash: <B as BlockT>::Hash) {
if let Ok(Some(header)) = self.backend.blockchain().header(BlockId::Hash(hash)) {
let number = *header.number();
debug!(target: "aleph-justification", "Trying to request block {:?}", number);
self.request_status.save_block_number(number);
self.request_status.insert_hash(hash);
debug!(target: "aleph-justification", "We have block {:?} with hash {:?}. Requesting justification.", number, header.hash());
self.justification_request_scheduler.on_request_sent();
self.block_requester.request_justification(&hash, number);
} else {
debug!(target: "aleph-justification", "Cancelling request, because we don't have block {:?}.", hash);
}
}

fn leaves(&self, limit: NumberFor<B>) -> Vec<<B as BlockT>::Header> {
// We request justifications for all the children of last finalized block and a justification
// for a block of number num on longest branch.
// Assuming that we request at the same pace that finalization is progressing, the former ensures
// that we are up to date with finalization. On the other hand, the former enables fast catch up
// if we are behind.
// We don't remove the child that it's on the same branch as best since a fork may happen
// somewhere in between them.
fn request_targets(&self, mut num: NumberFor<B>) -> Vec<<B as BlockT>::Hash> {
let blockchain_backend = self.backend.blockchain();
let finalized_hash = blockchain_backend.info().finalized_hash;
let finalized_number = blockchain_backend.info().finalized_number;

let ancestor = |hash: <B as BlockT>::Hash| {
blockchain_backend
.header(BlockId::Hash(hash))
.ok()
.flatten()
.map(|header| {
let number = *header.number();
let mut ancestor = header;
let mut delay = if self.min_allowed_delay < (number - finalized_number) {
self.min_allowed_delay
} else {
number - finalized_number
};
while delay > 0u32.into() {
let ancestor_hash = *ancestor.parent_hash();
if let Ok(Some(header)) =
blockchain_backend.header(BlockId::Hash(ancestor_hash))
{
ancestor = header;
delay -= 1u32.into();
} else {
break;
}
}
ancestor
})
};

let leaves = {
let lock = self.backend.get_import_lock();
blockchain_backend
.children(finalized_hash)
.unwrap_or_default()
.iter()
.filter_map(|hash| {
blockchain_backend
.best_containing(*hash, Some(limit), lock)
.ok()
.flatten()
.and_then(ancestor)
})
.collect()
};
let blockchain_info = blockchain_backend.info();
let finalized_hash = blockchain_info.finalized_hash;

let mut targets = blockchain_backend
.children(finalized_hash)
.unwrap_or_default();
let best_number = blockchain_info.best_number;
if best_number < num {
num = best_number;
}
match blockchain_backend.hash(num) {
Ok(Some(hash)) => {
targets.push(hash);
}
Ok(None) => {
debug!(target: "aleph-justification", "Cancelling request, because we don't have block {:?}.", num);
}
Err(err) => {
debug!(target: "aleph-justification", "Cancelling request, because fetching block {:?} failed {:?}.", num, err);
}
}

leaves
targets
}
}

0 comments on commit 561a8e2

Please sign in to comment.