Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Request/AvailableDAGs APIs #68

Merged
merged 7 commits into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- Added `chunk_transmit_throttle` config option, which adds a throttle delay between transmission of chunks.
- Added `radio_address` config option, which hardcodes an address for `myceli` to send responses to.
- Added `RequestAvailableDags` API, and association between DAGs and filename.

### Changed

Expand Down
5 changes: 5 additions & 0 deletions local-storage/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::str::FromStr;
#[derive(PartialEq)]
pub struct StoredBlock {
pub cid: String,
pub filename: Option<String>,
pub data: Vec<u8>,
pub links: Vec<String>,
}
Expand All @@ -30,6 +31,7 @@ impl fmt::Debug for StoredBlock {

f.debug_struct("StoredBlock")
.field("cid", &cid_str)
.field("filename", &self.filename)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you're storing the blocks in sqlite not in individual files (and if you were I would hope you could derive the filename from the CID).

So I'm guessing this filename member is actually from a UnixFS directory's link to this block? e.g. the directory entry's name for it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The filename originates in this API message ApplicationAPI::ImportFile { path: String }. We just pull out the "file name" piece of the path in Storage::import_path.

.field("data", &self.data.len())
.field("links", &self.links.len())
.finish()
Expand Down Expand Up @@ -99,6 +101,7 @@ mod tests {
cid: b.cid().to_string(),
data: b.data().to_vec(),
links,
filename: None,
};

stored_blocks.push(stored);
Expand Down Expand Up @@ -214,6 +217,7 @@ mod tests {
cid: cid.to_string(),
data,
links: vec![],
filename: None,
},
);

Expand All @@ -236,6 +240,7 @@ mod tests {
cid: cid.to_string(),
data,
links: vec![],
filename: None,
});

assert_eq!(
Expand Down
200 changes: 92 additions & 108 deletions local-storage/src/provider.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{block::StoredBlock, error::StorageError};

use anyhow::{bail, Result};
use rusqlite::Connection;
use rusqlite::{params_from_iter, Connection};

pub trait StorageProvider {
// Import a stored block
Expand All @@ -12,7 +12,9 @@ pub trait StorageProvider {
fn get_block_by_cid(&self, cid: &str) -> Result<StoredBlock>;
// Requests the links associated with the given CID
fn get_links_by_cid(&self, cid: &str) -> Result<Vec<String>>;
fn list_available_dags(&self) -> Result<Vec<String>>;
fn list_available_dags(&self) -> Result<Vec<(String, String)>>;
// Attaches filename to dag
fn name_dag(&self, cid: &str, file_name: &str) -> Result<()>;
fn get_missing_cid_blocks(&self, cid: &str) -> Result<Vec<String>>;
fn get_dag_blocks_by_window(
&self,
Expand All @@ -22,7 +24,6 @@ pub trait StorageProvider {
) -> Result<Vec<StoredBlock>>;
fn get_all_dag_cids(&self, cid: &str) -> Result<Vec<String>>;
fn get_all_dag_blocks(&self, cid: &str) -> Result<Vec<StoredBlock>>;
fn get_all_blocks_under_cid(&self, cid: &str) -> Result<Vec<StoredBlock>>;
}

pub struct SqliteStorageProvider {
Expand All @@ -42,6 +43,7 @@ impl SqliteStorageProvider {
"CREATE TABLE IF NOT EXISTS blocks (
id INTEGER PRIMARY KEY,
cid TEXT NOT NULL,
filename TEXT,
data BLOB
)",
(),
Expand Down Expand Up @@ -75,13 +77,64 @@ impl SqliteStorageProvider {

Ok(())
}

fn get_blocks_recursive_query(
&self,
cid: &str,
offset: Option<u32>,
window_size: Option<u32>,
) -> Result<Vec<StoredBlock>> {
let mut base_query = "
WITH RECURSIVE cids(x,y,z) AS (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not familiar with WITH RECURSIVE. That's interesting.
How is performance? Is this function (regardless of how it's implemented) a memory limitation concern?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't run into any show stopping performance issues with files in the 5-50 mB range, but also haven't done much memory profiling around larger files here. I think we are Ok for the smaller files scheduled for usage in the demonstration.

SELECT cid,data,filename FROM blocks WHERE cid = (?1)
UNION
SELECT cid,data,filename FROM blocks b
INNER JOIN links l ON b.cid==l.block_cid
INNER JOIN cids ON (root_cid=x)
)
SELECT x,y,z FROM cids
"
.to_string();
let mut params = vec![cid.to_string()];

if let Some(offset) = offset {
if let Some(window_size) = window_size {
base_query.push_str(" LIMIT (?2) OFFSET (?3);");
params.push(format!("{window_size}"));
params.push(format!("{offset}"));
}
}
let params = params_from_iter(params.into_iter());
let blocks = self
.conn
.prepare(&base_query)?
.query_map(params, |row| {
let cid_str: String = row.get(0)?;
let data: Vec<u8> = row.get(1)?;
let filename: Option<String> = row.get(2).ok();
let links = match self.get_links_by_cid(&cid_str) {
Ok(links) => links,
Err(_) => vec![],
};
Ok(StoredBlock {
cid: cid_str,
data,
links,
filename,
})
})?
.filter_map(|b| b.ok())
.collect();

Ok(blocks)
}
}

impl StorageProvider for SqliteStorageProvider {
fn import_block(&self, block: &StoredBlock) -> Result<()> {
self.conn.execute(
"INSERT OR IGNORE INTO blocks (cid, data) VALUES (?1, ?2)",
(&block.cid, &block.data),
"INSERT OR IGNORE INTO blocks (cid, data, filename) VALUES (?1, ?2, ?3)",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "or ignore" makes me immediately question what the PK and/or unique constraints are. And looking at that code leads me to a side question: The purpose of an autonumber-style "id"?

But back to the main question: it looks like you did not modify the unique index on CID. So if you add a second file with the same content it gets silently dropped completely - no record of that filename existing. Is this really desired?

Perhaps it's too academic a concern, but in a pure mathematical sense a filename probably should't be a column in blocks, right? It would be a separate table that associates... I guess many-to-many?

(&block.cid, &block.data, &block.filename),
)?;
// TODO: Should we have another indicator for root blocks that isn't just the number of links?
// TODO: This logic should probably get pulled up and split into two parts:
Expand Down Expand Up @@ -145,16 +198,18 @@ impl StorageProvider for SqliteStorageProvider {

fn get_block_by_cid(&self, cid: &str) -> Result<StoredBlock> {
match self.conn.query_row(
"SELECT cid, data FROM blocks b
"SELECT cid, data, filename FROM blocks b
WHERE cid == (?1)",
[&cid],
|row| {
let cid_str: String = row.get(0)?;
let data: Vec<u8> = row.get(1)?;
let filename: Option<String> = row.get(2).ok();
Ok(StoredBlock {
cid: cid_str,
data,
links: vec![],
filename,
})
},
) {
Expand All @@ -166,25 +221,43 @@ impl StorageProvider for SqliteStorageProvider {
}
}

fn list_available_dags(&self) -> Result<Vec<String>> {
fn list_available_dags(&self) -> Result<Vec<(String, String)>> {
let roots = self
.conn
.prepare("SELECT DISTINCT root_cid FROM links")?
.prepare("SELECT DISTINCT cid, filename FROM blocks")?
.query_map([], |row| {
let cid_str: String = row.get(0)?;
Ok(cid_str)
let filename_str: String = row.get(1)?;
Ok((cid_str, filename_str))
})?
// TODO: Correctly catch/log/handle errors here
.filter_map(|cid| cid.ok())
.collect();
Ok(roots)
}

fn name_dag(&self, cid: &str, file_name: &str) -> Result<()> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conversely this method assumes you want to drop the old name.

I would've guessed there'd be a parameter, perhaps bool or bool-equivalent-enum, specifying whether you'd like to do this or do WHERE cid = ?2 AND filename = NULL

self.conn.execute(
"UPDATE blocks SET filename = ?1 WHERE cid = ?2",
(file_name, cid),
)?;
Ok(())
}

fn get_missing_cid_blocks(&self, cid: &str) -> Result<Vec<String>> {
// First get all block cid+id associated with root cid
let blocks: Vec<(String, Option<i32>)> = self
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious that you don't want to use a recursive select here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went ahead and fixed this for consistency and in case it becomes an issue later

.conn
.prepare("SELECT block_cid, block_id FROM links WHERE root_cid == (?1)")?
.prepare(
"
WITH RECURSIVE cids(x,y) AS (
SELECT cid, id FROM blocks WHERE cid = (?1)
UNION
SELECT block_cid, block_id FROM links JOIN cids ON root_cid=x
)
SELECT x,y FROM cids;
",
)?
.query_map([cid], |row| {
let block_cid: String = row.get(0)?;
let block_id: Option<i32> = row.get(1)?;
Expand Down Expand Up @@ -217,39 +290,7 @@ impl StorageProvider for SqliteStorageProvider {
offset: u32,
window_size: u32,
) -> Result<Vec<StoredBlock>> {
let blocks: Vec<StoredBlock> = self
.conn
.prepare(
"
WITH RECURSIVE cids(x,y) AS (
SELECT cid,data FROM blocks WHERE cid = (?1)
UNION
SELECT cid,data FROM blocks b
INNER JOIN links l ON b.cid==l.block_cid
INNER JOIN cids ON (root_cid=x)
)
SELECT x,y FROM cids
LIMIT (?2) OFFSET (?3);
",
)?
.query_map(
[cid, &format!("{window_size}"), &format!("{offset}")],
|row| {
let cid_str: String = row.get(0)?;
let data: Vec<u8> = row.get(1)?;
let links = match self.get_links_by_cid(&cid_str) {
Ok(links) => links,
Err(_) => vec![],
};
Ok(StoredBlock {
cid: cid_str,
data,
links,
})
},
)?
.filter_map(|b| b.ok())
.collect();
let blocks = self.get_blocks_recursive_query(cid, Some(offset), Some(window_size))?;

Ok(blocks)
}
Expand Down Expand Up @@ -278,71 +319,7 @@ impl StorageProvider for SqliteStorageProvider {
}

fn get_all_dag_blocks(&self, cid: &str) -> Result<Vec<StoredBlock>> {
let blocks: Vec<StoredBlock> = self
.conn
.prepare(
"
WITH RECURSIVE cids(x,y) AS (
SELECT cid,data FROM blocks WHERE cid = (?1)
UNION
SELECT cid,data FROM blocks b
INNER JOIN links l ON b.cid==l.block_cid
INNER JOIN cids ON (root_cid=x)
)
SELECT x,y FROM cids
",
)?
.query_map([cid], |row| {
let cid_str: String = row.get(0)?;
let data: Vec<u8> = row.get(1)?;
let links = match self.get_links_by_cid(&cid_str) {
Ok(links) => links,
Err(_) => vec![],
};
Ok(StoredBlock {
cid: cid_str,
data,
links,
})
})?
.filter_map(|b| b.ok())
.collect();

Ok(blocks)
}

fn get_all_blocks_under_cid(&self, cid: &str) -> Result<Vec<StoredBlock>> {
let blocks: Vec<StoredBlock> = self
.conn
.prepare(
"
WITH RECURSIVE cids(x,y) AS (
SELECT cid,data FROM blocks WHERE cid = (?1)
UNION
SELECT cid,data FROM blocks b
INNER JOIN links l ON b.cid==l.block_cid
INNER JOIN cids ON (root_cid=x)
)
SELECT x,y FROM cids
",
)?
.query_map([cid], |row| {
let cid_str: String = row.get(0)?;
let data: Vec<u8> = row.get(1)?;
let links = match self.get_links_by_cid(&cid_str) {
Ok(links) => links,
Err(_) => vec![],
};
Ok(StoredBlock {
cid: cid_str,
data,
links,
})
})?
.filter_map(|b| b.ok())
.collect();

Ok(blocks)
self.get_blocks_recursive_query(cid, None, None)
}
}

Expand Down Expand Up @@ -390,6 +367,7 @@ pub mod tests {
cid: cid_str.to_string(),
data: b"1010101".to_vec(),
links: vec![],
filename: None,
};

harness.provider.import_block(&block).unwrap();
Expand Down Expand Up @@ -418,6 +396,7 @@ pub mod tests {
cid: c.to_string(),
data: b"123412341234".to_vec(),
links: vec![],
filename: None,
};
harness.provider.import_block(&block).unwrap()
});
Expand All @@ -439,6 +418,7 @@ pub mod tests {
cid: cid.to_string(),
data: b"1010101".to_vec(),
links: vec![],
filename: None,
};

harness.provider.import_block(&block).unwrap();
Expand All @@ -460,6 +440,7 @@ pub mod tests {
cid: cid.to_string(),
data: b"1010101".to_vec(),
links: vec![block_cid.to_string()],
filename: None,
};

harness.provider.import_block(&block).unwrap();
Expand All @@ -481,6 +462,7 @@ pub mod tests {
cid: cid.to_string(),
data: vec![],
links: vec![block_cid.to_string()],
filename: None,
};

harness.provider.import_block(&block).unwrap();
Expand Down Expand Up @@ -512,12 +494,14 @@ pub mod tests {
cid: cid_str.to_string(),
data: vec![],
links: vec![block_cid.to_string()],
filename: None,
};

let child_block = StoredBlock {
cid: block_cid.to_string(),
data: b"101293910101".to_vec(),
links: vec![],
filename: None,
};

harness.provider.import_block(&block).unwrap();
Expand Down
Loading