Skip to content

Commit

Permalink
fix: continuous adapter (PRO-684) (#4503)
Browse files Browse the repository at this point in the history
* fix: continuous adapter

* comment
  • Loading branch information
AlastairHolmes committed Feb 26, 2024
1 parent c1271c0 commit 20019b6
Showing 1 changed file with 13 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ where
stream::unfold(
(chain_stream.fuse(), chain_client.clone(), epoch, unprocessed_indices, inprogress_indices, processed_indices),
move |(mut chain_stream, chain_client, mut epoch, mut unprocessed_indices, mut inprogress_indices, mut processed_indices)| async move {
let is_epoch_complete = |processed_indices: &RleBitmap<Self::Index>, end: Self::Index| {
processed_indices.is_superset(&{
let mut bitmap = RleBitmap::new(true);
bitmap.set_range(..epoch.info.1, false);
bitmap.set_range(end.., false);
bitmap
})
};

loop_select!(
let header = chain_stream.next_or_pending() => {
let highest_processed = processed_indices.iter(true).last().map_or(epoch.info.1, |highest_processed| std::cmp::max(highest_processed, epoch.info.1));
Expand All @@ -113,12 +122,10 @@ where

break Some((header, (chain_stream, chain_client, epoch, unprocessed_indices, inprogress_indices, processed_indices)))
},
if epoch.historic_signal.get().is_some() && processed_indices.is_superset(&{
let mut bitmap = RleBitmap::new(true);
bitmap.set_range(..epoch.info.1, false);
bitmap.set_range(epoch.historic_signal.get().unwrap().1.., false);
bitmap
}) => break None,
// Allows the stream to exit while waiting for blocks, if the epoch becomes historic
if let true = epoch.historic_signal.clone().wait().map(|(_, historic_at, _)| is_epoch_complete(&processed_indices, historic_at)) => {
break None
} else disable then if is_epoch_complete(&processed_indices, epoch.historic_signal.get().unwrap().1) => break None,
let (_, header) = inprogress_indices.next_or_pending() => {
processed_indices.set(header.index, true);
let _result = self.store.store(epoch.index, &processed_indices);
Expand Down

0 comments on commit 20019b6

Please sign in to comment.