diff --git a/src/repo/mem.rs b/src/repo/mem.rs index 9207d6b95..90e10f14d 100644 --- a/src/repo/mem.rs +++ b/src/repo/mem.rs @@ -1,11 +1,10 @@ //! Volatile memory backed repo use crate::error::Error; -use crate::repo::{ - BlockPut, BlockStore, Column, DataStore, PinDocument, PinKind, PinMode, PinStore, Recursive, -}; +use crate::repo::{BlockPut, BlockStore, Column, DataStore, PinKind, PinMode, PinStore}; use async_trait::async_trait; use bitswap::Block; use cid::Cid; +use std::convert::TryFrom; use std::path::PathBuf; use tokio::sync::{Mutex, OwnedMutexGuard}; @@ -13,6 +12,7 @@ use super::{BlockRm, BlockRmError, RepoCid}; use std::collections::hash_map::Entry; // FIXME: Transition to Persistent Map to make iterating more consistent +use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; @@ -432,6 +432,211 @@ impl DataStore for MemDataStore { } } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +enum Recursive { + /// Persistent record of **completed** recursive pinning. All references now have indirect pins + /// recorded. + Count(u64), + /// Persistent record of intent to add recursive pins to all indirect blocks or even not to + /// keep the go-ipfs way which might not be a bad idea after all. Adding all the indirect pins + /// on disk will cause massive write amplification in the end, but lets keep that way until we + /// get everything working at least. + Intent, + /// Not pinned recursively. + Not, +} + +impl Recursive { + fn is_set(&self) -> bool { + match self { + Recursive::Count(_) | Recursive::Intent => true, + Recursive::Not => false, + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +struct PinDocument { + version: u8, + direct: bool, + // how many descendants; something to check when walking + recursive: Recursive, + // no further metadata necessary; cids are pinned by full cid + cid_version: u8, + // using the cidv1 versions of all cids here, not sure if that makes sense or is important + indirect_by: Vec, +} + +impl PinDocument { + fn update(&mut self, add: bool, kind: &PinKind<&'_ Cid>) -> Result { + // these update rules are a bit complex and there are cases we don't need to handle. + // Updating on upon `PinKind` forces the caller to inspect what the current state is for + // example to handle the case of failing "unpin currently recursively pinned as direct". + // the ruleset seems quite strange to be honest. + match kind { + PinKind::IndirectFrom(root) => { + let root = if root.version() == cid::Version::V1 { + root.to_string() + } else { + // this is one more allocation + Cid::new_v1(root.codec(), (*root).hash().to_owned()).to_string() + }; + + let modified = if self.indirect_by.is_empty() { + if add { + self.indirect_by.push(root); + true + } else { + false + } + } else { + let mut set = self + .indirect_by + .drain(..) + .collect::>(); + + let modified = if add { + set.insert(root) + } else { + set.remove(&root) + }; + + self.indirect_by.extend(set.into_iter()); + modified + }; + + Ok(modified) + } + PinKind::Direct => { + if self.recursive.is_set() && !self.direct && add { + // go-ipfs: cannot make recursive pin also direct + // not really sure why does this rule exist; the other way around is allowed + return Err(PinUpdateError::AlreadyPinnedRecursive); + } + + if !self.direct && !add { + panic!("this situation must be handled by the caller by checking that recursive pin is about to be removed as direct"); + } + + let modified = self.direct != add; + self.direct = add; + Ok(modified) + } + PinKind::RecursiveIntention => { + let modified = if add { + match self.recursive { + Recursive::Count(_) => return Err(PinUpdateError::AlreadyPinnedRecursive), + // can overwrite Intent with another Intent, as Ipfs::insert_pin is now moving to fix + // the Intent into the "final form" of Recursive::Count. + Recursive::Intent => false, + Recursive::Not => { + self.recursive = Recursive::Intent; + self.direct = false; + true + } + } + } else { + match self.recursive { + Recursive::Count(_) | Recursive::Intent => { + self.recursive = Recursive::Not; + true + } + Recursive::Not => false, + } + }; + + Ok(modified) + } + PinKind::Recursive(descendants) => { + let descendants = *descendants; + let modified = if add { + match self.recursive { + Recursive::Count(other) if other != descendants => { + return Err(PinUpdateError::UnexpectedNumberOfDescendants( + other, + descendants, + )) + } + Recursive::Count(_) => false, + Recursive::Intent | Recursive::Not => { + self.recursive = Recursive::Count(descendants); + // the previously direct has now been upgraded to recursive, it can + // still be indirect though + self.direct = false; + true + } + } + } else { + match self.recursive { + Recursive::Count(other) if other != descendants => { + return Err(PinUpdateError::UnexpectedNumberOfDescendants( + other, + descendants, + )) + } + Recursive::Count(_) | Recursive::Intent => { + self.recursive = Recursive::Not; + true + } + Recursive::Not => return Err(PinUpdateError::NotPinnedRecursive), + } + // FIXME: removing ... not sure if this is an issue; was thinking that maybe + // the update might need to be split to allow different api for removal than + // addition. + }; + Ok(modified) + } + } + } + + fn can_remove(&self) -> bool { + !self.direct && !self.recursive.is_set() && self.indirect_by.is_empty() + } + + fn mode(&self) -> Option { + if self.recursive.is_set() { + Some(PinMode::Recursive) + } else if !self.indirect_by.is_empty() { + Some(PinMode::Indirect) + } else if self.direct { + Some(PinMode::Direct) + } else { + None + } + } + + fn pick_kind(&self) -> Option, cid::Error>> { + self.mode().map(|p| { + Ok(match p { + PinMode::Recursive => match self.recursive { + Recursive::Intent => PinKind::RecursiveIntention, + Recursive::Count(total) => PinKind::Recursive(total), + _ => unreachable!("mode shuold not have returned PinKind::Recursive"), + }, + PinMode::Indirect => { + // go-ipfs does seem to be doing a fifo looking, perhaps this is a list there, or + // the indirect pins aren't being written down anywhere and they just refs from + // recursive roots. + let cid = Cid::try_from(self.indirect_by[0].as_str())?; + PinKind::IndirectFrom(cid) + } + PinMode::Direct => PinKind::Direct, + }) + }) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum PinUpdateError { + #[error("unexpected number of descendants ({}), found {}", .1, .0)] + UnexpectedNumberOfDescendants(u64, u64), + #[error("not pinned recursively")] + NotPinnedRecursive, + /// Not allowed: Adding direct pin while pinned recursive + #[error("already pinned recursively")] + AlreadyPinnedRecursive, +} + #[cfg(test)] mod tests { use super::*; @@ -526,4 +731,20 @@ mod tests { let get = store.get(col, &key); assert_eq!(get.await.unwrap(), None); } + + #[test] + fn pindocument_on_direct_pin() { + let mut doc = PinDocument { + version: 0, + direct: false, + recursive: Recursive::Not, + cid_version: 0, + indirect_by: Vec::new(), + }; + + assert!(doc.update(true, &PinKind::Direct).unwrap()); + + assert_eq!(doc.mode(), Some(PinMode::Direct)); + assert_eq!(doc.pick_kind().unwrap().unwrap(), PinKind::Direct); + } } diff --git a/src/repo/mod.rs b/src/repo/mod.rs index c36377122..8f8a88ea4 100644 --- a/src/repo/mod.rs +++ b/src/repo/mod.rs @@ -15,7 +15,6 @@ use futures::channel::{ }; use futures::sink::SinkExt; use libp2p::core::PeerId; -use serde::{Deserialize, Serialize}; use std::borrow::Borrow; use std::hash::{Hash, Hasher}; use std::path::PathBuf; @@ -440,211 +439,6 @@ impl Repo { } } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] -enum Recursive { - /// Persistent record of **completed** recursive pinning. All references now have indirect pins - /// recorded. - Count(u64), - /// Persistent record of intent to add recursive pins to all indirect blocks or even not to - /// keep the go-ipfs way which might not be a bad idea after all. Adding all the indirect pins - /// on disk will cause massive write amplification in the end, but lets keep that way until we - /// get everything working at least. - Intent, - /// Not pinned recursively. - Not, -} - -impl Recursive { - fn is_set(&self) -> bool { - match self { - Recursive::Count(_) | Recursive::Intent => true, - Recursive::Not => false, - } - } -} - -#[derive(Debug, Serialize, Deserialize)] -struct PinDocument { - version: u8, - direct: bool, - // how many descendants; something to check when walking - recursive: Recursive, - // no further metadata necessary; cids are pinned by full cid - cid_version: u8, - // using the cidv1 versions of all cids here, not sure if that makes sense or is important - indirect_by: Vec, -} - -impl PinDocument { - fn update(&mut self, add: bool, kind: &PinKind<&'_ Cid>) -> Result { - // these update rules are a bit complex and there are cases we don't need to handle. - // Updating on upon `PinKind` forces the caller to inspect what the current state is for - // example to handle the case of failing "unpin currently recursively pinned as direct". - // the ruleset seems quite strange to be honest. - match kind { - PinKind::IndirectFrom(root) => { - let root = if root.version() == cid::Version::V1 { - root.to_string() - } else { - // this is one more allocation - Cid::new_v1(root.codec(), (*root).hash().to_owned()).to_string() - }; - - let modified = if self.indirect_by.is_empty() { - if add { - self.indirect_by.push(root); - true - } else { - false - } - } else { - let mut set = self - .indirect_by - .drain(..) - .collect::>(); - - let modified = if add { - set.insert(root) - } else { - set.remove(&root) - }; - - self.indirect_by.extend(set.into_iter()); - modified - }; - - Ok(modified) - } - PinKind::Direct => { - if self.recursive.is_set() && !self.direct && add { - // go-ipfs: cannot make recursive pin also direct - // not really sure why does this rule exist; the other way around is allowed - return Err(PinUpdateError::AlreadyPinnedRecursive); - } - - if !self.direct && !add { - panic!("this situation must be handled by the caller by checking that recursive pin is about to be removed as direct"); - } - - let modified = self.direct != add; - self.direct = add; - Ok(modified) - } - PinKind::RecursiveIntention => { - let modified = if add { - match self.recursive { - Recursive::Count(_) => return Err(PinUpdateError::AlreadyPinnedRecursive), - // can overwrite Intent with another Intent, as Ipfs::insert_pin is now moving to fix - // the Intent into the "final form" of Recursive::Count. - Recursive::Intent => false, - Recursive::Not => { - self.recursive = Recursive::Intent; - self.direct = false; - true - } - } - } else { - match self.recursive { - Recursive::Count(_) | Recursive::Intent => { - self.recursive = Recursive::Not; - true - } - Recursive::Not => false, - } - }; - - Ok(modified) - } - PinKind::Recursive(descendants) => { - let descendants = *descendants; - let modified = if add { - match self.recursive { - Recursive::Count(other) if other != descendants => { - return Err(PinUpdateError::UnexpectedNumberOfDescendants( - other, - descendants, - )) - } - Recursive::Count(_) => false, - Recursive::Intent | Recursive::Not => { - self.recursive = Recursive::Count(descendants); - // the previously direct has now been upgraded to recursive, it can - // still be indirect though - self.direct = false; - true - } - } - } else { - match self.recursive { - Recursive::Count(other) if other != descendants => { - return Err(PinUpdateError::UnexpectedNumberOfDescendants( - other, - descendants, - )) - } - Recursive::Count(_) | Recursive::Intent => { - self.recursive = Recursive::Not; - true - } - Recursive::Not => return Err(PinUpdateError::NotPinnedRecursive), - } - // FIXME: removing ... not sure if this is an issue; was thinking that maybe - // the update might need to be split to allow different api for removal than - // addition. - }; - Ok(modified) - } - } - } - - fn can_remove(&self) -> bool { - !self.direct && !self.recursive.is_set() && self.indirect_by.is_empty() - } - - fn mode(&self) -> Option { - if self.recursive.is_set() { - Some(PinMode::Recursive) - } else if !self.indirect_by.is_empty() { - Some(PinMode::Indirect) - } else if self.direct { - Some(PinMode::Direct) - } else { - None - } - } - - fn pick_kind(&self) -> Option, cid::Error>> { - self.mode().map(|p| { - Ok(match p { - PinMode::Recursive => match self.recursive { - Recursive::Intent => PinKind::RecursiveIntention, - Recursive::Count(total) => PinKind::Recursive(total), - _ => unreachable!("mode shuold not have returned PinKind::Recursive"), - }, - PinMode::Indirect => { - // go-ipfs does seem to be doing a fifo looking, perhaps this is a list there, or - // the indirect pins aren't being written down anywhere and they just refs from - // recursive roots. - let cid = Cid::try_from(self.indirect_by[0].as_str())?; - PinKind::IndirectFrom(cid) - } - PinMode::Direct => PinKind::Direct, - }) - }) - } -} - -#[derive(Debug, thiserror::Error)] -pub enum PinUpdateError { - #[error("unexpected number of descendants ({}), found {}", .1, .0)] - UnexpectedNumberOfDescendants(u64, u64), - #[error("not pinned recursively")] - NotPinnedRecursive, - /// Not allowed: Adding direct pin while pinned recursive - #[error("already pinned recursively")] - AlreadyPinnedRecursive, -} - #[cfg(test)] pub(crate) mod tests { use super::*; @@ -883,22 +677,6 @@ pub(crate) mod tests { assert_eq!(e.to_string(), "already pinned recursively"); } - #[test] - fn pindocument_on_direct_pin() { - let mut doc = PinDocument { - version: 0, - direct: false, - recursive: Recursive::Not, - cid_version: 0, - indirect_by: Vec::new(), - }; - - assert!(doc.update(true, &PinKind::Direct).unwrap()); - - assert_eq!(doc.mode(), Some(PinMode::Direct)); - assert_eq!(doc.pick_kind().unwrap().unwrap(), PinKind::Direct); - } - #[tokio::test(max_threads = 1)] async fn can_pin_direct_as_recursive() { // the other way around doesn't work