Skip to content

Commit

Permalink
DB vacuum (#1552)
Browse files Browse the repository at this point in the history
  • Loading branch information
mfranciszkiewicz authored Aug 4, 2021
1 parent d4295be commit a856ada
Show file tree
Hide file tree
Showing 8 changed files with 240 additions and 3 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ ya-market = "0.3"
ya-metrics = "0.1"
ya-net = { version = "0.2", features = ["service"] }
ya-payment = "0.2"
ya-persistence = "0.2"
ya-persistence = { version = "0.2", features = ["service"] }
ya-sb-proto = "0.4"
ya-sb-router = "0.4"
ya-service-api = "0.1"
Expand Down
11 changes: 11 additions & 0 deletions core/persistence/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@ version = "0.2.0"
authors = ["Golem Factory <contact@golem.network>"]
edition = "2018"

[features]
default = []
service = ["ya-service-api", "ya-service-api-interfaces", "ya-utils-process", "structopt"]

[dependencies]
ya-client-model = { version = "0.3", features = [ "with-diesel" ] }
ya-core-model = { version = "^0.4"}
ya-service-api = { version = "0.1", optional = true }
ya-service-api-interfaces = {version = "0.1", optional = true }
ya-utils-process = { version = "0.1", features = ["lock"], optional = true }

anyhow = "1.0.26"
bigdecimal = "0.2"
Expand All @@ -18,5 +24,10 @@ libsqlite3-sys = { version = "0.9.1", features = ["bundled"] }
log = "0.4.8"
r2d2 = "0.8"
serde_json = "1.0"
structopt = { version = "0.3", optional = true }
thiserror = "1.0.9"
tokio = { version = "0.2", features = ["blocking"] }

[dev-dependencies]
tempdir = "0.3.7"
tokio = { version = "0.2", features = ["macros", "rt-core", "rt-util"] }
4 changes: 4 additions & 0 deletions core/persistence/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ impl DbExecutor {
{
do_with_transaction(&self.pool, f).await
}

pub(crate) async fn execute(&self, query: &str) -> Result<usize, Error> {
Ok(self.conn()?.execute(query)?)
}
}

pub trait AsDao<'a> {
Expand Down
2 changes: 2 additions & 0 deletions core/persistence/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
extern crate diesel;

pub mod executor;
#[cfg(feature = "service")]
pub mod service;
pub mod types;

pub use executor::Error;
186 changes: 186 additions & 0 deletions core/persistence/src/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
use std::path::{Path, PathBuf};
use structopt::StructOpt;

use ya_service_api::{CliCtx, CommandOutput};
use ya_service_api_interfaces::{Provider, Service};
use ya_utils_process::lock::ProcLock;

use crate::executor::DbExecutor;

/// Persistence service
pub struct Persistence;

impl Service for Persistence {
type Cli = Command;
}

impl Persistence {
/// Run DB vacuum on startup
pub async fn gsb<Context: Provider<Self, CliCtx>>(context: &Context) -> anyhow::Result<()> {
let ctx = context.component();
vacuum(&ctx.data_dir, filter::wal_larger_than_db, true).await?;
Ok(())
}
}

/// Database management
#[derive(StructOpt, Debug)]
pub enum Command {
/// Rebuild databases to reduce size
#[structopt(setting = structopt::clap::AppSettings::DeriveDisplayOrder)]
Vacuum {
/// Vacuum when the daemon is running
#[structopt(long)]
force: bool,
},
}

impl Command {
pub async fn run_command(self, ctx: &CliCtx) -> anyhow::Result<CommandOutput> {
match self {
Command::Vacuum { force } => vacuum(&ctx.data_dir, filter::any, force).await,
}
}
}

async fn vacuum<F, P>(data_dir: P, filter: F, force: bool) -> anyhow::Result<CommandOutput>
where
F: Fn(&PathBuf) -> bool,
P: AsRef<Path>,
{
let db_files = std::fs::read_dir(&data_dir)?
.filter_map(|r| r.map(|e| e.path()).ok())
.filter(|p| !p.is_dir())
.filter(|p| {
p.extension()
.map(|e| {
let ext = e.to_string_lossy().to_lowercase();
ext.as_str() == "db"
})
.unwrap_or(false)
})
.filter(filter)
.collect::<Vec<_>>();

if db_files.is_empty() {
return Ok(CommandOutput::Object(serde_json::Value::String(
"no databases found to vacuum".to_string(),
)));
}

if !force && ProcLock::contains_locks(&data_dir)? {
anyhow::bail!(
"Data directory '{}' is used by another application. Use '--force' to override.",
data_dir.as_ref().display()
);
}

for db_file in db_files {
eprintln!("vacuuming {}", db_file.display());
let db = DbExecutor::new(db_file.display().to_string())?;
db.execute("VACUUM;").await?;
}

Ok(CommandOutput::NoOutput)
}

mod filter {
use std::path::PathBuf;

pub(super) fn any(_: &PathBuf) -> bool {
true
}

pub(super) fn wal_larger_than_db(db: &PathBuf) -> bool {
let mut wal = db.to_path_buf();
wal.set_extension("db-wal");

let db_meta = match db.metadata() {
Ok(meta) => meta,
_ => return false,
};
let wal_meta = match wal.metadata() {
Ok(meta) => meta,
_ => return false,
};

wal_meta.len() > db_meta.len()
}
}

#[cfg(test)]
mod tests {
use std::fs::OpenOptions;
use std::path::Path;

use ya_service_api::CommandOutput;
use ya_utils_process::lock::ProcLock;

use crate::service::filter;
use crate::service::vacuum;

fn touch_db<P: AsRef<Path>>(path: P, name: &str) -> anyhow::Result<()> {
OpenOptions::new()
.write(true)
.create(true)
.open(path.as_ref().join(format!("{}.db", name)))?;
Ok(())
}

#[tokio::test]
async fn vacuum_dir() -> anyhow::Result<()> {
let temp_dir = tempdir::TempDir::new("vacuum")?;
let temp_path = temp_dir.path();

touch_db(&temp_path, "test")?;

assert!(vacuum(&temp_path, filter::any, false).await.is_ok());
Ok(())
}

#[tokio::test]
async fn vacuum_locked_dir() -> anyhow::Result<()> {
let temp_dir = tempdir::TempDir::new("vacuum")?;
let temp_path = temp_dir.path();

touch_db(&temp_path, "test")?;

let _lock = ProcLock::new("temp", &temp_path)?.lock(std::process::id())?;
assert!(vacuum(&temp_path, filter::any, false).await.is_err());

Ok(())
}

#[tokio::test]
async fn vacuum_locked_dir_forced() -> anyhow::Result<()> {
let temp_dir = tempdir::TempDir::new("vacuum")?;
let temp_path = temp_dir.path();

touch_db(&temp_path, "test")?;

let _lock = ProcLock::new("temp", &temp_path)?.lock(std::process::id())?;
assert!(vacuum(&temp_path, filter::any, true).await.is_ok());

Ok(())
}

#[tokio::test]
async fn vacuum_when() -> anyhow::Result<()> {
let temp_dir = tempdir::TempDir::new("vacuum")?;
let temp_path = temp_dir.path();

touch_db(&temp_path, "test")?;

match vacuum(&temp_path, |_| false, true).await? {
CommandOutput::Object(_) => (),
_ => panic!("invalid result"),
}

match vacuum(&temp_path, filter::any, true).await? {
CommandOutput::NoOutput => (),
_ => panic!("invalid result"),
}

Ok(())
}
}
3 changes: 3 additions & 0 deletions core/serv/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use ya_metrics::{MetricsPusherOpts, MetricsService};
use ya_net::Net as NetService;
use ya_payment::{accounts as payment_accounts, PaymentService};
use ya_persistence::executor::DbExecutor;
use ya_persistence::service::Persistence as PersistenceService;
use ya_sb_proto::{DEFAULT_GSB_URL, GSB_URL_ENV_VAR};
use ya_service_api::{CliCtx, CommandOutput};
use ya_service_api_interfaces::Provider;
Expand Down Expand Up @@ -186,6 +187,8 @@ impl TryFrom<CliCtx> for ServiceContext {

#[ya_service_api_derive::services(ServiceContext)]
enum Services {
#[enable(gsb, cli)]
Db(PersistenceService),
// Metrics service must be activated before all other services
// to that will use it. Identity service is used by the Metrics,
// so must be initialized before.
Expand Down
30 changes: 28 additions & 2 deletions utils/process/src/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use std::fs::File;
use std::io::Write;
use std::path::{Path, PathBuf};

const LOCK_FILE_EXT: &'static str = "lock";
const PID_FILE_EXT: &'static str = "pid";

pub struct ProcLock {
dir: PathBuf,
name: String,
Expand All @@ -28,6 +31,26 @@ impl ProcLock {
})
}

pub fn contains_locks<P: AsRef<Path>>(dir: P) -> Result<bool> {
Ok(std::fs::read_dir(dir)?
.filter_map(|r| r.map(|e| e.path()).ok())
.filter(|p| !p.is_dir())
.filter(|p| {
p.extension()
.map(|e| {
let e = e.to_string_lossy().to_lowercase();
e.as_str() == LOCK_FILE_EXT
})
.unwrap_or(false)
})
.filter(|p| match File::open(&p) {
Ok(f) => f.try_lock_exclusive().is_err(),
_ => true,
})
.next()
.is_some())
}

pub fn lock(mut self, pid: u32) -> Result<Self> {
let (lock_file, lock_path) = self.lock_file(&self.name)?;
if let Err(_) = lock_file.try_lock_exclusive() {
Expand Down Expand Up @@ -74,7 +97,9 @@ impl ProcLock {
}

fn lock_file(&self, name: impl ToString) -> Result<(File, PathBuf)> {
let lock_path = self.dir.join(format!("{}.lock", name.to_string()));
let lock_path = self
.dir
.join(format!("{}.{}", name.to_string(), LOCK_FILE_EXT));
let lock_file = if lock_path.is_file() {
match File::open(&lock_path) {
Ok(f) => f,
Expand All @@ -91,7 +116,8 @@ impl ProcLock {

#[inline]
fn pid_path(&self, name: impl ToString) -> PathBuf {
self.dir.join(format!("{}.pid", name.to_string()))
self.dir
.join(format!("{}.{}", name.to_string(), PID_FILE_EXT))
}
}

Expand Down

0 comments on commit a856ada

Please sign in to comment.