Skip to content

Commit

Permalink
Merge 6aa9e49 into 56343b4
Browse files Browse the repository at this point in the history
  • Loading branch information
tareknaser authored Sep 9, 2024
2 parents 56343b4 + 6aa9e49 commit dc67cc3
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 2 deletions.
2 changes: 2 additions & 0 deletions ark-cli/src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod list;
mod monitor;
mod render;
pub mod storage;
mod watch;

pub use file::{file_append, file_insert, format_file, format_line};

Expand All @@ -18,6 +19,7 @@ pub enum Commands {
Monitor(monitor::Monitor),
Render(render::Render),
List(list::List),
Watch(watch::Watch),
#[command(about = "Manage links")]
Link {
#[clap(subcommand)]
Expand Down
27 changes: 27 additions & 0 deletions ark-cli/src/commands/watch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use std::path::PathBuf;

use fs_index::watch_index;

use crate::{AppError, ResourceId};

#[derive(Clone, Debug, clap::Args)]
#[clap(
name = "watch",
about = "Watch the ark managed folder for changes and update the index accordingly"
)]
pub struct Watch {
#[clap(
help = "Path to the directory to watch for changes",
default_value = ".",
value_parser
)]
path: PathBuf,
}

impl Watch {
pub async fn run(&self) -> Result<(), AppError> {
watch_index::<_, ResourceId>(&self.path)
.await
.map_err(|err| AppError::IndexError(err.to_string()))
}
}
1 change: 1 addition & 0 deletions ark-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ async fn run() -> Result<()> {
Monitor(monitor) => monitor.run()?,
Render(render) => render.run()?,
List(list) => list.run()?,
Watch(watch) => watch.run().await?,
Link { subcommand } => match subcommand {
Create(create) => create.run().await?,
Load(load) => load.run()?,
Expand Down
6 changes: 6 additions & 0 deletions fs-index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ bench = false

[dependencies]
log = { version = "0.4.17", features = ["release_max_level_off"] }
env_logger = "0.11"
walkdir = "2.3.2"
anyhow = "1.0.58"
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
# For the watch API
notify = "6.1"
futures = "0.3"


fs-storage = { path = "../fs-storage" }
Expand All @@ -28,6 +32,8 @@ criterion = { version = "0.5", features = ["html_reports"] }
tempfile = "3.10"
# Depending on `dev-hash` for testing
dev-hash = { path = "../dev-hash" }
# Examples
tokio = { version = "1.40", features = ["full"] }

[[bench]]
name = "resource_index_benchmark"
Expand Down
35 changes: 35 additions & 0 deletions fs-index/examples/index_watch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use std::{path::Path, thread};

use anyhow::Result;
use log::LevelFilter;

use dev_hash::Blake3;
use fs_index::watch_index;

/// Example demonstrating how to use fs_index to watch a directory for changes
/// in a separate thread. This automatically updates the index when changes are
/// detected.
fn main() -> Result<()> {
env_logger::builder()
.filter_level(LevelFilter::Debug)
.init();

// Change this to the path of the directory you want to watch
let root = Path::new("test-assets");

let thread_handle = thread::spawn(move || {
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async move {
if let Err(err) = watch_index::<_, Blake3>(root).await {
eprintln!("Error in watching index: {:?}", err);
}
});
});

thread_handle
.join()
.expect("Failed to join thread");

Ok(())
}
5 changes: 3 additions & 2 deletions fs-index/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
mod index;
mod serde;
mod utils;

pub use utils::load_or_build_index;
mod watch;

pub use index::ResourceIndex;
pub use utils::load_or_build_index;
pub use watch::watch_index;

#[cfg(test)]
mod tests;
107 changes: 107 additions & 0 deletions fs-index/src/watch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
use std::{fs, path::Path};

use anyhow::Result;
use futures::{
channel::mpsc::{channel, Receiver},
SinkExt, StreamExt,
};
use log::info;
use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};

use data_resource::ResourceId;
use fs_storage::ARK_FOLDER;

use crate::ResourceIndex;

/// Watches a given directory for file system changes and automatically updates
/// the resource index.
///
/// This function continuously monitors the specified directory and responds to
/// file system events such as file creation, modification, and deletion. When
/// an event is detected, the function updates the associated resource index and
/// stores the changes.
///
/// The function runs asynchronously, whcih makes it suitable for non-blocking
/// contexts. It uses a recursive watcher to track all changes within the
/// directory tree. Events related to the internal `.ark` folder are ignored to
/// prevent unnecessary updates.
///
/// # Arguments
///
/// * `root_path` - The root directory to be watched. This path is canonicalized
/// to handle symbolic links and relative paths correctly.
pub async fn watch_index<P: AsRef<Path>, Id: ResourceId>(
root_path: P,
) -> Result<()> {
log::debug!(
"Attempting to watch index at root path: {:?}",
root_path.as_ref()
);

let root_path = fs::canonicalize(root_path.as_ref())?;
let mut index: ResourceIndex<Id> = ResourceIndex::build(&root_path)?;
index.store()?;

let (mut watcher, mut rx) = async_watcher()?;
info!("Watching directory: {:?}", root_path);
let config = Config::default();
watcher.configure(config)?;
watcher.watch(root_path.as_ref(), RecursiveMode::Recursive)?;
info!("Started watcher with config: \n\t{:?}", config);

let ark_folder = root_path.join(ARK_FOLDER);
while let Some(res) = rx.next().await {
match res {
Ok(event) => {
// If the event is a change in .ark folder, ignore it
if event
.paths
.iter()
.any(|p| p.starts_with(&ark_folder))
{
continue;
}
// If the event is not a create, modify or remove, ignore it
if !(event.kind.is_create()
|| event.kind.is_modify()
|| event.kind.is_remove())
{
continue;
}

info!("Detected event: {:?}", event);
let file = event
.paths
.first()
.expect("Failed to get file path from event");
log::debug!("Updating index for file: {:?}", file);
let relative_path = file.strip_prefix(&root_path)?;
index.update_one(relative_path)?;

index.store()?;
info!("Index updated and stored");
}
Err(e) => log::error!("Error in watcher: {:?}", e),
}
}

unreachable!("Watcher stream ended unexpectedly");
}

fn async_watcher(
) -> notify::Result<(RecommendedWatcher, Receiver<notify::Result<Event>>)> {
let (mut tx, rx) = channel(1);

let watcher = RecommendedWatcher::new(
move |res| {
futures::executor::block_on(async {
if let Err(err) = tx.send(res).await {
log::error!("Error sending event: {:?}", err);
}
})
},
Config::default(),
)?;

Ok((watcher, rx))
}

0 comments on commit dc67cc3

Please sign in to comment.