Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

BEEFY subscription fires multiple times for the same commitment #10684

Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
04e9166
prevent duplicate justification notifications
Wizdave97 Jan 17, 2022
acbc825
Merge branch 'master' of https://github.com/paritytech/substrate into…
Wizdave97 Jan 17, 2022
8cc6ac7
minor fix
Wizdave97 Jan 17, 2022
e98492b
clean up
Wizdave97 Jan 17, 2022
2ca50e3
fix failing tests
Wizdave97 Jan 17, 2022
1029b26
minor fix
Wizdave97 Jan 17, 2022
0d40e9a
add comments to test
Wizdave97 Jan 17, 2022
5cbf9d2
fix typo
Wizdave97 Jan 17, 2022
b20926c
minor fix
Wizdave97 Jan 18, 2022
307a147
Merge branch 'master' of https://github.com/paritytech/substrate into…
Wizdave97 Jan 19, 2022
9147724
Merge branch 'master' of https://github.com/paritytech/substrate into…
Wizdave97 Jan 21, 2022
5253c95
Merge branch 'master' of https://github.com/paritytech/substrate into…
Wizdave97 Jan 24, 2022
c6a8a2f
Merge branch 'paritytech:master' into david/beefy-subscribe-justifica…
Wizdave97 Jan 24, 2022
fe0f7ec
updated test case
Wizdave97 Jan 24, 2022
2be30bb
remove unreliable tests
Wizdave97 Jan 24, 2022
61088bd
Merge branch 'paritytech:master' into david/beefy-subscribe-justifica…
Wizdave97 Jan 24, 2022
c302c39
minor fix
Wizdave97 Jan 25, 2022
b213cce
Merge branch 'david/beefy-subscribe-justification-rpc' of github.com:…
Wizdave97 Jan 25, 2022
d0846ed
fix
Wizdave97 Jan 25, 2022
925ce6b
Merge branch 'paritytech:master' into david/beefy-subscribe-justifica…
Wizdave97 Jan 26, 2022
74e2b6c
Merge branch 'master' of https://github.com/paritytech/substrate into…
Wizdave97 Jan 27, 2022
99adcdf
add justification dedup test case
Wizdave97 Jan 27, 2022
11aba7c
Merge branch 'david/beefy-subscribe-justification-rpc' of github.com:…
Wizdave97 Jan 27, 2022
c3b7f22
fix tests
Wizdave97 Jan 28, 2022
d02bb2f
Merge branch 'master' of https://github.com/paritytech/substrate into…
Wizdave97 Jan 28, 2022
854cd0a
fix test assertion string
Wizdave97 Jan 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 79 additions & 14 deletions client/beefy/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
use parking_lot::RwLock;
use std::sync::Arc;

use sp_runtime::traits::Block as BlockT;
use sp_runtime::traits::{Block as BlockT, NumberFor};

use futures::{task::SpawnError, FutureExt, SinkExt, StreamExt, TryFutureExt};
use futures::{future, task::SpawnError, FutureExt, SinkExt, StreamExt, TryFutureExt};
use jsonrpc_derive::rpc;
use jsonrpc_pubsub::{manager::SubscriptionManager, typed::Subscriber, SubscriptionId};
use log::warn;
Expand Down Expand Up @@ -120,6 +120,7 @@ pub struct BeefyRpcHandler<Block: BlockT> {
signed_commitment_stream: BeefySignedCommitmentStream<Block>,
beefy_best_block: Arc<RwLock<Option<Block::Hash>>>,
manager: SubscriptionManager,
beefy_best_block_num: Arc<RwLock<Option<NumberFor<Block>>>>,
acatangiu marked this conversation as resolved.
Show resolved Hide resolved
}

impl<Block: BlockT> BeefyRpcHandler<Block> {
Expand All @@ -133,13 +134,17 @@ impl<Block: BlockT> BeefyRpcHandler<Block> {
E: futures::task::Spawn + Send + Sync + 'static,
{
let beefy_best_block = Arc::new(RwLock::new(None));
let beefy_best_block_num = Arc::new(RwLock::new(None));

let stream = best_block_stream.subscribe();
let closure_clone = beefy_best_block.clone();
let beefy_block_num_clone = beefy_best_block_num.clone();
let future = stream.for_each(move |best_beefy| {
let async_clone = closure_clone.clone();
let async_block_num_clone = beefy_block_num_clone.clone();
async move {
*async_clone.write() = Some(best_beefy);
*async_clone.write() = Some(best_beefy.0);
*async_block_num_clone.write() = Some(best_beefy.1)
}
});

Expand All @@ -151,7 +156,7 @@ impl<Block: BlockT> BeefyRpcHandler<Block> {
})?;

let manager = SubscriptionManager::new(Arc::new(executor));
Ok(Self { signed_commitment_stream, beefy_best_block, manager })
Ok(Self { signed_commitment_stream, beefy_best_block, manager, beefy_best_block_num })
}
}

Expand All @@ -166,9 +171,19 @@ where
_metadata: Self::Metadata,
subscriber: Subscriber<notification::EncodedSignedCommitment>,
) {
let beefy_block_num = self.beefy_best_block_num.clone();
let stream = self
.signed_commitment_stream
.subscribe()
.filter(move |x| {
let best_block_clone = beefy_block_num.clone();
let best_block = best_block_clone.read();
if let Some(best_block) = *best_block {
future::ready(x.commitment.block_number > best_block)
} else {
future::ready(true)
}
})
.map(|x| Ok::<_, ()>(Ok(notification::EncodedSignedCommitment::new::<Block>(x))));

self.manager.add(subscriber, |sink| {
Expand Down Expand Up @@ -257,7 +272,7 @@ mod tests {
let (io, _) = setup_io_handler_with_best_block_stream(stream);

let hash = BlakeTwo256::hash(b"42");
let r: Result<(), ()> = sender.notify(|| Ok(hash));
let r: Result<(), ()> = sender.notify(|| Ok((hash, 0)));
r.unwrap();

// Verify RPC `beefy_getFinalizedHead` returns expected hash.
Expand Down Expand Up @@ -319,7 +334,10 @@ mod tests {
// Unsubscribe again and fail
assert_eq!(
io.handle_request_sync(&unsub_req, meta),
Some(r#"{"jsonrpc":"2.0","error":{"code":-32602,"message":"Invalid subscription id."},"id":1}"#.into()),
Some(
r#"{"jsonrpc":"2.0","error":{"code":-32602,"message":"Invalid subscription id."},"id":1}"#
.into()
),
);
}

Expand All @@ -341,18 +359,17 @@ mod tests {
r#"{"jsonrpc":"2.0","method":"beefy_unsubscribeJustifications","params":["FOO"],"id":1}"#,
meta.clone()
),
Some(r#"{"jsonrpc":"2.0","error":{"code":-32602,"message":"Invalid subscription id."},"id":1}"#.into())
Some(
r#"{"jsonrpc":"2.0","error":{"code":-32602,"message":"Invalid subscription id."},"id":1}"#
.into()
)
);
}

fn create_commitment() -> BeefySignedCommitment<Block> {
fn create_commitment(block_number: u64) -> BeefySignedCommitment<Block> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

block numbers are actually u32

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The susbtrate-test-runtime uses a u64

let payload = Payload::new(known_payload_ids::MMR_ROOT_ID, "Hello World!".encode());
BeefySignedCommitment::<Block> {
commitment: beefy_primitives::Commitment {
payload,
block_number: 5,
validator_set_id: 0,
},
commitment: beefy_primitives::Commitment { payload, block_number, validator_set_id: 0 },
signatures: vec![],
}
}
Expand All @@ -371,7 +388,7 @@ mod tests {
let sub_id: String = serde_json::from_value(resp["result"].take()).unwrap();

// Notify with commitment
let commitment = create_commitment();
let commitment = create_commitment(5);
let r: Result<(), ()> = commitment_sender.notify(|| Ok(commitment.clone()));
r.unwrap();

Expand All @@ -393,4 +410,52 @@ mod tests {
assert_eq!(recv_sub_id, sub_id);
assert_eq!(recv_commitment, commitment);
}

#[test]
fn should_not_send_signed_commitment_multiple_times() {
let (sender, stream) = BeefyBestBlockStream::<Block>::channel();
let (io, commitment_sender) = setup_io_handler_with_best_block_stream(stream);
let (meta, receiver) = setup_session();

// Subscribe
let sub_request =
r#"{"jsonrpc":"2.0","method":"beefy_subscribeJustifications","params":[],"id":1}"#;
let resp = io.handle_request_sync(sub_request, meta.clone());
let resp: Output = serde_json::from_str(&resp.unwrap()).unwrap();

let sub_id = match resp {
Output::Success(success) => success.result,
_ => panic!(),
};

let sub_id: String = serde_json::from_value(sub_id).unwrap();

// Notify with commitment
let commitment = create_commitment(5);
let r: Result<(), ()> = commitment_sender.notify(|| Ok(commitment.clone()));
r.unwrap();
let hash = BlakeTwo256::hash(b"43");
let r: Result<(), ()> = sender.notify(|| Ok((hash, 5)));
r.unwrap();

// This commitment should be filtered out
let r: Result<(), ()> = commitment_sender.notify(|| Ok(commitment.clone()));
r.unwrap();

let commitment_1 = create_commitment(6);

let r: Result<(), ()> = commitment_sender.notify(|| Ok(commitment_1.clone()));
r.unwrap();

// Inspect what we received
// We should have received only two commitments
let recvs = futures::executor::block_on(receiver.take(2).collect::<Vec<_>>());
assert_eq!(
recvs,
vec![
format!("{{\"jsonrpc\":\"2.0\",\"method\":\"beefy_justifications\",\"params\":{{\"result\":\"0x046d68343048656c6c6f20576f726c64210500000000000000000000000000000004000000000000\",\"subscription\":\"{}\"}}}}", sub_id),
format!("{{\"jsonrpc\":\"2.0\",\"method\":\"beefy_justifications\",\"params\":{{\"result\":\"0x046d68343048656c6c6f20576f726c64210600000000000000000000000000000004000000000000\",\"subscription\":\"{}\"}}}}", sub_id)
]
);
}
}
5 changes: 3 additions & 2 deletions client/beefy/src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ pub type BeefySignedCommitment<Block> =

/// The sending half of the notifications channel(s) used to send
/// notifications about best BEEFY block from the gadget side.
pub type BeefyBestBlockSender<Block> = NotificationSender<<Block as BlockT>::Hash>;
pub type BeefyBestBlockSender<Block> =
NotificationSender<(<Block as BlockT>::Hash, NumberFor<Block>)>;

/// The receiving half of a notifications channel used to receive
/// notifications about best BEEFY blocks determined on the gadget side.
pub type BeefyBestBlockStream<Block> =
NotificationStream<<Block as BlockT>::Hash, BeefyBestBlockTracingKey>;
NotificationStream<(<Block as BlockT>::Hash, NumberFor<Block>), BeefyBestBlockTracingKey>;

/// The sending half of the notifications channel(s) used to send notifications
/// about signed commitments generated at the end of a BEEFY round.
Expand Down
6 changes: 4 additions & 2 deletions client/beefy/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,9 @@ where

self.best_beefy_block = Some(*notification.header.number());
self.beefy_best_block_sender
.notify(|| Ok::<_, ()>(notification.hash.clone()))
.notify(|| {
Ok::<_, ()>((notification.hash.clone(), *notification.header.number()))
})
.expect("forwards closure result; the closure always returns Ok; qed.");

// this metric is kind of 'fake'. Best BEEFY block should only be updated once we
Expand Down Expand Up @@ -374,7 +376,7 @@ where
if let Err(err) = self.client.hash(block_num).map(|h| {
if let Some(hash) = h {
self.beefy_best_block_sender
.notify(|| Ok::<_, ()>(hash))
.notify(|| Ok::<_, ()>((hash, block_num)))
.expect("forwards closure result; the closure always returns Ok; qed.");
}
}) {
Expand Down