diff --git a/nomos-services/data-availability/tests/Cargo.toml b/nomos-services/data-availability/tests/Cargo.toml index 53a87f63b..0baffd239 100644 --- a/nomos-services/data-availability/tests/Cargo.toml +++ b/nomos-services/data-availability/tests/Cargo.toml @@ -39,6 +39,7 @@ tokio-stream = "0.1.15" tempfile = "3.6" tracing = "0.1" time = "0.3" +tracing-subscriber = "0.2.25" [dev-dependencies] blake2 = { version = "0.10" } diff --git a/nomos-services/data-availability/tests/src/indexer_integration.rs b/nomos-services/data-availability/tests/src/indexer_integration.rs index 249f73b7e..7696d68f3 100644 --- a/nomos-services/data-availability/tests/src/indexer_integration.rs +++ b/nomos-services/data-availability/tests/src/indexer_integration.rs @@ -33,6 +33,7 @@ use tempfile::{NamedTempFile, TempDir}; use time::OffsetDateTime; use tokio_stream::wrappers::BroadcastStream; use tokio_stream::StreamExt; + // internal use crate::common::*; @@ -229,6 +230,17 @@ fn test_indexer() { let blob_hash = ::id(&blobs[0]); let blob_info = BlobInfo::new(blob_hash, meta); + // Create orphan blob metadata - without having actual blob stored + let orphan_app_id = [10u8; 32]; + let orphan_index = 2.into(); + + let orphan_meta = Metadata::new(orphan_app_id, orphan_index); + let mut orphan_blob_hash = [0u8; 32]; + thread_rng().fill(&mut orphan_blob_hash[..]); + + let orphan_blob_info = BlobInfo::new(orphan_blob_hash, orphan_meta); + + // Prepare indexes for blobs let mut node_1_blob_0_idx = Vec::new(); node_1_blob_0_idx.extend_from_slice(&blob_hash); node_1_blob_0_idx.extend_from_slice(&0u16.to_be_bytes()); @@ -315,6 +327,18 @@ fn test_indexer() { .unwrap(); let _ = mempool_rx.await.unwrap(); + // Put orphan_blob_info into the mempool. + let (mempool2_tx, mempool2_rx) = tokio::sync::oneshot::channel(); + mempool_outbound + .send(nomos_mempool::MempoolMsg::Add { + payload: orphan_blob_info, + key: orphan_blob_hash, + reply_channel: mempool2_tx, + }) + .await + .unwrap(); + let _ = mempool2_rx.await.unwrap(); + // Wait for block in the network. let timeout = tokio::time::sleep(Duration::from_secs(INDEXER_TEST_MAX_SECONDS)); tokio::pin!(timeout); @@ -347,6 +371,36 @@ fn test_indexer() { .unwrap(); let mut app_id_blobs = indexer_rx.await.unwrap(); + // Request range of orphan blob from indexer - nothing is expected in return + let (indexer2_tx, indexer2_rx) = tokio::sync::oneshot::channel(); + indexer_outbound + .send(nomos_da_indexer::DaMsg::GetRange { + app_id: orphan_app_id, + range: 0.into()..2.into(), + reply_channel: indexer2_tx, + }) + .await + .unwrap(); + let orphan_app_id_blobs = indexer2_rx.await.unwrap(); + + // Indexer should not return any blobs for orphan app_id + for v in orphan_app_id_blobs { + assert!(v.1.is_empty()); + } + + // Mempool should still contain orphan_blob_info + let (mempool3_tx, mempool3_rx) = tokio::sync::oneshot::channel(); + mempool_outbound + .send(nomos_mempool::MempoolMsg::Status { + items: vec![orphan_blob_hash], + reply_channel: mempool3_tx, + }) + .await + .unwrap(); + let blocks_with_orphan_blob_hash = mempool3_rx.await.unwrap(); + + assert_eq!(blocks_with_orphan_blob_hash.len(), 1); + // Since we've only attested to blob_info at idx 0, the first // item should have "some" data, other indexes should be None. app_id_blobs.sort_by(|(a, _), (b, _)| a.partial_cmp(b).unwrap()); diff --git a/nomos-services/data-availability/tests/src/verifier_integration.rs b/nomos-services/data-availability/tests/src/verifier_integration.rs index fe53abff2..0e47dff51 100644 --- a/nomos-services/data-availability/tests/src/verifier_integration.rs +++ b/nomos-services/data-availability/tests/src/verifier_integration.rs @@ -19,17 +19,24 @@ use nomos_libp2p::SwarmConfig; use rand::{thread_rng, Rng}; use tempfile::{NamedTempFile, TempDir}; use time::OffsetDateTime; +use tracing_subscriber::fmt::TestWriter; +use tracing_subscriber::EnvFilter; // internal use crate::common::*; #[test] fn test_verifier() { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .compact() + .with_writer(TestWriter::default()) + .try_init(); let performed_tx = Arc::new(AtomicBool::new(false)); let performed_rx = performed_tx.clone(); let is_success_tx = Arc::new(AtomicBool::new(false)); let is_success_rx = is_success_tx.clone(); - let mut ids = vec![[0; 32]; 2]; + let mut ids = vec![[0; 32]; 3]; for id in &mut ids { thread_rng().fill(id); } @@ -70,28 +77,42 @@ fn test_verifier() { port: 7774, ..Default::default() }; + + let swarm_config3 = SwarmConfig { + port: 7775, + ..Default::default() + }; + let mix_configs = new_mix_configs(vec![ Multiaddr::from_str("/ip4/127.0.0.1/udp/7783/quic-v1").unwrap(), Multiaddr::from_str("/ip4/127.0.0.1/udp/7784/quic-v1").unwrap(), + Multiaddr::from_str("/ip4/127.0.0.1/udp/7785/quic-v1").unwrap(), ]); let blobs_dir = TempDir::new().unwrap().path().to_path_buf(); let (node1_sk, _) = generate_blst_hex_keys(); let (node2_sk, _) = generate_blst_hex_keys(); + let (node3_sk, _) = generate_blst_hex_keys(); let client_zone = new_client(NamedTempFile::new().unwrap().path().to_path_buf()); let (peer_sk_1, peer_id_1) = generate_ed25519_sk_peerid(); let (peer_sk_2, peer_id_2) = generate_ed25519_sk_peerid(); + let (peer_sk_3, peer_id_3) = generate_ed25519_sk_peerid(); let addr_1 = Multiaddr::from_str("/ip4/127.0.0.1/udp/8880/quic-v1").unwrap(); let addr_2 = Multiaddr::from_str("/ip4/127.0.0.1/udp/8881/quic-v1").unwrap(); + let addr_3 = Multiaddr::from_str("/ip4/127.0.0.1/udp/8882/quic-v1").unwrap(); - let peer_addresses = vec![(peer_id_1, addr_1.clone()), (peer_id_2, addr_2.clone())]; + let peer_addresses = vec![ + (peer_id_1, addr_1.clone()), + (peer_id_2, addr_2.clone()), + (peer_id_3, addr_3.clone()), + ]; let num_samples = 1; - let num_subnets = 2; + let num_subnets = 3; let nodes_per_subnet = 1; let node1 = new_node( @@ -141,7 +162,7 @@ fn test_verifier() { global_params_path: GLOBAL_PARAMS_PATH.into(), }, TestDaNetworkSettings { - peer_addresses, + peer_addresses: peer_addresses.clone(), listening_address: addr_2, num_subnets, num_samples, @@ -150,9 +171,37 @@ fn test_verifier() { }, ); - let node1_verifier = node1.handle().relay::(); + let node3 = new_node( + &LeaderConfig { + notes: vec![notes[2].clone()], + nf_sk: sks[2], + }, + &ledger_config, + &genesis_state, + &time_config, + &swarm_config3, + &mix_configs[2], + NamedTempFile::new().unwrap().path().to_path_buf(), + &blobs_dir, + vec![node_address(&swarm_config2)], + KzgrsDaVerifierSettings { + sk: node3_sk, + index: [2].into(), + global_params_path: GLOBAL_PARAMS_PATH.into(), + }, + TestDaNetworkSettings { + peer_addresses, + listening_address: addr_3, + num_subnets, + num_samples, + nodes_per_subnet, + node_key: peer_sk_3, + }, + ); + let node1_verifier = node1.handle().relay::(); let node2_verifier = node2.handle().relay::(); + let node3_verifier = node3.handle().relay::(); client_zone.spawn(async move { let node1_verifier = node1_verifier.connect().await.unwrap(); @@ -161,9 +210,13 @@ fn test_verifier() { let node2_verifier = node2_verifier.connect().await.unwrap(); let (node2_reply_tx, node2_reply_rx) = tokio::sync::oneshot::channel(); + let node3_verifier = node3_verifier.connect().await.unwrap(); + let (node3_reply_tx, node3_reply_rx) = tokio::sync::oneshot::channel::>(); + let verifiers = vec![ (node1_verifier, node1_reply_tx), (node2_verifier, node2_reply_tx), + (node3_verifier, node3_reply_tx), ]; // Encode data @@ -176,32 +229,34 @@ fn test_verifier() { let encoded_data = encoder.encode(&data).unwrap(); let columns: Vec<_> = encoded_data.extended_data.columns().collect(); + drop(node3_reply_rx); + for (i, (verifier, reply_tx)) in verifiers.into_iter().enumerate() { - let column = &columns[i]; + let index = i % 2; + let column = &columns[index]; let da_blob = DaBlob { column: column.clone(), - column_idx: i + column_idx: index .try_into() .expect("Column index shouldn't overflow the target type"), - column_commitment: encoded_data.column_commitments[i], + column_commitment: encoded_data.column_commitments[index], aggregated_column_commitment: encoded_data.aggregated_column_commitment, - aggregated_column_proof: encoded_data.aggregated_column_proofs[i], + aggregated_column_proof: encoded_data.aggregated_column_proofs[index], rows_commitments: encoded_data.row_commitments.clone(), rows_proofs: encoded_data .rows_proofs .iter() - .map(|proofs| proofs.get(i).cloned().unwrap()) + .map(|proofs| proofs.get(index).cloned().unwrap()) .collect(), }; - verifier - .send(nomos_da_verifier::DaVerifierMsg::AddBlob { - blob: da_blob, - reply_channel: reply_tx, - }) - .await - .unwrap(); + let add_blob_message = nomos_da_verifier::DaVerifierMsg::AddBlob { + blob: da_blob, + reply_channel: reply_tx, + }; + + verifier.send(add_blob_message).await.unwrap(); } // Wait for response from the verifier.