Skip to content
This repository has been archived by the owner on Oct 23, 2022. It is now read-only.

feat: http pin/add #342

Merged
merged 13 commits into from
Aug 27, 2020
24 changes: 0 additions & 24 deletions conformance/patches/local_refs.patch

This file was deleted.

2 changes: 1 addition & 1 deletion conformance/test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ tests.dht(factory, {

// tests.repo(factory)
// tests.object(factory)
// tests.pin(factory)
tests.pin.add(factory)
// tests.bootstrap(factory)
// tests.name(factory)
// tests.namePubsub(factory)
Expand Down
11 changes: 7 additions & 4 deletions http/src/v0.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod block;
pub mod dag;
pub mod dht;
pub mod id;
pub mod pin;
pub mod pubsub;
pub mod refs;
pub mod root_files;
Expand Down Expand Up @@ -128,6 +129,11 @@ pub fn routes<T: IpfsTypes>(
and_boxed!(warp::path!("disconnect"), swarm::disconnect(ipfs)),
and_boxed!(warp::path!("peers"), swarm::peers(ipfs)),
)),
warp::path("pin").and(combine!(
and_boxed!(warp::path!("add"), pin::add(ipfs)),
and_boxed!(warp::path!("ls"), pin::list(ipfs)),
and_boxed!(warp::path!("rm"), pin::rm(ipfs)),
)),
combine_unify!(
warp::path!("bootstrap" / ..),
warp::path!("config" / ..),
Expand All @@ -136,17 +142,14 @@ pub fn routes<T: IpfsTypes>(
warp::path!("key" / ..),
warp::path!("name" / ..),
warp::path!("object" / ..),
warp::path!("pin" / ..),
warp::path!("ping" / ..),
warp::path!("repo" / ..),
warp::path!("stats" / ..),
)
.and_then(not_implemented),
));

// have a common handler turn the rejections into 400 or 500 with json body
// boxing this might save up to 15s.
boxed_on_debug!(api.recover(recover_as_message_response))
api.recover(recover_as_message_response)
}

pub(crate) async fn handle_shutdown(
Expand Down
141 changes: 141 additions & 0 deletions http/src/v0/pin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
use crate::v0::support::option_parsing::ParseError;
use crate::v0::support::{with_ipfs, StringError, StringSerialized};
use futures::future::try_join_all;
use ipfs::{Cid, Ipfs, IpfsTypes};
use serde::Serialize;
use std::convert::TryFrom;
use warp::{reply, Filter, Rejection, Reply};

#[derive(Debug)]
struct AddRequest {
args: Vec<Cid>,
recursive: bool,
progress: bool,
// TODO: timeout, probably with rollback semantics?
}

#[derive(Serialize)]
struct AddResponse {
#[serde(rename = "Pins")]
pins: Vec<StringSerialized<Cid>>,
// FIXME: go-ipfs doesn't respond with this
//progress: u8,
}

/// `pin/add` per https://docs.ipfs.io/reference/http/api/#api-v0-pin-add or the
/// interface-ipfs-http test suite.
pub fn add<T: IpfsTypes>(
ipfs: &Ipfs<T>,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
with_ipfs(ipfs).and(add_request()).and_then(add_inner)
}

async fn add_inner<T: IpfsTypes>(
ipfs: Ipfs<T>,
request: AddRequest,
) -> Result<impl Reply, Rejection> {
if request.recursive {
// FIXME: this is not documented however all tests always pass true or false
return Err(crate::v0::support::NotImplemented.into());
}

if request.progress {
// FIXME: there doesn't appear to be a test for this
return Err(crate::v0::support::NotImplemented.into());
}

let cids: Vec<Cid> = request.args;

let dispatched_pins = cids
.into_iter()
.map(|x| async { ipfs.pin_block(&x).await.map(move |_| StringSerialized(x)) });

let completed = try_join_all(dispatched_pins)
.await
.map_err(StringError::from)?;

Ok(reply::json(&AddResponse {
pins: completed,
//progress: 100,
}))
}

pub fn list<T: IpfsTypes>(
ipfs: &Ipfs<T>,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
with_ipfs(ipfs).and_then(list_inner)
}

async fn list_inner<T: IpfsTypes>(_ipfs: Ipfs<T>) -> Result<impl Reply, Rejection> {
// interestingly conformance tests call this with `paths=cid&stream=true&arg=cid`
// this needs to be a stream of the listing
Err::<&'static str, _>(crate::v0::NotImplemented.into())
}

pub fn rm<T: IpfsTypes>(
ipfs: &Ipfs<T>,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
with_ipfs(ipfs).and_then(rm_inner)
}

async fn rm_inner<T: IpfsTypes>(_ipfs: Ipfs<T>) -> Result<impl Reply, Rejection> {
Err::<&'static str, _>(crate::v0::NotImplemented.into())
}

impl<'a> TryFrom<&'a str> for AddRequest {
type Error = ParseError<'a>;

fn try_from(q: &'a str) -> Result<Self, Self::Error> {
use ParseError::*;

let mut args = Vec::new();
let mut recursive = None;
let mut progress = None;

for (key, value) in url::form_urlencoded::parse(q.as_bytes()) {
let target = match &*key {
"arg" => {
args.push(Cid::try_from(&*value).map_err(|e| InvalidCid("arg".into(), e))?);
continue;
}
"recursive" => &mut recursive,
"progress" => &mut progress,
_ => {
// ignore unknown
continue;
}
};

if target.is_none() {
match value.parse::<bool>() {
Ok(value) => *target = Some(value),
Err(_) => return Err(InvalidBoolean(key, value)),
}
} else {
return Err(DuplicateField(key));
}
}

if args.is_empty() {
return Err(MissingArg);
}

Ok(AddRequest {
args,
recursive: recursive.unwrap_or(false),
progress: progress.unwrap_or(false),
})
}
}

/// Filter to perform custom `warp::query::<AddRequest>`. This needs to be copypasted around as
/// HRTB is not quite usable yet.
fn add_request() -> impl Filter<Extract = (AddRequest,), Error = Rejection> + Clone {
warp::filters::query::raw().and_then(|q: String| {
let res = AddRequest::try_from(q.as_str())
.map_err(StringError::from)
.map_err(warp::reject::custom);

futures::future::ready(res)
})
}
1 change: 1 addition & 0 deletions http/src/v0/support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::error::Error as StdError;
use std::fmt;

pub mod option_parsing;

mod stream;
pub use stream::StreamResponse;

Expand Down
17 changes: 15 additions & 2 deletions http/src/v0/support/option_parsing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ use std::fmt;
pub enum ParseError<'a> {
DuplicateField(Cow<'a, str>),
MissingArg,
MissingField(Cow<'a, str>),
InvalidNumber(Cow<'a, str>, Cow<'a, str>),
InvalidBoolean(Cow<'a, str>, Cow<'a, str>),
InvalidDuration(Cow<'a, str>, humantime::DurationError),
InvalidCid(Cow<'a, str>, cid::Error),
}

impl<'a> fmt::Display for ParseError<'a> {
Expand All @@ -16,13 +18,24 @@ impl<'a> fmt::Display for ParseError<'a> {
match *self {
DuplicateField(ref s) => write!(fmt, "field {:?} was duplicated", *s),
MissingArg => write!(fmt, "required field \"arg\" missing"),
MissingField(ref field) => write!(fmt, "required field {:?} missing", field),
InvalidNumber(ref k, ref v) => write!(fmt, "field {:?} invalid number: {:?}", *k, *v),
InvalidBoolean(ref k, ref v) => write!(fmt, "field {:?} invalid boolean: {:?}", *k, *v),
InvalidDuration(ref field, ref e) => {
write!(fmt, "field {:?} invalid duration: {:?}", field, e)
write!(fmt, "field {:?} invalid duration: {}", field, e)
}
InvalidCid(ref field, ref e) => write!(fmt, "field {:?} invalid cid: {}", field, e),
}
}
}

impl<'a> std::error::Error for ParseError<'a> {}
impl<'a> std::error::Error for ParseError<'a> {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
use ParseError::*;
match self {
InvalidDuration(_, e) => Some(e),
InvalidCid(_, e) => Some(e),
_ => None,
}
}
}