Skip to content

Commit

Permalink
channels
Browse files Browse the repository at this point in the history
  • Loading branch information
dejankos committed Sep 10, 2020
1 parent b0f7002 commit fd50672
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 24 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,4 @@ rocksdb = "0.15.0"
confy = "0.4.0"
num_cpus = "1.0"
crossbeam = "0.7.3"
executors = "0.7.0"
bincode = "1.3.1"
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@
same "printed page" as the copyright notice for easier
identification within third-party archives.

Copyright [yyyy] [name of copyright owner]
Copyright 2020 Dejan Kos

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ Example configuration is provided under ```project_root/config```

### TODO
- [ ] support for more rocksDb options in config (bloom, block cache..)
- [ ] channel for expire
- [X] channel for expire
- [ ] fix running test in parallel (TODO per test db root path)
- [ ] db iterator for on startup init
- [ ] test compaction
Expand Down
80 changes: 59 additions & 21 deletions src/db.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
use std::collections::HashMap;
use std::fmt::Debug;
use std::path::Path;
use std::sync::{Arc, Mutex, MutexGuard};
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{mpsc, Arc, Mutex, MutexGuard};
use std::{fs, thread};

use actix_web::web::Bytes;
use crossbeam::sync::{ShardedLock, ShardedLockReadGuard, ShardedLockWriteGuard};
use executors::threadpool_executor::ThreadPoolExecutor;
use executors::Executor;
use rocksdb::{CompactionDecision, IteratorMode, Options, DB};
use serde::{Deserialize, Serialize};

use crate::config::DbConfig;
use crate::conversion::{bytes_to_str, current_ms, Conversion, FromBytes, IntoBytes};
use crate::errors::DbError;
use std::fmt::Debug;

const ROOT_DB_NAME: &str = "root";

Expand All @@ -35,7 +34,26 @@ pub struct DbManager {
db_cfg: DbConfig,
root_db: Db,
dbs: SafeRW<HashMap<String, Db>>,
executor: Mutex<ThreadPoolExecutor>,
tx: Mutex<Sender<BoxedFnOnce>>,
}

pub struct BoxedFnOnce {
data: Box<dyn FnOnce() + Send + 'static>,
}

impl BoxedFnOnce {
fn new<T>(data: T) -> BoxedFnOnce
where
T: FnOnce() + Send + 'static,
{
BoxedFnOnce {
data: Box::new(data),
}
}

fn invoke(self) {
(self.data)()
}
}

impl Data {
Expand Down Expand Up @@ -93,19 +111,22 @@ impl DbManager {
.set_compaction_filter("expiration-filter", compaction_filter);

let root_db = open_root_db(&db_cfg)?;
let (tx, rx) = mpsc::channel::<BoxedFnOnce>();

let db_manager = DbManager {
db_cfg,
root_db,
dbs: Arc::new(ShardedLock::new(HashMap::new())),
executor: Mutex::new(ThreadPoolExecutor::new(1)),
tx: Mutex::new(tx),
};
db_manager.init();
db_manager.open_dbs();
db_manager.reg_receiver_thread(rx);

Ok(db_manager)
}

// will panic in main thread and prevent startup
fn init(&self) {
fn open_dbs(&self) {
info!("Initializing dbs from root ...");
//TODO db iterator
self.root_db
Expand All @@ -123,6 +144,18 @@ impl DbManager {
});
}

fn reg_receiver_thread(&self, rx: Receiver<BoxedFnOnce>) {
thread::Builder::new()
.name("async-expire-thread".into())
.spawn(move || {
for boxed in rx {
info!("expiring key in thead {:?}", thread::current());
boxed.invoke()
}
})
.expect("Failed to register receiver thread");
}

pub async fn open(&self, db_name: String) -> DbResult<()> {
if self.contains(&db_name) {
warn!("Db {} already exists", &db_name);
Expand Down Expand Up @@ -156,20 +189,25 @@ impl DbManager {
info!("Closing db = {} ...", &db_name);
let path = self.db_cfg.db_path(&db_name);
self.root_db.w_lock().delete(&db_name)?;
//possible expensive call moved to separate thread TODO channels
self.tp_mutex().execute(move || match db.close(&db_name) {
Ok(_) => {
info!("Db = {} closed. Deleting db files...", &db_name);
remove_files(path);
}
Err(e) => error!("Error closing db = {}, e = {}", &db_name, e),
});
self.try_close_async(db, db_name, path);
}

Ok(())
}
}

fn try_close_async(&self, db: Db, db_name: String, path: String) {
let _ = self
.tx_mutex()
.send(BoxedFnOnce::new(move || match db.close(&db_name) {
Ok(_) => {
info!("Db = {} closed. Deleting db files...", &db_name);
remove_files(path);
}
Err(e) => error!("Error closing db = {}, e = {}", &db_name, e),
}));
}

pub async fn store(&self, db_name: &str, key: &str, val: Bytes, ttl: u128) -> DbResult<()> {
let bytes = Data::new(ttl, val.to_vec()).as_bytes()?;
match self.w_lock().get(db_name) {
Expand Down Expand Up @@ -200,11 +238,11 @@ impl DbManager {
fn expire(&self, db: &Db, key: &str) {
let db = db.clone();
let key = key.to_string();
self.tp_mutex().execute(move || {
let _ = self.tx_mutex().send(BoxedFnOnce::new(move || {
if let Err(e) = db.w_lock().delete(&key) {
error!("Failed to expire key = {}, e = {}", key, e);
}
});
}));
}

pub async fn remove(&self, db_name: &str, key: &str) -> DbResult<()> {
Expand All @@ -222,10 +260,10 @@ impl DbManager {
!self.contains(db_name)
}

fn tp_mutex(&self) -> MutexGuard<'_, ThreadPoolExecutor> {
self.executor
fn tx_mutex(&self) -> MutexGuard<'_, Sender<BoxedFnOnce>> {
self.tx
.lock()
.expect("Failed to acquire executor lock")
.expect("Failed to acquire channel sender lock")
}

fn r_lock(&self) -> ShardedLockReadGuard<'_, HashMap<String, Db>> {
Expand Down

0 comments on commit fd50672

Please sign in to comment.