diff --git a/CHANGELOG.md b/CHANGELOG.md index 4fc8700..d471167 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - Added `chunk_transmit_throttle` config option, which adds a throttle delay between transmission of chunks. - Added `radio_address` config option, which hardcodes an address for `myceli` to send responses to. +- Added `RequestAvailableDags` API, and association between DAGs and filename. ### Changed diff --git a/local-storage/src/block.rs b/local-storage/src/block.rs index 548d144..9205c07 100644 --- a/local-storage/src/block.rs +++ b/local-storage/src/block.rs @@ -9,6 +9,7 @@ use std::str::FromStr; #[derive(PartialEq)] pub struct StoredBlock { pub cid: String, + pub filename: Option, pub data: Vec, pub links: Vec, } @@ -30,6 +31,7 @@ impl fmt::Debug for StoredBlock { f.debug_struct("StoredBlock") .field("cid", &cid_str) + .field("filename", &self.filename) .field("data", &self.data.len()) .field("links", &self.links.len()) .finish() @@ -99,6 +101,7 @@ mod tests { cid: b.cid().to_string(), data: b.data().to_vec(), links, + filename: None, }; stored_blocks.push(stored); @@ -214,6 +217,7 @@ mod tests { cid: cid.to_string(), data, links: vec![], + filename: None, }, ); @@ -236,6 +240,7 @@ mod tests { cid: cid.to_string(), data, links: vec![], + filename: None, }); assert_eq!( diff --git a/local-storage/src/provider.rs b/local-storage/src/provider.rs index a78b6e5..6ea747b 100644 --- a/local-storage/src/provider.rs +++ b/local-storage/src/provider.rs @@ -1,7 +1,7 @@ use crate::{block::StoredBlock, error::StorageError}; use anyhow::{bail, Result}; -use rusqlite::Connection; +use rusqlite::{params_from_iter, Connection}; pub trait StorageProvider { // Import a stored block @@ -12,7 +12,9 @@ pub trait StorageProvider { fn get_block_by_cid(&self, cid: &str) -> Result; // Requests the links associated with the given CID fn get_links_by_cid(&self, cid: &str) -> Result>; - fn list_available_dags(&self) -> Result>; + fn list_available_dags(&self) -> Result>; + // Attaches filename to dag + fn name_dag(&self, cid: &str, file_name: &str) -> Result<()>; fn get_missing_cid_blocks(&self, cid: &str) -> Result>; fn get_dag_blocks_by_window( &self, @@ -22,7 +24,6 @@ pub trait StorageProvider { ) -> Result>; fn get_all_dag_cids(&self, cid: &str) -> Result>; fn get_all_dag_blocks(&self, cid: &str) -> Result>; - fn get_all_blocks_under_cid(&self, cid: &str) -> Result>; } pub struct SqliteStorageProvider { @@ -42,6 +43,7 @@ impl SqliteStorageProvider { "CREATE TABLE IF NOT EXISTS blocks ( id INTEGER PRIMARY KEY, cid TEXT NOT NULL, + filename TEXT, data BLOB )", (), @@ -75,13 +77,64 @@ impl SqliteStorageProvider { Ok(()) } + + fn get_blocks_recursive_query( + &self, + cid: &str, + offset: Option, + window_size: Option, + ) -> Result> { + let mut base_query = " + WITH RECURSIVE cids(x,y,z) AS ( + SELECT cid,data,filename FROM blocks WHERE cid = (?1) + UNION + SELECT cid,data,filename FROM blocks b + INNER JOIN links l ON b.cid==l.block_cid + INNER JOIN cids ON (root_cid=x) + ) + SELECT x,y,z FROM cids + " + .to_string(); + let mut params = vec![cid.to_string()]; + + if let Some(offset) = offset { + if let Some(window_size) = window_size { + base_query.push_str(" LIMIT (?2) OFFSET (?3);"); + params.push(format!("{window_size}")); + params.push(format!("{offset}")); + } + } + let params = params_from_iter(params.into_iter()); + let blocks = self + .conn + .prepare(&base_query)? + .query_map(params, |row| { + let cid_str: String = row.get(0)?; + let data: Vec = row.get(1)?; + let filename: Option = row.get(2).ok(); + let links = match self.get_links_by_cid(&cid_str) { + Ok(links) => links, + Err(_) => vec![], + }; + Ok(StoredBlock { + cid: cid_str, + data, + links, + filename, + }) + })? + .filter_map(|b| b.ok()) + .collect(); + + Ok(blocks) + } } impl StorageProvider for SqliteStorageProvider { fn import_block(&self, block: &StoredBlock) -> Result<()> { self.conn.execute( - "INSERT OR IGNORE INTO blocks (cid, data) VALUES (?1, ?2)", - (&block.cid, &block.data), + "INSERT OR IGNORE INTO blocks (cid, data, filename) VALUES (?1, ?2, ?3)", + (&block.cid, &block.data, &block.filename), )?; // TODO: Should we have another indicator for root blocks that isn't just the number of links? // TODO: This logic should probably get pulled up and split into two parts: @@ -145,16 +198,18 @@ impl StorageProvider for SqliteStorageProvider { fn get_block_by_cid(&self, cid: &str) -> Result { match self.conn.query_row( - "SELECT cid, data FROM blocks b + "SELECT cid, data, filename FROM blocks b WHERE cid == (?1)", [&cid], |row| { let cid_str: String = row.get(0)?; let data: Vec = row.get(1)?; + let filename: Option = row.get(2).ok(); Ok(StoredBlock { cid: cid_str, data, links: vec![], + filename, }) }, ) { @@ -166,13 +221,14 @@ impl StorageProvider for SqliteStorageProvider { } } - fn list_available_dags(&self) -> Result> { + fn list_available_dags(&self) -> Result> { let roots = self .conn - .prepare("SELECT DISTINCT root_cid FROM links")? + .prepare("SELECT DISTINCT cid, filename FROM blocks")? .query_map([], |row| { let cid_str: String = row.get(0)?; - Ok(cid_str) + let filename_str: String = row.get(1)?; + Ok((cid_str, filename_str)) })? // TODO: Correctly catch/log/handle errors here .filter_map(|cid| cid.ok()) @@ -180,11 +236,28 @@ impl StorageProvider for SqliteStorageProvider { Ok(roots) } + fn name_dag(&self, cid: &str, file_name: &str) -> Result<()> { + self.conn.execute( + "UPDATE blocks SET filename = ?1 WHERE cid = ?2", + (file_name, cid), + )?; + Ok(()) + } + fn get_missing_cid_blocks(&self, cid: &str) -> Result> { // First get all block cid+id associated with root cid let blocks: Vec<(String, Option)> = self .conn - .prepare("SELECT block_cid, block_id FROM links WHERE root_cid == (?1)")? + .prepare( + " + WITH RECURSIVE cids(x,y) AS ( + SELECT cid, id FROM blocks WHERE cid = (?1) + UNION + SELECT block_cid, block_id FROM links JOIN cids ON root_cid=x + ) + SELECT x,y FROM cids; + ", + )? .query_map([cid], |row| { let block_cid: String = row.get(0)?; let block_id: Option = row.get(1)?; @@ -217,39 +290,7 @@ impl StorageProvider for SqliteStorageProvider { offset: u32, window_size: u32, ) -> Result> { - let blocks: Vec = self - .conn - .prepare( - " - WITH RECURSIVE cids(x,y) AS ( - SELECT cid,data FROM blocks WHERE cid = (?1) - UNION - SELECT cid,data FROM blocks b - INNER JOIN links l ON b.cid==l.block_cid - INNER JOIN cids ON (root_cid=x) - ) - SELECT x,y FROM cids - LIMIT (?2) OFFSET (?3); - ", - )? - .query_map( - [cid, &format!("{window_size}"), &format!("{offset}")], - |row| { - let cid_str: String = row.get(0)?; - let data: Vec = row.get(1)?; - let links = match self.get_links_by_cid(&cid_str) { - Ok(links) => links, - Err(_) => vec![], - }; - Ok(StoredBlock { - cid: cid_str, - data, - links, - }) - }, - )? - .filter_map(|b| b.ok()) - .collect(); + let blocks = self.get_blocks_recursive_query(cid, Some(offset), Some(window_size))?; Ok(blocks) } @@ -278,71 +319,7 @@ impl StorageProvider for SqliteStorageProvider { } fn get_all_dag_blocks(&self, cid: &str) -> Result> { - let blocks: Vec = self - .conn - .prepare( - " - WITH RECURSIVE cids(x,y) AS ( - SELECT cid,data FROM blocks WHERE cid = (?1) - UNION - SELECT cid,data FROM blocks b - INNER JOIN links l ON b.cid==l.block_cid - INNER JOIN cids ON (root_cid=x) - ) - SELECT x,y FROM cids - ", - )? - .query_map([cid], |row| { - let cid_str: String = row.get(0)?; - let data: Vec = row.get(1)?; - let links = match self.get_links_by_cid(&cid_str) { - Ok(links) => links, - Err(_) => vec![], - }; - Ok(StoredBlock { - cid: cid_str, - data, - links, - }) - })? - .filter_map(|b| b.ok()) - .collect(); - - Ok(blocks) - } - - fn get_all_blocks_under_cid(&self, cid: &str) -> Result> { - let blocks: Vec = self - .conn - .prepare( - " - WITH RECURSIVE cids(x,y) AS ( - SELECT cid,data FROM blocks WHERE cid = (?1) - UNION - SELECT cid,data FROM blocks b - INNER JOIN links l ON b.cid==l.block_cid - INNER JOIN cids ON (root_cid=x) - ) - SELECT x,y FROM cids - ", - )? - .query_map([cid], |row| { - let cid_str: String = row.get(0)?; - let data: Vec = row.get(1)?; - let links = match self.get_links_by_cid(&cid_str) { - Ok(links) => links, - Err(_) => vec![], - }; - Ok(StoredBlock { - cid: cid_str, - data, - links, - }) - })? - .filter_map(|b| b.ok()) - .collect(); - - Ok(blocks) + self.get_blocks_recursive_query(cid, None, None) } } @@ -390,6 +367,7 @@ pub mod tests { cid: cid_str.to_string(), data: b"1010101".to_vec(), links: vec![], + filename: None, }; harness.provider.import_block(&block).unwrap(); @@ -418,6 +396,7 @@ pub mod tests { cid: c.to_string(), data: b"123412341234".to_vec(), links: vec![], + filename: None, }; harness.provider.import_block(&block).unwrap() }); @@ -439,6 +418,7 @@ pub mod tests { cid: cid.to_string(), data: b"1010101".to_vec(), links: vec![], + filename: None, }; harness.provider.import_block(&block).unwrap(); @@ -460,6 +440,7 @@ pub mod tests { cid: cid.to_string(), data: b"1010101".to_vec(), links: vec![block_cid.to_string()], + filename: None, }; harness.provider.import_block(&block).unwrap(); @@ -481,6 +462,7 @@ pub mod tests { cid: cid.to_string(), data: vec![], links: vec![block_cid.to_string()], + filename: None, }; harness.provider.import_block(&block).unwrap(); @@ -512,12 +494,14 @@ pub mod tests { cid: cid_str.to_string(), data: vec![], links: vec![block_cid.to_string()], + filename: None, }; let child_block = StoredBlock { cid: block_cid.to_string(), data: b"101293910101".to_vec(), links: vec![], + filename: None, }; harness.provider.import_block(&block).unwrap(); diff --git a/local-storage/src/storage.rs b/local-storage/src/storage.rs index 8e6f19e..a6a04aa 100644 --- a/local-storage/src/storage.rs +++ b/local-storage/src/storage.rs @@ -50,6 +50,7 @@ impl Storage { cid: b.cid().to_string(), data: b.data().to_vec(), links, + filename: None, }; // First validate each block if let Err(e) = stored.validate() { @@ -68,7 +69,11 @@ impl Storage { } } if let Some(root_cid) = root_cid { + if let Some(filename) = path.file_name().and_then(|p| p.to_str()) { + self.provider.name_dag(&root_cid, filename)?; + } info!("Imported path {} to {}", path.display(), root_cid); + info!("Importing {} blocks for {root_cid}", blocks.len()); Ok(root_cid) } else { bail!("Failed to find root block for {path:?}") @@ -116,7 +121,7 @@ impl Storage { } pub fn import_block(&self, block: &StoredBlock) -> Result<()> { - info!("Importing block {}", block.cid); + info!("Importing block {:?}", block); self.provider.import_block(block) } @@ -124,7 +129,7 @@ impl Storage { self.provider.get_missing_cid_blocks(cid) } - pub fn list_available_dags(&self) -> Result> { + pub fn list_available_dags(&self) -> Result> { self.provider.list_available_dags() } @@ -170,7 +175,7 @@ pub mod tests { } #[test] - pub fn test_import_path_to_storage() { + pub fn test_import_path_to_storage_single_block() { let harness = TestHarness::new(); let temp_dir = assert_fs::TempDir::new().unwrap(); @@ -187,6 +192,32 @@ pub mod tests { let available_cids = harness.storage.list_available_cids().unwrap(); assert!(available_cids.contains(&root_cid)); + + let available_dags = harness.storage.list_available_dags().unwrap(); + assert_eq!(available_dags, vec![(root_cid, "data.txt".to_string())]); + } + + #[test] + pub fn test_import_path_to_storage_multi_block() { + let harness = TestHarness::new(); + + let temp_dir = assert_fs::TempDir::new().unwrap(); + let test_file = temp_dir.child("data.txt"); + test_file + .write_binary( + "654684646847616846846876168468416874616846416846846186468464684684648684684" + .repeat(500) + .as_bytes(), + ) + .unwrap(); + let root_cid = harness.storage.import_path(test_file.path()).unwrap(); + + let available_cids = harness.storage.list_available_cids().unwrap(); + + assert!(available_cids.contains(&root_cid)); + + let available_dags = harness.storage.list_available_dags().unwrap(); + assert_eq!(available_dags, vec![(root_cid, "data.txt".to_string())]); } #[test] diff --git a/messages/src/api.rs b/messages/src/api.rs index 7da4c99..2757979 100644 --- a/messages/src/api.rs +++ b/messages/src/api.rs @@ -2,6 +2,12 @@ use clap::Subcommand; use parity_scale_codec_derive::{Decode as ParityDecode, Encode as ParityEncode}; use serde::Serialize; +#[derive(Clone, Debug, ParityEncode, ParityDecode, Serialize, Eq, PartialEq)] +pub struct DagInfo { + pub cid: String, + pub filename: String, +} + #[derive(Clone, Debug, ParityEncode, ParityDecode, Serialize, Subcommand, Eq, PartialEq)] pub enum ApplicationAPI { /// Asks IPFS instance to import a file path into the local IPFS store @@ -9,6 +15,7 @@ pub enum ApplicationAPI { path: String, }, /// Response message to ImportFile containing file's root CID + #[command(skip)] FileImported { path: String, cid: String, @@ -19,12 +26,14 @@ pub enum ApplicationAPI { path: String, }, /// Used to indicate the failure of a dag export + #[command(skip)] DagExportFailed { cid: String, path: String, error: String, }, /// Used to indicate a successful dag export + #[command(skip)] DagExported { cid: String, path: String, @@ -37,6 +46,7 @@ pub enum ApplicationAPI { /// Requests the current connected state GetConnected, /// Response to GetConnected, with current connected state + #[command(skip)] ConnectedState { connected: bool, }, @@ -45,6 +55,7 @@ pub enum ApplicationAPI { cid: String, }, /// Response to ValidateDag request, contains requested CID and a text response + #[command(skip)] ValidateDagResponse { cid: String, result: String, @@ -78,6 +89,7 @@ pub enum ApplicationAPI { /// Request Available Blocks RequestAvailableBlocks, /// Advertise all available blocks by CID + #[command(skip)] AvailableBlocks { cids: Vec, }, @@ -94,6 +106,7 @@ pub enum ApplicationAPI { cid: String, }, /// List of missing blocks and associated DAG's CID + #[command(skip)] MissingDagBlocks { cid: String, blocks: Vec, @@ -103,6 +116,11 @@ pub enum ApplicationAPI { Version { version: String, }, + RequestAvailableDags, + #[command(skip)] + AvailableDags { + dags: Vec, + }, // TODO: Implement later // Information about the next pass used for calculating // data transfer parameters diff --git a/messages/src/lib.rs b/messages/src/lib.rs index 0d603ca..ab6d144 100644 --- a/messages/src/lib.rs +++ b/messages/src/lib.rs @@ -2,6 +2,6 @@ pub(crate) mod api; pub(crate) mod message; pub(crate) mod protocol; -pub use api::ApplicationAPI; +pub use api::{ApplicationAPI, DagInfo}; pub use message::Message; pub use protocol::{DataProtocol, TransmissionBlock}; diff --git a/messages/src/protocol.rs b/messages/src/protocol.rs index 230dd70..c19d135 100644 --- a/messages/src/protocol.rs +++ b/messages/src/protocol.rs @@ -8,6 +8,7 @@ pub struct TransmissionBlock { pub cid: Vec, pub data: Vec, pub links: Vec>, + pub filename: Option, } impl fmt::Debug for TransmissionBlock { diff --git a/myceli/src/handlers.rs b/myceli/src/handlers.rs index 45f7a94..89e8520 100644 --- a/myceli/src/handlers.rs +++ b/myceli/src/handlers.rs @@ -1,6 +1,6 @@ use anyhow::Result; use local_storage::storage::Storage; -use messages::{ApplicationAPI, DataProtocol, Message}; +use messages::{ApplicationAPI, DagInfo, DataProtocol, Message}; use std::path::PathBuf; use std::rc::Rc; @@ -63,6 +63,20 @@ pub fn get_missing_dag_blocks_window_protocol( })) } +pub fn get_available_dags(storage: Rc) -> Result { + let local_dags: Vec = storage + .list_available_dags()? + .iter() + .map(|(cid, filename)| DagInfo { + cid: cid.to_string(), + filename: filename.to_string(), + }) + .collect(); + Ok(Message::ApplicationAPI(ApplicationAPI::AvailableDags { + dags: local_dags, + })) +} + #[cfg(test)] pub mod tests { use super::*; @@ -126,6 +140,7 @@ pub mod tests { cid: b.cid().to_string(), data: b.data().to_vec(), links, + filename: None, }; stored_blocks.push(stored); diff --git a/myceli/src/listener.rs b/myceli/src/listener.rs index 415141a..8857755 100644 --- a/myceli/src/listener.rs +++ b/myceli/src/listener.rs @@ -213,6 +213,9 @@ impl Listener { } None } + Message::ApplicationAPI(ApplicationAPI::RequestAvailableDags) => { + Some(handlers::get_available_dags(self.storage.clone())?) + } // Default case for valid messages which don't have handling code implemented yet message => { info!("Received message: {:?}", message); diff --git a/myceli/src/shipper.rs b/myceli/src/shipper.rs index 56942ca..50196bc 100644 --- a/myceli/src/shipper.rs +++ b/myceli/src/shipper.rs @@ -418,6 +418,7 @@ impl Shipper { cid: Cid::try_from(block.cid)?.to_string(), data: block.data, links, + filename: block.filename, }; stored_block.validate()?; self.storage.import_block(&stored_block) @@ -435,6 +436,7 @@ fn stored_block_to_transmission_block(stored: &StoredBlock) -> Result cid, + other => panic!("Failed to receive FileImported msg {other:?}"), + }; + + controller.send_msg( + Message::transmit_dag(&root_cid, &receiver.listen_addr, 0), + &transmitter.listen_addr, + ); + + utils::wait_receiving_done(&receiver, &mut controller); + + let transmitter_dags = controller.send_and_recv( + &transmitter.listen_addr, + Message::ApplicationAPI(ApplicationAPI::RequestAvailableDags), + ); + + let receiver_dags = controller.send_and_recv( + &receiver.listen_addr, + Message::ApplicationAPI(ApplicationAPI::RequestAvailableDags), + ); + + assert_eq!(transmitter_dags, receiver_dags); +} + #[test] pub fn test_transmit_dag_no_response_exceed_retries() { let transmitter = TestListener::new();