Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into shamb0/cxh-improve-…
Browse files Browse the repository at this point in the history
…DATask-test-coverage
  • Loading branch information
shamb0 committed Aug 14, 2024
2 parents 3cb4bc0 + 3f1e7c0 commit 8411537
Show file tree
Hide file tree
Showing 22 changed files with 231 additions and 188 deletions.
10 changes: 6 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/example-types/src/node_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub struct CombinedImpl;
pub type StaticMembership = StaticCommittee<TestTypes>;

impl<TYPES: NodeType> NodeImplementation<TYPES> for PushCdnImpl {
type Network = PushCdnNetwork<TYPES>;
type Network = PushCdnNetwork<TYPES::SignatureKey>;
type Storage = TestStorage<TYPES>;
type AuctionResultsProvider = TestAuctionResultsProvider<TYPES>;
}
Expand Down
44 changes: 23 additions & 21 deletions crates/examples/combined/all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,27 +72,28 @@ async fn main() {
let private_address = format!("127.0.0.1:{private_port}");
let public_address = format!("127.0.0.1:{public_port}");

let config: cdn_broker::Config<TestingDef<TestTypes>> = cdn_broker::Config {
discovery_endpoint: discovery_endpoint.clone(),
public_advertise_endpoint: public_address.clone(),
public_bind_endpoint: public_address,
private_advertise_endpoint: private_address.clone(),
private_bind_endpoint: private_address,

keypair: KeyPair {
public_key: WrappedSignatureKey(broker_public_key),
private_key: broker_private_key.clone(),
},

metrics_bind_endpoint: None,
ca_cert_path: None,
ca_key_path: None,
global_memory_pool_size: Some(1024 * 1024 * 1024),
};
let config: cdn_broker::Config<TestingDef<<TestTypes as NodeType>::SignatureKey>> =
cdn_broker::Config {
discovery_endpoint: discovery_endpoint.clone(),
public_advertise_endpoint: public_address.clone(),
public_bind_endpoint: public_address,
private_advertise_endpoint: private_address.clone(),
private_bind_endpoint: private_address,

keypair: KeyPair {
public_key: WrappedSignatureKey(broker_public_key),
private_key: broker_private_key.clone(),
},

metrics_bind_endpoint: None,
ca_cert_path: None,
ca_key_path: None,
global_memory_pool_size: Some(1024 * 1024 * 1024),
};

// Create and spawn the broker
async_spawn(async move {
let broker: Broker<TestingDef<TestTypes>> =
let broker: Broker<TestingDef<<TestTypes as NodeType>::SignatureKey>> =
Broker::new(config).await.expect("broker failed to start");

// Error if we stopped unexpectedly
Expand Down Expand Up @@ -120,9 +121,10 @@ async fn main() {

// Spawn the marshal
async_spawn(async move {
let marshal: Marshal<TestingDef<TestTypes>> = Marshal::new(marshal_config)
.await
.expect("failed to spawn marshal");
let marshal: Marshal<TestingDef<<TestTypes as NodeType>::SignatureKey>> =
Marshal::new(marshal_config)
.await
.expect("failed to spawn marshal");

// Error if we stopped unexpectedly
if let Err(err) = marshal.start().await {
Expand Down
25 changes: 12 additions & 13 deletions crates/examples/infra/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ pub struct PushCdnDaRun<TYPES: NodeType> {
/// The underlying configuration
config: NetworkConfig<TYPES::SignatureKey>,
/// The underlying network
network: PushCdnNetwork<TYPES>,
network: PushCdnNetwork<TYPES::SignatureKey>,
}

#[async_trait]
Expand All @@ -616,12 +616,12 @@ impl<
>,
NODE: NodeImplementation<
TYPES,
Network = PushCdnNetwork<TYPES>,
Network = PushCdnNetwork<TYPES::SignatureKey>,
Storage = TestStorage<TYPES>,
AuctionResultsProvider = TestAuctionResultsProvider<TYPES>,
>,
V: Versions,
> RunDa<TYPES, PushCdnNetwork<TYPES>, NODE, V> for PushCdnDaRun<TYPES>
> RunDa<TYPES, PushCdnNetwork<TYPES::SignatureKey>, NODE, V> for PushCdnDaRun<TYPES>
where
<TYPES as NodeType>::ValidatedState: TestableState<TYPES>,
<TYPES as NodeType>::BlockPayload: TestableBlock<TYPES>,
Expand Down Expand Up @@ -665,7 +665,7 @@ where
PushCdnDaRun { config, network }
}

fn network(&self) -> PushCdnNetwork<TYPES> {
fn network(&self) -> PushCdnNetwork<TYPES::SignatureKey> {
self.network.clone()
}

Expand Down Expand Up @@ -808,15 +808,14 @@ where
.await;

// Initialize our CDN network
let cdn_network: PushCdnDaRun<TYPES> = <PushCdnDaRun<TYPES> as RunDa<
TYPES,
PushCdnNetwork<TYPES>,
PushCdnImpl,
V,
>>::initialize_networking(
config.clone(), libp2p_advertise_address
)
.await;
let cdn_network: PushCdnDaRun<TYPES> =
<PushCdnDaRun<TYPES> as RunDa<
TYPES,
PushCdnNetwork<TYPES::SignatureKey>,
PushCdnImpl,
V,
>>::initialize_networking(config.clone(), libp2p_advertise_address)
.await;

// Create our combined network config
let delay_duration = config
Expand Down
44 changes: 23 additions & 21 deletions crates/examples/push-cdn/all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,27 +78,28 @@ async fn main() {
let private_address = format!("127.0.0.1:{private_port}");
let public_address = format!("127.0.0.1:{public_port}");

let config: cdn_broker::Config<TestingDef<TestTypes>> = cdn_broker::Config {
discovery_endpoint: discovery_endpoint.clone(),
public_advertise_endpoint: public_address.clone(),
public_bind_endpoint: public_address,
private_advertise_endpoint: private_address.clone(),
private_bind_endpoint: private_address,

keypair: KeyPair {
public_key: WrappedSignatureKey(broker_public_key),
private_key: broker_private_key.clone(),
},

metrics_bind_endpoint: None,
ca_cert_path: None,
ca_key_path: None,
global_memory_pool_size: Some(1024 * 1024 * 1024),
};
let config: cdn_broker::Config<TestingDef<<TestTypes as NodeType>::SignatureKey>> =
cdn_broker::Config {
discovery_endpoint: discovery_endpoint.clone(),
public_advertise_endpoint: public_address.clone(),
public_bind_endpoint: public_address,
private_advertise_endpoint: private_address.clone(),
private_bind_endpoint: private_address,

keypair: KeyPair {
public_key: WrappedSignatureKey(broker_public_key),
private_key: broker_private_key.clone(),
},

metrics_bind_endpoint: None,
ca_cert_path: None,
ca_key_path: None,
global_memory_pool_size: Some(1024 * 1024 * 1024),
};

// Create and spawn the broker
async_spawn(async move {
let broker: Broker<TestingDef<TestTypes>> =
let broker: Broker<TestingDef<<TestTypes as NodeType>::SignatureKey>> =
Broker::new(config).await.expect("broker failed to start");

// Error if we stopped unexpectedly
Expand All @@ -124,9 +125,10 @@ async fn main() {

// Spawn the marshal
async_spawn(async move {
let marshal: Marshal<TestingDef<TestTypes>> = Marshal::new(marshal_config)
.await
.expect("failed to spawn marshal");
let marshal: Marshal<TestingDef<<TestTypes as NodeType>::SignatureKey>> =
Marshal::new(marshal_config)
.await
.expect("failed to spawn marshal");

// Error if we stopped unexpectedly
if let Err(err) = marshal.start().await {
Expand Down
4 changes: 2 additions & 2 deletions crates/examples/push-cdn/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ struct Args {
#[arg(long, default_value = "local_ip:1738")]
public_advertise_endpoint: String,

/// The broker-facing endpoint in `IP:port` form to bind to for connections from
/// The broker-facing endpoint in `IP:port` form to bind to for connections from
/// other brokers
#[arg(long, default_value = "0.0.0.0:1739")]
private_bind_endpoint: String,
Expand Down Expand Up @@ -92,7 +92,7 @@ async fn main() -> Result<()> {
<TestTypes as NodeType>::SignatureKey::generated_from_seed_indexed(key_hash.into(), 1337);

// Create config
let broker_config: Config<ProductionDef<TestTypes>> = Config {
let broker_config: Config<ProductionDef<<TestTypes as NodeType>::SignatureKey>> = Config {
ca_cert_path: args.ca_cert_path,
ca_key_path: args.ca_key_path,

Expand Down
4 changes: 3 additions & 1 deletion crates/examples/push-cdn/marshal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use cdn_marshal::{Config, Marshal};
use clap::Parser;
use hotshot::traits::implementations::ProductionDef;
use hotshot_example_types::node_types::TestTypes;
use hotshot_types::traits::node_implementation::NodeType;
use tracing_subscriber::EnvFilter;

// TODO: forall, add logging where we need it
Expand Down Expand Up @@ -80,7 +81,8 @@ async fn main() -> Result<()> {
};

// Create new `Marshal` from the config
let marshal = Marshal::<ProductionDef<TestTypes>>::new(config).await?;
let marshal =
Marshal::<ProductionDef<<TestTypes as NodeType>::SignatureKey>>::new(config).await?;

// Start the main loop, consuming it
marshal.start().await?;
Expand Down
3 changes: 2 additions & 1 deletion crates/examples/push-cdn/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use hotshot_example_types::{
auction_results_provider_types::TestAuctionResultsProvider, state_types::TestTypes,
storage_types::TestStorage,
};
use hotshot_types::traits::node_implementation::NodeType;
use serde::{Deserialize, Serialize};

use crate::infra::PushCdnDaRun;
Expand All @@ -18,7 +19,7 @@ use crate::infra::PushCdnDaRun;
pub struct NodeImpl {}

/// Convenience type alias
pub type Network = PushCdnNetwork<TestTypes>;
pub type Network = PushCdnNetwork<<TestTypes as NodeType>::SignatureKey>;

impl NodeImplementation<TestTypes> for NodeImpl {
type Network = Network;
Expand Down
3 changes: 2 additions & 1 deletion crates/hotshot/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use hotshot_task_impls::{
network,
network::{NetworkEventTaskState, NetworkMessageTaskState},
request::NetworkRequestState,
response::{run_response_task, NetworkResponseState, RequestReceiver},
response::{run_response_task, NetworkResponseState},
transactions::TransactionTaskState,
upgrade::UpgradeTaskState,
vid::VidTaskState,
Expand All @@ -37,6 +37,7 @@ use hotshot_types::{
constants::EVENT_CHANNEL_SIZE,
data::QuorumProposal,
message::{Messages, Proposal},
request_response::RequestReceiver,
traits::{
network::ConnectedNetwork,
node_implementation::{ConsensusTime, NodeImplementation, NodeType},
Expand Down
15 changes: 8 additions & 7 deletions crates/hotshot/src/traits/networking/combined_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ use hotshot_types::{
COMBINED_NETWORK_MIN_PRIMARY_FAILURES, COMBINED_NETWORK_PRIMARY_CHECK_INTERVAL,
},
data::ViewNumber,
request_response::NetworkMsgResponseChannel,
traits::{
network::{BroadcastDelay, ConnectedNetwork, ResponseChannel, Topic},
network::{BroadcastDelay, ConnectedNetwork, Topic},
node_implementation::NodeType,
},
BoxSyncFuture,
Expand Down Expand Up @@ -93,7 +94,7 @@ impl<TYPES: NodeType> CombinedNetworks<TYPES> {
/// Panics if `COMBINED_NETWORK_CACHE_SIZE` is 0
#[must_use]
pub fn new(
primary_network: PushCdnNetwork<TYPES>,
primary_network: PushCdnNetwork<TYPES::SignatureKey>,
secondary_network: Libp2pNetwork<TYPES::SignatureKey>,
delay_duration: Option<Duration>,
) -> Self {
Expand All @@ -120,7 +121,7 @@ impl<TYPES: NodeType> CombinedNetworks<TYPES> {

/// Get a ref to the primary network
#[must_use]
pub fn primary(&self) -> &PushCdnNetwork<TYPES> {
pub fn primary(&self) -> &PushCdnNetwork<TYPES::SignatureKey> {
&self.networks.0
}

Expand Down Expand Up @@ -249,7 +250,7 @@ impl<TYPES: NodeType> CombinedNetworks<TYPES> {
/// on the tuple
#[derive(Clone)]
pub struct UnderlyingCombinedNetworks<TYPES: NodeType>(
pub PushCdnNetwork<TYPES>,
pub PushCdnNetwork<TYPES::SignatureKey>,
pub Libp2pNetwork<TYPES::SignatureKey>,
);

Expand All @@ -265,7 +266,7 @@ impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES> for CombinedNetwor
secondary_network_delay: Duration,
) -> AsyncGenerator<Arc<Self>> {
let generators = (
<PushCdnNetwork<TYPES> as TestableNetworkingImplementation<TYPES>>::generator(
<PushCdnNetwork<TYPES::SignatureKey> as TestableNetworkingImplementation<TYPES>>::generator(
expected_node_count,
num_bootstrap,
network_id,
Expand All @@ -291,7 +292,7 @@ impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES> for CombinedNetwor
Box::pin(async move {
// Generate the CDN network
let cdn = gen0.await;
let cdn = Arc::<PushCdnNetwork<TYPES>>::into_inner(cdn).unwrap();
let cdn = Arc::<PushCdnNetwork<TYPES::SignatureKey>>::into_inner(cdn).unwrap();

// Generate the p2p network
let p2p = gen1.await;
Expand Down Expand Up @@ -345,7 +346,7 @@ impl<TYPES: NodeType> ConnectedNetwork<TYPES::SignatureKey> for CombinedNetworks

async fn spawn_request_receiver_task(
&self,
) -> Option<mpsc::Receiver<(Vec<u8>, ResponseChannel<Vec<u8>>)>> {
) -> Option<mpsc::Receiver<(Vec<u8>, NetworkMsgResponseChannel<Vec<u8>>)>> {
self.secondary().spawn_request_receiver_task().await
}

Expand Down
Loading

0 comments on commit 8411537

Please sign in to comment.