diff --git a/fs-storage/Cargo.toml b/fs-storage/Cargo.toml index c6ffdd1..cf64ff5 100644 --- a/fs-storage/Cargo.toml +++ b/fs-storage/Cargo.toml @@ -17,7 +17,7 @@ serde_json = "1.0.82" serde = { version = "1.0.138", features = ["derive"] } jni = { version = "0.21.1", optional = true } jnix = { version = "0.5.1", features = ["derive"], optional = true } - +bincode = "1.3" data-error = { path = "../data-error" } diff --git a/fs-storage/src/folder_storage.rs b/fs-storage/src/folder_storage.rs index 3518695..aa960da 100644 --- a/fs-storage/src/folder_storage.rs +++ b/fs-storage/src/folder_storage.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; use std::fs::{self, File}; -use std::io::Write; +use std::io::{Read, Write}; use std::time::SystemTime; use std::{ collections::BTreeMap, @@ -9,7 +9,7 @@ use std::{ use crate::base_storage::{BaseStorage, SyncStatus}; use crate::monoid::Monoid; -use crate::utils::read_version_2_fs; +// use crate::utils::read_version_2_fs; use data_error::{ArklibError, Result}; /* @@ -23,7 +23,6 @@ Starting from version 3, data is stored in JSON format. For backward compatibility, we provide a helper function `read_version_2_fs` to read version 2 format. */ const STORAGE_VERSION: i32 = 3; -const MAX_ENTRIES_PER_FILE: usize = 1000; /// Represents a file storage system that persists data to disk. pub struct FolderStorage @@ -36,10 +35,12 @@ where path: PathBuf, /// Last modified time of internal mapping. This becomes equal to /// `written_to_disk` only when data is written or read from disk. - modified: SystemTime, + // modified: SystemTime, + ram_timestamps: BTreeMap, /// Last time the data was written to disk. This becomes equal to /// `modified` only when data is written or read from disk. - written_to_disk: SystemTime, + // written_to_disk: SystemTime, + disk_timestamps: BTreeMap, data: FolderStorageData, } @@ -71,7 +72,8 @@ where + Clone + serde::Serialize + serde::de::DeserializeOwned - + std::str::FromStr, + + std::str::FromStr + + std::fmt::Display, V: Clone + serde::Serialize + serde::de::DeserializeOwned @@ -84,12 +86,11 @@ where /// Note: if the file storage already exists, the data will be read from the file /// without overwriting it. pub fn new(label: String, path: &Path) -> Result { - let time = SystemTime::now(); let mut storage = Self { label, path: PathBuf::from(path), - modified: time, - written_to_disk: time, + ram_timestamps: BTreeMap::new(), + disk_timestamps: BTreeMap::new(), data: FolderStorageData { version: STORAGE_VERSION, entries: BTreeMap::new(), @@ -112,71 +113,52 @@ where )); } + if !self.path.is_dir() { + return Err(ArklibError::Storage( + self.label.clone(), + "Path is not a directory".to_owned(), + )); + } + let mut data = FolderStorageData { version: STORAGE_VERSION, entries: BTreeMap::new(), }; - let index_path = self.path.join("index.json"); - if index_path.exists() { - let index_file = File::open(&index_path)?; - let index: BTreeMap = - serde_json::from_reader(index_file)?; - - for (_key, file_index) in index { - let file_path = self - .path - .join(format!("data_{}.json", file_index)); - if file_path.exists() { - // First check if the file starts with "version: 2" - let file_content = - std::fs::read_to_string(file_path.clone())?; - if file_content.starts_with("version: 2") { - // Attempt to parse the file using the legacy version 2 storage format of FolderStorage. - match read_version_2_fs(&file_path) { - Ok(legacy_data) => { - log::info!( - "Version 2 storage format detected for {}", - self.label - ); - data.version = 2; - data.entries.extend(legacy_data); - continue; - } - Err(_) => { - return Err(ArklibError::Storage( - self.label.clone(), - "Storage seems to be version 2, but failed to parse" - .to_owned(), - )); - } - }; - } - - let file = fs::File::open(&file_path)?; - let file_data: FolderStorageData = - serde_json::from_reader(file).map_err(|err| { - ArklibError::Storage( - self.label.clone(), - err.to_string(), - ) - })?; - - if file_data.version != STORAGE_VERSION { - return Err(ArklibError::Storage( + // read_version_2_fs : unimplemented!() + + for entry in fs::read_dir(&self.path)? { + let entry = entry?; + let path = entry.path(); + if path.is_file() + && path.extension().map_or(false, |ext| ext == "bin") + { + let key = path + .file_stem() + .unwrap() + .to_str() + .unwrap() + .parse::() + .map_err(|_| { + ArklibError::Storage( self.label.clone(), - format!( - "Storage version mismatch: expected {}, got {}", - STORAGE_VERSION, file_data.version - ), - )); - } - - data.entries.extend(file_data.entries); - } + "Failed to parse key from filename".to_owned(), + ) + })?; + + let mut file = File::open(&path)?; + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer)?; + + let value: V = bincode::deserialize(&buffer).map_err(|e| { + ArklibError::Storage( + self.label.clone(), + format!("Failed to deserialize value: {}", e), + ) + })?; + data.entries.insert(key, value); } } - Ok(data) } } @@ -187,7 +169,8 @@ where + Clone + serde::Serialize + serde::de::DeserializeOwned - + std::str::FromStr, + + std::str::FromStr + + std::fmt::Display, V: Clone + serde::Serialize + serde::de::DeserializeOwned @@ -196,8 +179,8 @@ where { /// Set a key-value pair in the internal mapping fn set(&mut self, key: K, value: V) { - self.data.entries.insert(key, value); - self.modified = std::time::SystemTime::now(); + self.data.entries.insert(key.clone(), value); + self.ram_timestamps.insert(key, SystemTime::now()); } /// Remove an entry from the internal mapping given a key @@ -205,7 +188,10 @@ where self.data.entries.remove(id).ok_or_else(|| { ArklibError::Storage(self.label.clone(), "Key not found".to_owned()) })?; - self.modified = std::time::SystemTime::now(); + // self.ram_timestamps.remove(id); + // OR + self.ram_timestamps + .insert(id.clone(), SystemTime::now()); Ok(()) } @@ -213,30 +199,7 @@ where /// with the timestamp of the in-memory storage and the last written /// to time to determine if either of the two requires syncing. fn sync_status(&self) -> Result { - let file_updated = fs::metadata(&self.path)?.modified()?; - - // Determine the synchronization status based on the modification times - // Conditions: - // 1. If both the in-memory storage and the storage on disk have been modified - // since the last write, then the storage is diverged. - // 2. If only the in-memory storage has been modified since the last write, - // then the storage on disk is stale. - // 3. If only the storage on disk has been modified since the last write, - // then the in-memory storage is stale. - // 4. If neither the in-memory storage nor the storage on disk has been modified - // since the last write, then the storage is in sync. - let status = match ( - self.modified > self.written_to_disk, - file_updated > self.written_to_disk, - ) { - (true, true) => SyncStatus::Diverge, - (true, false) => SyncStatus::StorageStale, - (false, true) => SyncStatus::MappingStale, - (false, false) => SyncStatus::InSync, - }; - - log::info!("{} sync status is {}", self.label, status); - Ok(status) + unimplemented!() } /// Sync the in-memory storage with the storage on disk @@ -257,10 +220,16 @@ where /// Read the data from file fn read_fs(&mut self) -> Result<&BTreeMap> { let data = self.load_fs_data()?; - self.modified = fs::metadata(&self.path)?.modified()?; - self.written_to_disk = self.modified; self.data = data; - + self.disk_timestamps.clear(); + for key in self.data.entries.keys() { + let file_path = self.path.join(format!("{}.bin", key)); + if let Ok(metadata) = fs::metadata(&file_path) { + if let Ok(modified) = metadata.modified() { + self.disk_timestamps.insert(key.clone(), modified); + } + } + } Ok(&self.data.entries) } @@ -274,66 +243,53 @@ where /// Update the modified timestamp in file metadata to avoid OS timing issues /// https://github.com/ARK-Builders/ark-rust/pull/63#issuecomment-2163882227 fn write_fs(&mut self) -> Result<()> { - let parent_dir = self.path.parent().ok_or_else(|| { - ArklibError::Storage( - self.label.clone(), - "Failed to get parent directory".to_owned(), - ) - })?; - fs::create_dir_all(parent_dir)?; - - let mut current_file_index = 0; - let mut current_file_entries = 0; + fs::create_dir_all(&self.path)?; for (key, value) in &self.data.entries { - if current_file_entries >= MAX_ENTRIES_PER_FILE { - current_file_index += 1; - current_file_entries = 0; - } - - let file_path = self - .path - .join(format!("data_{}.json", current_file_index)); - let mut file_data: BTreeMap = if file_path.exists() { - let file = File::open(&file_path)?; - serde_json::from_reader(file)? - } else { - BTreeMap::new() - }; - - file_data.insert(key.clone(), value.clone()); - current_file_entries += 1; + let file_path = self.path.join(format!("{}.bin", key)); + let encoded: Vec = bincode::serialize(value).map_err(|e| { + ArklibError::Storage( + self.label.clone(), + format!("Failed to serialize value: {}", e), + ) + })?; let mut file = File::create(&file_path)?; - file.write_all( - serde_json::to_string_pretty(&file_data)?.as_bytes(), - )?; + file.write_all(&encoded)?; file.flush()?; let new_timestamp = SystemTime::now(); file.set_modified(new_timestamp)?; file.sync_all()?; + + self.disk_timestamps + .insert(key.clone(), new_timestamp); } - // Write the index file - // index stores K -> key, V -> file index in which key value pair is stored - let index: BTreeMap = self - .data - .entries - .keys() - .enumerate() - .map(|(i, k)| (k.clone(), i / MAX_ENTRIES_PER_FILE)) - .collect(); - let index_path = self.path.join("index.json"); - let mut index_file = File::create(index_path)?; - index_file - .write_all(serde_json::to_string_pretty(&index)?.as_bytes())?; - index_file.flush()?; - index_file.sync_all()?; - - let new_timestamp = SystemTime::now(); - self.modified = new_timestamp; - self.written_to_disk = new_timestamp; + // Remove files for keys that no longer exist + for entry in fs::read_dir(&self.path)? { + let entry = entry?; + let path = entry.path(); + if path.is_file() + && path.extension().map_or(false, |ext| ext == "bin") + { + let key = path + .file_stem() + .unwrap() + .to_str() + .unwrap() + .parse::() + .map_err(|_| { + ArklibError::Storage( + self.label.clone(), + "Failed to parse key from filename".to_owned(), + ) + })?; + if !self.data.entries.contains_key(&key) { + fs::remove_file(path)?; + } + } + } log::info!( "{} {} entries have been written", @@ -363,8 +319,9 @@ where } else { self.set(key.clone(), value.clone()) } + self.ram_timestamps + .insert(key.clone(), SystemTime::now()); } - self.modified = std::time::SystemTime::now(); Ok(()) } }