Skip to content

Commit

Permalink
feat: Add lock_repo command
Browse files Browse the repository at this point in the history
  • Loading branch information
aawsome committed Oct 5, 2024
1 parent e84373e commit 6dd51dd
Show file tree
Hide file tree
Showing 11 changed files with 286 additions and 6 deletions.
31 changes: 30 additions & 1 deletion crates/core/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub(crate) mod dry_run;
pub(crate) mod hotcold;
pub(crate) mod ignore;
pub(crate) mod local_destination;
pub(crate) mod lock;
pub(crate) mod node;
pub(crate) mod stdin;
pub(crate) mod warm_up;
Expand All @@ -14,8 +15,9 @@ use std::{io::Read, ops::Deref, path::PathBuf, sync::Arc};

use anyhow::Result;
use bytes::Bytes;
use chrono::{DateTime, Local};
use enum_map::Enum;
use log::trace;
use log::{debug, trace};

#[cfg(test)]
use mockall::mock;
Expand Down Expand Up @@ -337,6 +339,27 @@ pub trait WriteBackend: ReadBackend {
///
/// The result of the removal.
fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()>;

/// Specify if the backend is able to lock files
fn can_lock(&self) -> bool {
false

Check warning on line 345 in crates/core/src/backend.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/backend.rs#L344-L345

Added lines #L344 - L345 were not covered by tests
}

/// Lock the given file.
///
/// # Arguments
///
/// * `tpe` - The type of the file.
/// * `id` - The id of the file.
/// * `until` - The date until when to lock. May be `None` which usually specifies a unlimited lock
///
/// # Errors
///
/// If the file could not be read.
fn lock(&self, tpe: FileType, id: &Id, until: Option<DateTime<Local>>) -> Result<()> {
debug!("no locking implemented. {tpe:?}, {id}, {until:?}");
Ok(())

Check warning on line 361 in crates/core/src/backend.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/backend.rs#L359-L361

Added lines #L359 - L361 were not covered by tests
}
}

#[cfg(test)]
Expand Down Expand Up @@ -374,6 +397,12 @@ impl WriteBackend for Arc<dyn WriteBackend> {
fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()> {
self.deref().remove(tpe, id, cacheable)
}
fn can_lock(&self) -> bool {
self.deref().can_lock()

Check warning on line 401 in crates/core/src/backend.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/backend.rs#L401

Added line #L401 was not covered by tests
}
fn lock(&self, tpe: FileType, id: &Id, until: Option<DateTime<Local>>) -> Result<()> {
self.deref().lock(tpe, id, until)

Check warning on line 404 in crates/core/src/backend.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/backend.rs#L404

Added line #L404 was not covered by tests
}
}

impl ReadBackend for Arc<dyn WriteBackend> {
Expand Down
9 changes: 9 additions & 0 deletions crates/core/src/backend/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{

use anyhow::Result;
use bytes::Bytes;
use chrono::{DateTime, Local};
use dirs::cache_dir;
use log::{trace, warn};
use walkdir::WalkDir;
Expand Down Expand Up @@ -210,6 +211,14 @@ impl WriteBackend for CachedBackend {
}
self.be.remove(tpe, id, cacheable)
}

fn can_lock(&self) -> bool {
self.be.can_lock()
}

fn lock(&self, tpe: FileType, id: &Id, until: Option<DateTime<Local>>) -> Result<()> {
self.be.lock(tpe, id, until)
}
}

/// Backend that caches data in a directory.
Expand Down
9 changes: 9 additions & 0 deletions crates/core/src/backend/decrypt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{num::NonZeroU32, sync::Arc};

use anyhow::Result;
use bytes::Bytes;
use chrono::{DateTime, Local};
use crossbeam_channel::{unbounded, Receiver};
use rayon::prelude::*;
use zstd::stream::{copy_encode, decode_all, encode_all};
Expand Down Expand Up @@ -578,6 +579,14 @@ impl<C: CryptoKey> WriteBackend for DecryptBackend<C> {
fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()> {
self.be.remove(tpe, id, cacheable)
}

fn can_lock(&self) -> bool {
self.be.can_lock()

Check warning on line 584 in crates/core/src/backend/decrypt.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/backend/decrypt.rs#L583-L584

Added lines #L583 - L584 were not covered by tests
}

fn lock(&self, tpe: FileType, id: &Id, until: Option<DateTime<Local>>) -> Result<()> {
self.be.lock(tpe, id, until)

Check warning on line 588 in crates/core/src/backend/decrypt.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/backend/decrypt.rs#L587-L588

Added lines #L587 - L588 were not covered by tests
}
}

#[cfg(test)]
Expand Down
9 changes: 9 additions & 0 deletions crates/core/src/backend/dry_run.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::Result;
use bytes::Bytes;
use chrono::{DateTime, Local};
use zstd::decode_all;

use crate::{
Expand Down Expand Up @@ -156,4 +157,12 @@ impl<BE: DecryptFullBackend> WriteBackend for DryRunBackend<BE> {
self.be.remove(tpe, id, cacheable)
}
}

fn can_lock(&self) -> bool {
self.be.can_lock()

Check warning on line 162 in crates/core/src/backend/dry_run.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/backend/dry_run.rs#L161-L162

Added lines #L161 - L162 were not covered by tests
}

fn lock(&self, tpe: FileType, id: &Id, until: Option<DateTime<Local>>) -> Result<()> {
self.be.lock(tpe, id, until)

Check warning on line 166 in crates/core/src/backend/dry_run.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/backend/dry_run.rs#L165-L166

Added lines #L165 - L166 were not covered by tests
}
}
9 changes: 9 additions & 0 deletions crates/core/src/backend/hotcold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

use anyhow::Result;
use bytes::Bytes;
use chrono::{DateTime, Local};

use crate::{
backend::{FileType, ReadBackend, WriteBackend},
Expand Down Expand Up @@ -98,4 +99,12 @@ impl WriteBackend for HotColdBackend {
}
Ok(())
}

fn can_lock(&self) -> bool {
self.be.can_lock()
}

fn lock(&self, tpe: FileType, id: &Id, until: Option<DateTime<Local>>) -> Result<()> {
self.be.lock(tpe, id, until)
}
}
113 changes: 113 additions & 0 deletions crates/core/src/backend/lock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use std::{process::Command, sync::Arc};

use anyhow::Result;
use bytes::Bytes;
use chrono::{DateTime, Local};
use log::{debug, warn};

use crate::{
backend::{FileType, ReadBackend, WriteBackend},
id::Id,
CommandInput,
};

/// A backend which warms up files by simply accessing them.
#[derive(Clone, Debug)]
pub struct LockBackend {
/// The backend to use.
be: Arc<dyn WriteBackend>,
/// The command to be called to lock files in the backend
command: CommandInput,
}

impl LockBackend {
/// Creates a new `WarmUpAccessBackend`.
///
/// # Arguments
///
/// * `be` - The backend to use.
pub fn new_lock(be: Arc<dyn WriteBackend>, command: CommandInput) -> Arc<dyn WriteBackend> {

Check warning on line 29 in crates/core/src/backend/lock.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/backend/lock.rs#L29

Added line #L29 was not covered by tests
Arc::new(Self { be, command })
}
}

impl ReadBackend for LockBackend {
fn location(&self) -> String {

Check warning on line 35 in crates/core/src/backend/lock.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/backend/lock.rs#L35

Added line #L35 was not covered by tests
self.be.location()
}

fn list_with_size(&self, tpe: FileType) -> Result<Vec<(Id, u32)>> {

Check warning on line 39 in crates/core/src/backend/lock.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/backend/lock.rs#L39

Added line #L39 was not covered by tests
self.be.list_with_size(tpe)
}

fn read_full(&self, tpe: FileType, id: &Id) -> Result<Bytes> {

Check warning on line 43 in crates/core/src/backend/lock.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/backend/lock.rs#L43

Added line #L43 was not covered by tests
self.be.read_full(tpe, id)
}

fn read_partial(

Check warning on line 47 in crates/core/src/backend/lock.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/backend/lock.rs#L47

Added line #L47 was not covered by tests
&self,
tpe: FileType,
id: &Id,
cacheable: bool,
offset: u32,
length: u32,
) -> Result<Bytes> {
self.be.read_partial(tpe, id, cacheable, offset, length)
}

fn list(&self, tpe: FileType) -> Result<Vec<Id>> {

Check warning on line 58 in crates/core/src/backend/lock.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/backend/lock.rs#L58

Added line #L58 was not covered by tests
self.be.list(tpe)
}

fn needs_warm_up(&self) -> bool {
self.be.needs_warm_up()
}

fn warm_up(&self, tpe: FileType, id: &Id) -> Result<()> {
self.be.warm_up(tpe, id)
}
}

fn path(tpe: FileType, id: &Id) -> String {

Check warning on line 71 in crates/core/src/backend/lock.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/backend/lock.rs#L71

Added line #L71 was not covered by tests
let hex_id = id.to_hex();
match tpe {

Check warning on line 73 in crates/core/src/backend/lock.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/backend/lock.rs#L73

Added line #L73 was not covered by tests
FileType::Config => "config".into(),
FileType::Pack => format!("data/{}/{}", &hex_id[0..2], &*hex_id),
_ => format!("{}/{}", tpe.dirname(), &*hex_id),

Check warning on line 76 in crates/core/src/backend/lock.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/backend/lock.rs#L75-L76

Added lines #L75 - L76 were not covered by tests
}
}

impl WriteBackend for LockBackend {
fn create(&self) -> Result<()> {
self.be.create()
}

fn write_bytes(&self, tpe: FileType, id: &Id, cacheable: bool, buf: Bytes) -> Result<()> {

Check warning on line 85 in crates/core/src/backend/lock.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/backend/lock.rs#L85

Added line #L85 was not covered by tests
self.be.write_bytes(tpe, id, cacheable, buf)
}

fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()> {
self.be.remove(tpe, id, cacheable)
}

fn can_lock(&self) -> bool {
true
}

fn lock(&self, tpe: FileType, id: &Id, until: Option<DateTime<Local>>) -> Result<()> {
let until = until.map_or_else(String::new, |u| u.to_rfc3339());
let path = path(tpe, id);
let args = self.command.args().iter().map(|c| {
c.replace("%id", &id.to_hex())
.replace("%type", tpe.dirname())
.replace("%path", &path)
.replace("%until", &until)

Check warning on line 104 in crates/core/src/backend/lock.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/backend/lock.rs#L97-L104

Added lines #L97 - L104 were not covered by tests
});
debug!("calling {:?}...", self.command);
let status = Command::new(self.command.command()).args(args).status()?;
if !status.success() {
warn!("lock command was not successful for {tpe:?}, id: {id}. {status}");

Check warning on line 109 in crates/core/src/backend/lock.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/backend/lock.rs#L106-L109

Added lines #L106 - L109 were not covered by tests
}
Ok(())
}
}
1 change: 1 addition & 0 deletions crates/core/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub mod dump;
pub mod forget;
pub mod init;
pub mod key;
pub mod lock;
pub mod merge;
pub mod prune;
/// The `repair` command.
Expand Down
75 changes: 75 additions & 0 deletions crates/core/src/commands/lock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
//! `lock` subcommand

use chrono::{DateTime, Local};
use log::error;
use rayon::ThreadPoolBuilder;

use crate::{
error::{CommandErrorKind, RepositoryErrorKind, RusticResult},
progress::{Progress, ProgressBars},
repofile::{configfile::ConfigId, IndexId, KeyId, PackId, RepoId, SnapshotId},
repository::Repository,
WriteBackend,
};

pub(super) mod constants {
/// The maximum number of reader threads to use for locking.
pub(super) const MAX_LOCKER_THREADS_NUM: usize = 20;
}

pub fn lock_repo<P: ProgressBars, S>(
repo: &Repository<P, S>,
until: Option<DateTime<Local>>,
) -> RusticResult<()> {
lock_all_files::<P, S, ConfigId>(repo, until)?;
lock_all_files::<P, S, KeyId>(repo, until)?;
lock_all_files::<P, S, SnapshotId>(repo, until)?;
lock_all_files::<P, S, IndexId>(repo, until)?;
lock_all_files::<P, S, PackId>(repo, until)?;
Ok(())

Check warning on line 29 in crates/core/src/commands/lock.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/lock.rs#L24-L29

Added lines #L24 - L29 were not covered by tests
}

pub fn lock_all_files<P: ProgressBars, S, ID: RepoId + std::fmt::Debug>(
repo: &Repository<P, S>,
until: Option<DateTime<Local>>,
) -> RusticResult<()> {
if !repo.be.can_lock() {
return Err(CommandErrorKind::NoLockingConfigured.into());

Check warning on line 37 in crates/core/src/commands/lock.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/lock.rs#L36-L37

Added lines #L36 - L37 were not covered by tests
}

let p = &repo
.pb
.progress_spinner(format!("listing {:?} files..", ID::TYPE));
let ids: Vec<ID> = repo.list()?.collect();
p.finish();
lock_files(repo, &ids, until)

Check warning on line 45 in crates/core/src/commands/lock.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/lock.rs#L40-L45

Added lines #L40 - L45 were not covered by tests
}

fn lock_files<P: ProgressBars, S, ID: RepoId + std::fmt::Debug>(
repo: &Repository<P, S>,
ids: &[ID],
until: Option<DateTime<Local>>,
) -> RusticResult<()> {
let pool = ThreadPoolBuilder::new()
.num_threads(constants::MAX_LOCKER_THREADS_NUM)

Check warning on line 54 in crates/core/src/commands/lock.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/lock.rs#L53-L54

Added lines #L53 - L54 were not covered by tests
.build()
.map_err(RepositoryErrorKind::FromThreadPoolbilderError)?;
let p = &repo
.pb
.progress_counter(format!("locking {:?} files..", ID::TYPE));
p.set_length(ids.len().try_into().unwrap());
let backend = &repo.be;
pool.in_place_scope(|scope| {
for id in ids {
scope.spawn(move |_| {
if let Err(e) = backend.lock(ID::TYPE, id, until) {

Check warning on line 65 in crates/core/src/commands/lock.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/lock.rs#L56-L65

Added lines #L56 - L65 were not covered by tests
// FIXME: Use error handling
error!("lock failed for {:?} {id:?}. {e}", ID::TYPE);

Check warning on line 67 in crates/core/src/commands/lock.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/lock.rs#L67

Added line #L67 was not covered by tests
};
p.inc(1);

Check warning on line 69 in crates/core/src/commands/lock.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/lock.rs#L69

Added line #L69 was not covered by tests
});
}
});
p.finish();
Ok(())

Check warning on line 74 in crates/core/src/commands/lock.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/lock.rs#L73-L74

Added lines #L73 - L74 were not covered by tests
}
2 changes: 2 additions & 0 deletions crates/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ pub enum CommandErrorKind {
NoKeepOption,
/// {0:?}
FromParseError(#[from] shell_words::ParseError),
/// No locking capability configured for the backend
NoLockingConfigured,
}

/// [`CryptoErrorKind`] describes the errors that can happen while dealing with Cryptographic functions
Expand Down
Loading

0 comments on commit 6dd51dd

Please sign in to comment.