From 85dc04bf200fddc8908ec97897e01447baea9707 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Fri, 27 Jan 2023 20:11:19 +0300 Subject: [PATCH] feat(cubestore): Cache/Queue - implement migrations via truncating --- .../cubestore/src/cachestore/cache_item.rs | 25 ++++- .../src/cachestore/cache_rocksstore.rs | 1 + .../cubestore/src/cachestore/queue_item.rs | 33 ++++++- .../cubestore/src/cachestore/queue_result.rs | 25 ++++- rust/cubestore/cubestore/src/metastore/job.rs | 4 +- rust/cubestore/cubestore/src/metastore/mod.rs | 10 ++ .../cubestore/src/metastore/multi_index.rs | 6 +- .../cubestore/src/metastore/replay_handle.rs | 4 +- .../cubestore/src/metastore/rocks_table.rs | 92 +++++++++++++++---- .../cubestore/src/metastore/source.rs | 4 +- .../cubestore/src/metastore/table.rs | 4 +- 11 files changed, 175 insertions(+), 33 deletions(-) diff --git a/rust/cubestore/cubestore/src/cachestore/cache_item.rs b/rust/cubestore/cubestore/src/cachestore/cache_item.rs index 92000e3a41012..64f9743ac9310 100644 --- a/rust/cubestore/cubestore/src/cachestore/cache_item.rs +++ b/rust/cubestore/cubestore/src/cachestore/cache_item.rs @@ -1,5 +1,7 @@ -use crate::metastore::{IndexId, RocksSecondaryIndex, TableId}; -use crate::{base_rocks_secondary_index, rocks_table_impl}; +use crate::metastore::{ + BaseRocksTable, IndexId, RocksEntity, RocksSecondaryIndex, RocksTable, TableId, TableInfo, +}; +use crate::{base_rocks_secondary_index, rocks_table_new, CubeError}; use chrono::serde::ts_seconds_option; use chrono::{DateTime, Duration, Utc}; use serde::{Deserialize, Deserializer, Serialize}; @@ -13,6 +15,8 @@ pub struct CacheItem { pub(crate) expire: Option>, } +impl RocksEntity for CacheItem {} + impl CacheItem { pub fn parse_path_to_prefix(mut path: String) -> String { if path.ends_with(":*") { @@ -77,8 +81,23 @@ pub(crate) enum CacheItemRocksIndex { ByPath = 1, ByPrefix = 2, } +pub struct CacheItemRocksTable<'a> { + db: crate::metastore::DbTableRef<'a>, +} + +impl<'a> CacheItemRocksTable<'a> { + pub fn new(db: crate::metastore::DbTableRef<'a>) -> Self { + Self { db } + } +} + +impl<'a> BaseRocksTable for CacheItemRocksTable<'a> { + fn migrate_table(&self, _table_info: TableInfo) -> Result<(), CubeError> { + self.migrate_table_by_truncate() + } +} -rocks_table_impl!(CacheItem, CacheItemRocksTable, TableId::CacheItems, { +rocks_table_new!(CacheItem, CacheItemRocksTable, TableId::CacheItems, { vec![ Box::new(CacheItemRocksIndex::ByPath), Box::new(CacheItemRocksIndex::ByPrefix), diff --git a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs index df7f4be241b4b..9d576d9f8e0b1 100644 --- a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs +++ b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs @@ -87,6 +87,7 @@ impl RocksStoreDetails for RocksCacheStoreDetails { fn migrate(&self, table_ref: DbTableRef) -> Result<(), CubeError> { CacheItemRocksTable::new(table_ref.clone()).migrate()?; QueueItemRocksTable::new(table_ref.clone()).migrate()?; + QueueResultRocksTable::new(table_ref.clone()).migrate()?; table_ref .db diff --git a/rust/cubestore/cubestore/src/cachestore/queue_item.rs b/rust/cubestore/cubestore/src/cachestore/queue_item.rs index 58c1b656ed77e..2bf9611669b61 100644 --- a/rust/cubestore/cubestore/src/cachestore/queue_item.rs +++ b/rust/cubestore/cubestore/src/cachestore/queue_item.rs @@ -1,8 +1,11 @@ -use crate::metastore::{IndexId, RocksSecondaryIndex, TableId}; +use crate::metastore::{ + BaseRocksTable, IndexId, RocksEntity, RocksSecondaryIndex, RocksTable, TableId, TableInfo, +}; use crate::table::{Row, TableValue}; -use crate::{base_rocks_secondary_index, rocks_table_impl, CubeError}; -use chrono::serde::ts_seconds; +use crate::{base_rocks_secondary_index, rocks_table_new, CubeError}; +use chrono::serde::{ts_seconds}; use chrono::{DateTime, Duration, Utc}; + use serde::{Deserialize, Deserializer, Serialize}; fn merge(a: serde_json::Value, b: serde_json::Value) -> Option { @@ -72,6 +75,12 @@ pub struct QueueItem { pub(crate) expire: DateTime, } +impl RocksEntity for QueueItem { + fn version() -> u32 { + 1 + } +} + impl QueueItem { pub fn new(path: String, value: String, status: QueueItemStatus, priority: i64) -> Self { let parts: Vec<&str> = path.rsplitn(2, ":").collect(); @@ -224,7 +233,23 @@ pub(crate) enum QueueItemRocksIndex { ByPrefix = 3, } -rocks_table_impl!(QueueItem, QueueItemRocksTable, TableId::QueueItems, { +pub struct QueueItemRocksTable<'a> { + db: crate::metastore::DbTableRef<'a>, +} + +impl<'a> QueueItemRocksTable<'a> { + pub fn new(db: crate::metastore::DbTableRef<'a>) -> Self { + Self { db } + } +} + +impl<'a> BaseRocksTable for QueueItemRocksTable<'a> { + fn migrate_table(&self, _table_info: TableInfo) -> Result<(), CubeError> { + self.migrate_table_by_truncate() + } +} + +rocks_table_new!(QueueItem, QueueItemRocksTable, TableId::QueueItems, { vec![ Box::new(QueueItemRocksIndex::ByPath), Box::new(QueueItemRocksIndex::ByPrefixAndStatus), diff --git a/rust/cubestore/cubestore/src/cachestore/queue_result.rs b/rust/cubestore/cubestore/src/cachestore/queue_result.rs index 6a00659b83fec..d48f31371f304 100644 --- a/rust/cubestore/cubestore/src/cachestore/queue_result.rs +++ b/rust/cubestore/cubestore/src/cachestore/queue_result.rs @@ -1,5 +1,7 @@ -use crate::metastore::{IndexId, RocksSecondaryIndex, TableId}; -use crate::{base_rocks_secondary_index, rocks_table_impl}; +use crate::metastore::{ + BaseRocksTable, IndexId, RocksEntity, RocksSecondaryIndex, RocksTable, TableId, TableInfo, +}; +use crate::{base_rocks_secondary_index, rocks_table_new, CubeError}; use chrono::serde::ts_seconds; use chrono::{DateTime, Duration, Utc}; use serde::{Deserialize, Deserializer, Serialize}; @@ -12,6 +14,8 @@ pub struct QueueResult { pub(crate) expire: DateTime, } +impl RocksEntity for QueueResult {} + impl QueueResult { pub fn new(path: String, value: String) -> Self { QueueResult { @@ -34,8 +38,23 @@ impl QueueResult { pub(crate) enum QueueResultRocksIndex { ByPath = 1, } +pub struct QueueResultRocksTable<'a> { + db: crate::metastore::DbTableRef<'a>, +} + +impl<'a> QueueResultRocksTable<'a> { + pub fn new(db: crate::metastore::DbTableRef<'a>) -> Self { + Self { db } + } +} + +impl<'a> BaseRocksTable for QueueResultRocksTable<'a> { + fn migrate_table(&self, _table_info: TableInfo) -> Result<(), CubeError> { + self.migrate_table_by_truncate() + } +} -rocks_table_impl!(QueueResult, QueueResultRocksTable, TableId::QueueResults, { +rocks_table_new!(QueueResult, QueueResultRocksTable, TableId::QueueResults, { vec![Box::new(QueueResultRocksIndex::ByPath)] }); diff --git a/rust/cubestore/cubestore/src/metastore/job.rs b/rust/cubestore/cubestore/src/metastore/job.rs index f85a85044dab9..a6606a56b3581 100644 --- a/rust/cubestore/cubestore/src/metastore/job.rs +++ b/rust/cubestore/cubestore/src/metastore/job.rs @@ -1,7 +1,7 @@ use super::{IndexId, RocksSecondaryIndex, TableId}; use crate::base_rocks_secondary_index; use crate::metastore::table::Table; -use crate::metastore::RowKey; +use crate::metastore::{RocksEntity, RowKey}; use crate::rocks_table_impl; use byteorder::{BigEndian, WriteBytesExt}; use chrono::{DateTime, Utc}; @@ -53,6 +53,8 @@ pub struct Job { status: JobStatus, } +impl RocksEntity for Job {} + impl Job { pub fn new(row_reference: RowKey, job_type: JobType, shard: String) -> Job { Job { diff --git a/rust/cubestore/cubestore/src/metastore/mod.rs b/rust/cubestore/cubestore/src/metastore/mod.rs index a7601b1a0efb2..537afd010ae4f 100644 --- a/rust/cubestore/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/cubestore/src/metastore/mod.rs @@ -546,6 +546,8 @@ pub struct Schema { } } +impl RocksEntity for Schema {} + #[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)] pub enum IndexType { Regular = 1, @@ -568,6 +570,8 @@ pub struct Index { } } +impl RocksEntity for Index {} + #[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)] pub enum AggregateFunction { SUM = 1, @@ -657,6 +661,8 @@ pub struct Partition { } } +impl RocksEntity for Partition {} + data_frame_from! { #[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)] pub struct Chunk { @@ -684,6 +690,8 @@ pub struct Chunk { } } +impl RocksEntity for Chunk {} + #[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)] pub struct WAL { table_id: u64, @@ -691,6 +699,8 @@ pub struct WAL { uploaded: bool, } +impl RocksEntity for WAL {} + meta_store_table_impl!(SchemaMetaStoreTable, Schema, SchemaRocksTable); meta_store_table_impl!(ChunkMetaStoreTable, Chunk, ChunkRocksTable); meta_store_table_impl!(IndexMetaStoreTable, Index, IndexRocksTable); diff --git a/rust/cubestore/cubestore/src/metastore/multi_index.rs b/rust/cubestore/cubestore/src/metastore/multi_index.rs index eebb092dfff13..abac5836f1d40 100644 --- a/rust/cubestore/cubestore/src/metastore/multi_index.rs +++ b/rust/cubestore/cubestore/src/metastore/multi_index.rs @@ -6,7 +6,7 @@ //! Multi-partitioned are compacted and repartitioned by applying the same operation to ordinary //! partitions they own. use crate::data_frame_from; -use crate::metastore::{Column, IdRow, IndexId, RocksSecondaryIndex, TableId}; +use crate::metastore::{Column, IdRow, IndexId, RocksEntity, RocksSecondaryIndex, TableId}; use crate::rocks_table_impl; use crate::table::Row; use byteorder::{BigEndian, WriteBytesExt}; @@ -24,6 +24,8 @@ pub struct MultiIndex { } } +impl RocksEntity for MultiIndex {} + impl MultiIndex { pub fn new(schema_id: u64, name: String, key_columns: Vec) -> MultiIndex { MultiIndex { @@ -114,6 +116,8 @@ pub struct MultiPartition { } } +impl RocksEntity for MultiPartition {} + impl MultiPartition { // Note that roots are active by default. pub fn new_root(multi_index_id: u64) -> MultiPartition { diff --git a/rust/cubestore/cubestore/src/metastore/replay_handle.rs b/rust/cubestore/cubestore/src/metastore/replay_handle.rs index 1554af64ff43f..d3d07dfbd6004 100644 --- a/rust/cubestore/cubestore/src/metastore/replay_handle.rs +++ b/rust/cubestore/cubestore/src/metastore/replay_handle.rs @@ -1,6 +1,6 @@ use super::{IndexId, RocksSecondaryIndex, TableId}; use crate::metastore::table::Table; -use crate::metastore::IdRow; +use crate::metastore::{IdRow, RocksEntity}; use crate::rocks_table_impl; use crate::{base_rocks_secondary_index, CubeError}; use byteorder::{BigEndian, WriteBytesExt}; @@ -20,6 +20,8 @@ pub struct ReplayHandle { } } +impl RocksEntity for ReplayHandle {} + #[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)] pub struct SeqPointer { start_seq: Option, diff --git a/rust/cubestore/cubestore/src/metastore/rocks_table.rs b/rust/cubestore/cubestore/src/metastore/rocks_table.rs index c806855ea2bda..3af8e6e86c75e 100644 --- a/rust/cubestore/cubestore/src/metastore/rocks_table.rs +++ b/rust/cubestore/cubestore/src/metastore/rocks_table.rs @@ -25,11 +25,30 @@ macro_rules! rocks_table_impl { } impl<'a> $rocks_table<'a> { - pub fn new(db: crate::metastore::DbTableRef<'a>) -> $rocks_table { - $rocks_table { db } + pub fn new(db: crate::metastore::DbTableRef<'a>) -> Self { + Self { db } } } + impl<'a> crate::metastore::BaseRocksTable for $rocks_table<'a> { + fn migrate_table( + &self, + table_info: crate::metastore::TableInfo, + ) -> Result<(), crate::CubeError> { + Err(crate::CubeError::internal(format!( + "Unable to migrate table from {}. There is no support for auto migrations. Please implement migration.", + table_info.version + ))) + } + } + + crate::rocks_table_new!($table, $rocks_table, $table_id, $indexes); + }; +} + +#[macro_export] +macro_rules! rocks_table_new { + ($table: ty, $rocks_table: ident, $table_id: expr, $indexes: block) => { impl<'a> crate::metastore::RocksTable for $rocks_table<'a> { type T = $table; @@ -278,8 +297,18 @@ where } } -pub trait RocksTable: Debug + Send + Sync { - type T: Serialize + Clone + Debug + Send; +pub trait RocksEntity { + fn version() -> u32 { + 1 + } +} + +pub trait BaseRocksTable { + fn migrate_table(&self, table_info: TableInfo) -> Result<(), CubeError>; +} + +pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { + type T: Serialize + Clone + Debug + Send + RocksEntity; fn delete_event(&self, row: IdRow) -> MetaStoreEvent; fn update_event(&self, old_row: IdRow, new_row: IdRow) -> MetaStoreEvent; fn db(&self) -> &DB; @@ -293,6 +322,19 @@ pub trait RocksTable: Debug + Send + Sync { D: Deserializer<'de>; fn indexes() -> Vec>>; + fn migrate_table_by_truncate(&self) -> Result<(), CubeError> { + let mut batch = WriteBatch::default(); + self.delete_all_rows_from_table(Self::table_id(), &mut batch)?; + + for index in Self::indexes() { + self.delete_all_rows_from_index(index.get_id(), &mut batch)?; + } + + self.db().write(batch)?; + + Ok(()) + } + fn insert( &self, row: Self::T, @@ -356,7 +398,7 @@ pub trait RocksTable: Debug + Send + Sync { if let Some(table_info) = table_info { let table_info = self.deserialize_table_info(table_info.as_slice())?; - if table_info.version != self.version() + if table_info.version != Self::T::version() || table_info.value_version != self.value_version() { self.migrate_table(table_info)? @@ -368,7 +410,7 @@ pub trait RocksTable: Debug + Send + Sync { } .to_bytes(), self.serialize_table_info(TableInfo { - version: self.version(), + version: Self::T::version(), value_version: self.value_version(), })? .as_slice(), @@ -378,14 +420,6 @@ pub trait RocksTable: Debug + Send + Sync { Ok(()) } - fn migrate_table(&self, table_info: TableInfo) -> Result<(), CubeError> { - Err(CubeError::internal(format!( - "Unable to migrate table from {} to {}. There is no support for auto migrations. Please implement migration.", - table_info.version, - self.value_version() - ))) - } - fn migration_check_indexes(&self) -> Result<(), CubeError> { let snapshot = self.snapshot(); for index in Self::indexes().into_iter() { @@ -418,10 +452,6 @@ pub trait RocksTable: Debug + Send + Sync { Ok(()) } - fn version(&self) -> u32 { - 1 - } - fn value_version(&self) -> u32 { 1 } @@ -870,6 +900,32 @@ pub trait RocksTable: Debug + Send + Sync { Ok(res) } + fn delete_all_rows_from_table( + &self, + table_id: TableId, + batch: &mut WriteBatch, + ) -> Result<(), CubeError> { + let ref db = self.snapshot(); + let key_min = RowKey::Table(table_id, 0); + + let iter = db.iterator(IteratorMode::From( + &key_min.to_bytes()[0..get_fixed_prefix()], + Direction::Forward, + )); + + for (key, _) in iter { + let row_key = RowKey::from_bytes(&key); + if let RowKey::Table(row_table_id, _) = row_key { + if row_table_id == table_id { + batch.delete(key); + } else { + return Ok(()); + } + } + } + Ok(()) + } + fn delete_all_rows_from_index( &self, secondary_id: u32, diff --git a/rust/cubestore/cubestore/src/metastore/source.rs b/rust/cubestore/cubestore/src/metastore/source.rs index 24a7f7655652d..ef19af2895dcb 100644 --- a/rust/cubestore/cubestore/src/metastore/source.rs +++ b/rust/cubestore/cubestore/src/metastore/source.rs @@ -1,6 +1,6 @@ use super::{IndexId, RocksSecondaryIndex, TableId}; use crate::base_rocks_secondary_index; -use crate::metastore::DataFrameValue; +use crate::metastore::{DataFrameValue, RocksEntity}; use crate::rocks_table_impl; use byteorder::{BigEndian, WriteBytesExt}; @@ -36,6 +36,8 @@ pub struct Source { } } +impl RocksEntity for Source {} + impl Source { pub fn new(name: String, source_credentials: SourceCredentials) -> Self { Self { diff --git a/rust/cubestore/cubestore/src/metastore/table.rs b/rust/cubestore/cubestore/src/metastore/table.rs index 47dbce5686c4a..bb5016237b7b9 100644 --- a/rust/cubestore/cubestore/src/metastore/table.rs +++ b/rust/cubestore/cubestore/src/metastore/table.rs @@ -2,7 +2,7 @@ use super::{ AggregateFunction, Column, ColumnType, DataFrameValue, IndexId, RocksSecondaryIndex, TableId, }; use crate::data_frame_from; -use crate::metastore::{IdRow, ImportFormat, Schema}; +use crate::metastore::{IdRow, ImportFormat, RocksEntity, Schema}; use crate::queryplanner::udfs::aggregate_udf_by_kind; use crate::queryplanner::udfs::CubeAggregateUDFKind; use crate::rocks_table_impl; @@ -155,6 +155,8 @@ pub struct Table { } } +impl RocksEntity for Table {} + #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] pub struct TablePath { pub table: IdRow,