Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Data dispersal storage tests #921

Merged
merged 10 commits into from
Nov 28, 2024
1 change: 1 addition & 0 deletions nomos-services/data-availability/tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ tokio-stream = "0.1.15"
tempfile = "3.6"
tracing = "0.1"
time = "0.3"
tracing-subscriber = "0.2.25"

[dev-dependencies]
blake2 = { version = "0.10" }
Expand Down
54 changes: 54 additions & 0 deletions nomos-services/data-availability/tests/src/indexer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use tempfile::{NamedTempFile, TempDir};
use time::OffsetDateTime;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt;

// internal
use crate::common::*;

Expand Down Expand Up @@ -229,6 +230,17 @@ fn test_indexer() {
let blob_hash = <DaBlob as Blob>::id(&blobs[0]);
let blob_info = BlobInfo::new(blob_hash, meta);

// Create orphan blob metadata - without having actual blob stored
let orphan_app_id = [10u8; 32];
let orphan_index = 2.into();

let orphan_meta = Metadata::new(orphan_app_id, orphan_index);
let mut orphan_blob_hash = [0u8; 32];
thread_rng().fill(&mut orphan_blob_hash[..]);

let orphan_blob_info = BlobInfo::new(orphan_blob_hash, orphan_meta);

// Prepare indexes for blobs
let mut node_1_blob_0_idx = Vec::new();
node_1_blob_0_idx.extend_from_slice(&blob_hash);
node_1_blob_0_idx.extend_from_slice(&0u16.to_be_bytes());
Expand Down Expand Up @@ -315,6 +327,18 @@ fn test_indexer() {
.unwrap();
let _ = mempool_rx.await.unwrap();

// Put orphan_blob_info into the mempool.
let (mempool2_tx, mempool2_rx) = tokio::sync::oneshot::channel();
mempool_outbound
.send(nomos_mempool::MempoolMsg::Add {
payload: orphan_blob_info,
key: orphan_blob_hash,
reply_channel: mempool2_tx,
})
.await
.unwrap();
let _ = mempool2_rx.await.unwrap();

// Wait for block in the network.
let timeout = tokio::time::sleep(Duration::from_secs(INDEXER_TEST_MAX_SECONDS));
tokio::pin!(timeout);
Expand Down Expand Up @@ -347,6 +371,36 @@ fn test_indexer() {
.unwrap();
let mut app_id_blobs = indexer_rx.await.unwrap();

// Request range of orphan blob from indexer - nothing is expected in return
let (indexer2_tx, indexer2_rx) = tokio::sync::oneshot::channel();
indexer_outbound
.send(nomos_da_indexer::DaMsg::GetRange {
app_id: orphan_app_id,
range: 0.into()..2.into(),
reply_channel: indexer2_tx,
})
.await
.unwrap();
let orphan_app_id_blobs = indexer2_rx.await.unwrap();

// Indexer should not return any blobs for orphan app_id
for v in orphan_app_id_blobs {
assert!(v.1.is_empty());
}

// Mempool should still contain orphan_blob_info
let (mempool3_tx, mempool3_rx) = tokio::sync::oneshot::channel();
mempool_outbound
.send(nomos_mempool::MempoolMsg::Status {
items: vec![orphan_blob_hash],
reply_channel: mempool3_tx,
})
.await
.unwrap();
let blocks_with_orphan_blob_hash = mempool3_rx.await.unwrap();

assert_eq!(blocks_with_orphan_blob_hash.len(), 1);

// Since we've only attested to blob_info at idx 0, the first
// item should have "some" data, other indexes should be None.
app_id_blobs.sort_by(|(a, _), (b, _)| a.partial_cmp(b).unwrap());
Expand Down
89 changes: 72 additions & 17 deletions nomos-services/data-availability/tests/src/verifier_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,24 @@ use nomos_libp2p::SwarmConfig;
use rand::{thread_rng, Rng};
use tempfile::{NamedTempFile, TempDir};
use time::OffsetDateTime;
use tracing_subscriber::fmt::TestWriter;
use tracing_subscriber::EnvFilter;
// internal
use crate::common::*;

#[test]
fn test_verifier() {
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.compact()
.with_writer(TestWriter::default())
.try_init();
let performed_tx = Arc::new(AtomicBool::new(false));
let performed_rx = performed_tx.clone();
let is_success_tx = Arc::new(AtomicBool::new(false));
let is_success_rx = is_success_tx.clone();

let mut ids = vec![[0; 32]; 2];
let mut ids = vec![[0; 32]; 3];
for id in &mut ids {
thread_rng().fill(id);
}
Expand Down Expand Up @@ -70,28 +77,42 @@ fn test_verifier() {
port: 7774,
..Default::default()
};

let swarm_config3 = SwarmConfig {
port: 7775,
..Default::default()
};

let mix_configs = new_mix_configs(vec![
Multiaddr::from_str("/ip4/127.0.0.1/udp/7783/quic-v1").unwrap(),
Multiaddr::from_str("/ip4/127.0.0.1/udp/7784/quic-v1").unwrap(),
Multiaddr::from_str("/ip4/127.0.0.1/udp/7785/quic-v1").unwrap(),
]);

let blobs_dir = TempDir::new().unwrap().path().to_path_buf();

let (node1_sk, _) = generate_blst_hex_keys();
let (node2_sk, _) = generate_blst_hex_keys();
let (node3_sk, _) = generate_blst_hex_keys();

let client_zone = new_client(NamedTempFile::new().unwrap().path().to_path_buf());

let (peer_sk_1, peer_id_1) = generate_ed25519_sk_peerid();
let (peer_sk_2, peer_id_2) = generate_ed25519_sk_peerid();
let (peer_sk_3, peer_id_3) = generate_ed25519_sk_peerid();

let addr_1 = Multiaddr::from_str("/ip4/127.0.0.1/udp/8880/quic-v1").unwrap();
let addr_2 = Multiaddr::from_str("/ip4/127.0.0.1/udp/8881/quic-v1").unwrap();
let addr_3 = Multiaddr::from_str("/ip4/127.0.0.1/udp/8882/quic-v1").unwrap();

let peer_addresses = vec![(peer_id_1, addr_1.clone()), (peer_id_2, addr_2.clone())];
let peer_addresses = vec![
(peer_id_1, addr_1.clone()),
(peer_id_2, addr_2.clone()),
(peer_id_3, addr_3.clone()),
];

let num_samples = 1;
let num_subnets = 2;
let num_subnets = 3;
let nodes_per_subnet = 1;

let node1 = new_node(
Expand Down Expand Up @@ -141,7 +162,7 @@ fn test_verifier() {
global_params_path: GLOBAL_PARAMS_PATH.into(),
},
TestDaNetworkSettings {
peer_addresses,
peer_addresses: peer_addresses.clone(),
listening_address: addr_2,
num_subnets,
num_samples,
Expand All @@ -150,9 +171,37 @@ fn test_verifier() {
},
);

let node1_verifier = node1.handle().relay::<DaVerifier>();
let node3 = new_node(
&LeaderConfig {
notes: vec![notes[2].clone()],
nf_sk: sks[2],
},
&ledger_config,
&genesis_state,
&time_config,
&swarm_config3,
&mix_configs[2],
NamedTempFile::new().unwrap().path().to_path_buf(),
&blobs_dir,
vec![node_address(&swarm_config2)],
KzgrsDaVerifierSettings {
sk: node3_sk,
index: [2].into(),
global_params_path: GLOBAL_PARAMS_PATH.into(),
},
TestDaNetworkSettings {
peer_addresses,
listening_address: addr_3,
num_subnets,
num_samples,
nodes_per_subnet,
node_key: peer_sk_3,
},
);

let node1_verifier = node1.handle().relay::<DaVerifier>();
let node2_verifier = node2.handle().relay::<DaVerifier>();
let node3_verifier = node3.handle().relay::<DaVerifier>();

client_zone.spawn(async move {
let node1_verifier = node1_verifier.connect().await.unwrap();
Expand All @@ -161,9 +210,13 @@ fn test_verifier() {
let node2_verifier = node2_verifier.connect().await.unwrap();
let (node2_reply_tx, node2_reply_rx) = tokio::sync::oneshot::channel();

let node3_verifier = node3_verifier.connect().await.unwrap();
let (node3_reply_tx, node3_reply_rx) = tokio::sync::oneshot::channel::<Option<()>>();

let verifiers = vec![
(node1_verifier, node1_reply_tx),
(node2_verifier, node2_reply_tx),
(node3_verifier, node3_reply_tx),
];

// Encode data
Expand All @@ -176,32 +229,34 @@ fn test_verifier() {
let encoded_data = encoder.encode(&data).unwrap();
let columns: Vec<_> = encoded_data.extended_data.columns().collect();

drop(node3_reply_rx);

for (i, (verifier, reply_tx)) in verifiers.into_iter().enumerate() {
let column = &columns[i];
let index = i % 2;
let column = &columns[index];

let da_blob = DaBlob {
column: column.clone(),
column_idx: i
column_idx: index
.try_into()
.expect("Column index shouldn't overflow the target type"),
column_commitment: encoded_data.column_commitments[i],
column_commitment: encoded_data.column_commitments[index],
aggregated_column_commitment: encoded_data.aggregated_column_commitment,
aggregated_column_proof: encoded_data.aggregated_column_proofs[i],
aggregated_column_proof: encoded_data.aggregated_column_proofs[index],
rows_commitments: encoded_data.row_commitments.clone(),
rows_proofs: encoded_data
.rows_proofs
.iter()
.map(|proofs| proofs.get(i).cloned().unwrap())
.map(|proofs| proofs.get(index).cloned().unwrap())
.collect(),
};

verifier
.send(nomos_da_verifier::DaVerifierMsg::AddBlob {
blob: da_blob,
reply_channel: reply_tx,
})
.await
.unwrap();
let add_blob_message = nomos_da_verifier::DaVerifierMsg::AddBlob {
blob: da_blob,
reply_channel: reply_tx,
};

verifier.send(add_blob_message).await.unwrap();
}

// Wait for response from the verifier.
Expand Down
Loading