Skip to content

Commit

Permalink
Various fixes from testing v0.6.4 (#66)
Browse files Browse the repository at this point in the history
### Added

- Added `DagExported` and `DagExportFailed` responses to `ExportDag` command.
- Added `DagTransmissionComplete` response when a DAG has been completely transmitted.

### Changed

- MTU is now an optional flag on `controller`
- Added `block_size` as a `myceli` config option for controlling IPFS block size in file chunking
- Changed default `block_size` to 3072 bytes
- Fixed cases where responses inside of dag transfer session weren't sent to original target address
  • Loading branch information
plauche authored May 22, 2023
1 parent 67011ec commit 3427eac
Show file tree
Hide file tree
Showing 18 changed files with 197 additions and 87 deletions.
15 changes: 14 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
# Changelog

## [0.6.5] - Unreleased

## [0.6.4] - Unreleased
### Added

- Added `DagExported` and `DagExportFailed` responses to `ExportDag` command.
- Added `DagTransmissionComplete` response when a DAG has been completely transmitted.

### Changed

- MTU is now an optional flag on `controller`
- Added `block_size` as a `myceli` config option for controlling IPFS block size in file chunking
- Changed default `block_size` to 3072 bytes
- Fixed cases where responses inside of dag transfer session weren't sent to original target address

## [0.6.4] - 2023-05-15

### Added

Expand Down
15 changes: 8 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ members = [
]

[workspace.package]
version = "0.6.3"
version = "0.6.5"
edition = "2021"
license = "Apache-2.0/MIT"
rust-version = "1.68.1"
Expand Down
4 changes: 3 additions & 1 deletion controller/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use transports::{Transport, UdpTransport};
#[clap(about = "Control a Myceli instance")]
pub struct Cli {
instance_addr: String,
#[arg(short, long, default_value = "512")]
mtu: u16,
#[arg(short, long)]
listen_mode: bool,
#[arg(short, long, default_value = "0.0.0.0:8090")]
Expand All @@ -18,7 +20,7 @@ pub struct Cli {

impl Cli {
pub async fn run(&self) -> Result<()> {
let transport = UdpTransport::new(&self.bind_address, 512)?;
let transport = UdpTransport::new(&self.bind_address, self.mtu)?;

let command = Message::ApplicationAPI(self.command.clone());
let cmd_str = serde_json::to_string(&command)?;
Expand Down
1 change: 1 addition & 0 deletions ipfs-unixfs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ multihash.workspace = true
num_enum.workspace = true
prost.workspace = true
tokio = { workspace = true, features = ["fs"] }
tokio-util = { workspace = true, features = ["io-util"] }

[dev-dependencies]
# criterion = { workspace = true, features = ["async_tokio"] }
Expand Down
1 change: 1 addition & 0 deletions local-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ ipfs-unixfs.workspace = true
rusqlite.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
tokio-util = { workspace = true, features = ["io-util"] }
tracing.workspace = true

[dev-dependencies]
Expand Down
20 changes: 16 additions & 4 deletions local-storage/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use anyhow::{anyhow, bail, Result};
use bytes::Bytes;
use cid::Cid;
use ipfs_unixfs::Block;
use std::fmt;
use std::str::FromStr;

#[derive(Debug, PartialEq)]
#[derive(PartialEq)]
pub struct StoredBlock {
pub cid: String,
pub data: Vec<u8>,
Expand All @@ -21,6 +22,20 @@ impl StoredBlock {
}
}

impl fmt::Debug for StoredBlock {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let cid_str = Cid::try_from(self.cid.clone())
.map(|c| c.to_string())
.unwrap();

f.debug_struct("StoredBlock")
.field("cid", &cid_str)
.field("data", &self.data.len())
.field("links", &self.links.len())
.finish()
}
}

impl TryInto<Block> for &StoredBlock {
type Error = anyhow::Error;

Expand All @@ -40,9 +55,6 @@ pub fn validate_dag(stored_blocks: &[StoredBlock]) -> Result<()> {
if stored_blocks.is_empty() {
bail!("No blocks found in dag")
}
for block in stored_blocks.iter() {
block.validate()?;
}
verify_dag(stored_blocks)?;
Ok(())
}
Expand Down
35 changes: 13 additions & 22 deletions local-storage/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,23 @@ use tracing::{error, info};

pub struct Storage {
pub provider: Box<dyn StorageProvider>,
block_size: u32,
}

// TODO: Make this configurable
// Changing to 1MB to optimize for larger files
const BLOCK_SIZE: usize = 1024 * 100;

impl Storage {
pub fn new(provider: Box<dyn StorageProvider>) -> Self {
Storage { provider }
pub fn new(provider: Box<dyn StorageProvider>, block_size: u32) -> Self {
Storage {
provider,
block_size,
}
}

pub fn import_path(&self, path: &Path) -> Result<String> {
let rt = tokio::runtime::Runtime::new()?;
let blocks: Result<Vec<Block>> = rt.block_on(async {
let file: File = FileBuilder::new()
.path(path)
.fixed_chunker(BLOCK_SIZE)
.fixed_chunker(self.block_size.try_into()?)
.build()
.await?;
let blocks: Vec<_> = file.encode().await?.try_collect().await?;
Expand All @@ -40,8 +40,6 @@ impl Storage {
let blocks = blocks?;
let mut root_cid: Option<String> = None;

let mut stored_blocks = vec![];

blocks.iter().for_each(|b| {
let links = b
.links()
Expand All @@ -61,14 +59,9 @@ impl Storage {
error!("Failed to import block {e}");
}
if !stored.links.is_empty() {
root_cid = Some(stored.cid.clone());
root_cid = Some(stored.cid);
}
stored_blocks.push(stored);
});
info!("Validating imported blocks {}", blocks.len());
if let Err(e) = crate::block::validate_dag(&stored_blocks) {
error!("Failed to validate dag on import: {e}");
}
if blocks.len() == 1 {
if let Some(first) = blocks.first() {
root_cid = Some(first.cid().to_string());
Expand Down Expand Up @@ -141,7 +134,6 @@ impl Storage {
window_size: u32,
window_num: u32,
) -> Result<Vec<StoredBlock>> {
println!("offset = {} * {}", window_size, window_num);
let offset = window_size * window_num;

self.provider
Expand All @@ -156,6 +148,8 @@ pub mod tests {
use assert_fs::{fixture::FileWriteBin, fixture::PathChild, TempDir};
use rand::{thread_rng, RngCore};

const BLOCK_SIZE: usize = 1024 * 10;

struct TestHarness {
storage: Storage,
_db_dir: TempDir,
Expand All @@ -167,7 +161,7 @@ pub mod tests {
let db_path = db_dir.child("storage.db");
let provider = SqliteStorageProvider::new(db_path.path().to_str().unwrap()).unwrap();
provider.setup().unwrap();
let storage = Storage::new(Box::new(provider));
let storage = Storage::new(Box::new(provider), BLOCK_SIZE.try_into().unwrap());
TestHarness {
storage,
_db_dir: db_dir,
Expand Down Expand Up @@ -264,17 +258,14 @@ pub mod tests {
let cid = harness.storage.import_path(test_file.path()).unwrap();

let window_size: u32 = 10;
let mut window_num = 0;

let all_dag_blocks = harness.storage.get_all_dag_blocks(&cid).unwrap();

for chunk in all_dag_blocks.chunks(window_size as usize).into_iter() {
for (window_num, chunk) in all_dag_blocks.chunks(window_size as usize).enumerate() {
let window_blocks = harness
.storage
.get_dag_blocks_by_window(&cid, window_size, window_num)
.get_dag_blocks_by_window(&cid, window_size, window_num.try_into().unwrap())
.unwrap();
assert_eq!(chunk, &window_blocks);
window_num += 1;
}
}

Expand Down
2 changes: 1 addition & 1 deletion local-storage/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub(crate) fn verify_dag(blocks: &[StoredBlock]) -> Result<()> {
while !queue.is_empty() {
// this is a safe unwrap as the queue is not empty
let node: &StoredBlock = queue.pop_front().unwrap();
// ignore a visite node
// ignore a visited node
if visited.contains(&node.cid.as_str()) {
continue;
}
Expand Down
15 changes: 15 additions & 0 deletions messages/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ pub enum ApplicationAPI {
cid: String,
path: String,
},
/// Used to indicate the failure of a dag export
DagExportFailed {
cid: String,
path: String,
error: String,
},
/// Used to indicate a successful dag export
DagExported {
cid: String,
path: String,
},
/// Sets current connected state
SetConnected {
#[arg(action(clap::ArgAction::Set), required(true))]
Expand All @@ -44,6 +55,10 @@ pub enum ApplicationAPI {
target_addr: String,
retries: u8,
},
/// Indicates that a Dag has been transmitted completely successfully
DagTransmissionComplete {
cid: String,
},
/// Initiates transmission of block corresponding to the given CID
TransmitBlock {
cid: String,
Expand Down
1 change: 0 additions & 1 deletion messages/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ pub enum DataProtocol {
// in order to continue transmitting the dag
RetryDagSession {
cid: String,
target_addr: String,
},
// Requests windowed transmission of a dag
RequestTransmitDag {
Expand Down
6 changes: 5 additions & 1 deletion myceli/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub struct Config {
pub storage_path: String,
pub mtu: u16,
pub window_size: u32,
pub block_size: u32,
}

impl Default for Config {
Expand All @@ -24,8 +25,11 @@ impl Default for Config {
// Default storage dir
storage_path: "storage".to_string(),
// Default MTU appropriate for dev radio
mtu: 60,
mtu: 512,
// Default to sending five blocks at a time
window_size: 5,
// Default to 3 kilobyte blocks
block_size: 1024 * 3,
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion myceli/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ pub mod tests {
use local_storage::provider::SqliteStorageProvider;
use rand::{thread_rng, RngCore};

const BLOCK_SIZE: u32 = 1024 * 3;

struct TestHarness {
storage: Rc<Storage>,
db_dir: TempDir,
Expand All @@ -85,7 +87,7 @@ pub mod tests {
let db_path = db_dir.child("storage.db");
let provider = SqliteStorageProvider::new(db_path.path().to_str().unwrap()).unwrap();
provider.setup().unwrap();
let storage = Rc::new(Storage::new(Box::new(provider)));
let storage = Rc::new(Storage::new(Box::new(provider), BLOCK_SIZE));
TestHarness { storage, db_dir }
}

Expand Down
Loading

0 comments on commit 3427eac

Please sign in to comment.