Skip to content

Commit

Permalink
Remove queueing from message-lane (paritytech#352)
Browse files Browse the repository at this point in the history
* remove queueing from message-lane

* also remove queueing from RPCs

* another trace

* new clippy
  • Loading branch information
svyatonik committed Sep 15, 2020
1 parent 1714703 commit 6067d02
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 370 deletions.
2 changes: 1 addition & 1 deletion bridges/modules/currency-exchange/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ fn prepare_deposit_details<T: Trait<I>, I: Instance>(
) -> Result<DepositDetails<T, I>, Error<T, I>> {
// ensure that transaction is included in finalized block that we know of
let transaction = <T as Trait<I>>::PeerBlockchain::verify_transaction_inclusion_proof(proof)
.ok_or_else(|| Error::<T, I>::UnfinalizedTransaction)?;
.ok_or(Error::<T, I>::UnfinalizedTransaction)?;

// parse transaction
let transaction =
Expand Down
2 changes: 1 addition & 1 deletion bridges/modules/ethereum/src/verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ pub fn accept_aura_header_into_pool<S: Storage>(
validator_checks(config, &best_context.validators_set().validators, header, header_step);
if let Err(error) = validators_check_result {
find_next_validators_signal(storage, &best_context)
.ok_or_else(|| error)
.ok_or(error)
.and_then(|next_validators| validator_checks(config, &next_validators, header, header_step))?;
}

Expand Down
32 changes: 0 additions & 32 deletions bridges/modules/message-lane/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ pub type MessagesProof = Bytes;
/// SCALE-encoded trie nodes array `Vec<Vec<u8>>`.
pub type MessagesReceivingProof = Bytes;

/// Trie-based storage proof that the message(s) with given key(s) have been processed by the bridged chain.
/// SCALE-encoded trie nodes array `Vec<Vec<u8>>`.
pub type MessagesProcessingProof = Bytes;

/// Runtime adapter.
pub trait Runtime: Send + Sync + 'static {
/// Return runtime storage key for given message. May return None if instance is unknown.
Expand Down Expand Up @@ -75,15 +71,6 @@ pub trait MessageLaneApi<BlockHash> {
lane: LaneId,
block: Option<BlockHash>,
) -> FutureResult<MessagesReceivingProof>;

/// Returns proof-of-message(s) processing.
#[rpc(name = "messageLane_proveMessagesProcessing")]
fn prove_messages_processing(
&self,
instance: InstanceId,
lane: LaneId,
block: Option<BlockHash>,
) -> FutureResult<MessagesProcessingProof>;
}

/// Implements the MessageLaneApi trait for interacting with message lanes.
Expand Down Expand Up @@ -150,25 +137,6 @@ where
.map_err(Into::into),
)
}

fn prove_messages_processing(
&self,
instance: InstanceId,
lane: LaneId,
block: Option<Block::Hash>,
) -> FutureResult<MessagesProcessingProof> {
Box::new(
prove_keys_read(
self.backend.clone(),
block,
vec![self.runtime.inbound_lane_data_key(&instance, &lane)],
)
.boxed()
.compat()
.map(serialize_storage_proof)
.map_err(Into::into),
)
}
}

async fn prove_keys_read<Block, Backend>(
Expand Down
165 changes: 13 additions & 152 deletions bridges/modules/message-lane/src/inbound_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

//! Everything about incoming messages receival.

use bp_message_lane::{InboundLaneData, LaneId, Message, MessageKey, MessageNonce, MessageResult, OnMessageReceived};
use bp_message_lane::{InboundLaneData, LaneId, Message, MessageKey, MessageNonce, OnMessageReceived};

/// Inbound lane storage.
pub trait InboundLaneStorage {
Expand All @@ -29,12 +29,6 @@ pub trait InboundLaneStorage {
fn data(&self) -> InboundLaneData;
/// Update lane data in the storage.
fn set_data(&mut self, data: InboundLaneData);
/// Returns saved inbound message payload.
fn message(&self, nonce: &MessageNonce) -> Option<Self::Payload>;
/// Save inbound message in the storage.
fn save_message(&mut self, nonce: MessageNonce, payload: Self::Payload);
/// Remove inbound message from the storage.
fn remove_message(&mut self, nonce: &MessageNonce);
}

/// Inbound messages lane.
Expand All @@ -49,188 +43,55 @@ impl<S: InboundLaneStorage> InboundLane<S> {
}

/// Receive new message.
pub fn receive_message(
pub fn receive_message<P: OnMessageReceived<S::Payload>>(
&mut self,
nonce: MessageNonce,
payload: S::Payload,
processor: &mut impl OnMessageReceived<S::Payload>,
) -> bool {
let mut data = self.storage.data();
let is_correct_message = nonce == data.latest_received_nonce + 1;
if !is_correct_message {
return false;
}

let is_process_required = is_correct_message && data.oldest_unprocessed_nonce == nonce;
data.latest_received_nonce = nonce;
self.storage.set_data(data);

let payload_to_save = match is_process_required {
true => {
let message = Message {
key: MessageKey {
lane_id: self.storage.id(),
nonce,
},
payload,
};
match processor.on_message_received(message) {
MessageResult::Processed => None,
MessageResult::NotProcessed(message) => Some(message.payload),
}
}
false => Some(payload),
};

if let Some(payload_to_save) = payload_to_save {
self.storage.save_message(nonce, payload_to_save);
}
P::on_message_received(Message {
key: MessageKey {
lane_id: self.storage.id(),
nonce,
},
payload,
});

true
}

/// Process stored lane messages.
///
/// Stops processing either when all messages are processed, or when processor returns
/// MessageResult::NotProcessed.
pub fn process_messages(&mut self, processor: &mut impl OnMessageReceived<S::Payload>) {
let mut anything_processed = false;
let mut data = self.storage.data();
while data.oldest_unprocessed_nonce <= data.latest_received_nonce {
let nonce = data.oldest_unprocessed_nonce;
let payload = self
.storage
.message(&nonce)
.expect("message is referenced by lane; referenced message is not pruned; qed");
let message = Message {
key: MessageKey {
lane_id: self.storage.id(),
nonce,
},
payload,
};

let process_result = processor.on_message_received(message);
if let MessageResult::NotProcessed(_) = process_result {
break;
}

self.storage.remove_message(&nonce);

anything_processed = true;
data.oldest_unprocessed_nonce += 1;
}

if anything_processed {
self.storage.set_data(data);
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{
inbound_lane,
mock::{
run_test, TestMessageProcessor, TestPayload, TestRuntime, PAYLOAD_TO_QUEUE, REGULAR_PAYLOAD, TEST_LANE_ID,
},
mock::{run_test, TestRuntime, REGULAR_PAYLOAD, TEST_LANE_ID},
};

#[test]
fn fails_to_receive_message_with_incorrect_nonce() {
run_test(|| {
let mut lane = inbound_lane::<TestRuntime, _>(TEST_LANE_ID);
assert!(!lane.receive_message(10, REGULAR_PAYLOAD, &mut TestMessageProcessor));
assert!(lane.storage.message(&10).is_none());
assert!(!lane.receive_message::<()>(10, REGULAR_PAYLOAD));
assert_eq!(lane.storage.data().latest_received_nonce, 0);
});
}

#[test]
fn correct_message_is_queued_if_some_other_messages_are_queued() {
run_test(|| {
let mut lane = inbound_lane::<TestRuntime, _>(TEST_LANE_ID);
assert!(lane.receive_message(1, PAYLOAD_TO_QUEUE, &mut TestMessageProcessor));
assert!(lane.storage.message(&1).is_some());
assert!(lane.receive_message(2, REGULAR_PAYLOAD, &mut TestMessageProcessor));
assert!(lane.storage.message(&2).is_some());
assert_eq!(lane.storage.data().latest_received_nonce, 2);
});
}

#[test]
fn correct_message_is_queued_if_processor_wants_to_queue() {
run_test(|| {
let mut lane = inbound_lane::<TestRuntime, _>(TEST_LANE_ID);
assert!(lane.receive_message(1, PAYLOAD_TO_QUEUE, &mut TestMessageProcessor));
assert!(lane.storage.message(&1).is_some());
assert_eq!(lane.storage.data().latest_received_nonce, 1);
});
}

#[test]
fn correct_message_is_not_queued_if_processed_instantly() {
fn correct_message_is_processed_instantly() {
run_test(|| {
let mut lane = inbound_lane::<TestRuntime, _>(TEST_LANE_ID);
assert!(lane.receive_message(1, REGULAR_PAYLOAD, &mut TestMessageProcessor));
assert!(lane.storage.message(&1).is_none());
assert!(lane.receive_message::<()>(1, REGULAR_PAYLOAD));
assert_eq!(lane.storage.data().latest_received_nonce, 1);
});
}

#[test]
fn process_message_does_nothing_when_lane_is_empty() {
run_test(|| {
let mut lane = inbound_lane::<TestRuntime, _>(TEST_LANE_ID);
assert_eq!(lane.storage.data().oldest_unprocessed_nonce, 1);
lane.process_messages(&mut TestMessageProcessor);
assert_eq!(lane.storage.data().oldest_unprocessed_nonce, 1);
});
}

#[test]
fn process_message_works() {
run_test(|| {
pub struct QueueByNonce(MessageNonce);

impl OnMessageReceived<TestPayload> for QueueByNonce {
fn on_message_received(&mut self, message: Message<TestPayload>) -> MessageResult<TestPayload> {
if message.key.nonce == self.0 {
MessageResult::NotProcessed(message)
} else {
MessageResult::Processed
}
}
}

let mut lane = inbound_lane::<TestRuntime, _>(TEST_LANE_ID);
assert!(lane.receive_message(1, PAYLOAD_TO_QUEUE, &mut TestMessageProcessor));
assert!(lane.receive_message(2, PAYLOAD_TO_QUEUE, &mut TestMessageProcessor));
assert!(lane.receive_message(3, PAYLOAD_TO_QUEUE, &mut TestMessageProcessor));
assert!(lane.receive_message(4, REGULAR_PAYLOAD, &mut TestMessageProcessor));

assert!(lane.storage.message(&1).is_some());
assert!(lane.storage.message(&2).is_some());
assert!(lane.storage.message(&3).is_some());
assert!(lane.storage.message(&4).is_some());
assert_eq!(lane.storage.data().oldest_unprocessed_nonce, 1);

lane.process_messages(&mut QueueByNonce(3));

assert!(lane.storage.message(&1).is_none());
assert!(lane.storage.message(&2).is_none());
assert!(lane.storage.message(&3).is_some());
assert!(lane.storage.message(&4).is_some());
assert_eq!(lane.storage.data().oldest_unprocessed_nonce, 3);

lane.process_messages(&mut QueueByNonce(10));

assert!(lane.storage.message(&1).is_none());
assert!(lane.storage.message(&2).is_none());
assert!(lane.storage.message(&3).is_none());
assert!(lane.storage.message(&4).is_none());
assert_eq!(lane.storage.data().oldest_unprocessed_nonce, 5);
});
}
}
Loading

0 comments on commit 6067d02

Please sign in to comment.