Skip to content

Commit

Permalink
review
Browse files Browse the repository at this point in the history
chore: serialize expire as i64

review: $cfn -> $column_family

better name for nx

remove dead code
  • Loading branch information
ovr committed Nov 8, 2022
1 parent fed5346 commit ecfc0e6
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 41 deletions.
26 changes: 15 additions & 11 deletions rust/cubestore/cubestore/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,20 @@ lazy_static! {
tokio::sync::RwLock::new(false);
}

pub async fn init_test_logger() {
if !*TEST_LOGGING_INITIALIZED.read().await {
let mut initialized = TEST_LOGGING_INITIALIZED.write().await;
if !*initialized {
SimpleLogger::new()
.with_level(Level::Error.to_level_filter())
.with_module_level("cubestore", Level::Trace.to_level_filter())
.init()
.unwrap();
}
*initialized = true;
}
}

fn env_bool(name: &str, default: bool) -> bool {
env::var(name)
.ok()
Expand Down Expand Up @@ -898,17 +912,7 @@ impl Config {
I: FnOnce(Arc<Injector>) -> T1,
F: FnOnce(CubeServices) -> T2,
{
if !*TEST_LOGGING_INITIALIZED.read().await {
let mut initialized = TEST_LOGGING_INITIALIZED.write().await;
if !*initialized {
SimpleLogger::new()
.with_level(Level::Error.to_level_filter())
.with_module_level("cubestore", Level::Trace.to_level_filter())
.init()
.unwrap();
}
*initialized = true;
}
init_test_logger().await;

let store_path = self.local_dir().clone();
let remote_fs = self.remote_fs().await.unwrap();
Expand Down
44 changes: 28 additions & 16 deletions rust/cubestore/cubestore/src/metastore/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ impl Drop for MetaStoreCacheCompactionFilter {
fn drop(&mut self) {
let elapsed = Utc::now() - self.current;

#[cfg(debug_assertions)]
println!(
"Compaction finished in {}.{} secs (is_full: {}), scanned: {}, removed: {}, orphaned: {}",
elapsed.num_seconds(),
Expand Down Expand Up @@ -81,20 +82,20 @@ impl MetaStoreCacheCompactionFilter {
return CompactionDecision::Keep;
}

match chrono::DateTime::parse_from_rfc3339(expire.as_str()) {
Ok(expire) => {
if expire <= self.current {
match chrono::NaiveDateTime::from_timestamp_opt(expire.as_i64(), 0) {
Some(expire) => {
if DateTime::<Utc>::from_utc(expire, Utc) <= self.current {
self.removed += 1;

CompactionDecision::Remove
} else {
CompactionDecision::Keep
}
}
Err(err) => {
None => {
warn!(
r#"Unable to parser date from expire field with value "{}", error: {}"#,
expire, err
r#"Unable to parser date from expire field with value "{}""#,
expire
);

self.orphaned += 1;
Expand Down Expand Up @@ -216,6 +217,7 @@ impl CompactionFilterFactory for MetaStoreCacheCompactionFactory {
#[cfg(test)]
mod tests {
use super::*;
use crate::config::init_test_logger;
use crate::metastore::cache::{CacheItemRocksIndex, CacheItemRocksTable};
use crate::metastore::{get_compaction_state, BaseRocksSecondaryIndex, CacheItem, RocksTable};
use crate::TableId;
Expand All @@ -230,8 +232,10 @@ mod tests {
}
}

#[test]
fn test_compaction_table_no_ttl_keep() {
#[tokio::test]
async fn test_compaction_table_no_ttl_keep() {
init_test_logger().await;

let mut filter = MetaStoreCacheCompactionFilter::new(
Some(get_compaction_state()),
get_test_filter_context(),
Expand All @@ -250,8 +254,10 @@ mod tests {
}
}

#[test]
fn test_compaction_table_tll_not_expired_keep() {
#[tokio::test]
async fn test_compaction_table_tll_not_expired_keep() {
init_test_logger().await;

let mut filter = MetaStoreCacheCompactionFilter::new(
Some(get_compaction_state()),
get_test_filter_context(),
Expand All @@ -270,8 +276,10 @@ mod tests {
}
}

#[test]
fn test_compaction_table_tll_expired_remove() {
#[tokio::test]
async fn test_compaction_table_tll_expired_remove() {
init_test_logger().await;

let mut filter = MetaStoreCacheCompactionFilter::new(
Some(get_compaction_state()),
get_test_filter_context(),
Expand All @@ -291,8 +299,10 @@ mod tests {
}
}

#[test]
fn test_compaction_index_ttl_expired_remove() {
#[tokio::test]
async fn test_compaction_index_ttl_expired_remove() {
init_test_logger().await;

let mut filter = MetaStoreCacheCompactionFilter::new(
Some(get_compaction_state()),
get_test_filter_context(),
Expand All @@ -314,8 +324,10 @@ mod tests {
}
}

#[test]
fn test_compaction_index_ttl_not_expired_remove() {
#[tokio::test]
async fn test_compaction_index_ttl_not_expired_remove() {
init_test_logger().await;

let mut filter = MetaStoreCacheCompactionFilter::new(
Some(get_compaction_state()),
get_test_filter_context(),
Expand Down
29 changes: 15 additions & 14 deletions rust/cubestore/cubestore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use crate::CubeError;
use arrow::datatypes::TimeUnit::Microsecond;
use arrow::datatypes::{DataType, Field};
use cache::{CacheItemRocksIndex, CacheItemRocksTable};
use chrono::serde::ts_seconds_option;
use chrono::{DateTime, NaiveDateTime, Utc};
use chunks::ChunkRocksTable;
use core::{fmt, mem};
Expand Down Expand Up @@ -170,7 +171,7 @@ macro_rules! base_rocks_secondary_index {

#[macro_export]
macro_rules! rocks_table_impl {
($table: ty, $rocks_table: ident, $table_id: expr, $indexes: block, $cfn: expr) => {
($table: ty, $rocks_table: ident, $table_id: expr, $indexes: block, $column_family: expr) => {
pub(crate) struct $rocks_table<'a> {
db: crate::metastore::DbTableRef<'a>,
}
Expand All @@ -185,7 +186,7 @@ macro_rules! rocks_table_impl {
type T = $table;

fn cf_name(&self) -> ColumnFamilyName {
$cfn
$column_family
}

fn db(&self) -> &DB {
Expand Down Expand Up @@ -744,6 +745,7 @@ pub struct CacheItem {
prefix: Option<String>,
key: String,
value: String,
#[serde(with = "ts_seconds_option")]
expire: Option<DateTime<Utc>>
}
}
Expand Down Expand Up @@ -1202,7 +1204,11 @@ pub trait MetaStore: DIService + Send + Sync {
async fn debug_dump(&self, out_path: String) -> Result<(), CubeError>;

async fn all_cache(&self) -> Result<Vec<IdRow<CacheItem>>, CubeError>;
async fn cache_set(&self, item: CacheItem, nx: bool) -> Result<bool, CubeError>;
async fn cache_set(
&self,
item: CacheItem,
update_if_not_exists: bool,
) -> Result<bool, CubeError>;
async fn cache_truncate(&self) -> Result<(), CubeError>;
async fn cache_delete(&self, key: String) -> Result<(), CubeError>;
async fn cache_get(&self, key: String) -> Result<Option<IdRow<CacheItem>>, CubeError>;
Expand Down Expand Up @@ -2056,15 +2062,6 @@ trait RocksTable: Debug + Send + Sync {
if let Some(row) = self.get_row(id)? {
res.push(row);
} else {
if RocksSecondaryIndex::is_ttl(secondary_index) {
trace!(
"Row exists in secondary index (with TTL) however missing in {:?} table: {}. Compaction problem?",
self, id
);

continue;
}

let index = self.get_index_by_id(BaseRocksSecondaryIndex::get_id(secondary_index));
self.rebuild_index(&index)?;

Expand Down Expand Up @@ -4056,15 +4053,19 @@ impl MetaStore for RocksMetaStore {
.await
}

async fn cache_set(&self, item: CacheItem, nx: bool) -> Result<bool, CubeError> {
async fn cache_set(
&self,
item: CacheItem,
update_if_not_exists: bool,
) -> Result<bool, CubeError> {
self.write_operation_cache(move |db_ref, batch_pipe| {
let cache_schema = CacheItemRocksTable::new(db_ref.clone());
let index_key = CacheItemIndexKey::ByPath(item.get_path());
let id_row_opt = cache_schema
.get_single_opt_row_by_index(&index_key, &CacheItemRocksIndex::ByPath)?;

if let Some(id_row) = id_row_opt {
if nx {
if update_if_not_exists {
return Ok(false);
};

Expand Down

0 comments on commit ecfc0e6

Please sign in to comment.