Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: pushkarm029 <pushkarmishra029@gmail.com>
  • Loading branch information
Pushkarm029 committed Jul 23, 2024
1 parent 181428a commit afa0b37
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 148 deletions.
2 changes: 1 addition & 1 deletion fs-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ serde_json = "1.0.82"
serde = { version = "1.0.138", features = ["derive"] }
jni = { version = "0.21.1", optional = true }
jnix = { version = "0.5.1", features = ["derive"], optional = true }

bincode = "1.3"
data-error = { path = "../data-error" }


Expand Down
251 changes: 104 additions & 147 deletions fs-storage/src/folder_storage.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
use std::fs::{self, File};
use std::io::Write;
use std::io::{Read, Write};
use std::time::SystemTime;
use std::{
collections::BTreeMap,
Expand All @@ -9,7 +9,7 @@ use std::{

use crate::base_storage::{BaseStorage, SyncStatus};
use crate::monoid::Monoid;
use crate::utils::read_version_2_fs;
// use crate::utils::read_version_2_fs;
use data_error::{ArklibError, Result};

/*
Expand All @@ -23,7 +23,6 @@ Starting from version 3, data is stored in JSON format.
For backward compatibility, we provide a helper function `read_version_2_fs` to read version 2 format.
*/
const STORAGE_VERSION: i32 = 3;
const MAX_ENTRIES_PER_FILE: usize = 1000;

/// Represents a file storage system that persists data to disk.
pub struct FolderStorage<K, V>
Expand All @@ -36,10 +35,12 @@ where
path: PathBuf,
/// Last modified time of internal mapping. This becomes equal to
/// `written_to_disk` only when data is written or read from disk.
modified: SystemTime,
// modified: SystemTime,
ram_timestamps: BTreeMap<K, SystemTime>,
/// Last time the data was written to disk. This becomes equal to
/// `modified` only when data is written or read from disk.
written_to_disk: SystemTime,
// written_to_disk: SystemTime,
disk_timestamps: BTreeMap<K, SystemTime>,
data: FolderStorageData<K, V>,
}

Expand Down Expand Up @@ -71,7 +72,8 @@ where
+ Clone
+ serde::Serialize
+ serde::de::DeserializeOwned
+ std::str::FromStr,
+ std::str::FromStr
+ std::fmt::Display,
V: Clone
+ serde::Serialize
+ serde::de::DeserializeOwned
Expand All @@ -84,12 +86,11 @@ where
/// Note: if the file storage already exists, the data will be read from the file
/// without overwriting it.
pub fn new(label: String, path: &Path) -> Result<Self> {
let time = SystemTime::now();
let mut storage = Self {
label,
path: PathBuf::from(path),
modified: time,
written_to_disk: time,
ram_timestamps: BTreeMap::new(),
disk_timestamps: BTreeMap::new(),
data: FolderStorageData {
version: STORAGE_VERSION,
entries: BTreeMap::new(),
Expand All @@ -112,71 +113,52 @@ where
));
}

if !self.path.is_dir() {
return Err(ArklibError::Storage(
self.label.clone(),
"Path is not a directory".to_owned(),
));
}

let mut data = FolderStorageData {
version: STORAGE_VERSION,
entries: BTreeMap::new(),
};

let index_path = self.path.join("index.json");
if index_path.exists() {
let index_file = File::open(&index_path)?;
let index: BTreeMap<K, usize> =
serde_json::from_reader(index_file)?;

for (_key, file_index) in index {
let file_path = self
.path
.join(format!("data_{}.json", file_index));
if file_path.exists() {
// First check if the file starts with "version: 2"
let file_content =
std::fs::read_to_string(file_path.clone())?;
if file_content.starts_with("version: 2") {
// Attempt to parse the file using the legacy version 2 storage format of FolderStorage.
match read_version_2_fs(&file_path) {
Ok(legacy_data) => {
log::info!(
"Version 2 storage format detected for {}",
self.label
);
data.version = 2;
data.entries.extend(legacy_data);
continue;
}
Err(_) => {
return Err(ArklibError::Storage(
self.label.clone(),
"Storage seems to be version 2, but failed to parse"
.to_owned(),
));
}
};
}

let file = fs::File::open(&file_path)?;
let file_data: FolderStorageData<K, V> =
serde_json::from_reader(file).map_err(|err| {
ArklibError::Storage(
self.label.clone(),
err.to_string(),
)
})?;

if file_data.version != STORAGE_VERSION {
return Err(ArklibError::Storage(
// read_version_2_fs : unimplemented!()

for entry in fs::read_dir(&self.path)? {
let entry = entry?;
let path = entry.path();
if path.is_file()
&& path.extension().map_or(false, |ext| ext == "bin")
{
let key = path
.file_stem()
.unwrap()
.to_str()
.unwrap()
.parse::<K>()
.map_err(|_| {
ArklibError::Storage(
self.label.clone(),
format!(
"Storage version mismatch: expected {}, got {}",
STORAGE_VERSION, file_data.version
),
));
}

data.entries.extend(file_data.entries);
}
"Failed to parse key from filename".to_owned(),
)
})?;

let mut file = File::open(&path)?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer)?;

let value: V = bincode::deserialize(&buffer).map_err(|e| {
ArklibError::Storage(
self.label.clone(),
format!("Failed to deserialize value: {}", e),
)
})?;
data.entries.insert(key, value);
}
}

Ok(data)
}
}
Expand All @@ -187,7 +169,8 @@ where
+ Clone
+ serde::Serialize
+ serde::de::DeserializeOwned
+ std::str::FromStr,
+ std::str::FromStr
+ std::fmt::Display,
V: Clone
+ serde::Serialize
+ serde::de::DeserializeOwned
Expand All @@ -196,47 +179,27 @@ where
{
/// Set a key-value pair in the internal mapping
fn set(&mut self, key: K, value: V) {
self.data.entries.insert(key, value);
self.modified = std::time::SystemTime::now();
self.data.entries.insert(key.clone(), value);
self.ram_timestamps.insert(key, SystemTime::now());
}

/// Remove an entry from the internal mapping given a key
fn remove(&mut self, id: &K) -> Result<()> {
self.data.entries.remove(id).ok_or_else(|| {
ArklibError::Storage(self.label.clone(), "Key not found".to_owned())
})?;
self.modified = std::time::SystemTime::now();
// self.ram_timestamps.remove(id);
// OR
self.ram_timestamps
.insert(id.clone(), SystemTime::now());
Ok(())
}

/// Compare the timestamp of the storage file
/// with the timestamp of the in-memory storage and the last written
/// to time to determine if either of the two requires syncing.
fn sync_status(&self) -> Result<SyncStatus> {
let file_updated = fs::metadata(&self.path)?.modified()?;

// Determine the synchronization status based on the modification times
// Conditions:
// 1. If both the in-memory storage and the storage on disk have been modified
// since the last write, then the storage is diverged.
// 2. If only the in-memory storage has been modified since the last write,
// then the storage on disk is stale.
// 3. If only the storage on disk has been modified since the last write,
// then the in-memory storage is stale.
// 4. If neither the in-memory storage nor the storage on disk has been modified
// since the last write, then the storage is in sync.
let status = match (
self.modified > self.written_to_disk,
file_updated > self.written_to_disk,
) {
(true, true) => SyncStatus::Diverge,
(true, false) => SyncStatus::StorageStale,
(false, true) => SyncStatus::MappingStale,
(false, false) => SyncStatus::InSync,
};

log::info!("{} sync status is {}", self.label, status);
Ok(status)
unimplemented!()
}

/// Sync the in-memory storage with the storage on disk
Expand All @@ -257,10 +220,16 @@ where
/// Read the data from file
fn read_fs(&mut self) -> Result<&BTreeMap<K, V>> {
let data = self.load_fs_data()?;
self.modified = fs::metadata(&self.path)?.modified()?;
self.written_to_disk = self.modified;
self.data = data;

self.disk_timestamps.clear();
for key in self.data.entries.keys() {
let file_path = self.path.join(format!("{}.bin", key));
if let Ok(metadata) = fs::metadata(&file_path) {
if let Ok(modified) = metadata.modified() {
self.disk_timestamps.insert(key.clone(), modified);
}
}
}
Ok(&self.data.entries)
}

Expand All @@ -274,66 +243,53 @@ where
/// Update the modified timestamp in file metadata to avoid OS timing issues
/// https://github.com/ARK-Builders/ark-rust/pull/63#issuecomment-2163882227
fn write_fs(&mut self) -> Result<()> {
let parent_dir = self.path.parent().ok_or_else(|| {
ArklibError::Storage(
self.label.clone(),
"Failed to get parent directory".to_owned(),
)
})?;
fs::create_dir_all(parent_dir)?;

let mut current_file_index = 0;
let mut current_file_entries = 0;
fs::create_dir_all(&self.path)?;

for (key, value) in &self.data.entries {
if current_file_entries >= MAX_ENTRIES_PER_FILE {
current_file_index += 1;
current_file_entries = 0;
}

let file_path = self
.path
.join(format!("data_{}.json", current_file_index));
let mut file_data: BTreeMap<K, V> = if file_path.exists() {
let file = File::open(&file_path)?;
serde_json::from_reader(file)?
} else {
BTreeMap::new()
};

file_data.insert(key.clone(), value.clone());
current_file_entries += 1;
let file_path = self.path.join(format!("{}.bin", key));
let encoded: Vec<u8> = bincode::serialize(value).map_err(|e| {
ArklibError::Storage(
self.label.clone(),
format!("Failed to serialize value: {}", e),
)
})?;

let mut file = File::create(&file_path)?;
file.write_all(
serde_json::to_string_pretty(&file_data)?.as_bytes(),
)?;
file.write_all(&encoded)?;
file.flush()?;

let new_timestamp = SystemTime::now();
file.set_modified(new_timestamp)?;
file.sync_all()?;

self.disk_timestamps
.insert(key.clone(), new_timestamp);
}

// Write the index file
// index stores K -> key, V -> file index in which key value pair is stored
let index: BTreeMap<K, usize> = self
.data
.entries
.keys()
.enumerate()
.map(|(i, k)| (k.clone(), i / MAX_ENTRIES_PER_FILE))
.collect();
let index_path = self.path.join("index.json");
let mut index_file = File::create(index_path)?;
index_file
.write_all(serde_json::to_string_pretty(&index)?.as_bytes())?;
index_file.flush()?;
index_file.sync_all()?;

let new_timestamp = SystemTime::now();
self.modified = new_timestamp;
self.written_to_disk = new_timestamp;
// Remove files for keys that no longer exist
for entry in fs::read_dir(&self.path)? {
let entry = entry?;
let path = entry.path();
if path.is_file()
&& path.extension().map_or(false, |ext| ext == "bin")
{
let key = path
.file_stem()
.unwrap()
.to_str()
.unwrap()
.parse::<K>()
.map_err(|_| {
ArklibError::Storage(
self.label.clone(),
"Failed to parse key from filename".to_owned(),
)
})?;
if !self.data.entries.contains_key(&key) {
fs::remove_file(path)?;
}
}
}

log::info!(
"{} {} entries have been written",
Expand Down Expand Up @@ -363,8 +319,9 @@ where
} else {
self.set(key.clone(), value.clone())
}
self.ram_timestamps
.insert(key.clone(), SystemTime::now());
}
self.modified = std::time::SystemTime::now();
Ok(())
}
}
Expand Down

0 comments on commit afa0b37

Please sign in to comment.