Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add watch_index method and ark-cli watch command #36

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions ark-cli/USAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,16 @@ $ /tmp/ark-cli list -t --filter=search
22-207093268 search,engine
```

### Watch a Directory for Changes

You can watch a directory for changes and automatically update the index by running the following command:

```sh
ark-cli watch [PATH]
```

If you don't provide a path, the current directory (`.`) will be used by default. This command continuously monitors the specified directory for file changes (create, modify, or remove) and updates the index accordingly. It's useful for keeping your index in sync with the latest changes in the folder.

## :zap: Low-level utilities :zap:

There are commands which could be useful with time, when you grasp the basic concepts. Some of these commands also can be useful for debugging [ArkLib](https://github.com/ARK-Builders/ark-rust).
Expand Down
2 changes: 2 additions & 0 deletions ark-cli/src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod list;
mod monitor;
mod render;
pub mod storage;
mod watch;

pub use file::{file_append, file_insert, format_file, format_line};

Expand All @@ -18,6 +19,7 @@ pub enum Commands {
Monitor(monitor::Monitor),
Render(render::Render),
List(list::List),
Watch(watch::Watch),
#[command(about = "Manage links")]
Link {
#[clap(subcommand)]
Expand Down
27 changes: 27 additions & 0 deletions ark-cli/src/commands/watch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use std::path::PathBuf;

use fs_index::watch_index;

use crate::{AppError, ResourceId};

#[derive(Clone, Debug, clap::Args)]
#[clap(
name = "watch",
about = "Watch the ark managed folder for changes and update the index accordingly"
)]
pub struct Watch {
#[clap(
help = "Path to the directory to watch for changes",
default_value = ".",
value_parser
)]
path: PathBuf,
}

impl Watch {
pub async fn run(&self) -> Result<(), AppError> {
watch_index::<_, ResourceId>(&self.path)
.await
.map_err(|err| AppError::IndexError(err.to_string()))
}
}
1 change: 1 addition & 0 deletions ark-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ async fn run() -> Result<()> {
Monitor(monitor) => monitor.run()?,
Render(render) => render.run()?,
List(list) => list.run()?,
Watch(watch) => watch.run().await?,
Link { subcommand } => match subcommand {
Create(create) => create.run().await?,
Load(load) => load.run()?,
Expand Down
6 changes: 6 additions & 0 deletions fs-index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ bench = false

[dependencies]
log = { version = "0.4.17", features = ["release_max_level_off"] }
env_logger = "0.11"
walkdir = "2.3.2"
anyhow = "1.0.58"
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
# For the watch API
notify = "6.1"
futures = "0.3"
kirillt marked this conversation as resolved.
Show resolved Hide resolved


fs-storage = { path = "../fs-storage" }
Expand All @@ -28,6 +32,8 @@ criterion = { version = "0.5", features = ["html_reports"] }
tempfile = "3.10"
# Depending on `dev-hash` for testing
dev-hash = { path = "../dev-hash" }
# Examples
tokio = { version = "1.40", features = ["full"] }

[[bench]]
name = "resource_index_benchmark"
Expand Down
2 changes: 2 additions & 0 deletions fs-index/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ The most important struct in this crate is `ResourceIndex` which comes with:
- `get_resource_by_path`: Query a resource from the index by its path.
- **Selective API**
- `update_one`: Method to manually update a specific resource by selectively rescanning a single file.
- **Watch API**
- `watch`: Method to watch a directory for changes and update the index accordingly.

## Custom Serialization

Expand Down
35 changes: 35 additions & 0 deletions fs-index/examples/index_watch.rs
kirillt marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use std::{path::Path, thread};

use anyhow::Result;
use log::LevelFilter;

use dev_hash::Blake3;
use fs_index::watch_index;

/// Example demonstrating how to use fs_index to watch a directory for changes
/// in a separate thread. This automatically updates the index when changes are
/// detected.
fn main() -> Result<()> {
env_logger::builder()
.filter_level(LevelFilter::Debug)
.init();

// Change this to the path of the directory you want to watch
let root = Path::new("test-assets");

let thread_handle = thread::spawn(move || {
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async move {
if let Err(err) = watch_index::<_, Blake3>(root).await {
eprintln!("Error in watching index: {:?}", err);
}
});
});

thread_handle
.join()
.expect("Failed to join thread");

Ok(())
}
5 changes: 3 additions & 2 deletions fs-index/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
mod index;
mod serde;
mod utils;

pub use utils::load_or_build_index;
mod watch;

pub use index::ResourceIndex;
pub use utils::load_or_build_index;
pub use watch::watch_index;

#[cfg(test)]
mod tests;
132 changes: 132 additions & 0 deletions fs-index/src/watch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use std::{fs, path::Path};

use anyhow::Result;
use futures::{
channel::mpsc::{channel, Receiver},
SinkExt, StreamExt,
};
use log::info;
use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};

use data_resource::ResourceId;
use fs_storage::ARK_FOLDER;

use crate::ResourceIndex;

/// Watches a given directory for file system changes and automatically updates
/// the resource index.
///
/// This function continuously monitors the specified directory and responds to
/// file system events such as file creation, modification, and deletion. When
/// an event is detected, the function updates the associated resource index and
/// stores the changes.
///
/// The function runs asynchronously, whcih makes it suitable for non-blocking
/// contexts. It uses a recursive watcher to track all changes within the
/// directory tree. Events related to the internal `.ark` folder are ignored to
/// prevent unnecessary updates.
///
/// # Arguments
///
/// * `root_path` - The root directory to be watched. This path is canonicalized
/// to handle symbolic links and relative paths correctly.
pub async fn watch_index<P: AsRef<Path>, Id: ResourceId>(
root_path: P,
) -> Result<()> {
kirillt marked this conversation as resolved.
Show resolved Hide resolved
log::debug!(
"Attempting to watch index at root path: {:?}",
root_path.as_ref()
);

let root_path = fs::canonicalize(root_path.as_ref())?;
let mut index: ResourceIndex<Id> = ResourceIndex::build(&root_path)?;
index.store()?;

let (mut watcher, mut rx) = async_watcher()?;
info!("Watching directory: {:?}", root_path);
let config = Config::default();
watcher.configure(config)?;
watcher.watch(root_path.as_ref(), RecursiveMode::Recursive)?;
info!("Started watcher with config: \n\t{:?}", config);

let ark_folder = root_path.join(ARK_FOLDER);
while let Some(res) = rx.next().await {
kirillt marked this conversation as resolved.
Show resolved Hide resolved
match res {
Ok(event) => {
// If the event is a change in .ark folder, ignore it
if event
.paths
.iter()
.any(|p| p.starts_with(&ark_folder))
{
continue;
}
tareknaser marked this conversation as resolved.
Show resolved Hide resolved
// We only care for:
// - file modifications
// - file renames
// - file creations
// - file deletions
match event.kind {
notify::EventKind::Modify(
notify::event::ModifyKind::Data(_),
)
| notify::EventKind::Modify(
notify::event::ModifyKind::Name(_),
)
| notify::EventKind::Create(
notify::event::CreateKind::File,
)
| notify::EventKind::Remove(
notify::event::RemoveKind::File,
) => {}
_ => continue,
}

info!("Detected event: {:?}", event);
let file = event
.paths
.first()
.expect("Failed to get file path from event");
log::debug!("Updating index for file: {:?}", file);

log::info!(
"\n Current resource index: {}",
index
.resources()
.iter()
.map(|x| x.path().to_str().unwrap().to_string())
.collect::<Vec<String>>()
.join("\n\t")
);

let relative_path = file.strip_prefix(&root_path)?;
log::info!("Relative path: {:?}", relative_path);
index.update_one(relative_path)?;

index.store()?;
info!("Index updated and stored");
}
Err(e) => log::error!("Error in watcher: {:?}", e),
}
}

unreachable!("Watcher stream ended unexpectedly");
}

fn async_watcher(
) -> notify::Result<(RecommendedWatcher, Receiver<notify::Result<Event>>)> {
let (mut tx, rx) = channel(1);

let watcher = RecommendedWatcher::new(
move |res| {
futures::executor::block_on(async {
if let Err(err) = tx.send(res).await {
log::error!("Error sending event: {:?}", err);
}
})
},
Config::default(),
)?;

Ok((watcher, rx))
}
Loading