diff --git a/rust/processor/src/config.rs b/rust/processor/src/config.rs index 92a696f68..5a9626b2a 100644 --- a/rust/processor/src/config.rs +++ b/rust/processor/src/config.rs @@ -43,6 +43,10 @@ pub struct IndexerGrpcProcessorConfig { #[serde(default = "AHashMap::new")] pub per_table_chunk_sizes: AHashMap, pub enable_verbose_logging: Option, + + #[serde(default = "IndexerGrpcProcessorConfig::default_grpc_response_item_timeout_in_secs")] + pub grpc_response_item_timeout_in_secs: u64, + #[serde(default)] pub transaction_filter: TransactionFilter, } @@ -65,6 +69,11 @@ impl IndexerGrpcProcessorConfig { pub const fn default_pb_channel_txn_chunk_size() -> usize { 100_000 } + + /// Default timeout for grpc response item in seconds. Defaults to 60 seconds. + pub const fn default_grpc_response_item_timeout_in_secs() -> u64 { + 60 + } } #[async_trait::async_trait] @@ -85,6 +94,7 @@ impl RunnableConfig for IndexerGrpcProcessorConfig { self.per_table_chunk_sizes.clone(), self.enable_verbose_logging, self.transaction_filter.clone(), + self.grpc_response_item_timeout_in_secs, ) .await .context("Failed to build worker")?; diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index c43fef9f7..5bae76224 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -43,8 +43,6 @@ use url::Url; // of 50 means that we could potentially have at least 4.8GB of data in memory at any given time and that we should provision // machines accordingly. pub const BUFFER_SIZE: usize = 100; -// Consumer thread will wait X seconds before panicking if it doesn't receive any data -pub const CONSUMER_THREAD_TIMEOUT_IN_SECS: u64 = 60 * 5; pub const PROCESSOR_SERVICE_TYPE: &str = "processor"; pub struct Worker { @@ -63,6 +61,7 @@ pub struct Worker { pub per_table_chunk_sizes: AHashMap, pub enable_verbose_logging: Option, pub transaction_filter: TransactionFilter, + pub grpc_response_item_timeout_in_secs: u64, } impl Worker { @@ -83,6 +82,7 @@ impl Worker { per_table_chunk_sizes: AHashMap, enable_verbose_logging: Option, transaction_filter: TransactionFilter, + grpc_response_item_timeout_in_secs: u64, ) -> Result { let processor_name = processor_config.name(); info!(processor_name = processor_name, "[Parser] Kicking off"); @@ -117,6 +117,7 @@ impl Worker { per_table_chunk_sizes, enable_verbose_logging, transaction_filter, + grpc_response_item_timeout_in_secs, }) } @@ -306,7 +307,8 @@ impl Worker { let chain_id = self .grpc_chain_id .expect("GRPC chain ID has not been fetched yet!"); - + let grpc_response_item_timeout = + std::time::Duration::from_secs(self.grpc_response_item_timeout_in_secs); tokio::spawn(async move { let task_index_str = task_index.to_string(); let step = ProcessorStep::ProcessedBatch.get_step(); @@ -321,6 +323,7 @@ impl Worker { &stream_address, receiver_clone.clone(), task_index, + grpc_response_item_timeout, ) .await; @@ -600,9 +603,36 @@ async fn fetch_transactions( stream_address: &str, receiver: kanal::AsyncReceiver, task_index: usize, + grpc_response_item_timeout: std::time::Duration, ) -> TransactionsPBResponse { let pb_channel_fetch_time = std::time::Instant::now(); - let txn_pb_res = receiver.recv().await; + let txn_pb_res = match tokio::time::timeout(grpc_response_item_timeout, receiver.recv()).await { + Ok(Ok(res)) => Ok(res), + Ok(Err(_)) => { + error!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + stream_address = stream_address, + "[Parser][T#{}] Consumer thread failed to receive transactions", + task_index + ); + Err(anyhow::anyhow!( + "Consumer thread failed to receive transactions" + )) + }, + Err(_) => { + error!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + stream_address = stream_address, + "[Parser][T#{}] Consumer thread timed out waiting for transactions", + task_index + ); + Err(anyhow::anyhow!( + "Consumer thread timed out waiting for transactions" + )) + }, + }; // Track how much time this task spent waiting for a pb bundle PB_CHANNEL_FETCH_WAIT_TIME_SECS .with_label_values(&[processor_name, &task_index.to_string()]) @@ -611,15 +641,8 @@ async fn fetch_transactions( match txn_pb_res { Ok(txn_pb) => txn_pb, Err(_e) => { - error!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - stream_address = stream_address, - "[Parser][T#{}] Consumer thread timed out waiting for transactions", - task_index - ); panic!( - "[Parser][T#{}] Consumer thread timed out waiting for transactions", + "[Parser][T#{}] Consumer thread failed or timed out waiting for transactions", task_index ); },