From 20019b63e81c1343299734b9eab8c03bf238909f Mon Sep 17 00:00:00 2001 From: Alastair Holmes <42404303+AlastairHolmes@users.noreply.github.com> Date: Mon, 26 Feb 2024 15:02:30 +0100 Subject: [PATCH] fix: continuous adapter (PRO-684) (#4503) * fix: continuous adapter * comment --- .../chunked_by_vault/continuous.rs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/engine/src/witness/common/chunked_chain_source/chunked_by_vault/continuous.rs b/engine/src/witness/common/chunked_chain_source/chunked_by_vault/continuous.rs index 5a85a61592..8975e0ddf1 100644 --- a/engine/src/witness/common/chunked_chain_source/chunked_by_vault/continuous.rs +++ b/engine/src/witness/common/chunked_chain_source/chunked_by_vault/continuous.rs @@ -87,6 +87,15 @@ where stream::unfold( (chain_stream.fuse(), chain_client.clone(), epoch, unprocessed_indices, inprogress_indices, processed_indices), move |(mut chain_stream, chain_client, mut epoch, mut unprocessed_indices, mut inprogress_indices, mut processed_indices)| async move { + let is_epoch_complete = |processed_indices: &RleBitmap, end: Self::Index| { + processed_indices.is_superset(&{ + let mut bitmap = RleBitmap::new(true); + bitmap.set_range(..epoch.info.1, false); + bitmap.set_range(end.., false); + bitmap + }) + }; + loop_select!( let header = chain_stream.next_or_pending() => { let highest_processed = processed_indices.iter(true).last().map_or(epoch.info.1, |highest_processed| std::cmp::max(highest_processed, epoch.info.1)); @@ -113,12 +122,10 @@ where break Some((header, (chain_stream, chain_client, epoch, unprocessed_indices, inprogress_indices, processed_indices))) }, - if epoch.historic_signal.get().is_some() && processed_indices.is_superset(&{ - let mut bitmap = RleBitmap::new(true); - bitmap.set_range(..epoch.info.1, false); - bitmap.set_range(epoch.historic_signal.get().unwrap().1.., false); - bitmap - }) => break None, + // Allows the stream to exit while waiting for blocks, if the epoch becomes historic + if let true = epoch.historic_signal.clone().wait().map(|(_, historic_at, _)| is_epoch_complete(&processed_indices, historic_at)) => { + break None + } else disable then if is_epoch_complete(&processed_indices, epoch.historic_signal.get().unwrap().1) => break None, let (_, header) = inprogress_indices.next_or_pending() => { processed_indices.set(header.index, true); let _result = self.store.store(epoch.index, &processed_indices);