From fd50672825f27f571467591be338475cf3708902 Mon Sep 17 00:00:00 2001 From: dkos Date: Thu, 10 Sep 2020 18:36:26 +0200 Subject: [PATCH] channels --- Cargo.toml | 1 - LICENSE | 2 +- README.md | 2 +- src/db.rs | 80 ++++++++++++++++++++++++++++++++++++++++-------------- 4 files changed, 61 insertions(+), 24 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 136f4a1..71a539a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,5 +19,4 @@ rocksdb = "0.15.0" confy = "0.4.0" num_cpus = "1.0" crossbeam = "0.7.3" -executors = "0.7.0" bincode = "1.3.1" diff --git a/LICENSE b/LICENSE index 261eeb9..4a3a37a 100644 --- a/LICENSE +++ b/LICENSE @@ -186,7 +186,7 @@ same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright [yyyy] [name of copyright owner] + Copyright 2020 Dejan Kos Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/README.md b/README.md index dd861f8..065bf5c 100644 --- a/README.md +++ b/README.md @@ -76,7 +76,7 @@ Example configuration is provided under ```project_root/config``` ### TODO - [ ] support for more rocksDb options in config (bloom, block cache..) - - [ ] channel for expire + - [X] channel for expire - [ ] fix running test in parallel (TODO per test db root path) - [ ] db iterator for on startup init - [ ] test compaction diff --git a/src/db.rs b/src/db.rs index b7e16ad..0ef0b5c 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,19 +1,18 @@ use std::collections::HashMap; +use std::fmt::Debug; use std::path::Path; -use std::sync::{Arc, Mutex, MutexGuard}; +use std::sync::mpsc::{Receiver, Sender}; +use std::sync::{mpsc, Arc, Mutex, MutexGuard}; use std::{fs, thread}; use actix_web::web::Bytes; use crossbeam::sync::{ShardedLock, ShardedLockReadGuard, ShardedLockWriteGuard}; -use executors::threadpool_executor::ThreadPoolExecutor; -use executors::Executor; use rocksdb::{CompactionDecision, IteratorMode, Options, DB}; use serde::{Deserialize, Serialize}; use crate::config::DbConfig; use crate::conversion::{bytes_to_str, current_ms, Conversion, FromBytes, IntoBytes}; use crate::errors::DbError; -use std::fmt::Debug; const ROOT_DB_NAME: &str = "root"; @@ -35,7 +34,26 @@ pub struct DbManager { db_cfg: DbConfig, root_db: Db, dbs: SafeRW>, - executor: Mutex, + tx: Mutex>, +} + +pub struct BoxedFnOnce { + data: Box, +} + +impl BoxedFnOnce { + fn new(data: T) -> BoxedFnOnce + where + T: FnOnce() + Send + 'static, + { + BoxedFnOnce { + data: Box::new(data), + } + } + + fn invoke(self) { + (self.data)() + } } impl Data { @@ -93,19 +111,22 @@ impl DbManager { .set_compaction_filter("expiration-filter", compaction_filter); let root_db = open_root_db(&db_cfg)?; + let (tx, rx) = mpsc::channel::(); + let db_manager = DbManager { db_cfg, root_db, dbs: Arc::new(ShardedLock::new(HashMap::new())), - executor: Mutex::new(ThreadPoolExecutor::new(1)), + tx: Mutex::new(tx), }; - db_manager.init(); + db_manager.open_dbs(); + db_manager.reg_receiver_thread(rx); Ok(db_manager) } // will panic in main thread and prevent startup - fn init(&self) { + fn open_dbs(&self) { info!("Initializing dbs from root ..."); //TODO db iterator self.root_db @@ -123,6 +144,18 @@ impl DbManager { }); } + fn reg_receiver_thread(&self, rx: Receiver) { + thread::Builder::new() + .name("async-expire-thread".into()) + .spawn(move || { + for boxed in rx { + info!("expiring key in thead {:?}", thread::current()); + boxed.invoke() + } + }) + .expect("Failed to register receiver thread"); + } + pub async fn open(&self, db_name: String) -> DbResult<()> { if self.contains(&db_name) { warn!("Db {} already exists", &db_name); @@ -156,20 +189,25 @@ impl DbManager { info!("Closing db = {} ...", &db_name); let path = self.db_cfg.db_path(&db_name); self.root_db.w_lock().delete(&db_name)?; - //possible expensive call moved to separate thread TODO channels - self.tp_mutex().execute(move || match db.close(&db_name) { - Ok(_) => { - info!("Db = {} closed. Deleting db files...", &db_name); - remove_files(path); - } - Err(e) => error!("Error closing db = {}, e = {}", &db_name, e), - }); + self.try_close_async(db, db_name, path); } Ok(()) } } + fn try_close_async(&self, db: Db, db_name: String, path: String) { + let _ = self + .tx_mutex() + .send(BoxedFnOnce::new(move || match db.close(&db_name) { + Ok(_) => { + info!("Db = {} closed. Deleting db files...", &db_name); + remove_files(path); + } + Err(e) => error!("Error closing db = {}, e = {}", &db_name, e), + })); + } + pub async fn store(&self, db_name: &str, key: &str, val: Bytes, ttl: u128) -> DbResult<()> { let bytes = Data::new(ttl, val.to_vec()).as_bytes()?; match self.w_lock().get(db_name) { @@ -200,11 +238,11 @@ impl DbManager { fn expire(&self, db: &Db, key: &str) { let db = db.clone(); let key = key.to_string(); - self.tp_mutex().execute(move || { + let _ = self.tx_mutex().send(BoxedFnOnce::new(move || { if let Err(e) = db.w_lock().delete(&key) { error!("Failed to expire key = {}, e = {}", key, e); } - }); + })); } pub async fn remove(&self, db_name: &str, key: &str) -> DbResult<()> { @@ -222,10 +260,10 @@ impl DbManager { !self.contains(db_name) } - fn tp_mutex(&self) -> MutexGuard<'_, ThreadPoolExecutor> { - self.executor + fn tx_mutex(&self) -> MutexGuard<'_, Sender> { + self.tx .lock() - .expect("Failed to acquire executor lock") + .expect("Failed to acquire channel sender lock") } fn r_lock(&self) -> ShardedLockReadGuard<'_, HashMap> {