-
Notifications
You must be signed in to change notification settings - Fork 329
/
pending.rs
247 lines (212 loc) · 8.73 KB
/
pending.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
use core::iter::Iterator;
use core::time::Duration;
use std::time::Instant;
use tracing::{debug, error, trace, trace_span};
use ibc::core::ics24_host::identifier::{ChainId, ChannelId, PortId};
use ibc::events::IbcEvent;
use ibc::query::{QueryTxHash, QueryTxRequest};
use crate::error::Error as RelayerError;
use crate::link::error::LinkError;
use crate::util::queue::Queue;
use crate::{
chain::handle::ChainHandle,
link::{operational_data::OperationalData, relay_sender::AsyncReply, RelaySummary, TxHashes},
};
pub const TIMEOUT: Duration = Duration::from_secs(300);
/// A wrapper over an [`OperationalData`] that is pending.
/// Additionally holds all the necessary information
/// to query for confirmations:
/// - hashes for all transactions in that op. data,
/// - the target chain to query for confirmations,
/// - timestamp to track time-outs and declare an
/// operational data as pending.
#[derive(Clone)]
pub struct PendingData {
pub original_od: OperationalData,
pub tx_hashes: TxHashes,
pub submit_time: Instant,
pub error_events: Vec<IbcEvent>,
}
/// The mediator stores all pending data
/// and tries to confirm them asynchronously.
pub struct PendingTxs<Chain> {
pub chain: Chain,
pub channel_id: ChannelId,
pub port_id: PortId,
pub counterparty_chain_id: ChainId,
pub pending_queue: Queue<PendingData>,
}
impl<Chain> PendingTxs<Chain> {
pub fn new(
chain: Chain,
channel_id: ChannelId,
port_id: PortId,
counterparty_chain_id: ChainId,
) -> Self {
Self {
chain,
channel_id,
port_id,
counterparty_chain_id,
pending_queue: Queue::new(),
}
}
}
impl<Chain: ChainHandle> PendingTxs<Chain> {
pub fn chain_id(&self) -> ChainId {
self.chain.id()
}
// Insert new pending transaction to the back of the queue.
pub fn insert_new_pending_tx(&self, r: AsyncReply, od: OperationalData) {
let mut tx_hashes = Vec::new();
let mut error_events = Vec::new();
for response in r.responses.into_iter() {
if response.code.is_err() {
// If the response is an error, we do not want to check for the
// transaction confirmation status because it is never going to
// be committed. Instead we convert it into an error event and
// store it to be returned in the RelaySummary after all other
// transactions have been confirmed.
// This is not ideal, but is just to follow the previous synchronous
// behavior of handling the OperationalData.
let span = trace_span!(
"inserting new pending txs",
chain = %self.chain_id(),
counterparty_chain = %self.counterparty_chain_id,
port = %self.port_id,
channel = %self.channel_id,
);
let _guard = span.enter();
trace!(
"putting error response in error event queue: {:?} ",
response
);
let error_event = IbcEvent::ChainError(format!(
"deliver_tx on chain {} for Tx hash {} reports error: code={:?}, log={:?}",
self.chain_id(),
response.hash,
response.code,
response.log
));
error_events.push(error_event);
} else {
tx_hashes.push(response.hash);
}
}
let u = PendingData {
original_od: od,
tx_hashes: TxHashes(tx_hashes),
submit_time: Instant::now(),
error_events,
};
self.pending_queue.push_back(u);
}
fn check_tx_events(&self, tx_hashes: &TxHashes) -> Result<Option<Vec<IbcEvent>>, RelayerError> {
let mut all_events = Vec::new();
for hash in &tx_hashes.0 {
let mut events = self
.chain
.query_txs(QueryTxRequest::Transaction(QueryTxHash(*hash)))?;
if events.is_empty() {
return Ok(None);
} else {
all_events.append(&mut events)
}
}
Ok(Some(all_events))
}
/// Try and process one pending transaction if available.
pub fn process_pending(
&self,
timeout: Duration,
resubmit: impl FnOnce(OperationalData) -> Result<AsyncReply, LinkError>,
) -> Result<Option<RelaySummary>, LinkError> {
// We process pending transactions in a FIFO manner, so take from
// the front of the queue.
if let Some(pending) = self.pending_queue.pop_front() {
let tx_hashes = &pending.tx_hashes;
let submit_time = &pending.submit_time;
if tx_hashes.0.is_empty() {
return Ok(Some(RelaySummary::from_events(pending.error_events)));
}
let span = trace_span!(
"processing pending tx",
chain = %self.chain_id(),
counterparty_chain = %self.counterparty_chain_id,
port = %self.port_id,
channel = %self.channel_id,
);
let _guard = span.enter();
// Process the given pending transaction.
trace!("trying to confirm {} ", tx_hashes);
// Check for TX events for the given pending transaction hashes.
let events_result = self.check_tx_events(tx_hashes);
let res = match events_result {
Ok(None) => {
// There is no events for the associated transactions.
// This means the transaction has not yet been committed.
trace!("transaction is not yet committed: {} ", tx_hashes);
if submit_time.elapsed() > timeout {
// The submission time for the transaction has exceeded the
// timeout threshold. Returning Outcome::Timeout for the
// relayer to resubmit the transaction to the chain again.
error!("timed out while confirming {}", tx_hashes);
let resubmit_res = resubmit(pending.original_od.clone());
match resubmit_res {
Ok(reply) => {
self.insert_new_pending_tx(reply, pending.original_od);
Ok(None)
}
Err(e) => {
self.pending_queue.push_back(pending);
Err(e)
}
}
} else {
// Reinsert the pending transaction, this time
// to the back of the queue so that we process other
// pending transactions first in the meanwhile.
self.pending_queue.push_back(pending);
Ok(None)
}
}
Ok(Some(events)) => {
// We get a list of events for the transaction hashes,
// Meaning the transaction has been committed successfully
// to the chain.
debug!(
"confirmed after {:#?}: {} ",
pending.submit_time.elapsed(),
tx_hashes
);
// Convert the events to RelaySummary and return them.
let mut summary = RelaySummary::from_events(events);
summary.extend(RelaySummary::from_events(pending.error_events));
Ok(Some(summary))
}
Err(e) => {
// There are errors querying for the transaction hashes.
// This may be temporary errors when the relayer is communicating
// with the chain endpoint.
error!(
"error querying for tx hashes {}: {}. will retry again later",
tx_hashes, e
);
// Push it to the back of the pending queue to process it
// again at a later time.
self.pending_queue.push_back(pending);
Err(LinkError::relayer(e))
}
};
if !self.pending_queue.is_empty() {
trace!(
"total pending transactions left: {}",
self.pending_queue.len()
);
}
res
} else {
Ok(None)
}
}
}