Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle duplicate packet events and perform full clearing even in the presence of errors #3361

Merged
merged 8 commits into from
May 23, 2023
3 changes: 3 additions & 0 deletions .changelog/unreleased/bug-fixes/3358-dup-write-ack.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- Fix acknowledgement packet not being relayed
due to duplicate `write_acknowledgement` events
([\#3358](https://github.com/informalsystems/hermes/issues/3358))
2 changes: 2 additions & 0 deletions .changelog/unreleased/bug-fixes/3359-dup-send-packet.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- Fix bug where chain proof verification would fail due to duplicate `send_packet
events ([\#3359](https://github.com/informalsystems/hermes/issues/3359))
19 changes: 12 additions & 7 deletions crates/relayer-cli/src/commands/clear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use ibc_relayer_types::events::IbcEvent;
use crate::application::app_config;
use crate::cli_utils::spawn_chain_counterparty;
use crate::conclude::Output;
use crate::error::Error;

/// `clear` subcommands
#[derive(Command, Debug, Parser, Runnable)]
Expand Down Expand Up @@ -115,40 +114,46 @@ impl Runnable for ClearPacketsCmd {
src_port_id: self.port_id.clone(),
src_channel_id: self.channel_id.clone(),
};

let fwd_link = match Link::new_from_opts(chains.src.clone(), chains.dst, opts, false, false)
{
Ok(link) => link,
Err(e) => Output::error(e).exit(),
};

let rev_link = match fwd_link.reverse(false, false) {
Ok(link) => link,
Err(e) => Output::error(e).exit(),
};

// Schedule RecvPacket messages for pending packets in both directions.
// This may produce pending acks which will be processed in the next phase.
run_and_collect_events(&mut ev_list, || {
run_and_collect_events("forward recv and timeout", &mut ev_list, || {
fwd_link.relay_recv_packet_and_timeout_messages()
});
run_and_collect_events(&mut ev_list, || {
run_and_collect_events("reverse recv and timeout", &mut ev_list, || {
rev_link.relay_recv_packet_and_timeout_messages()
});

// Schedule AckPacket messages in both directions.
run_and_collect_events(&mut ev_list, || fwd_link.relay_ack_packet_messages());
run_and_collect_events(&mut ev_list, || rev_link.relay_ack_packet_messages());
run_and_collect_events("forward ack", &mut ev_list, || {
fwd_link.relay_ack_packet_messages()
});
run_and_collect_events("reverse ack", &mut ev_list, || {
rev_link.relay_ack_packet_messages()
});

Output::success(ev_list).exit()
}
}

fn run_and_collect_events<F>(ev_list: &mut Vec<IbcEvent>, f: F)
fn run_and_collect_events<F>(desc: &str, ev_list: &mut Vec<IbcEvent>, f: F)
where
F: FnOnce() -> Result<Vec<IbcEvent>, LinkError>,
{
match f() {
Ok(mut ev) => ev_list.append(&mut ev),
Err(e) => Output::error(Error::link(e)).exit(),
Err(e) => tracing::error!("Failed to relay {desc} packets: {e:?}"),
};
}

Expand Down
4 changes: 2 additions & 2 deletions crates/relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ impl CosmosSdkChain {
&mut response
.begin_block_events
.unwrap_or_default()
.into_iter()
.iter()
.filter_map(|ev| filter_matching_event(ev, request, seqs))
.map(|ev| IbcEventWithHeight::new(ev, response_height))
.collect(),
Expand All @@ -759,7 +759,7 @@ impl CosmosSdkChain {
&mut response
.end_block_events
.unwrap_or_default()
.into_iter()
.iter()
.filter_map(|ev| filter_matching_event(ev, request, seqs))
.map(|ev| IbcEventWithHeight::new(ev, response_height))
.collect(),
Expand Down
69 changes: 42 additions & 27 deletions crates/relayer/src/chain/cosmos/query/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use tendermint::abci::Event;
use tendermint::Hash as TxHash;
use tendermint_rpc::endpoint::tx::Response as TxResponse;
use tendermint_rpc::{Client, HttpClient, Order, Url};
use tracing::warn;

use crate::chain::cosmos::query::{header_query, packet_query, tx_hash_query};
use crate::chain::cosmos::types::events;
Expand Down Expand Up @@ -96,6 +97,7 @@ pub async fn query_txs(
}

/// This function queries transactions for packet events matching certain criteria.
///
/// It returns at most one packet event for each sequence specified in the request.
/// Note - there is no way to format the packet query such that it asks for Tx-es with either
/// sequence (the query conditions can only be AND-ed).
Expand All @@ -121,34 +123,46 @@ pub async fn query_packets_from_txs(
let mut result: Vec<IbcEventWithHeight> = vec![];

for seq in &request.sequences {
// query first (and only) Tx that includes the event specified in the query request
let mut response = rpc_client
.tx_search(
packet_query(request, *seq),
false,
1,
1, // get only the first Tx matching the query
Order::Ascending,
)
// Query the latest 10 txs which include the event specified in the query request
let response = rpc_client
.tx_search(packet_query(request, *seq), false, 1, 10, Order::Descending)
.await
.map_err(|e| Error::rpc(rpc_address.clone(), e))?;

debug_assert!(
response.txs.len() <= 1,
"packet_from_tx_search_response: unexpected number of txs"
);

if response.txs.is_empty() {
continue;
}

let tx = response.txs.remove(0);
let event = packet_from_tx_search_response(chain_id, request, *seq, tx)?;
let mut tx_events = vec![];

if let Some(event) = event {
result.push(event);
// Process each tx in descending order
for tx in response.txs {
// Check if the tx contains and event which matches the query
if let Some(event) = packet_from_tx_search_response(chain_id, request, *seq, &tx)? {
// We found the event
tx_events.push((event, tx.hash, tx.height));
}
}

// If no event was found for this sequence, continue to the next sequence
if tx_events.is_empty() {
continue;
}

// If more than one event was found for this sequence, log a warning
if tx_events.len() > 1 {
warn!("more than one packet event found for sequence {seq}, this should not happen",);

for (event, hash, height) in &tx_events {
warn!("seq: {seq}, tx hash: {hash}, tx height: {height}, event: {event}",);
}
}

// In either case, use the first (latest) event found for this sequence
let (first_event, _, _) = tx_events.remove(0);
result.push(first_event);
}

Ok(result)
}

Expand Down Expand Up @@ -193,9 +207,9 @@ pub async fn query_packets_from_block(
tx_events.append(
&mut tx
.events
.into_iter()
.filter_map(|e| filter_matching_event(e, request, &request.sequences))
.map(|e| IbcEventWithHeight::new(e, height))
.iter()
.filter_map(|ev| filter_matching_event(ev, request, &request.sequences))
.map(|ev| IbcEventWithHeight::new(ev, height))
.collect(),
)
}
Expand All @@ -205,7 +219,7 @@ pub async fn query_packets_from_block(
&mut block_results
.begin_block_events
.unwrap_or_default()
.into_iter()
.iter()
.filter_map(|ev| filter_matching_event(ev, request, &request.sequences))
.map(|ev| IbcEventWithHeight::new(ev, height))
.collect(),
Expand All @@ -215,7 +229,7 @@ pub async fn query_packets_from_block(
&mut block_results
.end_block_events
.unwrap_or_default()
.into_iter()
.iter()
.filter_map(|ev| filter_matching_event(ev, request, &request.sequences))
.map(|ev| IbcEventWithHeight::new(ev, height))
.collect(),
Expand Down Expand Up @@ -278,7 +292,7 @@ fn packet_from_tx_search_response(
chain_id: &ChainId,
request: &QueryPacketEventDataRequest,
seq: Sequence,
response: TxResponse,
response: &TxResponse,
) -> Result<Option<IbcEventWithHeight>, Error> {
let height = ICSHeight::new(chain_id.version(), u64::from(response.height))
.map_err(|_| Error::invalid_height_no_source())?;
Expand All @@ -292,7 +306,7 @@ fn packet_from_tx_search_response(
Ok(response
.tx_result
.events
.into_iter()
.iter()
.find_map(|ev| filter_matching_event(ev, request, &[seq]))
.map(|ibc_event| IbcEventWithHeight::new(ibc_event, height)))
}
Expand All @@ -301,7 +315,7 @@ fn packet_from_tx_search_response(
/// is consistent with the request parameters.
/// Returns `None` otherwise.
pub fn filter_matching_event(
event: Event,
event: &Event,
request: &QueryPacketEventDataRequest,
seqs: &[Sequence],
) -> Option<IbcEvent> {
Expand All @@ -321,7 +335,8 @@ pub fn filter_matching_event(
return None;
}

let ibc_event = ibc_event_try_from_abci_event(&event).ok()?;
let ibc_event = ibc_event_try_from_abci_event(event).ok()?;

match ibc_event {
IbcEvent::SendPacket(ref send_ev)
if matches_packet(request, seqs.to_vec(), &send_ev.packet) =>
Expand Down
7 changes: 3 additions & 4 deletions crates/relayer/src/link/relay_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,11 +419,10 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
telemetry!(received_event_batch, tracking_id);

for i in 1..=MAX_RETRIES {
let cleared = self
.schedule_recv_packet_and_timeout_msgs(height, tracking_id)
.and_then(|_| self.schedule_packet_ack_msgs(height, tracking_id));
let cleared_recv = self.schedule_recv_packet_and_timeout_msgs(height, tracking_id);
let cleared_ack = self.schedule_packet_ack_msgs(height, tracking_id);

match cleared {
match cleared_recv.and(cleared_ack) {
Ok(()) => return Ok(()),
Err(e) => error!(
"failed to clear packets, retry {}/{}: {}",
Expand Down