From ecfc0e6492cc4bdd79b579f6d8cbd6d822179dc5 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Thu, 3 Nov 2022 19:05:06 +0300 Subject: [PATCH] review chore: serialize expire as i64 review: $cfn -> $column_family better name for nx remove dead code --- rust/cubestore/cubestore/src/config/mod.rs | 26 ++++++----- .../cubestore/src/metastore/compaction.rs | 44 ++++++++++++------- rust/cubestore/cubestore/src/metastore/mod.rs | 29 ++++++------ 3 files changed, 58 insertions(+), 41 deletions(-) diff --git a/rust/cubestore/cubestore/src/config/mod.rs b/rust/cubestore/cubestore/src/config/mod.rs index 61c71115dfe08..30984d51afce7 100644 --- a/rust/cubestore/cubestore/src/config/mod.rs +++ b/rust/cubestore/cubestore/src/config/mod.rs @@ -596,6 +596,20 @@ lazy_static! { tokio::sync::RwLock::new(false); } +pub async fn init_test_logger() { + if !*TEST_LOGGING_INITIALIZED.read().await { + let mut initialized = TEST_LOGGING_INITIALIZED.write().await; + if !*initialized { + SimpleLogger::new() + .with_level(Level::Error.to_level_filter()) + .with_module_level("cubestore", Level::Trace.to_level_filter()) + .init() + .unwrap(); + } + *initialized = true; + } +} + fn env_bool(name: &str, default: bool) -> bool { env::var(name) .ok() @@ -898,17 +912,7 @@ impl Config { I: FnOnce(Arc) -> T1, F: FnOnce(CubeServices) -> T2, { - if !*TEST_LOGGING_INITIALIZED.read().await { - let mut initialized = TEST_LOGGING_INITIALIZED.write().await; - if !*initialized { - SimpleLogger::new() - .with_level(Level::Error.to_level_filter()) - .with_module_level("cubestore", Level::Trace.to_level_filter()) - .init() - .unwrap(); - } - *initialized = true; - } + init_test_logger().await; let store_path = self.local_dir().clone(); let remote_fs = self.remote_fs().await.unwrap(); diff --git a/rust/cubestore/cubestore/src/metastore/compaction.rs b/rust/cubestore/cubestore/src/metastore/compaction.rs index 5f50c90aad844..8d95f50a20798 100644 --- a/rust/cubestore/cubestore/src/metastore/compaction.rs +++ b/rust/cubestore/cubestore/src/metastore/compaction.rs @@ -38,6 +38,7 @@ impl Drop for MetaStoreCacheCompactionFilter { fn drop(&mut self) { let elapsed = Utc::now() - self.current; + #[cfg(debug_assertions)] println!( "Compaction finished in {}.{} secs (is_full: {}), scanned: {}, removed: {}, orphaned: {}", elapsed.num_seconds(), @@ -81,9 +82,9 @@ impl MetaStoreCacheCompactionFilter { return CompactionDecision::Keep; } - match chrono::DateTime::parse_from_rfc3339(expire.as_str()) { - Ok(expire) => { - if expire <= self.current { + match chrono::NaiveDateTime::from_timestamp_opt(expire.as_i64(), 0) { + Some(expire) => { + if DateTime::::from_utc(expire, Utc) <= self.current { self.removed += 1; CompactionDecision::Remove @@ -91,10 +92,10 @@ impl MetaStoreCacheCompactionFilter { CompactionDecision::Keep } } - Err(err) => { + None => { warn!( - r#"Unable to parser date from expire field with value "{}", error: {}"#, - expire, err + r#"Unable to parser date from expire field with value "{}""#, + expire ); self.orphaned += 1; @@ -216,6 +217,7 @@ impl CompactionFilterFactory for MetaStoreCacheCompactionFactory { #[cfg(test)] mod tests { use super::*; + use crate::config::init_test_logger; use crate::metastore::cache::{CacheItemRocksIndex, CacheItemRocksTable}; use crate::metastore::{get_compaction_state, BaseRocksSecondaryIndex, CacheItem, RocksTable}; use crate::TableId; @@ -230,8 +232,10 @@ mod tests { } } - #[test] - fn test_compaction_table_no_ttl_keep() { + #[tokio::test] + async fn test_compaction_table_no_ttl_keep() { + init_test_logger().await; + let mut filter = MetaStoreCacheCompactionFilter::new( Some(get_compaction_state()), get_test_filter_context(), @@ -250,8 +254,10 @@ mod tests { } } - #[test] - fn test_compaction_table_tll_not_expired_keep() { + #[tokio::test] + async fn test_compaction_table_tll_not_expired_keep() { + init_test_logger().await; + let mut filter = MetaStoreCacheCompactionFilter::new( Some(get_compaction_state()), get_test_filter_context(), @@ -270,8 +276,10 @@ mod tests { } } - #[test] - fn test_compaction_table_tll_expired_remove() { + #[tokio::test] + async fn test_compaction_table_tll_expired_remove() { + init_test_logger().await; + let mut filter = MetaStoreCacheCompactionFilter::new( Some(get_compaction_state()), get_test_filter_context(), @@ -291,8 +299,10 @@ mod tests { } } - #[test] - fn test_compaction_index_ttl_expired_remove() { + #[tokio::test] + async fn test_compaction_index_ttl_expired_remove() { + init_test_logger().await; + let mut filter = MetaStoreCacheCompactionFilter::new( Some(get_compaction_state()), get_test_filter_context(), @@ -314,8 +324,10 @@ mod tests { } } - #[test] - fn test_compaction_index_ttl_not_expired_remove() { + #[tokio::test] + async fn test_compaction_index_ttl_not_expired_remove() { + init_test_logger().await; + let mut filter = MetaStoreCacheCompactionFilter::new( Some(get_compaction_state()), get_test_filter_context(), diff --git a/rust/cubestore/cubestore/src/metastore/mod.rs b/rust/cubestore/cubestore/src/metastore/mod.rs index 0d9603b7ebd0e..88cbdd7d4b3b8 100644 --- a/rust/cubestore/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/cubestore/src/metastore/mod.rs @@ -50,6 +50,7 @@ use crate::CubeError; use arrow::datatypes::TimeUnit::Microsecond; use arrow::datatypes::{DataType, Field}; use cache::{CacheItemRocksIndex, CacheItemRocksTable}; +use chrono::serde::ts_seconds_option; use chrono::{DateTime, NaiveDateTime, Utc}; use chunks::ChunkRocksTable; use core::{fmt, mem}; @@ -170,7 +171,7 @@ macro_rules! base_rocks_secondary_index { #[macro_export] macro_rules! rocks_table_impl { - ($table: ty, $rocks_table: ident, $table_id: expr, $indexes: block, $cfn: expr) => { + ($table: ty, $rocks_table: ident, $table_id: expr, $indexes: block, $column_family: expr) => { pub(crate) struct $rocks_table<'a> { db: crate::metastore::DbTableRef<'a>, } @@ -185,7 +186,7 @@ macro_rules! rocks_table_impl { type T = $table; fn cf_name(&self) -> ColumnFamilyName { - $cfn + $column_family } fn db(&self) -> &DB { @@ -744,6 +745,7 @@ pub struct CacheItem { prefix: Option, key: String, value: String, + #[serde(with = "ts_seconds_option")] expire: Option> } } @@ -1202,7 +1204,11 @@ pub trait MetaStore: DIService + Send + Sync { async fn debug_dump(&self, out_path: String) -> Result<(), CubeError>; async fn all_cache(&self) -> Result>, CubeError>; - async fn cache_set(&self, item: CacheItem, nx: bool) -> Result; + async fn cache_set( + &self, + item: CacheItem, + update_if_not_exists: bool, + ) -> Result; async fn cache_truncate(&self) -> Result<(), CubeError>; async fn cache_delete(&self, key: String) -> Result<(), CubeError>; async fn cache_get(&self, key: String) -> Result>, CubeError>; @@ -2056,15 +2062,6 @@ trait RocksTable: Debug + Send + Sync { if let Some(row) = self.get_row(id)? { res.push(row); } else { - if RocksSecondaryIndex::is_ttl(secondary_index) { - trace!( - "Row exists in secondary index (with TTL) however missing in {:?} table: {}. Compaction problem?", - self, id - ); - - continue; - } - let index = self.get_index_by_id(BaseRocksSecondaryIndex::get_id(secondary_index)); self.rebuild_index(&index)?; @@ -4056,7 +4053,11 @@ impl MetaStore for RocksMetaStore { .await } - async fn cache_set(&self, item: CacheItem, nx: bool) -> Result { + async fn cache_set( + &self, + item: CacheItem, + update_if_not_exists: bool, + ) -> Result { self.write_operation_cache(move |db_ref, batch_pipe| { let cache_schema = CacheItemRocksTable::new(db_ref.clone()); let index_key = CacheItemIndexKey::ByPath(item.get_path()); @@ -4064,7 +4065,7 @@ impl MetaStore for RocksMetaStore { .get_single_opt_row_by_index(&index_key, &CacheItemRocksIndex::ByPath)?; if let Some(id_row) = id_row_opt { - if nx { + if update_if_not_exists { return Ok(false); };