Skip to content
This repository has been archived by the owner on Oct 23, 2022. It is now read-only.

test: common fs and mem pinstore tests #360

Merged
merged 25 commits into from
Sep 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ hex-literal = { default-features = false, version = "0.3" }
sha2 = { default-features = false, version = "0.9" }
tokio = { default-features = false, features = ["io-std"], version = "0.2" }
tracing-subscriber = { default-features = false, features = ["fmt", "tracing-log", "ansi", "env-filter"], version = "0.2" }
tempfile = "3.1.0"

[workspace]
members = [ "bitswap", "http", "unixfs" ]
Expand Down
9 changes: 6 additions & 3 deletions http/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ use structopt::StructOpt;
use ipfs::{Ipfs, IpfsOptions, IpfsTypes, UninitializedIpfs};
use ipfs_http::{config, v0};

#[macro_use]
extern crate tracing;

#[derive(Debug, StructOpt)]
enum Options {
/// Should initialize the repository (create directories and such). `js-ipfsd-ctl` calls this
Expand Down Expand Up @@ -168,11 +171,11 @@ fn main() {
// locking on the repo, unsure how go-ipfs locks the fsstore
let _ = tokio::fs::File::create(&api_link_file)
.await
.map_err(|e| eprintln!("Failed to truncate {:?}: {}", api_link_file, e));
.map_err(|e| info!("Failed to truncate {:?}: {}", api_link_file, e));
}
});

println!("Shutdown complete");
info!("Shutdown complete");
}

fn serve<Types: IpfsTypes>(
Expand All @@ -189,7 +192,7 @@ fn serve<Types: IpfsTypes>(

warp::serve(routes).bind_with_graceful_shutdown(([127, 0, 0, 1], 0), async move {
shutdown_rx.next().await;
println!("Shutdown trigger received; starting shutdown");
info!("Shutdown trigger received; starting shutdown");
ipfs.exit_daemon().await;
})
}
54 changes: 30 additions & 24 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl<T: RepoTypes> IpfsTypes for T {}
pub struct Types;
impl RepoTypes for Types {
type TBlockStore = repo::fs::FsBlockStore;
type TDataStore = repo::mem::MemDataStore;
type TDataStore = repo::fs::FsDataStore;
}

/// Testing IPFS types
Expand Down Expand Up @@ -298,7 +298,7 @@ impl<Types: IpfsTypes> UninitializedIpfs<Types> {
let repo_options = RepoOptions::from(&options);
let (repo, repo_events) = create_repo(repo_options);
let keys = options.keypair.clone();
let span = span.unwrap_or_else(|| tracing::trace_span!("ipfs"));
let span = span.unwrap_or_else(|| trace_span!("ipfs"));

UninitializedIpfs {
repo,
Expand Down Expand Up @@ -416,29 +416,31 @@ impl<Types: IpfsTypes> Ipfs<Types> {
/// state. The remedy is to re-pin recursive pins.
pub async fn insert_pin(&self, cid: &Cid, recursive: bool) -> Result<(), Error> {
use futures::stream::{StreamExt, TryStreamExt};
if !recursive {
self.repo.insert_direct_pin(cid).await
} else {
let span = tracing::debug_span!("insert recursive pin", cid = %cid);
let span = debug_span!(parent: &self.span, "insert_pin", cid = %cid, recursive);
let refs_span = debug_span!(parent: &span, "insert_pin refs");

async move {
// this needs to download everything but /pin/ls does not
let Block { data, .. } = self.repo.get_block(cid).await?;
async move {
// this needs to download everything but /pin/ls does not
let Block { data, .. } = self.repo.get_block(cid).await?;

if !recursive {
self.repo.insert_direct_pin(cid).await
} else {
let ipld = crate::ipld::decode_ipld(&cid, &data)?;

let st = crate::refs::IpldRefs::default()
.with_only_unique()
.refs_of_resolved(self, vec![(cid.clone(), ipld.clone())].into_iter())
.map_ok(|crate::refs::Edge { destination, .. }| destination)
.into_stream()
.instrument(refs_span)
.boxed();

self.repo.insert_recursive_pin(cid, st).await
}
.instrument(span)
.await
}
.instrument(span)
.await
}

/// Unpins a given Cid recursively or only directly.
Expand All @@ -449,11 +451,11 @@ impl<Types: IpfsTypes> Ipfs<Types> {
/// pinned tree roots.
pub async fn remove_pin(&self, cid: &Cid, recursive: bool) -> Result<(), Error> {
use futures::stream::{StreamExt, TryStreamExt};
if !recursive {
self.repo.remove_direct_pin(cid).await
} else {
let span = tracing::debug_span!("remove recursive pin", cid = %cid);
async move {
let span = debug_span!(parent: &self.span, "remove_pin", cid = %cid, recursive);
async move {
if !recursive {
self.repo.remove_direct_pin(cid).await
} else {
// start walking refs of the root after loading it

let Block { data, .. } = match self.repo.get_block_now(&cid).await? {
Expand All @@ -477,9 +479,9 @@ impl<Types: IpfsTypes> Ipfs<Types> {

self.repo.remove_recursive_pin(cid, st).await
}
.instrument(span)
.await
}
.instrument(span)
.await
}

/// Checks whether a given block is pinned. At the moment does not support incomplete recursive
Expand All @@ -498,9 +500,8 @@ impl<Types: IpfsTypes> Ipfs<Types> {
///
/// TODO: This operation could be provided as a `Ipfs::fix_pins()`.
pub async fn is_pinned(&self, cid: &Cid) -> Result<bool, Error> {
// best to just delegate, we cannot efficiently obtain a list of PinKind::RecursiveIntention but the
// repo impl can
self.repo.is_pinned(cid).instrument(self.span.clone()).await
let span = debug_span!(parent: &self.span, "is_pinned", cid = %cid);
self.repo.is_pinned(cid).instrument(span).await
}

/// Lists all pins, or the specific kind thereof.
Expand All @@ -510,7 +511,8 @@ impl<Types: IpfsTypes> Ipfs<Types> {
&self,
filter: Option<PinMode>,
) -> futures::stream::BoxStream<'static, Result<(Cid, PinMode), Error>> {
self.repo.list_pins(filter).await
let span = debug_span!(parent: &self.span, "list_pins", ?filter);
self.repo.list_pins(filter).instrument(span).await
}

/// Read specific pins. When `requirement` is `Some`, all pins are required to be of the given
Expand All @@ -522,7 +524,11 @@ impl<Types: IpfsTypes> Ipfs<Types> {
cids: Vec<Cid>,
requirement: Option<PinMode>,
) -> Result<Vec<(Cid, PinKind<Cid>)>, Error> {
self.repo.query_pins(cids, requirement).await
let span = debug_span!(parent: &self.span, "query_pins", ids = cids.len(), ?requirement);
self.repo
.query_pins(cids, requirement)
.instrument(span)
.await
}

/// Puts an ipld dag node into the ipfs repo.
Expand Down Expand Up @@ -1365,7 +1371,7 @@ mod node {
pub async fn new<T: AsRef<str>>(name: T) -> Self {
let opts = IpfsOptions::inmemory_with_generated_keys();
Node::with_options(opts)
.instrument(tracing::trace_span!("ipfs", node = name.as_ref()))
.instrument(trace_span!("ipfs", node = name.as_ref()))
.await
}

Expand Down
6 changes: 4 additions & 2 deletions src/refs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,12 @@ where
}
};

trace!(cid = %cid, "loaded next");

let ipld = match decode_ipld(&cid, &data) {
Ok(ipld) => ipld,
Err(e) => {
warn!("failed to parse {}, linked from {}: {}", cid, source, e);
warn!(cid = %cid, source = %cid, "failed to parse: {}", e);
// go-ipfs on raw Qm hash:
// > failed to decode Protocol Buffers: incorrectly formatted merkledag node: unmarshal failed. proto: illegal wireType 6
yield Err(e.into());
Expand All @@ -233,7 +235,7 @@ where
if traverse_links {
for (link_name, next_cid) in ipld_links(&cid, ipld) {
if unique && !queued_or_visited.insert(next_cid.clone()) {
trace!("skipping already queued {}", next_cid);
trace!(queued = %next_cid, "skipping already queued");
continue;
}

Expand Down
Loading