From 1e52843c1cd6c2c3ec904dd978abd6b9f402e6bd Mon Sep 17 00:00:00 2001 From: dtynn Date: Mon, 14 Mar 2022 17:44:08 +0800 Subject: [PATCH 1/2] feat: impl attached store manager --- venus-worker/assets/venus-worker.mock.toml | 7 +- venus-worker/src/config.rs | 13 +- venus-worker/src/infra/objstore/attached.rs | 53 ++++++++ venus-worker/src/infra/objstore/filestore.rs | 41 ++++-- venus-worker/src/infra/objstore/mod.rs | 7 + venus-worker/src/run.rs | 128 ++++++++++++++++--- venus-worker/src/sealing/worker/sealer.rs | 25 +++- venus-worker/src/watchdog.rs | 4 +- 8 files changed, 232 insertions(+), 46 deletions(-) create mode 100644 venus-worker/src/infra/objstore/attached.rs diff --git a/venus-worker/assets/venus-worker.mock.toml b/venus-worker/assets/venus-worker.mock.toml index 1c46b24b..51de7a67 100644 --- a/venus-worker/assets/venus-worker.mock.toml +++ b/venus-worker/assets/venus-worker.mock.toml @@ -35,7 +35,12 @@ location = "./mock-tmp/store2" [[sealing_thread]] location = "./mock-tmp/store3" -[remote_store] +# deprecated +# [remote_store] +# name = "persist-store1" +# location = "./mock-tmp/remote" + +[[attached]] # name = "persist-store1" location = "./mock-tmp/remote" diff --git a/venus-worker/src/config.rs b/venus-worker/src/config.rs index 18c6d774..51d9b6ac 100644 --- a/venus-worker/src/config.rs +++ b/venus-worker/src/config.rs @@ -96,10 +96,12 @@ pub struct SealingOptional { /// configuration for remote store #[derive(Debug, Default, Serialize, Deserialize)] -pub struct Remote { +pub struct Attached { pub name: Option, /// store path, if we are using fs based store - pub location: Option, + pub location: String, + + pub readonly: Option, } /// configurations for local sealing store @@ -173,8 +175,11 @@ pub struct Config { /// section for list of local sealing stores pub sealing_thread: Vec, - /// section for remote store - pub remote_store: Remote, + /// section for remote store, deprecated + pub remote_store: Option, + + /// section for attached store + pub attached: Option>, /// section for processors pub processors: Processors, diff --git a/venus-worker/src/infra/objstore/attached.rs b/venus-worker/src/infra/objstore/attached.rs new file mode 100644 index 00000000..42d51549 --- /dev/null +++ b/venus-worker/src/infra/objstore/attached.rs @@ -0,0 +1,53 @@ +//! manages multiple attached stores + +use std::collections::HashMap; + +use anyhow::{anyhow, Result}; + +use super::ObjectStore; + +/// manages all attached stores +pub struct AttachedManager { + stores: HashMap>, +} + +impl AttachedManager { + /// init AttachedManager with given stores + pub fn init(attached: Vec>) -> Result { + let mut stores = HashMap::new(); + for astore in attached { + if let Some(prev) = stores.insert(astore.instance(), astore) { + return Err(anyhow!("duplicate instance name {}", prev.instance())); + }; + } + + Ok(AttachedManager { stores }) + } + + /// get a named store instance + pub fn get(&self, instance: &str) -> Option<&dyn ObjectStore> { + self.stores.get(instance).map(|b| b.as_ref()) + } + + /// acquire an available store for sector persistence + pub fn acquire_persist( + &self, + _size: u64, + prev_instance: Option, + ) -> Option<&dyn ObjectStore> { + if let Some(ins) = prev_instance + .as_ref() + .and_then(|name| self.stores.get(name)) + { + if !ins.readonly() { + return Some(ins.as_ref()); + } + }; + + // TODO: depends on the free space + self.stores + .values() + .find(|s| !s.readonly()) + .map(|ins| ins.as_ref()) + } +} diff --git a/venus-worker/src/infra/objstore/filestore.rs b/venus-worker/src/infra/objstore/filestore.rs index 4dc0e470..ea6bda6a 100644 --- a/venus-worker/src/infra/objstore/filestore.rs +++ b/venus-worker/src/infra/objstore/filestore.rs @@ -1,16 +1,14 @@ //! ObjectStore implemented based on fs -use std::fs::{create_dir_all, File, OpenOptions}; +use std::fs::{create_dir_all, remove_file, File, OpenOptions}; use std::io::{copy, BufReader, Read, Seek, SeekFrom}; +use std::os::unix::fs::symlink; use std::path::{Path, PathBuf, MAIN_SEPARATOR}; use anyhow::{anyhow, Context, Result}; use super::{ObjResult, ObjectStore, Range}; -use crate::{ - infra::util::PlaceHolder, - logging::{debug_field, trace}, -}; +use crate::logging::{debug_field, trace}; const LOG_TARGET: &str = "filestore"; @@ -19,20 +17,19 @@ pub struct FileStore { sep: String, local_path: PathBuf, instance: String, - _holder: PlaceHolder, + readonly: bool, } impl FileStore { /// init filestore, create a placeholder file in its root dir pub fn init>(p: P) -> Result<()> { create_dir_all(p.as_ref())?; - let _holder = PlaceHolder::init(p.as_ref())?; Ok(()) } /// open the file store at given path - pub fn open>(p: P, ins: Option) -> Result { + pub fn open>(p: P, ins: Option, readonly: bool) -> Result { let dir_path = p.as_ref().canonicalize().context("canonicalize dir path")?; if !dir_path .metadata() @@ -42,8 +39,6 @@ impl FileStore { return Err(anyhow!("base path of the file store should a dir")); }; - let _holder = PlaceHolder::open(&dir_path).context("open placeholder")?; - let instance = match ins.or(dir_path.to_str().map(|s| s.to_owned())) { Some(i) => i, None => { @@ -58,7 +53,7 @@ impl FileStore { sep: MAIN_SEPARATOR.to_string(), local_path: dir_path, instance, - _holder, + readonly, }) } @@ -147,6 +142,30 @@ impl ObjectStore for FileStore { Ok(iter) } + + fn copy_to(&self, path: &Path, dst: &Path, allow_sym: bool) -> ObjResult<()> { + if allow_sym { + let src_path = self.path(path)?; + remove_file(dst)?; + symlink(src_path, dst)?; + return Ok(()); + } + + let mut r = self.get(path)?; + let mut f = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(true) + .open(dst)?; + + copy(&mut r, &mut f)?; + Ok(()) + } + + fn readonly(&self) -> bool { + self.readonly + } } struct ChunkReader { diff --git a/venus-worker/src/infra/objstore/mod.rs b/venus-worker/src/infra/objstore/mod.rs index 3ed4eedd..82c76e77 100644 --- a/venus-worker/src/infra/objstore/mod.rs +++ b/venus-worker/src/infra/objstore/mod.rs @@ -5,6 +5,7 @@ use std::fmt; use std::io::{self, Read}; use std::path::Path; +pub mod attached; pub mod filestore; /// errors in object storage usage @@ -77,4 +78,10 @@ pub trait ObjectStore: Send + Sync { path: &Path, ranges: &[Range], ) -> ObjResult>>>>; + + /// copy an object to a local path + fn copy_to(&self, path: &Path, dest: &Path, allow_sym: bool) -> ObjResult<()>; + + /// if this instance is read-only + fn readonly(&self) -> bool; } diff --git a/venus-worker/src/run.rs b/venus-worker/src/run.rs index f6d110bf..fc365a42 100644 --- a/venus-worker/src/run.rs +++ b/venus-worker/src/run.rs @@ -14,7 +14,7 @@ use tokio::runtime::Builder; use crate::{ config, infra::{ - objstore::filestore::FileStore, + objstore::{attached::AttachedManager, filestore::FileStore, ObjectStore}, piecestore::{proxy::ProxyPieceStore, PieceStore}, }, logging::{debug_field, info}, @@ -48,17 +48,60 @@ pub fn start_mock(miner: ActorID, sector_size: u64, cfg_path: String) -> Result< info!("config loaded:\n {:?}", cfg); - let remote = cfg - .remote_store - .location - .as_ref() - .cloned() - .ok_or(anyhow!("remote path is required for mock"))?; - let remote_store = Box::new( - FileStore::open(&remote, cfg.remote_store.name.clone()) - .with_context(|| format!("open remote filestore {}", remote))?, + let mut attached: Vec> = Vec::new(); + let mut attached_writable = 0; + if let Some(remote_cfg) = cfg.remote_store.as_ref() { + let remote_store = Box::new( + FileStore::open( + remote_cfg.location.clone(), + remote_cfg.name.clone(), + remote_cfg.readonly.unwrap_or(false), + ) + .with_context(|| format!("open remote filestore {}", remote_cfg.location))?, + ); + + if !remote_store.readonly() { + attached_writable += 1; + } + + attached.push(remote_store); + } + + if let Some(attach_cfgs) = cfg.attached.as_ref() { + for (sidx, scfg) in attach_cfgs.iter().enumerate() { + let attached_store = Box::new( + FileStore::open( + scfg.location.clone(), + scfg.name.clone(), + scfg.readonly.unwrap_or(false), + ) + .with_context(|| format!("open attached filestore #{}", sidx))?, + ); + + if !attached_store.readonly() { + attached_writable += 1; + } + + attached.push(attached_store); + } + } + + if attached.is_empty() { + return Err(anyhow!("no attached store available")); + } + + if attached_writable == 0 { + return Err(anyhow!("no attached store available for writing")); + } + + info!( + "{} stores attached, {} writable", + attached.len(), + attached_writable ); + let attached_mgr = AttachedManager::init(attached).context("init attached manager")?; + let mock_impl = mock::SimpleMockSealerRpc::new(miner, proof_type); let mut io = IoHandler::new(); io.extend_with(mock_impl.to_delegate()); @@ -75,7 +118,7 @@ pub fn start_mock(miner: ActorID, sector_size: u64, cfg_path: String) -> Result< let globl = GlobalModules { rpc: Arc::new(mock_client), - remote_store: Arc::new(remote_store), + attached: Arc::new(attached_mgr), processors, limit: Arc::new(resource::Pool::new( cfg.processors @@ -131,17 +174,60 @@ pub fn start_deamon(cfg_path: String) -> Result<()> { .with_context(|| format!("load from config file {}", cfg_path))?; info!("config loaded\n {:?}", cfg); - let remote_store = cfg - .remote_store - .location - .as_ref() - .cloned() - .ok_or(anyhow!("remote path is required for deamon"))?; - let remote = Box::new( - FileStore::open(&remote_store, cfg.remote_store.name.clone()) - .with_context(|| format!("open remote filestore {}", remote_store))?, + let mut attached: Vec> = Vec::new(); + let mut attached_writable = 0; + if let Some(remote_cfg) = cfg.remote_store.as_ref() { + let remote_store = Box::new( + FileStore::open( + remote_cfg.location.clone(), + remote_cfg.name.clone(), + remote_cfg.readonly.unwrap_or(false), + ) + .with_context(|| format!("open remote filestore {}", remote_cfg.location))?, + ); + + if !remote_store.readonly() { + attached_writable += 1; + } + + attached.push(remote_store); + } + + if let Some(attach_cfgs) = cfg.attached.as_ref() { + for (sidx, scfg) in attach_cfgs.iter().enumerate() { + let attached_store = Box::new( + FileStore::open( + scfg.location.clone(), + scfg.name.clone(), + scfg.readonly.unwrap_or(false), + ) + .with_context(|| format!("open attached filestore #{}", sidx))?, + ); + + if !attached_store.readonly() { + attached_writable += 1; + } + + attached.push(attached_store); + } + } + + if attached.is_empty() { + return Err(anyhow!("no attached store available")); + } + + if attached_writable == 0 { + return Err(anyhow!("no attached store available for writing")); + } + + info!( + "{} stores attached, {} writable", + attached.len(), + attached_writable ); + let attached_mgr = AttachedManager::init(attached).context("init attached manager")?; + let store_mgr = StoreManager::load(&cfg.sealing_thread, &cfg.sealing).context("load store manager")?; @@ -190,7 +276,7 @@ pub fn start_deamon(cfg_path: String) -> Result<()> { let global = GlobalModules { rpc: Arc::new(rpc_client), - remote_store: Arc::new(remote), + attached: Arc::new(attached_mgr), processors, limit: Arc::new(resource::Pool::new( cfg.processors diff --git a/venus-worker/src/sealing/worker/sealer.rs b/venus-worker/src/sealing/worker/sealer.rs index 68892570..e3e1cccd 100644 --- a/venus-worker/src/sealing/worker/sealer.rs +++ b/venus-worker/src/sealing/worker/sealer.rs @@ -943,6 +943,22 @@ impl<'c> Sealer<'c> { } fn handle_pc_landed(&mut self) -> HandleResult { + let proof_type = fetch_cloned_field! { + self.sector.base, + allocated.proof_type, + }?; + + let sector_size = proof_type.sector_size(); + let persist_store = self + .ctx + .global + .attached + .acquire_persist(sector_size, None) + .context("no available persist store") + .perm()?; + + debug!(name = %persist_store.instance(), "persist store acquired"); + let allocated = fetch_field! { self.sector.base, allocated, @@ -995,19 +1011,14 @@ impl<'c> Sealer<'c> { let copy_enter = copy_span.enter(); let source = opt.open(&one).crit()?; - let size = self - .ctx - .global - .remote_store - .put(target_path, Box::new(source)) - .crit()?; + let size = persist_store.put(target_path, Box::new(source)).crit()?; debug!(size, "persist done"); drop(copy_enter); } - Ok(Event::Persist(self.ctx.global.remote_store.instance())) + Ok(Event::Persist(persist_store.instance())) } fn handle_persisted(&mut self) -> HandleResult { diff --git a/venus-worker/src/watchdog.rs b/venus-worker/src/watchdog.rs index 5baf1599..166f8b7a 100644 --- a/venus-worker/src/watchdog.rs +++ b/venus-worker/src/watchdog.rs @@ -9,7 +9,7 @@ use tokio::runtime::Runtime; use crate::{ config::Config, - infra::{objstore::ObjectStore, piecestore::PieceStore}, + infra::{objstore::attached::AttachedManager, piecestore::PieceStore}, logging::{error, error_span, info, warn}, rpc::sealer::SealerClient, sealing::{ @@ -36,7 +36,7 @@ pub struct Ctx { #[derive(Clone)] pub struct GlobalModules { pub rpc: Arc, - pub remote_store: Arc>, + pub attached: Arc, pub processors: GloablProcessors, pub static_tree_d: HashMap, pub limit: Arc, From a8ca766bfd7c2bf2bd4cc6b9e365f0042c5fdc54 Mon Sep 17 00:00:00 2001 From: dtynn Date: Mon, 14 Mar 2022 18:01:32 +0800 Subject: [PATCH 2/2] feat: update config about section 'remote_store' & 'attached' --- ...15\347\275\256\350\247\243\346\236\220.md" | 38 ++++++++++++++++++- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git "a/docs/zh/03.venus-worker\347\232\204\351\205\215\347\275\256\350\247\243\346\236\220.md" "b/docs/zh/03.venus-worker\347\232\204\351\205\215\347\275\256\350\247\243\346\236\220.md" index 26344ef2..b680babb 100644 --- "a/docs/zh/03.venus-worker\347\232\204\351\205\215\347\275\256\350\247\243\346\236\220.md" +++ "b/docs/zh/03.venus-worker\347\232\204\351\205\215\347\275\256\350\247\243\346\236\220.md" @@ -44,7 +44,12 @@ location = "./mock-tmp/store2" [[sealing_thread]] location = "./mock-tmp/store3" -[remote_store] +# deprecated +# [remote_store] +# name = "persist-store1" +# location = "./mock-tmp/remote" + +[[attached]] # name = "persist-store1" location = "./mock-tmp/remote" @@ -314,7 +319,7 @@ sealing.allowed_sizes = ["64GiB"] -## [remote_store] +## [remote_store] 已废弃 `remote_store` 用于配置已完成的扇区持久化数据保存的位置。 @@ -339,6 +344,35 @@ location = "/mnt/remote/10.0.0.14/store" 如果持久化存储目录在所有机器上的挂载路径都统一的话,配置时也可以选择在 `venus-worker` 和`venus-sector-manager` 两侧都不配置 `name`。这种情况下,两者都会使用绝对路径作为 `name`,也能匹配。 +## [[attached]] + +`attached` 用于配置已完成的扇区持久化数据保存的位置,允许同时配置多个。 + + + +### 基础配置范例 + +``` +[attached] +# 名称, 选填项,字符串类型 +# 默认为路径对应的绝对路径 +# name = "remote-store1" + +# 路径,必填项,字符串类型 +# 建议直接填写绝对路径 +location = "/mnt/remote/10.0.0.14/store" + +# 只读,选填项,布尔类型 +# 默认值为 false +# readonly = true + +``` + +由于需要在 `venus-worker` 和 `venus-sector-manager` 之间协调存储位置信息,而在很多情况下,同一个持久化存储目录在`venus-worker` 机器和 `venus-sector-manager` 机器上的挂载路径不完全一致,因此我们决定使用 `name` 作为协调的基础信息. + +如果持久化存储目录在所有机器上的挂载路径都统一的话,配置时也可以选择在 `venus-worker` 和`venus-sector-manager` 两侧都不配置 `name`。这种情况下,两者都会使用绝对路径作为 `name`,也能匹配。 + + ## [processors]