From ca250e5328c0e0cfb00e3a6c01529d9ff8ab7186 Mon Sep 17 00:00:00 2001 From: Adi Seredinschi Date: Wed, 7 Jul 2021 18:39:08 +0200 Subject: [PATCH] Hermes retrying mechanism for packet clearing (#1146) * temp change for ft-transfer to send one msg/Tx * event monitor: Bulk events from all transactions included in a block * Update changelog * temp change for ft-transfer to send one msg/Tx * Optimize spawning of workers - draft * Add back check to start workers only if channel is open * Cleanup * Check connection state * temp change for ft-transfer to send one msg/Tx * Improve config loading message (#933) * Improve config load message * Raised log level to error. Log link to config example in the guide. * Changelog Co-authored-by: Adi Seredinschi * Migration to tx sync * Add Tx simulate * Add adjustment to the tx simulate result, some cleanup * Nitpick in CosmosSdkChain::key_and_bytes * Small cleanup * Remove duplicate send_msgs method * Cleanup config file * Fix typo after refactoring * Fix `query packet tx` description * Rework `wait_for_block_commits` to use the `retry` crate * Compute tx fee based on gas from tx simulate and gas price * Re-add missing error type * Combine `fee_denom` and `gas_price` into `gas_price` config option * Add tests for `mul_ceil` * Fix config serialization * Remove `fee_amount` config property * Update changelog * Avoid op data regeneration if retries exhausted. * Increase the number of retries while checking Tx is included in a block. * Move `query packet tx` to `query tx events` * better error msgs * Add Display instance for GasPrice * Fix default gas price denomination * Improve some debug messages * Rename `gas_price.amount` to `gase_price.price` * Add configurable fee adjustment to make it more likely gas estimate ends up being enough * Add Tx hash to ChainErr * Fix config files * Masked tonic::Code::NotFound result for query_client_connections. * Modified cfg option from gas to max_gas * Consistent trust_threshold in default config.toml * Revert guide updates * Nit: Imports for query.rs * Print info message when Hermes starts * Implement basic filtering based on channel identifiers * Add per chain filters, only channel based filtering support * Fix gas adjustement to be percentage on top of computed gas * Attempt to fix gas_limit * Fix chain spawn unrwap for QueryUnreceivedPacketsCmd * Retry on no confirmation * Print simulation errors. Trim clear packets output. * Retry timeout parametrized parametrized by rpc_timeout * Bring back NoConfirmation error and don't retry. Trigger packet clearing every 100 blocks. * Don't log during clearing packets unless there is work to do. * Revert printing of simulation error. Revert commit 054ff2a1a6133b982a860e7dcaeeb2af4c679b3e partly. Reason: printing of simulation error is handled separately in parallel PR #1137. * Cleanup * Consolidated output for packet clearing methods * Cleaner code around clear_packets method * Undid -k option for packet-recv * More context loaded into TxNoConfirmation * More context when uni path/link worker encounters error * Improve tx no confirmation error message * Make clearing packets interval configurable * Update changelog * Update CI config * Better defaults * Remove backup file * Formatting Co-authored-by: Anca Zamfir Co-authored-by: Romain Ruetschi --- CHANGELOG.md | 4 + ci/simple_config.toml | 1 + config.toml | 3 + relayer/src/chain/cosmos.rs | 90 ++++++++++-------- relayer/src/channel.rs | 2 +- relayer/src/config.rs | 17 +++- relayer/src/error.rs | 8 +- relayer/src/link.rs | 136 ++++++++++++++++++---------- relayer/src/supervisor.rs | 37 ++++++-- relayer/src/worker.rs | 13 ++- relayer/src/worker/map.rs | 6 +- relayer/src/worker/uni_chan_path.rs | 29 +++++- 12 files changed, 234 insertions(+), 112 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 27fbe58df3..bf0fd67f48 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,9 @@ reliability of Hermes ([#697]). - Update to `tendermint-rs` v0.20.0 ([#1125]) - Add inline documentation to config.toml ([#1127]) +- [ibc-relayer] + - Hermes will now clear pending packets at a configurable interval ([#1124]) + ### BUG FIXES - [ibc-relayer] @@ -29,6 +32,7 @@ reliability of Hermes ([#697]). [#1062]: https://github.com/informalsystems/ibc-rs/issues/1062 [#1057]: https://github.com/informalsystems/ibc-rs/issues/1057 [#1125]: https://github.com/informalsystems/ibc-rs/issues/1125 +[#1124]: https://github.com/informalsystems/ibc-rs/issues/1124 [#1127]: https://github.com/informalsystems/ibc-rs/issues/1127 [#1140]: https://github.com/informalsystems/ibc-rs/issues/1140 [#1143]: https://github.com/informalsystems/ibc-rs/issues/1143 diff --git a/ci/simple_config.toml b/ci/simple_config.toml index ec3abb85d9..713e0c3035 100644 --- a/ci/simple_config.toml +++ b/ci/simple_config.toml @@ -1,6 +1,7 @@ [global] strategy = 'all' log_level = 'info' +clear_packets_interval = 100 [[chains]] id = 'ibc-0' diff --git a/config.toml b/config.toml index e84e6793ba..9490aa7ddd 100644 --- a/config.toml +++ b/config.toml @@ -15,6 +15,9 @@ filter = true # Valid options are 'error', 'warn', 'info', 'debug', 'trace'. log_level = 'info' +# Interval (in number of blocks) at which pending packets +# should be eagerly cleared. Default: 100 +clear_packets_interval = 100 # The telemetry section defines parameters for Hermes' built-in telemetry capabilities. # https://hermes.informal.systems/telemetry.html diff --git a/relayer/src/chain/cosmos.rs b/relayer/src/chain/cosmos.rs index 1b5820b81a..44f2760dd2 100644 --- a/relayer/src/chain/cosmos.rs +++ b/relayer/src/chain/cosmos.rs @@ -11,6 +11,7 @@ use std::{ use anomaly::fail; use bech32::{ToBase32, Variant}; use bitcoin::hashes::hex::ToHex; +use itertools::Itertools; use prost::Message; use prost_types::Any; use tendermint::abci::Path as TendermintABCIPath; @@ -96,9 +97,10 @@ mod retry_strategy { use crate::util::retry::Fixed; use std::time::Duration; - pub fn wait_for_block_commits() -> impl Iterator { - // The total time should be higher than the full node timeout which defaults to 10sec. - Fixed::from_millis(300).take(40) // 12 seconds + pub fn wait_for_block_commits(max_total_wait: Duration) -> impl Iterator { + let backoff_millis = 300; // The periodic backoff + let count: usize = (max_total_wait.as_millis() / backoff_millis as u128) as usize; + Fixed::from_millis(backoff_millis).take(count) } } @@ -598,57 +600,69 @@ impl CosmosSdkChain { trace!("waiting for commit of block(s)"); + let hashes = tx_sync_results + .iter() + .map(|res| res.response.hash.to_string()) + .join(", "); + // Wait a little bit initially thread::sleep(Duration::from_millis(200)); let start = Instant::now(); + let result = retry_with_index( + retry_strategy::wait_for_block_commits(self.config.rpc_timeout), + |index| { + if all_tx_results_found(&tx_sync_results) { + trace!( + "wait_for_block_commits: retrieved {} tx results after {} tries ({}ms)", + tx_sync_results.len(), + index, + start.elapsed().as_millis() + ); - let result = retry_with_index(retry_strategy::wait_for_block_commits(), |index| { - if all_tx_results_found(&tx_sync_results) { - trace!( - "wait_for_block_commits: retrieved {} tx results after {} tries ({}ms)", - tx_sync_results.len(), - index, - start.elapsed().as_millis() - ); - - // All transactions confirmed - return RetryResult::Ok(()); - } + // All transactions confirmed + return RetryResult::Ok(()); + } - for TxSyncResult { response, events } in tx_sync_results.iter_mut() { - // If this transaction was not committed, determine whether it was because it failed - // or because it hasn't been committed yet. - if empty_event_present(&events) { - // If the transaction failed, replace the events with an error, - // so that we don't attempt to resolve the transaction later on. - if response.code.value() != 0 { - *events = vec![IbcEvent::ChainError(format!( - "deliver_tx for Tx hash {} reports error: code={:?}, log={:?}", - response.hash, response.code, response.log + for TxSyncResult { response, events } in tx_sync_results.iter_mut() { + // If this transaction was not committed, determine whether it was because it failed + // or because it hasn't been committed yet. + if empty_event_present(&events) { + // If the transaction failed, replace the events with an error, + // so that we don't attempt to resolve the transaction later on. + if response.code.value() != 0 { + *events = vec![IbcEvent::ChainError(format!( + "deliver_tx on chain {} for Tx hash {} reports error: code={:?}, log={:?}", + self.id(), response.hash, response.code, response.log ))]; - // Otherwise, try to resolve transaction hash to the corresponding events. - } else if let Ok(events_per_tx) = - self.query_txs(QueryTxRequest::Transaction(QueryTxHash(response.hash))) - { - // If we get events back, progress was made, so we replace the events - // with the new ones. in both cases we will check in the next iteration - // whether or not the transaction was fully committed. - if !events_per_tx.is_empty() { - *events = events_per_tx; + // Otherwise, try to resolve transaction hash to the corresponding events. + } else if let Ok(events_per_tx) = + self.query_txs(QueryTxRequest::Transaction(QueryTxHash(response.hash))) + { + // If we get events back, progress was made, so we replace the events + // with the new ones. in both cases we will check in the next iteration + // whether or not the transaction was fully committed. + if !events_per_tx.is_empty() { + *events = events_per_tx; + } } } } - } - RetryResult::Retry(index) - }); + RetryResult::Retry(index) + }, + ); match result { // All transactions confirmed Ok(()) => Ok(tx_sync_results), // Did not find confirmation - Err(_) => Err(Kind::TxNoConfirmation.into()), + Err(_) => Err(Kind::TxNoConfirmation(format!( + "from chain {} for hash(es) {}", + self.id(), + hashes + )) + .into()), } } } diff --git a/relayer/src/channel.rs b/relayer/src/channel.rs index 54784028d7..ba2406d32c 100644 --- a/relayer/src/channel.rs +++ b/relayer/src/channel.rs @@ -625,7 +625,7 @@ impl Channel { match self.handshake_step(state) { Err(e) => { - error!("Failed {:?} with error {}", state, e); + error!("Failed Chan{:?} with error: {}", state, e); RetryResult::Retry(index) } Ok(ev) => { diff --git a/relayer/src/config.rs b/relayer/src/config.rs index 8ff11889ff..802dfbc911 100644 --- a/relayer/src/config.rs +++ b/relayer/src/config.rs @@ -46,6 +46,14 @@ impl Default for ChainFilters { pub mod default { use super::*; + pub fn filter() -> bool { + false + } + + pub fn clear_packets_interval() -> u64 { + 100 + } + pub fn rpc_timeout() -> Duration { Duration::from_secs(10) } @@ -134,17 +142,20 @@ impl fmt::Display for LogLevel { #[serde(default, deny_unknown_fields)] pub struct GlobalConfig { pub strategy: Strategy, - #[serde(default)] - pub filter: bool, pub log_level: LogLevel, + #[serde(default = "default::filter")] + pub filter: bool, + #[serde(default = "default::clear_packets_interval")] + pub clear_packets_interval: u64, } impl Default for GlobalConfig { fn default() -> Self { Self { strategy: Strategy::default(), - filter: false, log_level: LogLevel::default(), + filter: default::filter(), + clear_packets_interval: default::clear_packets_interval(), } } } diff --git a/relayer/src/error.rs b/relayer/src/error.rs index 076e67c472..8b5f35599f 100644 --- a/relayer/src/error.rs +++ b/relayer/src/error.rs @@ -78,6 +78,10 @@ pub enum Kind { #[error("Failed to create client state")] BuildClientStateFailure, + /// Did not find tx confirmation + #[error("did not find tx confirmation {0}")] + TxNoConfirmation(String), + /// Gas estimate from simulated Tx exceeds the maximum configured #[error("{chain_id} gas estimate {estimated_gas} from simulated Tx exceeds the maximum configured {max_gas}")] TxSimulateGasEstimateExceeded { @@ -86,10 +90,6 @@ pub enum Kind { max_gas: u64, }, - /// Tx failure for lack of confirmation - #[error("Failed Tx: no confirmation")] - TxNoConfirmation, - /// Create client failure #[error("Failed to create client {0}")] CreateClient(String), diff --git a/relayer/src/link.rs b/relayer/src/link.rs index 79e2a9a7b3..dceffea079 100644 --- a/relayer/src/link.rs +++ b/relayer/src/link.rs @@ -5,6 +5,7 @@ use std::fmt; use std::thread; use std::time::Instant; +use itertools::Itertools; use prost_types::Any; use thiserror::Error; use tracing::{debug, error, info, trace, warn}; @@ -194,8 +195,10 @@ impl fmt::Display for OperationalData { pub struct RelayPath { channel: Channel, + // Marks whether this path has already cleared pending packets. + // Packets should be cleared once (at startup), then this + // flag turns to `false`. clear_packets: bool, - // Operational data, targeting both the source and destination chain. // These vectors of operational data are ordered decreasingly by their age, with element at // position `0` being the oldest. @@ -397,7 +400,6 @@ impl RelayPath { } fn relay_pending_packets(&mut self, height: Height) -> Result<(), LinkError> { - info!("[{}] clearing old packets", self); for _ in 0..MAX_RETRIES { if self .build_recv_packet_and_timeout_msgs(Some(height)) @@ -410,30 +412,45 @@ impl RelayPath { Err(LinkError::OldPacketClearingFailed) } - /// Should not run more than once per execution. + /// Queries the source chain at the given [`Height`] + /// to find any packets or acknowledgements that are pending, + /// and fetches the relevant packet event data. Finally, this + /// method also schedules the corresponding operational data, + /// so that the relayer will later relay the pending packets. pub fn clear_packets(&mut self, above_height: Height) -> Result<(), LinkError> { - if self.clear_packets { - info!( - "[{}] clearing pending packets from events before height {}", - self, above_height - ); - - let clear_height = above_height.decrement().map_err(|e| LinkError::Failed( - format!("Cannot clear packets @height {}, because this height cannot be decremented: {}", above_height, e.to_string())))?; + info!( + "[{}] clearing pending packets from events before height {}", + self, above_height + ); - self.relay_pending_packets(clear_height)?; + let clear_height = above_height.decrement().map_err(|e| { + LinkError::Failed(format!( + "Cannot clear packets @height {}, because this height cannot be decremented: {}", + above_height, + e.to_string() + )) + })?; - info!("[{}] finished clearing pending packets", self); + self.relay_pending_packets(clear_height)?; - self.clear_packets = false; - } + info!( + "[{}] finished scheduling the clearing of pending packets", + self + ); Ok(()) } /// Generate & schedule operational data from the input `batch` of IBC events. pub fn update_schedule(&mut self, batch: EventBatch) -> Result<(), LinkError> { - self.clear_packets(batch.height)?; + // With the first batch of events, also trigger the clearing of old packets. + if self.clear_packets { + self.clear_packets(batch.height)?; + + // Disable further clearing of old packet. + // Clearing will happen separately, upon new blocks. + self.clear_packets = false; + } // Collect relevant events from the incoming batch & adjust their height. let events = self.filter_events(&batch.events); @@ -636,6 +653,10 @@ impl RelayPath { } Err(e) => { // Unrecoverable error, propagate up the stack + error!( + "[{}] unrecoverable error in send_from_operational_data {}", + self, e + ); return Err(e); } } @@ -722,7 +743,8 @@ impl RelayPath { } /// Sends a transaction to the chain targeted by the operational data `odata`. - /// If the transaction generates an error, returns the error as well as `LinkError::SendError` if input events if a sending failure occurs. + /// If the transaction generates an error, returns the error as well as `LinkError::SendError` + /// if input events if a sending failure occurs. /// Returns the events generated by the target chain upon success. fn send_from_operational_data( &mut self, @@ -922,19 +944,13 @@ impl RelayPath { if packet_commitments.is_empty() { return Ok((events_result, query_height)); } - let commit_sequences = packet_commitments.iter().map(|p| p.sequence).collect(); - debug!( - "[{}] packets that still have commitments on {}: {:?}", - self, - self.src_chain().id(), - commit_sequences - ); + let commit_sequences: Vec = packet_commitments.iter().map(|p| p.sequence).collect(); // Get the packets that have not been received on destination chain let request = QueryUnreceivedPacketsRequest { port_id: self.dst_port_id().to_string(), channel_id: self.dst_channel_id()?.to_string(), - packet_commitment_sequences: commit_sequences, + packet_commitment_sequences: commit_sequences.clone(), }; let sequences: Vec = self @@ -944,17 +960,25 @@ impl RelayPath { .map(From::from) .collect(); + if sequences.is_empty() { + return Ok((events_result, query_height)); + } + debug!( - "[{}] recv packets to send out to {} of the ones with commitments on source {}: {:?}", + "[{}] packets that still have commitments on {}: {} (first 10 shown here; total={})", self, - self.dst_chain().id(), self.src_chain().id(), - sequences + commit_sequences.iter().take(10).join(", "), + commit_sequences.len() ); - if sequences.is_empty() { - return Ok((events_result, query_height)); - } + debug!( + "[{}] recv packets to send out to {} of the ones with commitments on source {}: {} (first 10 shown here; total={})", + self, + self.dst_chain().id(), + self.src_chain().id(), + sequences.iter().take(10).join(", "), sequences.len() + ); let query = QueryTxRequest::Packet(QueryPacketEventDataRequest { event_id: IbcEventType::SendPacket, @@ -973,8 +997,17 @@ impl RelayPath { let send_event = downcast!(event => IbcEvent::SendPacket) .ok_or_else(|| LinkError::Failed("unexpected query tx response".into()))?; packet_sequences.push(send_event.packet.sequence); + if packet_sequences.len() > 10 { + // Enough to print the first 10 + break; + } } - debug!("[{}] received from query_txs {:?}", self, packet_sequences); + info!( + "[{}] found unprocessed SendPacket events for {:?} (first 10 shown here; total={})", + self, + packet_sequences, + events_result.len() + ); Ok((events_result, query_height)) } @@ -1007,18 +1040,13 @@ impl RelayPath { return Ok((events_result, query_height)); } - let acked_sequences = acks_on_source.iter().map(|p| p.sequence).collect(); - debug!( - "[{}] packets that have acknowledgments on {} {:?}", - self, - self.src_chain().id(), - acked_sequences - ); + let mut acked_sequences: Vec = acks_on_source.iter().map(|p| p.sequence).collect(); + acked_sequences.sort_unstable(); let request = QueryUnreceivedAcksRequest { port_id: self.dst_port_id().to_string(), channel_id: dst_channel_id.to_string(), - packet_ack_sequences: acked_sequences, + packet_ack_sequences: acked_sequences.clone(), }; let sequences: Vec = self @@ -1028,17 +1056,27 @@ impl RelayPath { .into_iter() .map(From::from) .collect(); + + if sequences.is_empty() { + return Ok((events_result, query_height)); + } + debug!( - "[{}] ack packets to send out to {} of the ones with acknowledgments on {}: {:?}", + "[{}] packets that have acknowledgments on {}: [{:?}..{:?}] (total={})", self, - self.dst_chain().id(), self.src_chain().id(), - sequences + acked_sequences.first(), + acked_sequences.last(), + acked_sequences.len() ); - if sequences.is_empty() { - return Ok((events_result, query_height)); - } + debug!( + "[{}] ack packets to send out to {} of the ones with acknowledgments on {}: {} (first 10 shown here; total={})", + self, + self.dst_chain().id(), + self.src_chain().id(), + sequences.iter().take(10).join(", "), sequences.len() + ); events_result = self .src_chain() @@ -1058,8 +1096,12 @@ impl RelayPath { let write_ack_event = downcast!(event => IbcEvent::WriteAcknowledgement) .ok_or_else(|| LinkError::Failed("unexpected query tx response".into()))?; packet_sequences.push(write_ack_event.packet.sequence); + if packet_sequences.len() > 10 { + // Enough to print the first 10 + break; + } } - info!("[{}] received from query_txs {:?}", self, packet_sequences); + info!("[{}] found unprocessed WriteAcknowledgement events for {:?} (first 10 shown here; total={})", self, packet_sequences, events_result.len()); Ok((events_result, query_height)) } diff --git a/relayer/src/supervisor.rs b/relayer/src/supervisor.rs index 7085819c8d..a9ff2d47fe 100644 --- a/relayer/src/supervisor.rs +++ b/relayer/src/supervisor.rs @@ -436,8 +436,12 @@ impl Supervisor { src_connection_id: connection.connection_id, }); - self.workers - .get_or_spawn(connection_object, chain.clone(), counterparty_chain.clone()); + self.workers.get_or_spawn( + connection_object, + chain.clone(), + counterparty_chain.clone(), + &self.config, + ); } Ok(()) @@ -467,6 +471,7 @@ impl Supervisor { counterparty_chain.id(), chan_state_dst ); + if chan_state_src.is_open() && chan_state_dst.is_open() { // create the client object and spawn worker let client_object = Object::Client(Client { @@ -474,8 +479,13 @@ impl Supervisor { dst_chain_id: chain.id(), src_chain_id: client.client_state.chain_id(), }); - self.workers - .get_or_spawn(client_object, counterparty_chain.clone(), chain.clone()); + + self.workers.get_or_spawn( + client_object, + counterparty_chain.clone(), + chain.clone(), + &self.config, + ); // TODO: Only start the Uni worker if there are outstanding packets or ACKs. // https://github.com/informalsystems/ibc-rs/issues/901 @@ -486,8 +496,13 @@ impl Supervisor { src_channel_id: channel.channel_id.clone(), src_port_id: channel.port_id, }); - self.workers - .get_or_spawn(path_object, chain.clone(), counterparty_chain.clone()); + + self.workers.get_or_spawn( + path_object, + chain.clone(), + counterparty_chain.clone(), + &self.config, + ); } else if !chan_state_dst.is_open() && chan_state_dst as u32 <= chan_state_src as u32 && self.handshake_enabled() @@ -500,8 +515,12 @@ impl Supervisor { src_port_id: channel.port_id, }); - self.workers - .get_or_spawn(channel_object, chain.clone(), counterparty_chain.clone()); + self.workers.get_or_spawn( + channel_object, + chain.clone(), + counterparty_chain.clone(), + &self.config, + ); } Ok(()) } @@ -613,7 +632,7 @@ impl Supervisor { let src = self.registry.get_or_spawn(object.src_chain_id())?; let dst = self.registry.get_or_spawn(object.dst_chain_id())?; - let worker = self.workers.get_or_spawn(object, src, dst); + let worker = self.workers.get_or_spawn(object, src, dst, &self.config); worker.send_events(height, events, chain_id.clone())? } diff --git a/relayer/src/worker.rs b/relayer/src/worker.rs index 6120529b7a..b30bcfd642 100644 --- a/relayer/src/worker.rs +++ b/relayer/src/worker.rs @@ -3,7 +3,7 @@ use std::fmt; use crossbeam_channel::Sender; use tracing::{debug, error, info}; -use crate::{chain::handle::ChainHandlePair, object::Object, telemetry::Telemetry}; +use crate::{chain::handle::ChainHandlePair, config::Config, object::Object, telemetry::Telemetry}; pub mod retry_strategy; @@ -54,6 +54,7 @@ impl Worker { object: Object, msg_tx: Sender, telemetry: Telemetry, + config: &Config, ) -> WorkerHandle { let (cmd_tx, cmd_rx) = crossbeam_channel::unbounded(); @@ -75,9 +76,13 @@ impl Worker { Object::Channel(channel) => { Self::Channel(ChannelWorker::new(channel, chains, cmd_rx, telemetry)) } - Object::UnidirectionalChannelPath(path) => { - Self::UniChanPath(UniChanPathWorker::new(path, chains, cmd_rx, telemetry)) - } + Object::UnidirectionalChannelPath(path) => Self::UniChanPath(UniChanPathWorker::new( + path, + chains, + cmd_rx, + telemetry, + config.global.clear_packets_interval, + )), }; let thread_handle = std::thread::spawn(move || worker.run(msg_tx)); diff --git a/relayer/src/worker/map.rs b/relayer/src/worker/map.rs index 6548974065..0e6a7aca23 100644 --- a/relayer/src/worker/map.rs +++ b/relayer/src/worker/map.rs @@ -6,6 +6,7 @@ use ibc::ics24_host::identifier::ChainId; use crate::{ chain::handle::{ChainHandle, ChainHandlePair}, + config::Config, object::Object, telemetry, telemetry::Telemetry, @@ -74,11 +75,12 @@ impl WorkerMap { object: Object, src: Box, dst: Box, + config: &Config, ) -> &WorkerHandle { if self.workers.contains_key(&object) { &self.workers[&object] } else { - let worker = self.spawn_worker(src, dst, &object); + let worker = self.spawn_worker(src, dst, &object, config); self.workers.entry(object).or_insert(worker) } } @@ -88,6 +90,7 @@ impl WorkerMap { src: Box, dst: Box, object: &Object, + config: &Config, ) -> WorkerHandle { telemetry!(self.telemetry.worker(metric_type(object), 1)); @@ -96,6 +99,7 @@ impl WorkerMap { object.clone(), self.msg_tx.clone(), self.telemetry.clone(), + config, ) } } diff --git a/relayer/src/worker/uni_chan_path.rs b/relayer/src/worker/uni_chan_path.rs index 34ca7fef16..49f67b0406 100644 --- a/relayer/src/worker/uni_chan_path.rs +++ b/relayer/src/worker/uni_chan_path.rs @@ -22,6 +22,7 @@ pub struct UniChanPathWorker { chains: ChainHandlePair, cmd_rx: Receiver, telemetry: Telemetry, + clear_packets_interval: u64, } impl UniChanPathWorker { @@ -30,12 +31,14 @@ impl UniChanPathWorker { chains: ChainHandlePair, cmd_rx: Receiver, telemetry: Telemetry, + clear_packets_interval: u64, ) -> Self { Self { path, chains, cmd_rx, telemetry, + clear_packets_interval, } } @@ -67,7 +70,7 @@ impl UniChanPathWorker { }; let result = retry_with_index(retry_strategy::worker_default_strategy(), |index| { - Self::step(maybe_cmd.clone(), &mut link, index) + self.step(maybe_cmd.clone(), &mut link, index) }); match result { @@ -86,21 +89,37 @@ impl UniChanPathWorker { } } - fn step(cmd: Option, link: &mut Link, index: u64) -> RetryResult { + fn step( + &self, + cmd: Option, + link: &mut Link, + index: u64, + ) -> RetryResult { if let Some(cmd) = cmd { let result = match cmd { WorkerCmd::IbcEvents { batch } => { // Update scheduled batches. link.a_to_b.update_schedule(batch) } + + // Handle the arrival of an event signaling that the + // source chain has advanced to a new block. WorkerCmd::NewBlock { height, new_block: _, - } => link.a_to_b.clear_packets(height), + } => { + // Schedule the clearing of pending packets + // at predefined block intervals. + if height.revision_height % self.clear_packets_interval == 0 { + link.a_to_b.clear_packets(height) + } else { + Ok(()) + } + } }; if let Err(e) = result { - error!("{}", e); + error!("[{}] step() encountered error: {}", link.a_to_b, e); return RetryResult::Retry(index); } } @@ -113,7 +132,7 @@ impl UniChanPathWorker { match result { Ok(summary) => RetryResult::Ok(summary), Err(e) => { - error!("{}", e); + error!("[{}] step() encountered error: {}", link.a_to_b, e); RetryResult::Retry(index) } }