From 45327192c223a13d34ccaa9e0509d044f71c42e2 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Fri, 1 Oct 2021 13:13:41 +0200 Subject: [PATCH 1/3] av-store: clean up StoreAvailableData message --- node/core/av-store/src/lib.rs | 1 - node/core/av-store/src/tests.rs | 7 ------- node/core/backing/src/lib.rs | 9 --------- node/core/backing/src/tests.rs | 8 ++++---- node/core/dispute-participation/src/lib.rs | 1 - node/core/dispute-participation/src/tests.rs | 1 - node/subsystem-types/src/messages.rs | 4 +--- 7 files changed, 5 insertions(+), 26 deletions(-) diff --git a/node/core/av-store/src/lib.rs b/node/core/av-store/src/lib.rs index 34b26df3c958..8337d78a96b8 100644 --- a/node/core/av-store/src/lib.rs +++ b/node/core/av-store/src/lib.rs @@ -1076,7 +1076,6 @@ fn process_message( }, AvailabilityStoreMessage::StoreAvailableData( candidate, - _our_index, n_validators, available_data, tx, diff --git a/node/core/av-store/src/tests.rs b/node/core/av-store/src/tests.rs index 1101edde928f..8a4816ebe3eb 100644 --- a/node/core/av-store/src/tests.rs +++ b/node/core/av-store/src/tests.rs @@ -422,7 +422,6 @@ fn store_block_works() { let (tx, rx) = oneshot::channel(); let block_msg = AvailabilityStoreMessage::StoreAvailableData( candidate_hash, - Some(validator_index), n_validators, available_data.clone(), tx, @@ -476,7 +475,6 @@ fn store_pov_and_query_chunk_works() { let (tx, rx) = oneshot::channel(); let block_msg = AvailabilityStoreMessage::StoreAvailableData( candidate_hash, - None, n_validators, available_data, tx, @@ -523,7 +521,6 @@ fn query_all_chunks_works() { let (tx, rx) = oneshot::channel(); let block_msg = AvailabilityStoreMessage::StoreAvailableData( candidate_hash_1, - None, n_validators, available_data, tx, @@ -612,7 +609,6 @@ fn stored_but_not_included_data_is_pruned() { let (tx, rx) = oneshot::channel(); let block_msg = AvailabilityStoreMessage::StoreAvailableData( candidate_hash, - None, n_validators, available_data.clone(), tx, @@ -665,7 +661,6 @@ fn stored_data_kept_until_finalized() { let (tx, rx) = oneshot::channel(); let block_msg = AvailabilityStoreMessage::StoreAvailableData( candidate_hash, - None, n_validators, available_data.clone(), tx, @@ -901,7 +896,6 @@ fn forkfullness_works() { let (tx, rx) = oneshot::channel(); let msg = AvailabilityStoreMessage::StoreAvailableData( candidate_1_hash, - None, n_validators, available_data_1.clone(), tx, @@ -914,7 +908,6 @@ fn forkfullness_works() { let (tx, rx) = oneshot::channel(); let msg = AvailabilityStoreMessage::StoreAvailableData( candidate_2_hash, - None, n_validators, available_data_2.clone(), tx, diff --git a/node/core/backing/src/lib.rs b/node/core/backing/src/lib.rs index 338f333efbfa..5e06c6cd6edc 100644 --- a/node/core/backing/src/lib.rs +++ b/node/core/backing/src/lib.rs @@ -294,7 +294,6 @@ fn table_attested_to_backed( async fn store_available_data( sender: &mut JobSender, - id: Option, n_validators: u32, candidate_hash: CandidateHash, available_data: AvailableData, @@ -303,7 +302,6 @@ async fn store_available_data( sender .send_message(AvailabilityStoreMessage::StoreAvailableData( candidate_hash, - id, n_validators, available_data, tx, @@ -321,7 +319,6 @@ async fn store_available_data( // This returns `Err()` iff there is an internal error. Otherwise, it returns either `Ok(Ok(()))` or `Ok(Err(_))`. async fn make_pov_available( sender: &mut JobSender, - validator_index: Option, n_validators: usize, pov: Arc, candidate_hash: CandidateHash, @@ -349,7 +346,6 @@ async fn make_pov_available( store_available_data( sender, - validator_index, n_validators as u32, candidate_hash, available_data, @@ -409,7 +405,6 @@ struct BackgroundValidationParams, F> candidate: CandidateReceipt, relay_parent: Hash, pov: PoVData, - validator_index: Option, n_validators: usize, span: Option, make_command: F, @@ -427,7 +422,6 @@ async fn validate_and_make_available( candidate, relay_parent, pov, - validator_index, n_validators, span, make_command, @@ -484,7 +478,6 @@ async fn validate_and_make_available( } else { let erasure_valid = make_pov_available( &mut sender, - validator_index, n_validators, pov.clone(), candidate.hash(), @@ -719,7 +712,6 @@ impl CandidateBackingJob { candidate: candidate.clone(), relay_parent: self.parent, pov: PoVData::Ready(pov), - validator_index: self.table_context.validator.as_ref().map(|v| v.index()), n_validators: self.table_context.validators.len(), span, make_command: ValidatedCandidateCommand::Second, @@ -1033,7 +1025,6 @@ impl CandidateBackingJob { candidate: attesting.candidate, relay_parent: self.parent, pov, - validator_index: self.table_context.validator.as_ref().map(|v| v.index()), n_validators: self.table_context.validators.len(), span, make_command: ValidatedCandidateCommand::Attest, diff --git a/node/core/backing/src/tests.rs b/node/core/backing/src/tests.rs index 86282c38f91c..675f63281063 100644 --- a/node/core/backing/src/tests.rs +++ b/node/core/backing/src/tests.rs @@ -336,7 +336,7 @@ fn backing_second_works() { assert_matches!( virtual_overseer.recv().await, AllMessages::AvailabilityStore( - AvailabilityStoreMessage::StoreAvailableData(candidate_hash, _, _, _, tx) + AvailabilityStoreMessage::StoreAvailableData(candidate_hash, .., tx) ) if candidate_hash == candidate.hash() => { tx.send(Ok(())).unwrap(); } @@ -495,7 +495,7 @@ fn backing_works() { assert_matches!( virtual_overseer.recv().await, AllMessages::AvailabilityStore( - AvailabilityStoreMessage::StoreAvailableData(candidate_hash, _, _, _, tx) + AvailabilityStoreMessage::StoreAvailableData(candidate_hash, .., tx) ) if candidate_hash == candidate_a.hash() => { tx.send(Ok(())).unwrap(); } @@ -853,7 +853,7 @@ fn backing_misbehavior_works() { assert_matches!( virtual_overseer.recv().await, AllMessages::AvailabilityStore( - AvailabilityStoreMessage::StoreAvailableData(candidate_hash, _, _, _, tx) + AvailabilityStoreMessage::StoreAvailableData(candidate_hash, .., tx) ) if candidate_hash == candidate_a.hash() => { tx.send(Ok(())).unwrap(); } @@ -1027,7 +1027,7 @@ fn backing_dont_second_invalid() { assert_matches!( virtual_overseer.recv().await, AllMessages::AvailabilityStore( - AvailabilityStoreMessage::StoreAvailableData(candidate_hash, _, _, _, tx) + AvailabilityStoreMessage::StoreAvailableData(candidate_hash, .., tx) ) if candidate_hash == candidate_b.hash() => { tx.send(Ok(())).unwrap(); } diff --git a/node/core/dispute-participation/src/lib.rs b/node/core/dispute-participation/src/lib.rs index c79d10462e06..7e3ff5e71e39 100644 --- a/node/core/dispute-participation/src/lib.rs +++ b/node/core/dispute-participation/src/lib.rs @@ -250,7 +250,6 @@ async fn participate( // in the dispute ctx.send_message(AvailabilityStoreMessage::StoreAvailableData( candidate_hash, - None, n_validators, available_data.clone(), store_available_data_tx, diff --git a/node/core/dispute-participation/src/tests.rs b/node/core/dispute-participation/src/tests.rs index 40bfdd3f7b61..67a06db25e77 100644 --- a/node/core/dispute-participation/src/tests.rs +++ b/node/core/dispute-participation/src/tests.rs @@ -154,7 +154,6 @@ async fn store_available_data(virtual_overseer: &mut VirtualOverseer, success: b _, _, _, - _, tx, )) => { if success { diff --git a/node/subsystem-types/src/messages.rs b/node/subsystem-types/src/messages.rs index 9afdbc0fcad3..5a14aaa33c26 100644 --- a/node/subsystem-types/src/messages.rs +++ b/node/subsystem-types/src/messages.rs @@ -486,13 +486,11 @@ pub enum AvailabilityStoreMessage { tx: oneshot::Sender>, }, - /// Store a `AvailableData` in the AV store. - /// If `ValidatorIndex` is present store corresponding chunk also. + /// Store a `AvailableData` and all of its chunks in the AV store. /// /// Return `Ok(())` if the store operation succeeded, `Err(())` if it failed. StoreAvailableData( CandidateHash, - Option, u32, AvailableData, oneshot::Sender>, From 5c748c01a44817a3867ba9cebb829f30ea4e9e33 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Fri, 1 Oct 2021 16:19:51 +0200 Subject: [PATCH 2/3] fmt --- node/core/backing/src/lib.rs | 8 +------- node/subsystem-types/src/messages.rs | 7 +------ 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/node/core/backing/src/lib.rs b/node/core/backing/src/lib.rs index 5e06c6cd6edc..a28fbacd3d76 100644 --- a/node/core/backing/src/lib.rs +++ b/node/core/backing/src/lib.rs @@ -344,13 +344,7 @@ async fn make_pov_available( { let _span = span.as_ref().map(|s| s.child("store-data").with_candidate(candidate_hash)); - store_available_data( - sender, - n_validators as u32, - candidate_hash, - available_data, - ) - .await?; + store_available_data(sender, n_validators as u32, candidate_hash, available_data).await?; } Ok(Ok(())) diff --git a/node/subsystem-types/src/messages.rs b/node/subsystem-types/src/messages.rs index 5a14aaa33c26..be46d43618c8 100644 --- a/node/subsystem-types/src/messages.rs +++ b/node/subsystem-types/src/messages.rs @@ -489,12 +489,7 @@ pub enum AvailabilityStoreMessage { /// Store a `AvailableData` and all of its chunks in the AV store. /// /// Return `Ok(())` if the store operation succeeded, `Err(())` if it failed. - StoreAvailableData( - CandidateHash, - u32, - AvailableData, - oneshot::Sender>, - ), + StoreAvailableData(CandidateHash, u32, AvailableData, oneshot::Sender>), } impl AvailabilityStoreMessage { From 18f544455b1443de06a4b41030adeb44e4523007 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Fri, 1 Oct 2021 18:53:55 +0200 Subject: [PATCH 3/3] use named fields --- node/core/av-store/src/lib.rs | 8 ++-- node/core/av-store/src/tests.rs | 44 ++++++++++---------- node/core/backing/src/lib.rs | 4 +- node/core/backing/src/tests.rs | 8 ++-- node/core/dispute-participation/src/lib.rs | 8 ++-- node/core/dispute-participation/src/tests.rs | 7 +--- node/subsystem-types/src/messages.rs | 11 ++++- 7 files changed, 47 insertions(+), 43 deletions(-) diff --git a/node/core/av-store/src/lib.rs b/node/core/av-store/src/lib.rs index 8337d78a96b8..96c0266a4fa6 100644 --- a/node/core/av-store/src/lib.rs +++ b/node/core/av-store/src/lib.rs @@ -1074,18 +1074,18 @@ fn process_message( }, } }, - AvailabilityStoreMessage::StoreAvailableData( - candidate, + AvailabilityStoreMessage::StoreAvailableData { + candidate_hash, n_validators, available_data, tx, - ) => { + } => { subsystem.metrics.on_chunks_received(n_validators as _); let _timer = subsystem.metrics.time_store_available_data(); let res = - store_available_data(&subsystem, candidate, n_validators as _, available_data); + store_available_data(&subsystem, candidate_hash, n_validators as _, available_data); match res { Ok(()) => { diff --git a/node/core/av-store/src/tests.rs b/node/core/av-store/src/tests.rs index 8a4816ebe3eb..428ed51aecde 100644 --- a/node/core/av-store/src/tests.rs +++ b/node/core/av-store/src/tests.rs @@ -420,12 +420,12 @@ fn store_block_works() { }; let (tx, rx) = oneshot::channel(); - let block_msg = AvailabilityStoreMessage::StoreAvailableData( + let block_msg = AvailabilityStoreMessage::StoreAvailableData { candidate_hash, n_validators, - available_data.clone(), + available_data: available_data.clone(), tx, - ); + }; virtual_overseer.send(FromOverseer::Communication { msg: block_msg }).await; assert_eq!(rx.await.unwrap(), Ok(())); @@ -473,12 +473,12 @@ fn store_pov_and_query_chunk_works() { erasure::obtain_chunks_v1(n_validators as _, &available_data).unwrap(); let (tx, rx) = oneshot::channel(); - let block_msg = AvailabilityStoreMessage::StoreAvailableData( + let block_msg = AvailabilityStoreMessage::StoreAvailableData { candidate_hash, n_validators, available_data, tx, - ); + }; virtual_overseer.send(FromOverseer::Communication { msg: block_msg }).await; @@ -519,12 +519,12 @@ fn query_all_chunks_works() { { let (tx, rx) = oneshot::channel(); - let block_msg = AvailabilityStoreMessage::StoreAvailableData( - candidate_hash_1, + let block_msg = AvailabilityStoreMessage::StoreAvailableData { + candidate_hash: candidate_hash_1, n_validators, available_data, tx, - ); + }; virtual_overseer.send(FromOverseer::Communication { msg: block_msg }).await; assert_eq!(rx.await.unwrap(), Ok(())); @@ -607,12 +607,12 @@ fn stored_but_not_included_data_is_pruned() { }; let (tx, rx) = oneshot::channel(); - let block_msg = AvailabilityStoreMessage::StoreAvailableData( + let block_msg = AvailabilityStoreMessage::StoreAvailableData { candidate_hash, n_validators, - available_data.clone(), + available_data: available_data.clone(), tx, - ); + }; virtual_overseer.send(FromOverseer::Communication { msg: block_msg }).await; @@ -659,12 +659,12 @@ fn stored_data_kept_until_finalized() { let block_number = 10; let (tx, rx) = oneshot::channel(); - let block_msg = AvailabilityStoreMessage::StoreAvailableData( + let block_msg = AvailabilityStoreMessage::StoreAvailableData { candidate_hash, n_validators, - available_data.clone(), + available_data: available_data.clone(), tx, - ); + }; virtual_overseer.send(FromOverseer::Communication { msg: block_msg }).await; @@ -894,24 +894,24 @@ fn forkfullness_works() { }; let (tx, rx) = oneshot::channel(); - let msg = AvailabilityStoreMessage::StoreAvailableData( - candidate_1_hash, + let msg = AvailabilityStoreMessage::StoreAvailableData { + candidate_hash: candidate_1_hash, n_validators, - available_data_1.clone(), + available_data: available_data_1.clone(), tx, - ); + }; virtual_overseer.send(FromOverseer::Communication { msg }).await; rx.await.unwrap().unwrap(); let (tx, rx) = oneshot::channel(); - let msg = AvailabilityStoreMessage::StoreAvailableData( - candidate_2_hash, + let msg = AvailabilityStoreMessage::StoreAvailableData { + candidate_hash: candidate_2_hash, n_validators, - available_data_2.clone(), + available_data: available_data_2.clone(), tx, - ); + }; virtual_overseer.send(FromOverseer::Communication { msg }).await; diff --git a/node/core/backing/src/lib.rs b/node/core/backing/src/lib.rs index a28fbacd3d76..bd24244a01f8 100644 --- a/node/core/backing/src/lib.rs +++ b/node/core/backing/src/lib.rs @@ -300,12 +300,12 @@ async fn store_available_data( ) -> Result<(), Error> { let (tx, rx) = oneshot::channel(); sender - .send_message(AvailabilityStoreMessage::StoreAvailableData( + .send_message(AvailabilityStoreMessage::StoreAvailableData { candidate_hash, n_validators, available_data, tx, - )) + }) .await; let _ = rx.await.map_err(Error::StoreAvailableData)?; diff --git a/node/core/backing/src/tests.rs b/node/core/backing/src/tests.rs index 675f63281063..68c1e30dfd31 100644 --- a/node/core/backing/src/tests.rs +++ b/node/core/backing/src/tests.rs @@ -336,7 +336,7 @@ fn backing_second_works() { assert_matches!( virtual_overseer.recv().await, AllMessages::AvailabilityStore( - AvailabilityStoreMessage::StoreAvailableData(candidate_hash, .., tx) + AvailabilityStoreMessage::StoreAvailableData { candidate_hash, tx, .. } ) if candidate_hash == candidate.hash() => { tx.send(Ok(())).unwrap(); } @@ -495,7 +495,7 @@ fn backing_works() { assert_matches!( virtual_overseer.recv().await, AllMessages::AvailabilityStore( - AvailabilityStoreMessage::StoreAvailableData(candidate_hash, .., tx) + AvailabilityStoreMessage::StoreAvailableData { candidate_hash, tx, .. } ) if candidate_hash == candidate_a.hash() => { tx.send(Ok(())).unwrap(); } @@ -853,7 +853,7 @@ fn backing_misbehavior_works() { assert_matches!( virtual_overseer.recv().await, AllMessages::AvailabilityStore( - AvailabilityStoreMessage::StoreAvailableData(candidate_hash, .., tx) + AvailabilityStoreMessage::StoreAvailableData { candidate_hash, tx, .. } ) if candidate_hash == candidate_a.hash() => { tx.send(Ok(())).unwrap(); } @@ -1027,7 +1027,7 @@ fn backing_dont_second_invalid() { assert_matches!( virtual_overseer.recv().await, AllMessages::AvailabilityStore( - AvailabilityStoreMessage::StoreAvailableData(candidate_hash, .., tx) + AvailabilityStoreMessage::StoreAvailableData { candidate_hash, tx, .. } ) if candidate_hash == candidate_b.hash() => { tx.send(Ok(())).unwrap(); } diff --git a/node/core/dispute-participation/src/lib.rs b/node/core/dispute-participation/src/lib.rs index 7e3ff5e71e39..85b31f0a86f4 100644 --- a/node/core/dispute-participation/src/lib.rs +++ b/node/core/dispute-participation/src/lib.rs @@ -248,12 +248,12 @@ async fn participate( // we dispatch a request to store the available data for the candidate. we // want to maximize data availability for other potential checkers involved // in the dispute - ctx.send_message(AvailabilityStoreMessage::StoreAvailableData( + ctx.send_message(AvailabilityStoreMessage::StoreAvailableData { candidate_hash, n_validators, - available_data.clone(), - store_available_data_tx, - )) + available_data: available_data.clone(), + tx: store_available_data_tx, + }) .await; match store_available_data_rx.await? { diff --git a/node/core/dispute-participation/src/tests.rs b/node/core/dispute-participation/src/tests.rs index 67a06db25e77..e2c98af53b5e 100644 --- a/node/core/dispute-participation/src/tests.rs +++ b/node/core/dispute-participation/src/tests.rs @@ -150,12 +150,7 @@ async fn fetch_validation_code(virtual_overseer: &mut VirtualOverseer) { async fn store_available_data(virtual_overseer: &mut VirtualOverseer, success: bool) { assert_matches!( virtual_overseer.recv().await, - AllMessages::AvailabilityStore(AvailabilityStoreMessage::StoreAvailableData( - _, - _, - _, - tx, - )) => { + AllMessages::AvailabilityStore(AvailabilityStoreMessage::StoreAvailableData { tx, .. }) => { if success { tx.send(Ok(())).unwrap(); } else { diff --git a/node/subsystem-types/src/messages.rs b/node/subsystem-types/src/messages.rs index be46d43618c8..c6fb92736c51 100644 --- a/node/subsystem-types/src/messages.rs +++ b/node/subsystem-types/src/messages.rs @@ -489,7 +489,16 @@ pub enum AvailabilityStoreMessage { /// Store a `AvailableData` and all of its chunks in the AV store. /// /// Return `Ok(())` if the store operation succeeded, `Err(())` if it failed. - StoreAvailableData(CandidateHash, u32, AvailableData, oneshot::Sender>), + StoreAvailableData { + /// A hash of the candidate this `available_data` belongs to. + candidate_hash: CandidateHash, + /// The number of validators in the session. + n_validators: u32, + /// The `AvailableData` itself. + available_data: AvailableData, + /// Sending side of the channel to send result to. + tx: oneshot::Sender>, + }, } impl AvailabilityStoreMessage {