diff --git a/conformance/patches/local_refs.patch b/conformance/patches/local_refs.patch deleted file mode 100644 index 11e52e0e2..000000000 --- a/conformance/patches/local_refs.patch +++ /dev/null @@ -1,24 +0,0 @@ ---- node_modules/interface-ipfs-core/src/refs-local.js -+++ node_modules/interface-ipfs-core/src/refs-local.js -@@ -48,10 +48,14 @@ module.exports = (common, options) => { - - const imported = await all(importer(dirs, ipfs.block)) - -- // otherwise go-ipfs doesn't show them in the local refs -- await Promise.all( -- imported.map(i => ipfs.pin.add(i.cid)) -- ) -+ // rust-ipfs doesn't yet have pinning api, it'll just list all local cids -+ // in /refs/local -+ if (common.opts.type !== 'rust') { -+ // otherwise go-ipfs doesn't show them in the local refs -+ await Promise.all( -+ imported.map(i => ipfs.pin.add(i.cid)) -+ ) -+ } - - const refs = await all(ipfs.refs.local()) - const cids = refs.map(r => r.ref) --- -2.27.0 - diff --git a/conformance/test/index.js b/conformance/test/index.js index dd94be9db..dcc472e0c 100644 --- a/conformance/test/index.js +++ b/conformance/test/index.js @@ -121,7 +121,7 @@ tests.dht(factory, { // tests.repo(factory) // tests.object(factory) -// tests.pin(factory) +tests.pin.add(factory) // tests.bootstrap(factory) // tests.name(factory) // tests.namePubsub(factory) diff --git a/http/src/v0.rs b/http/src/v0.rs index a21bc26b3..4d4c8a7aa 100644 --- a/http/src/v0.rs +++ b/http/src/v0.rs @@ -6,6 +6,7 @@ pub mod block; pub mod dag; pub mod dht; pub mod id; +pub mod pin; pub mod pubsub; pub mod refs; pub mod root_files; @@ -128,6 +129,11 @@ pub fn routes( and_boxed!(warp::path!("disconnect"), swarm::disconnect(ipfs)), and_boxed!(warp::path!("peers"), swarm::peers(ipfs)), )), + warp::path("pin").and(combine!( + and_boxed!(warp::path!("add"), pin::add(ipfs)), + and_boxed!(warp::path!("ls"), pin::list(ipfs)), + and_boxed!(warp::path!("rm"), pin::rm(ipfs)), + )), combine_unify!( warp::path!("bootstrap" / ..), warp::path!("config" / ..), @@ -136,7 +142,6 @@ pub fn routes( warp::path!("key" / ..), warp::path!("name" / ..), warp::path!("object" / ..), - warp::path!("pin" / ..), warp::path!("ping" / ..), warp::path!("repo" / ..), warp::path!("stats" / ..), @@ -144,9 +149,7 @@ pub fn routes( .and_then(not_implemented), )); - // have a common handler turn the rejections into 400 or 500 with json body - // boxing this might save up to 15s. - boxed_on_debug!(api.recover(recover_as_message_response)) + api.recover(recover_as_message_response) } pub(crate) async fn handle_shutdown( diff --git a/http/src/v0/pin.rs b/http/src/v0/pin.rs new file mode 100644 index 000000000..ebfe582c6 --- /dev/null +++ b/http/src/v0/pin.rs @@ -0,0 +1,141 @@ +use crate::v0::support::option_parsing::ParseError; +use crate::v0::support::{with_ipfs, StringError, StringSerialized}; +use futures::future::try_join_all; +use ipfs::{Cid, Ipfs, IpfsTypes}; +use serde::Serialize; +use std::convert::TryFrom; +use warp::{reply, Filter, Rejection, Reply}; + +#[derive(Debug)] +struct AddRequest { + args: Vec, + recursive: bool, + progress: bool, + // TODO: timeout, probably with rollback semantics? +} + +#[derive(Serialize)] +struct AddResponse { + #[serde(rename = "Pins")] + pins: Vec>, + // FIXME: go-ipfs doesn't respond with this + //progress: u8, +} + +/// `pin/add` per https://docs.ipfs.io/reference/http/api/#api-v0-pin-add or the +/// interface-ipfs-http test suite. +pub fn add( + ipfs: &Ipfs, +) -> impl Filter + Clone { + with_ipfs(ipfs).and(add_request()).and_then(add_inner) +} + +async fn add_inner( + ipfs: Ipfs, + request: AddRequest, +) -> Result { + if request.recursive { + // FIXME: this is not documented however all tests always pass true or false + return Err(crate::v0::support::NotImplemented.into()); + } + + if request.progress { + // FIXME: there doesn't appear to be a test for this + return Err(crate::v0::support::NotImplemented.into()); + } + + let cids: Vec = request.args; + + let dispatched_pins = cids + .into_iter() + .map(|x| async { ipfs.pin_block(&x).await.map(move |_| StringSerialized(x)) }); + + let completed = try_join_all(dispatched_pins) + .await + .map_err(StringError::from)?; + + Ok(reply::json(&AddResponse { + pins: completed, + //progress: 100, + })) +} + +pub fn list( + ipfs: &Ipfs, +) -> impl Filter + Clone { + with_ipfs(ipfs).and_then(list_inner) +} + +async fn list_inner(_ipfs: Ipfs) -> Result { + // interestingly conformance tests call this with `paths=cid&stream=true&arg=cid` + // this needs to be a stream of the listing + Err::<&'static str, _>(crate::v0::NotImplemented.into()) +} + +pub fn rm( + ipfs: &Ipfs, +) -> impl Filter + Clone { + with_ipfs(ipfs).and_then(rm_inner) +} + +async fn rm_inner(_ipfs: Ipfs) -> Result { + Err::<&'static str, _>(crate::v0::NotImplemented.into()) +} + +impl<'a> TryFrom<&'a str> for AddRequest { + type Error = ParseError<'a>; + + fn try_from(q: &'a str) -> Result { + use ParseError::*; + + let mut args = Vec::new(); + let mut recursive = None; + let mut progress = None; + + for (key, value) in url::form_urlencoded::parse(q.as_bytes()) { + let target = match &*key { + "arg" => { + args.push(Cid::try_from(&*value).map_err(|e| InvalidCid("arg".into(), e))?); + continue; + } + "recursive" => &mut recursive, + "progress" => &mut progress, + _ => { + // ignore unknown + continue; + } + }; + + if target.is_none() { + match value.parse::() { + Ok(value) => *target = Some(value), + Err(_) => return Err(InvalidBoolean(key, value)), + } + } else { + return Err(DuplicateField(key)); + } + } + + if args.is_empty() { + return Err(MissingArg); + } + + Ok(AddRequest { + args, + recursive: recursive.unwrap_or(false), + progress: progress.unwrap_or(false), + }) + } +} + +/// Filter to perform custom `warp::query::`. This needs to be copypasted around as +/// HRTB is not quite usable yet. +fn add_request() -> impl Filter + Clone { + warp::filters::query::raw().and_then(|q: String| { + let res = AddRequest::try_from(q.as_str()) + .map_err(StringError::from) + .map_err(warp::reject::custom); + + futures::future::ready(res) + }) +} diff --git a/http/src/v0/support.rs b/http/src/v0/support.rs index 2004095b6..81cbc6eec 100644 --- a/http/src/v0/support.rs +++ b/http/src/v0/support.rs @@ -5,6 +5,7 @@ use std::error::Error as StdError; use std::fmt; pub mod option_parsing; + mod stream; pub use stream::StreamResponse; diff --git a/http/src/v0/support/option_parsing.rs b/http/src/v0/support/option_parsing.rs index 2311e16a4..80cc8431d 100644 --- a/http/src/v0/support/option_parsing.rs +++ b/http/src/v0/support/option_parsing.rs @@ -5,9 +5,11 @@ use std::fmt; pub enum ParseError<'a> { DuplicateField(Cow<'a, str>), MissingArg, + MissingField(Cow<'a, str>), InvalidNumber(Cow<'a, str>, Cow<'a, str>), InvalidBoolean(Cow<'a, str>, Cow<'a, str>), InvalidDuration(Cow<'a, str>, humantime::DurationError), + InvalidCid(Cow<'a, str>, cid::Error), } impl<'a> fmt::Display for ParseError<'a> { @@ -16,13 +18,24 @@ impl<'a> fmt::Display for ParseError<'a> { match *self { DuplicateField(ref s) => write!(fmt, "field {:?} was duplicated", *s), MissingArg => write!(fmt, "required field \"arg\" missing"), + MissingField(ref field) => write!(fmt, "required field {:?} missing", field), InvalidNumber(ref k, ref v) => write!(fmt, "field {:?} invalid number: {:?}", *k, *v), InvalidBoolean(ref k, ref v) => write!(fmt, "field {:?} invalid boolean: {:?}", *k, *v), InvalidDuration(ref field, ref e) => { - write!(fmt, "field {:?} invalid duration: {:?}", field, e) + write!(fmt, "field {:?} invalid duration: {}", field, e) } + InvalidCid(ref field, ref e) => write!(fmt, "field {:?} invalid cid: {}", field, e), } } } -impl<'a> std::error::Error for ParseError<'a> {} +impl<'a> std::error::Error for ParseError<'a> { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + use ParseError::*; + match self { + InvalidDuration(_, e) => Some(e), + InvalidCid(_, e) => Some(e), + _ => None, + } + } +}