From 0dcd2c6f1af9bc33f1632a3b7e59bd934d65e407 Mon Sep 17 00:00:00 2001 From: dkos Date: Sat, 5 Sep 2020 14:59:50 +0200 Subject: [PATCH] cleanup on db close --- README.md | 6 +++--- src/config.rs | 4 ++++ src/db.rs | 28 ++++++++++++++++++++++------ src/integration_tests.rs | 13 ++----------- 4 files changed, 31 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index 7688af1..e7b70bc 100644 --- a/README.md +++ b/README.md @@ -73,12 +73,12 @@ For database performance tuning check the official [RocksDb tuning guide](https: For service performance tuning check example service_config.toml and yes - ```workers``` is the only config parameter that matters, default is number of logical CPUs Example configuration is provided under ```project_root/config``` + ### TODO - - [ ] impl From for errors - - [ ] fix running test in parallel - [ ] support for more rocksDb options in config (bloom, block cache..) - - [ ] db iterator for on startup init - [ ] channel for expire + - [ ] fix running test in parallel + - [ ] db iterator for on startup init - [ ] test compaction - [ ] code coverage - [ ] docker diff --git a/src/config.rs b/src/config.rs index 80b5346..57a8e86 100644 --- a/src/config.rs +++ b/src/config.rs @@ -32,6 +32,10 @@ impl DbConfig { pub fn path(&self) -> &str { self.0.path.as_ref() } + + pub fn db_path(&self, db_name: &str) -> String { + format!("{}/{}", self.path(), db_name) + } } impl ServiceConfig { diff --git a/src/db.rs b/src/db.rs index 2d20b4d..b7e16ad 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::path::Path; use std::sync::{Arc, Mutex, MutexGuard}; -use std::thread; +use std::{fs, thread}; use actix_web::web::Bytes; use crossbeam::sync::{ShardedLock, ShardedLockReadGuard, ShardedLockWriteGuard}; @@ -13,6 +13,7 @@ use serde::{Deserialize, Serialize}; use crate::config::DbConfig; use crate::conversion::{bytes_to_str, current_ms, Conversion, FromBytes, IntoBytes}; use crate::errors::DbError; +use std::fmt::Debug; const ROOT_DB_NAME: &str = "root"; @@ -153,10 +154,14 @@ impl DbManager { } else { if let Some(db) = self.w_lock().remove(&db_name) { info!("Closing db = {} ...", &db_name); + let path = self.db_cfg.db_path(&db_name); self.root_db.w_lock().delete(&db_name)?; //possible expensive call moved to separate thread TODO channels self.tp_mutex().execute(move || match db.close(&db_name) { - Ok(_) => info!("Db = {} closed", &db_name), + Ok(_) => { + info!("Db = {} closed. Deleting db files...", &db_name); + remove_files(path); + } Err(e) => error!("Error closing db = {}, e = {}", &db_name, e), }); } @@ -233,10 +238,7 @@ impl DbManager { } fn open_root_db(db_cfg: &DbConfig) -> DbResult { - Db::new( - format!("{}/{}", db_cfg.path(), ROOT_DB_NAME), - &db_cfg.root_db_options(), - ) + Db::new(db_cfg.db_path(ROOT_DB_NAME), &db_cfg.root_db_options()) } fn not_exists(db_name: &str) -> DbError { @@ -251,6 +253,20 @@ fn is_expired(ttl: u128) -> Conversion { } } +fn remove_files

(path: P) +where + P: AsRef + Debug, +{ + if let Err(e) = fs::remove_dir_all(&path) { + error!( + "Failed to remove db files on path = {:?}, err = {}", + path, e + ); + } else { + info!("Removed files for db on path {:?}", path); + } +} + fn compaction_filter(_level: u32, _key: &[u8], value: &[u8]) -> CompactionDecision { info!( "Running compaction filter in thread {:?}", diff --git a/src/integration_tests.rs b/src/integration_tests.rs index b7be7ac..05c6f48 100644 --- a/src/integration_tests.rs +++ b/src/integration_tests.rs @@ -1,8 +1,7 @@ - use std::time::Duration; -use std::{fs, io, thread}; +use std::{thread}; -use actix_web::dev::{ServiceResponse}; +use actix_web::dev::ServiceResponse; use actix_web::http::StatusCode; use actix_web::{test, web, App, Error}; @@ -57,7 +56,6 @@ async fn should_open_and_close_db() -> Result<(), Error> { "Received payload:: {:?}", response_as_str(res) ); - cleanup()?; Ok(()) } @@ -119,7 +117,6 @@ async fn should_add_and_delete_record() -> Result<(), Error> { "Received payload:: {:?}", response_as_str(res) ); - cleanup()?; Ok(()) } @@ -184,15 +181,9 @@ async fn should_expire_record() -> Result<(), Error> { "Received payload:: {:?}", response_as_str(res) ); - cleanup()?; Ok(()) } -fn cleanup() -> io::Result<()> { - let config = DbConfig::new_with_defaults(); - fs::remove_dir_all(config.path()) -} - fn response_as_str(res: ServiceResponse) -> Conversion { match res.response().body().as_ref() { Some(Body::Bytes(bytes)) => bytes_to_str(bytes),