Skip to content

Commit

Permalink
cleanup on db close
Browse files Browse the repository at this point in the history
  • Loading branch information
dejankos committed Sep 5, 2020
1 parent 88cc550 commit 0dcd2c6
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 20 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ For database performance tuning check the official [RocksDb tuning guide](https:
For service performance tuning check example service_config.toml and yes - ```workers``` is the only config parameter that matters, default is number of logical CPUs
Example configuration is provided under ```project_root/config```


### TODO
- [ ] impl From for errors
- [ ] fix running test in parallel
- [ ] support for more rocksDb options in config (bloom, block cache..)
- [ ] db iterator for on startup init
- [ ] channel for expire
- [ ] fix running test in parallel
- [ ] db iterator for on startup init
- [ ] test compaction
- [ ] code coverage
- [ ] docker
Expand Down
4 changes: 4 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ impl DbConfig {
pub fn path(&self) -> &str {
self.0.path.as_ref()
}

pub fn db_path(&self, db_name: &str) -> String {
format!("{}/{}", self.path(), db_name)
}
}

impl ServiceConfig {
Expand Down
28 changes: 22 additions & 6 deletions src/db.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::path::Path;
use std::sync::{Arc, Mutex, MutexGuard};
use std::thread;
use std::{fs, thread};

use actix_web::web::Bytes;
use crossbeam::sync::{ShardedLock, ShardedLockReadGuard, ShardedLockWriteGuard};
Expand All @@ -13,6 +13,7 @@ use serde::{Deserialize, Serialize};
use crate::config::DbConfig;
use crate::conversion::{bytes_to_str, current_ms, Conversion, FromBytes, IntoBytes};
use crate::errors::DbError;
use std::fmt::Debug;

const ROOT_DB_NAME: &str = "root";

Expand Down Expand Up @@ -153,10 +154,14 @@ impl DbManager {
} else {
if let Some(db) = self.w_lock().remove(&db_name) {
info!("Closing db = {} ...", &db_name);
let path = self.db_cfg.db_path(&db_name);
self.root_db.w_lock().delete(&db_name)?;
//possible expensive call moved to separate thread TODO channels
self.tp_mutex().execute(move || match db.close(&db_name) {
Ok(_) => info!("Db = {} closed", &db_name),
Ok(_) => {
info!("Db = {} closed. Deleting db files...", &db_name);
remove_files(path);
}
Err(e) => error!("Error closing db = {}, e = {}", &db_name, e),
});
}
Expand Down Expand Up @@ -233,10 +238,7 @@ impl DbManager {
}

fn open_root_db(db_cfg: &DbConfig) -> DbResult<Db> {
Db::new(
format!("{}/{}", db_cfg.path(), ROOT_DB_NAME),
&db_cfg.root_db_options(),
)
Db::new(db_cfg.db_path(ROOT_DB_NAME), &db_cfg.root_db_options())
}

fn not_exists(db_name: &str) -> DbError {
Expand All @@ -251,6 +253,20 @@ fn is_expired(ttl: u128) -> Conversion<bool> {
}
}

fn remove_files<P>(path: P)
where
P: AsRef<Path> + Debug,
{
if let Err(e) = fs::remove_dir_all(&path) {
error!(
"Failed to remove db files on path = {:?}, err = {}",
path, e
);
} else {
info!("Removed files for db on path {:?}", path);
}
}

fn compaction_filter(_level: u32, _key: &[u8], value: &[u8]) -> CompactionDecision {
info!(
"Running compaction filter in thread {:?}",
Expand Down
13 changes: 2 additions & 11 deletions src/integration_tests.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@

use std::time::Duration;
use std::{fs, io, thread};
use std::{thread};

use actix_web::dev::{ServiceResponse};
use actix_web::dev::ServiceResponse;
use actix_web::http::StatusCode;
use actix_web::{test, web, App, Error};

Expand Down Expand Up @@ -57,7 +56,6 @@ async fn should_open_and_close_db() -> Result<(), Error> {
"Received payload:: {:?}",
response_as_str(res)
);
cleanup()?;
Ok(())
}

Expand Down Expand Up @@ -119,7 +117,6 @@ async fn should_add_and_delete_record() -> Result<(), Error> {
"Received payload:: {:?}",
response_as_str(res)
);
cleanup()?;
Ok(())
}

Expand Down Expand Up @@ -184,15 +181,9 @@ async fn should_expire_record() -> Result<(), Error> {
"Received payload:: {:?}",
response_as_str(res)
);
cleanup()?;
Ok(())
}

fn cleanup() -> io::Result<()> {
let config = DbConfig::new_with_defaults();
fs::remove_dir_all(config.path())
}

fn response_as_str(res: ServiceResponse<Body>) -> Conversion<String> {
match res.response().body().as_ref() {
Some(Body::Bytes(bytes)) => bytes_to_str(bytes),
Expand Down

0 comments on commit 0dcd2c6

Please sign in to comment.