From 3be586210ecfefdff53ae4c114d4b73b0f6b587d Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Tue, 6 Aug 2024 15:10:11 -0700 Subject: [PATCH] fix: Decreasing Processed Block Error (#988) Block streams are failing due to their start block being lower than the current. This PR fixes this issue by seeding the health task with the input start block. This should fix the issue in most cases. I've also ensured that the handoff between receiver_blocks and lake also do not repeat a block. --- block-streamer/src/block_stream.rs | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 561e2fc7..9489eaad 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -120,18 +120,22 @@ impl BlockStream { } } - fn start_health_monitoring_task(&self, redis: Arc) -> JoinHandle<()> { + fn start_health_monitoring_task( + &self, + redis: Arc, + start_block_height: near_indexer_primitives::types::BlockHeight, + ) -> JoinHandle<()> { tokio::spawn({ let config = self.indexer_config.clone(); let health = self.health.clone(); let redis_stream = self.redis_stream.clone(); + let stalled_timeout_seconds = 120; async move { - let mut last_processed_block = - redis.get_last_processed_block(&config).await.unwrap(); - + let mut last_processed_block = Some(start_block_height - 1); loop { - tokio::time::sleep(std::time::Duration::from_secs(120)).await; + tokio::time::sleep(std::time::Duration::from_secs(stalled_timeout_seconds)) + .await; let new_last_processed_block = if let Ok(block) = redis.get_last_processed_block(&config).await { @@ -183,6 +187,11 @@ impl BlockStream { health_lock.processing_state = ProcessingState::Waiting; } Ordering::Equal => { + tracing::warn!( + account_id = config.account_id.as_str(), + function_name = config.function_name, + "No block has been processed for {stalled_timeout_seconds} seconds" + ); health_lock.processing_state = ProcessingState::Stalled; } Ordering::Greater => { @@ -266,7 +275,8 @@ impl BlockStream { let cancellation_token = tokio_util::sync::CancellationToken::new(); - let monitor_handle = self.start_health_monitoring_task(redis.clone()); + let monitor_handle = + self.start_health_monitoring_task(redis.clone(), start_block_height.clone()); let stream_handle = self.start_block_stream_task( start_block_height,