Skip to content

Commit

Permalink
basic prometheus integration
Browse files Browse the repository at this point in the history
  • Loading branch information
dejankos committed Aug 30, 2020
1 parent 83d6353 commit d0194d2
Showing 6 changed files with 75 additions and 23 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -12,6 +12,9 @@ description = "RocksDb as a service"
structopt = "0.3"
actix-web = "2.0"
actix-rt = "1.0"
actix-web-prom = "0.2"
prometheus = "0.8"


serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,16 @@
# Rocky
RocksDb as a service
Rocky is a http service based on RocksDb with support for multiple databases and expiration.
Written in Rust on top of actix-web.




## Open db

## Close db

## Store record
#### TTL support
## Read record
## Delete record

6 changes: 0 additions & 6 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -151,9 +151,3 @@ fn get_compaction_style(s: &str) -> DBCompactionStyle {
}
}
}

#[cfg(test)]
mod tests {
#[test]
fn load() {}
}
29 changes: 15 additions & 14 deletions src/db.rs
Original file line number Diff line number Diff line change
@@ -5,13 +5,13 @@ use std::thread;

use actix_web::web::Bytes;
use crossbeam::sync::{ShardedLock, ShardedLockReadGuard, ShardedLockWriteGuard};
use executors::Executor;
use executors::threadpool_executor::ThreadPoolExecutor;
use rocksdb::{CompactionDecision, DB, IteratorMode, Options};
use executors::Executor;
use rocksdb::{CompactionDecision, IteratorMode, Options, DB};
use serde::{Deserialize, Serialize};

use crate::config::DbConfig;
use crate::conversion::{bytes_to_str, Conversion, current_ms, FromBytes, IntoBytes};
use crate::conversion::{bytes_to_str, current_ms, Conversion, FromBytes, IntoBytes};
use crate::errors::DbError;

const ROOT_DB_NAME: &str = "root";
@@ -45,8 +45,8 @@ impl Data {

impl Db {
fn new<P>(path: P, opts: &Options) -> DbResult<Self>
where
P: AsRef<Path>,
where
P: AsRef<Path>,
{
let rock = DB::open(&opts, path)?;
Ok(Db {
@@ -55,8 +55,8 @@ impl Db {
}

fn put<V>(&self, key: &str, val: V) -> DbResult<()>
where
V: AsRef<[u8]>,
where
V: AsRef<[u8]>,
{
Ok(self.w_lock().put(key, val)?)
}
@@ -70,8 +70,8 @@ impl Db {
}

fn close<P>(&self, path: P) -> DbResult<()>
where
P: AsRef<Path>,
where
P: AsRef<Path>,
{
Ok(DB::destroy(&Options::default(), path)?)
}
@@ -267,7 +267,6 @@ fn compaction_filter(_level: u32, _key: &[u8], value: &[u8]) -> CompactionDecisi
}
}


#[cfg(test)]
mod tests {
use super::*;
@@ -280,17 +279,19 @@ mod tests {

match compaction_filter(0, &[0], &bytes) {
CompactionDecision::Remove => {}
_ => panic!("Should have removed expired record")
_ => panic!("Should have removed expired record"),
}
}

#[test]
fn should_keep_not_expired() {
let bytes = Data::new(current_ms().unwrap() + ONE_DAY_MS, b"data".to_vec()).as_bytes().unwrap();
let bytes = Data::new(current_ms().unwrap() + ONE_DAY_MS, b"data".to_vec())
.as_bytes()
.unwrap();

match compaction_filter(0, &[0], &bytes) {
CompactionDecision::Keep => {}
_ => panic!("Should have kept non expired record")
_ => panic!("Should have kept non expired record"),
}
}
}
}
2 changes: 1 addition & 1 deletion src/errors.rs
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@ pub enum ApiError {
Msg(String),
}

// TODO remove dyn
// TODO impl conversion errors
impl From<Box<dyn std::error::Error>> for DbError {
fn from(boxed: Box<dyn error::Error>) -> Self {
DbError::Conversion(boxed.to_string())
42 changes: 41 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
@@ -20,6 +20,10 @@ use crate::conversion::{convert, current_ms, Conversion};
use crate::db::DbManager;
use crate::errors::{ApiError, DbError};

use actix_web_prom::PrometheusMetrics;
use prometheus::core::{AtomicI64, GenericCounterVec};
use prometheus::{opts, IntCounterVec};

mod errors;

mod config;
@@ -108,6 +112,20 @@ async fn open(db_name: web::Path<String>, db_man: web::Data<DbManager>) -> Respo
Ok(HttpResponse::Ok().finish())
}

#[get("/{db_name}")]
async fn db_size(
db_name: web::Path<String>,
c: web::Data<GenericCounterVec<AtomicI64>>,
) -> HttpResponse {
let r = c
.get_metric_with_label_values(&[&db_name])
.map_or(0, |gc| gc.get());

HttpResponse::Ok()
.set(ContentType::plaintext())
.body(r.to_string())
}

#[delete("/{db_name}")]
async fn close(db_name: web::Path<String>, db_man: web::Data<DbManager>) -> Response<HttpResponse> {
db_man.close(db_name.into_inner()).await?;
@@ -155,6 +173,11 @@ async fn remove(p_val: web::Path<PathVal>, db_man: web::Data<DbManager>) -> Resp
Ok(HttpResponse::Ok().finish())
}

#[get("/health")]
async fn health() -> HttpResponse {
HttpResponse::Ok().finish()
}

// main thread will panic! if config can't be initialized
#[actix_rt::main]
async fn main() -> std::io::Result<()> {
@@ -173,17 +196,34 @@ async fn main() -> std::io::Result<()> {

let db_manager = DbManager::new(db_cfg)?;
db_manager.init();

let db_manager = web::Data::new(db_manager);

let prometheus = PrometheusMetrics::new("api", Some("/metrics"), None);

let counter_opts = opts!("db_size_counter", "Database size").namespace("db");
let counter = IntCounterVec::new(counter_opts, &["db_name"]).unwrap();
prometheus
.registry
.register(Box::new(counter.clone()))
.unwrap();

counter.with_label_values(&["baza1"]).inc();

let c = web::Data::new(counter);

HttpServer::new(move || {
App::new()
.wrap(ErrorHandlers::new().handler(http::StatusCode::NOT_FOUND, not_found))
.wrap(prometheus.clone())
.app_data(db_manager.clone())
.app_data(c.clone())
.service(open)
.service(close)
.service(store)
.service(read)
.service(remove)
.service(health)
.service(db_size)
})
.bind(service_cfg.bind_address())?
.workers(service_cfg.workers())

0 comments on commit d0194d2

Please sign in to comment.