From 578a8f9a88d12ae1960947138c48e0b70d7b078b Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Fri, 10 Mar 2023 14:53:49 +0300 Subject: [PATCH 1/9] feat(cubestore): Use separate column family for queue --- .../cubestore/src/cachestore/cache_item.rs | 18 +++++++++----- .../cubestore/src/cachestore/queue_item.rs | 20 ++++++++++------ .../cubestore/src/cachestore/queue_result.rs | 10 +++++--- .../cubestore/src/metastore/chunks.rs | 18 +++++++++----- .../cubestore/src/metastore/index.rs | 20 ++++++++++------ rust/cubestore/cubestore/src/metastore/job.rs | 18 +++++++++----- .../cubestore/src/metastore/multi_index.rs | 13 ++++++---- .../cubestore/src/metastore/partition.rs | 24 ++++++++++++------- .../cubestore/src/metastore/replay_handle.rs | 3 ++- .../cubestore/src/metastore/rocks_table.rs | 20 ++++++++++++---- .../cubestore/src/metastore/schema.rs | 10 +++++--- .../cubestore/src/metastore/source.rs | 10 +++++--- .../cubestore/src/metastore/table.rs | 10 +++++--- rust/cubestore/cubestore/src/metastore/wal.rs | 10 +++++--- 14 files changed, 139 insertions(+), 65 deletions(-) diff --git a/rust/cubestore/cubestore/src/cachestore/cache_item.rs b/rust/cubestore/cubestore/src/cachestore/cache_item.rs index 393f87524b708..74e07af4ff349 100644 --- a/rust/cubestore/cubestore/src/cachestore/cache_item.rs +++ b/rust/cubestore/cubestore/src/cachestore/cache_item.rs @@ -110,12 +110,18 @@ impl<'a> BaseRocksTable for CacheItemRocksTable<'a> { } } -rocks_table_new!(CacheItem, CacheItemRocksTable, TableId::CacheItems, { - vec![ - Box::new(CacheItemRocksIndex::ByPath), - Box::new(CacheItemRocksIndex::ByPrefix), - ] -}); +rocks_table_new!( + CacheItem, + CacheItemRocksTable, + TableId::CacheItems, + { + vec![ + Box::new(CacheItemRocksIndex::ByPath), + Box::new(CacheItemRocksIndex::ByPrefix), + ] + }, + rocksdb::DEFAULT_COLUMN_FAMILY_NAME +); #[derive(Hash, Clone, Debug)] pub enum CacheItemIndexKey { diff --git a/rust/cubestore/cubestore/src/cachestore/queue_item.rs b/rust/cubestore/cubestore/src/cachestore/queue_item.rs index c0ed08be49919..29a6c88da06bb 100644 --- a/rust/cubestore/cubestore/src/cachestore/queue_item.rs +++ b/rust/cubestore/cubestore/src/cachestore/queue_item.rs @@ -364,13 +364,19 @@ impl<'a> BaseRocksTable for QueueItemRocksTable<'a> { } } -rocks_table_new!(QueueItem, QueueItemRocksTable, TableId::QueueItems, { - vec![ - Box::new(QueueItemRocksIndex::ByPath), - Box::new(QueueItemRocksIndex::ByPrefixAndStatus), - Box::new(QueueItemRocksIndex::ByPrefix), - ] -}); +rocks_table_new!( + QueueItem, + QueueItemRocksTable, + TableId::QueueItems, + { + vec![ + Box::new(QueueItemRocksIndex::ByPath), + Box::new(QueueItemRocksIndex::ByPrefixAndStatus), + Box::new(QueueItemRocksIndex::ByPrefix), + ] + }, + rocksdb::DEFAULT_COLUMN_FAMILY_NAME +); #[derive(Hash, Clone, Debug)] pub enum QueueItemIndexKey { diff --git a/rust/cubestore/cubestore/src/cachestore/queue_result.rs b/rust/cubestore/cubestore/src/cachestore/queue_result.rs index e9666396c6c41..da83730b02b3f 100644 --- a/rust/cubestore/cubestore/src/cachestore/queue_result.rs +++ b/rust/cubestore/cubestore/src/cachestore/queue_result.rs @@ -67,9 +67,13 @@ impl<'a> BaseRocksTable for QueueResultRocksTable<'a> { } } -rocks_table_new!(QueueResult, QueueResultRocksTable, TableId::QueueResults, { - vec![Box::new(QueueResultRocksIndex::ByPath)] -}); +rocks_table_new!( + QueueResult, + QueueResultRocksTable, + TableId::QueueResults, + { vec![Box::new(QueueResultRocksIndex::ByPath)] }, + rocksdb::DEFAULT_COLUMN_FAMILY_NAME +); #[derive(Hash, Clone, Debug)] pub enum QueueResultIndexKey { diff --git a/rust/cubestore/cubestore/src/metastore/chunks.rs b/rust/cubestore/cubestore/src/metastore/chunks.rs index 1dc22887ea9b7..4940aab2f411d 100644 --- a/rust/cubestore/cubestore/src/metastore/chunks.rs +++ b/rust/cubestore/cubestore/src/metastore/chunks.rs @@ -159,12 +159,18 @@ pub(crate) enum ChunkRocksIndex { ReplayHandleId = 2, } -rocks_table_impl!(Chunk, ChunkRocksTable, TableId::Chunks, { - vec![ - Box::new(ChunkRocksIndex::PartitionId), - Box::new(ChunkRocksIndex::ReplayHandleId), - ] -}); +rocks_table_impl!( + Chunk, + ChunkRocksTable, + TableId::Chunks, + { + vec![ + Box::new(ChunkRocksIndex::PartitionId), + Box::new(ChunkRocksIndex::ReplayHandleId), + ] + }, + rocksdb::DEFAULT_COLUMN_FAMILY_NAME +); base_rocks_secondary_index!(Chunk, ChunkRocksIndex); diff --git a/rust/cubestore/cubestore/src/metastore/index.rs b/rust/cubestore/cubestore/src/metastore/index.rs index 82c1da9f3dbd5..5501b89d992f7 100644 --- a/rust/cubestore/cubestore/src/metastore/index.rs +++ b/rust/cubestore/cubestore/src/metastore/index.rs @@ -80,13 +80,19 @@ pub(crate) enum IndexRocksIndex { crate::base_rocks_secondary_index!(Index, IndexRocksIndex); -rocks_table_impl!(Index, IndexRocksTable, TableId::Indexes, { - vec![ - Box::new(IndexRocksIndex::TableID), - Box::new(IndexRocksIndex::Name), - Box::new(IndexRocksIndex::MultiIndexId), - ] -}); +rocks_table_impl!( + Index, + IndexRocksTable, + TableId::Indexes, + { + vec![ + Box::new(IndexRocksIndex::TableID), + Box::new(IndexRocksIndex::Name), + Box::new(IndexRocksIndex::MultiIndexId), + ] + }, + rocksdb::DEFAULT_COLUMN_FAMILY_NAME +); #[derive(Hash, Clone, Debug)] pub enum IndexIndexKey { diff --git a/rust/cubestore/cubestore/src/metastore/job.rs b/rust/cubestore/cubestore/src/metastore/job.rs index a6606a56b3581..c2af32163206a 100644 --- a/rust/cubestore/cubestore/src/metastore/job.rs +++ b/rust/cubestore/cubestore/src/metastore/job.rs @@ -118,12 +118,18 @@ pub enum JobRocksIndex { base_rocks_secondary_index!(Job, JobRocksIndex); -rocks_table_impl!(Job, JobRocksTable, TableId::Jobs, { - vec![ - Box::new(JobRocksIndex::RowReference), - Box::new(JobRocksIndex::ByShard), - ] -}); +rocks_table_impl!( + Job, + JobRocksTable, + TableId::Jobs, + { + vec![ + Box::new(JobRocksIndex::RowReference), + Box::new(JobRocksIndex::ByShard), + ] + }, + rocksdb::DEFAULT_COLUMN_FAMILY_NAME +); #[derive(Hash, Clone, Debug)] pub enum JobIndexKey { diff --git a/rust/cubestore/cubestore/src/metastore/multi_index.rs b/rust/cubestore/cubestore/src/metastore/multi_index.rs index abac5836f1d40..fd8f88743b815 100644 --- a/rust/cubestore/cubestore/src/metastore/multi_index.rs +++ b/rust/cubestore/cubestore/src/metastore/multi_index.rs @@ -55,9 +55,13 @@ pub(crate) enum MultiIndexRocksIndex { crate::base_rocks_secondary_index!(MultiIndex, MultiIndexRocksIndex); -rocks_table_impl!(MultiIndex, MultiIndexRocksTable, TableId::MultiIndexes, { - vec![Box::new(MultiIndexRocksIndex::ByName)] -}); +rocks_table_impl!( + MultiIndex, + MultiIndexRocksTable, + TableId::MultiIndexes, + { vec![Box::new(MultiIndexRocksIndex::ByName)] }, + rocksdb::DEFAULT_COLUMN_FAMILY_NAME +); #[derive(Hash, Clone, Debug)] pub enum MultiIndexIndexKey { @@ -210,7 +214,8 @@ rocks_table_impl!( Box::new(MultiPartitionRocksIndex::ByMultiIndexId), Box::new(MultiPartitionRocksIndex::ByParentId), ] - } + }, + rocksdb::DEFAULT_COLUMN_FAMILY_NAME ); #[derive(Hash, Clone, Debug)] diff --git a/rust/cubestore/cubestore/src/metastore/partition.rs b/rust/cubestore/cubestore/src/metastore/partition.rs index 8b1c63b23159f..bc8c265e14f0a 100644 --- a/rust/cubestore/cubestore/src/metastore/partition.rs +++ b/rust/cubestore/cubestore/src/metastore/partition.rs @@ -199,15 +199,21 @@ pub(crate) enum PartitionRocksIndex { ParentPartitionId = 5, } -rocks_table_impl!(Partition, PartitionRocksTable, TableId::Partitions, { - vec![ - Box::new(PartitionRocksIndex::IndexId), - Box::new(PartitionRocksIndex::MultiPartitionId), - Box::new(PartitionRocksIndex::Active), - Box::new(PartitionRocksIndex::JustCreated), - Box::new(PartitionRocksIndex::ParentPartitionId), - ] -}); +rocks_table_impl!( + Partition, + PartitionRocksTable, + TableId::Partitions, + { + vec![ + Box::new(PartitionRocksIndex::IndexId), + Box::new(PartitionRocksIndex::MultiPartitionId), + Box::new(PartitionRocksIndex::Active), + Box::new(PartitionRocksIndex::JustCreated), + Box::new(PartitionRocksIndex::ParentPartitionId), + ] + }, + rocksdb::DEFAULT_COLUMN_FAMILY_NAME +); #[derive(Hash, Clone, Debug)] pub enum PartitionIndexKey { diff --git a/rust/cubestore/cubestore/src/metastore/replay_handle.rs b/rust/cubestore/cubestore/src/metastore/replay_handle.rs index d3d07dfbd6004..7baf2af6dca6d 100644 --- a/rust/cubestore/cubestore/src/metastore/replay_handle.rs +++ b/rust/cubestore/cubestore/src/metastore/replay_handle.rs @@ -324,7 +324,8 @@ rocks_table_impl!( ReplayHandle, ReplayHandleRocksTable, TableId::ReplayHandles, - { vec![Box::new(ReplayHandleRocksIndex::ByTableId),] } + { vec![Box::new(ReplayHandleRocksIndex::ByTableId),] }, + rocksdb::DEFAULT_COLUMN_FAMILY_NAME ); #[derive(Hash, Clone, Debug)] diff --git a/rust/cubestore/cubestore/src/metastore/rocks_table.rs b/rust/cubestore/cubestore/src/metastore/rocks_table.rs index 37ee35c969ea0..f3635a3d3e1b7 100644 --- a/rust/cubestore/cubestore/src/metastore/rocks_table.rs +++ b/rust/cubestore/cubestore/src/metastore/rocks_table.rs @@ -7,7 +7,9 @@ use crate::CubeError; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use chrono::{DateTime, Utc}; use itertools::Itertools; -use rocksdb::{DBIterator, Direction, IteratorMode, ReadOptions, Snapshot, WriteBatch, DB}; +use rocksdb::{ + ColumnFamily, DBIterator, Direction, IteratorMode, ReadOptions, Snapshot, WriteBatch, DB, +}; use serde::{Deserialize, Deserializer, Serialize}; use std::collections::hash_map::DefaultHasher; use std::collections::HashMap; @@ -19,7 +21,7 @@ use std::time::SystemTime; #[macro_export] macro_rules! rocks_table_impl { - ($table: ty, $rocks_table: ident, $table_id: expr, $indexes: block) => { + ($table: ty, $rocks_table: ident, $table_id: expr, $indexes: block, $column_family: expr) => { pub(crate) struct $rocks_table<'a> { db: crate::metastore::DbTableRef<'a>, } @@ -43,16 +45,20 @@ macro_rules! rocks_table_impl { } } - crate::rocks_table_new!($table, $rocks_table, $table_id, $indexes); + crate::rocks_table_new!($table, $rocks_table, $table_id, $indexes, $column_family); }; } #[macro_export] macro_rules! rocks_table_new { - ($table: ty, $rocks_table: ident, $table_id: expr, $indexes: block) => { + ($table: ty, $rocks_table: ident, $table_id: expr, $indexes: block, $column_family: expr) => { impl<'a> crate::metastore::RocksTable for $rocks_table<'a> { type T = $table; + fn cf_name(&self) -> &'static str { + $column_family + } + fn db(&self) -> &rocksdb::DB { self.db.db } @@ -363,6 +369,12 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { where D: Deserializer<'de>; fn indexes() -> Vec>>; + fn cf(&self) -> Result<&ColumnFamily, CubeError> { + self.db() + .cf_handle(&self.cf_name().to_string()) + .ok_or_else(|| CubeError::internal(format!("cf {:?} not found", self.cf_name()))) + } + fn cf_name(&self) -> &'static str; fn migrate_table_by_truncate(&self, mut batch: &mut WriteBatch) -> Result<(), CubeError> { log::trace!("Truncating rows from {:?} table", self); diff --git a/rust/cubestore/cubestore/src/metastore/schema.rs b/rust/cubestore/cubestore/src/metastore/schema.rs index e0c177592cfe3..b3776a7d61bd7 100644 --- a/rust/cubestore/cubestore/src/metastore/schema.rs +++ b/rust/cubestore/cubestore/src/metastore/schema.rs @@ -23,9 +23,13 @@ pub(crate) enum SchemaRocksIndex { Name = 1, } -rocks_table_impl!(Schema, SchemaRocksTable, TableId::Schemas, { - vec![Box::new(SchemaRocksIndex::Name)] -}); +rocks_table_impl!( + Schema, + SchemaRocksTable, + TableId::Schemas, + { vec![Box::new(SchemaRocksIndex::Name)] }, + rocksdb::DEFAULT_COLUMN_FAMILY_NAME +); impl RocksSecondaryIndex for SchemaRocksIndex { fn typed_key_by(&self, row: &Schema) -> String { diff --git a/rust/cubestore/cubestore/src/metastore/source.rs b/rust/cubestore/cubestore/src/metastore/source.rs index ef19af2895dcb..804a3e9197dcd 100644 --- a/rust/cubestore/cubestore/src/metastore/source.rs +++ b/rust/cubestore/cubestore/src/metastore/source.rs @@ -62,9 +62,13 @@ pub enum SourceRocksIndex { base_rocks_secondary_index!(Source, SourceRocksIndex); -rocks_table_impl!(Source, SourceRocksTable, TableId::Sources, { - vec![Box::new(SourceRocksIndex::Name)] -}); +rocks_table_impl!( + Source, + SourceRocksTable, + TableId::Sources, + { vec![Box::new(SourceRocksIndex::Name)] }, + rocksdb::DEFAULT_COLUMN_FAMILY_NAME +); #[derive(Hash, Clone, Debug)] pub enum SourceIndexKey { diff --git a/rust/cubestore/cubestore/src/metastore/table.rs b/rust/cubestore/cubestore/src/metastore/table.rs index bb5016237b7b9..a05cb55588763 100644 --- a/rust/cubestore/cubestore/src/metastore/table.rs +++ b/rust/cubestore/cubestore/src/metastore/table.rs @@ -412,9 +412,13 @@ impl Column { } } -rocks_table_impl!(Table, TableRocksTable, TableId::Tables, { - vec![Box::new(TableRocksIndex::Name)] -}); +rocks_table_impl!( + Table, + TableRocksTable, + TableId::Tables, + { vec![Box::new(TableRocksIndex::Name)] }, + rocksdb::DEFAULT_COLUMN_FAMILY_NAME +); #[derive(Clone, Copy, Debug)] pub(crate) enum TableRocksIndex { diff --git a/rust/cubestore/cubestore/src/metastore/wal.rs b/rust/cubestore/cubestore/src/metastore/wal.rs index 04fa25d22d263..d6a1340262fb7 100644 --- a/rust/cubestore/cubestore/src/metastore/wal.rs +++ b/rust/cubestore/cubestore/src/metastore/wal.rs @@ -45,9 +45,13 @@ pub(crate) enum WALRocksIndex { TableID = 1, } -rocks_table_impl!(WAL, WALRocksTable, TableId::WALs, { - vec![Box::new(WALRocksIndex::TableID)] -}); +rocks_table_impl!( + WAL, + WALRocksTable, + TableId::WALs, + { vec![Box::new(WALRocksIndex::TableID)] }, + rocksdb::DEFAULT_COLUMN_FAMILY_NAME +); #[derive(Hash, Clone, Debug)] pub enum WALIndexKey { From ec07445403d68f8605d9726f5cdda2d29a9e25d4 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Fri, 10 Mar 2023 15:03:03 +0300 Subject: [PATCH 2/9] chore: declare cf --- .../src/cachestore/cache_rocksstore.rs | 21 +++++++++++++++++-- .../cubestore/src/cachestore/queue_item.rs | 3 ++- .../cubestore/src/cachestore/queue_result.rs | 3 ++- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs index 61399cd410dd0..9f4bfc9861946 100644 --- a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs +++ b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs @@ -24,7 +24,7 @@ use crate::CubeError; use async_trait::async_trait; use futures_timer::Delay; -use rocksdb::{BlockBasedOptions, Options, DB}; +use rocksdb::{BlockBasedOptions, ColumnFamilyDescriptor, Options, DB}; use crate::cachestore::compaction::CompactionPreloadedState; use crate::cachestore::listener::RocksCacheStoreListener; @@ -38,6 +38,8 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use tokio::sync::broadcast::Sender; +pub const CACHESTORE_QUEUE_COLUMN_FAMILY_NAME: &str = "queue"; + pub(crate) struct RocksCacheStoreDetails {} impl RocksCacheStoreDetails { @@ -74,6 +76,7 @@ impl RocksStoreDetails for RocksCacheStoreDetails { let mut opts = Options::default(); opts.create_if_missing(true); + opts.create_missing_column_families(true); opts.set_prefix_extractor(rocksdb::SliceTransform::create_fixed_prefix(13)); opts.set_compaction_filter_factory(compaction::MetaStoreCacheCompactionFactory::new( compaction_state, @@ -90,7 +93,21 @@ impl RocksStoreDetails for RocksCacheStoreDetails { opts.set_block_based_table_factory(&block_opts); - DB::open(&opts, path) + let default_cf = { + let mut opts = Options::default(); + opts.set_prefix_extractor(rocksdb::SliceTransform::create_fixed_prefix(13)); + + ColumnFamilyDescriptor::new(rocksdb::DEFAULT_COLUMN_FAMILY_NAME, opts) + }; + + let queue_cf = { + let mut opts = Options::default(); + opts.set_prefix_extractor(rocksdb::SliceTransform::create_fixed_prefix(13)); + + ColumnFamilyDescriptor::new(CACHESTORE_QUEUE_COLUMN_FAMILY_NAME, opts) + }; + + DB::open_cf_descriptors(&opts, path, vec![default_cf, queue_cf]) .map_err(|err| CubeError::internal(format!("DB::open error for cachestore: {}", err))) } diff --git a/rust/cubestore/cubestore/src/cachestore/queue_item.rs b/rust/cubestore/cubestore/src/cachestore/queue_item.rs index 29a6c88da06bb..91e26ef1cad36 100644 --- a/rust/cubestore/cubestore/src/cachestore/queue_item.rs +++ b/rust/cubestore/cubestore/src/cachestore/queue_item.rs @@ -1,3 +1,4 @@ +use super::cache_rocksstore::CACHESTORE_QUEUE_COLUMN_FAMILY_NAME; use crate::metastore::{ BaseRocksTable, IndexId, RocksEntity, RocksSecondaryIndex, RocksTable, TableId, TableInfo, }; @@ -375,7 +376,7 @@ rocks_table_new!( Box::new(QueueItemRocksIndex::ByPrefix), ] }, - rocksdb::DEFAULT_COLUMN_FAMILY_NAME + CACHESTORE_QUEUE_COLUMN_FAMILY_NAME ); #[derive(Hash, Clone, Debug)] diff --git a/rust/cubestore/cubestore/src/cachestore/queue_result.rs b/rust/cubestore/cubestore/src/cachestore/queue_result.rs index da83730b02b3f..74009e572ad24 100644 --- a/rust/cubestore/cubestore/src/cachestore/queue_result.rs +++ b/rust/cubestore/cubestore/src/cachestore/queue_result.rs @@ -1,3 +1,4 @@ +use super::cache_rocksstore::CACHESTORE_QUEUE_COLUMN_FAMILY_NAME; use crate::metastore::{ BaseRocksTable, IndexId, RocksEntity, RocksSecondaryIndex, RocksTable, TableId, TableInfo, }; @@ -72,7 +73,7 @@ rocks_table_new!( QueueResultRocksTable, TableId::QueueResults, { vec![Box::new(QueueResultRocksIndex::ByPath)] }, - rocksdb::DEFAULT_COLUMN_FAMILY_NAME + CACHESTORE_QUEUE_COLUMN_FAMILY_NAME ); #[derive(Hash, Clone, Debug)] From 988a604a4146e2bb6bb6dd5e549443dda7c5271b Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Fri, 10 Mar 2023 15:11:08 +0300 Subject: [PATCH 3/9] chore: use cf api --- .../cubestore/src/metastore/rocks_table.rs | 45 +++++++++++-------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/rust/cubestore/cubestore/src/metastore/rocks_table.rs b/rust/cubestore/cubestore/src/metastore/rocks_table.rs index f3635a3d3e1b7..6008fa2235a54 100644 --- a/rust/cubestore/cubestore/src/metastore/rocks_table.rs +++ b/rust/cubestore/cubestore/src/metastore/rocks_table.rs @@ -458,7 +458,8 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { self.migrate_table(&mut batch, table_info)?; - batch.put( + batch.put_cf( + self.cf()?, &RowKey::TableInfo { table_id: Self::table_id(), } @@ -473,7 +474,8 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { self.db().write(batch)?; } } else { - self.db().put( + self.db().put_cf( + self.cf()?, &RowKey::TableInfo { table_id: Self::table_id(), } @@ -570,9 +572,10 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { } let row = row?; let index_row = self.index_key_val(row.get_row(), row.get_id(), index); - batch.put(index_row.key, index_row.val); + batch.put_cf(self.cf()?, index_row.key, index_row.val); } - batch.put( + batch.put_cf( + self.cf()?, &RowKey::SecondaryIndexInfo { index_id: Self::index_id(index.get_id()), } @@ -931,9 +934,10 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { let mut opts = ReadOptions::default(); opts.set_prefix_same_as_start(true); - let iter = db.iterator_opt( - IteratorMode::From(&key_min.to_bytes()[0..(key_len + 5)], Direction::Forward), + let iter = db.iterator_cf_opt( + self.cf()?, opts, + IteratorMode::From(&key_min.to_bytes()[0..(key_len + 5)], Direction::Forward), ); let index = self.get_index_by_id(secondary_id); @@ -982,10 +986,13 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { let ref db = self.snapshot(); let key_min = RowKey::Table(table_id, 0); - let iter = db.iterator(IteratorMode::From( - &key_min.to_bytes()[0..get_fixed_prefix()], - Direction::Forward, - )); + let iter = db.iterator_cf( + self.cf()?, + IteratorMode::From( + &key_min.to_bytes()[0..get_fixed_prefix()], + Direction::Forward, + ), + ); for kv_res in iter { let (key, _) = kv_res?; @@ -1011,10 +1018,10 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { let key_len = zero_vec.len(); let key_min = RowKey::SecondaryIndex(Self::index_id(secondary_id), zero_vec.clone(), 0); - let iter = db.iterator(IteratorMode::From( - &key_min.to_bytes()[0..(key_len + 5)], - Direction::Forward, - )); + let iter = db.iterator_cf( + self.cf()?, + IteratorMode::From(&key_min.to_bytes()[0..(key_len + 5)], Direction::Forward), + ); for kv_res in iter { let (key, _) = kv_res?; @@ -1065,9 +1072,10 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { let mut opts = ReadOptions::default(); opts.set_prefix_same_as_start(true); - let iter = db.iterator_opt( - IteratorMode::From(&key_min.to_bytes()[0..(key_len + 5)], Direction::Forward), + let iter = db.iterator_cf_opt( + self.cf()?, opts, + IteratorMode::From(&key_min.to_bytes()[0..(key_len + 5)], Direction::Forward), ); Ok(IndexScanIter { @@ -1085,12 +1093,13 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { let mut opts = ReadOptions::default(); opts.set_prefix_same_as_start(true); - let iterator = db.iterator_opt( + let iterator = db.iterator_cf_opt( + self.cf()?, + opts, IteratorMode::From( &key_min.to_bytes()[0..get_fixed_prefix()], Direction::Forward, ), - opts, ); Ok(TableScanIter { From c0f8d44224bf5d0cbe0e545a6a9cfd9cd3313ef7 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Fri, 10 Mar 2023 15:56:51 +0300 Subject: [PATCH 4/9] chore: use cf api --- .../cubestore/src/metastore/rocks_table.rs | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/rust/cubestore/cubestore/src/metastore/rocks_table.rs b/rust/cubestore/cubestore/src/metastore/rocks_table.rs index 6008fa2235a54..07e4ff9b0f71a 100644 --- a/rust/cubestore/cubestore/src/metastore/rocks_table.rs +++ b/rust/cubestore/cubestore/src/metastore/rocks_table.rs @@ -415,14 +415,22 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { let (row_id, inserted_row) = self.insert_row(serialized_row)?; batch_pipe.add_event(MetaStoreEvent::Insert(Self::table_id(), row_id)); - if self.snapshot().get(&inserted_row.key)?.is_some() { + if self + .snapshot() + .get_cf(self.cf()?, &inserted_row.key)? + .is_some() + { return Err(CubeError::internal(format!("Primary key constraint violation. Primary key already exists for a row id {}: {:?}", row_id, &row))); } batch_pipe.batch().put(inserted_row.key, inserted_row.val); let index_row = self.insert_index_row(&row, row_id)?; for to_insert in index_row { - if self.snapshot().get(&to_insert.key)?.is_some() { + if self + .snapshot() + .get_cf(self.cf()?, &to_insert.key)? + .is_some() + { return Err(CubeError::internal(format!("Primary key constraint violation in secondary index. Primary key already exists for a row id {}: {:?}", row_id, &row))); } batch_pipe.batch().put(to_insert.key, to_insert.val); @@ -850,7 +858,10 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { fn get_row(&self, row_id: u64) -> Result>, CubeError> { let ref db = self.snapshot(); - let res = db.get(RowKey::Table(Self::table_id(), row_id).to_bytes())?; + let res = db.get_cf( + self.cf()?, + RowKey::Table(Self::table_id(), row_id).to_bytes(), + )?; if let Some(buffer) = res { let row = self.deserialize_id_row(row_id, buffer.as_slice())?; From 6610cec366758a5547c159bfaa26f26ce83a0f0f Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Fri, 10 Mar 2023 16:50:55 +0300 Subject: [PATCH 5/9] chore: use cf api --- rust/cubestore/cubestore/src/metastore/rocks_table.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/rust/cubestore/cubestore/src/metastore/rocks_table.rs b/rust/cubestore/cubestore/src/metastore/rocks_table.rs index 07e4ff9b0f71a..cb5e5c4164c93 100644 --- a/rust/cubestore/cubestore/src/metastore/rocks_table.rs +++ b/rust/cubestore/cubestore/src/metastore/rocks_table.rs @@ -735,7 +735,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { ) -> Result, CubeError> { let deleted_row = self.delete_index_row(&old_row, row_id)?; for row in deleted_row { - batch_pipe.batch().delete(row.key); + batch_pipe.batch().delete_cf(self.cf()?, row.key); } let mut ser = flexbuffers::FlexbufferSerializer::new(); @@ -791,12 +791,12 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { } for row in deleted_row { - batch_pipe.batch().delete(row.key); + batch_pipe.batch().delete_cf(self.cf()?, row.key); } batch_pipe .batch() - .delete(self.delete_row(row.get_id())?.key); + .delete_cf(self.cf()?, self.delete_row(row.get_id())?.key); Ok(row) } @@ -1010,7 +1010,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { let row_key = RowKey::from_bytes(&key); if let RowKey::Table(row_table_id, _) = row_key { if row_table_id == table_id { - batch.delete(key); + batch.delete_cf(self.cf()?, key); } else { return Ok(()); } @@ -1039,7 +1039,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { let row_key = RowKey::from_bytes(&key); if let RowKey::SecondaryIndex(index_id, _, _) = row_key { if index_id == Self::index_id(secondary_id) { - batch.delete(key); + batch.delete_cf(self.cf()?, key); } else { return Ok(()); } From 98fc49d4a0246d6a6c08081980a79599036070e2 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Fri, 10 Mar 2023 16:51:49 +0300 Subject: [PATCH 6/9] chore: use cf api --- .../cubestore/src/metastore/rocks_table.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/rust/cubestore/cubestore/src/metastore/rocks_table.rs b/rust/cubestore/cubestore/src/metastore/rocks_table.rs index cb5e5c4164c93..3e01028bededc 100644 --- a/rust/cubestore/cubestore/src/metastore/rocks_table.rs +++ b/rust/cubestore/cubestore/src/metastore/rocks_table.rs @@ -422,7 +422,9 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { { return Err(CubeError::internal(format!("Primary key constraint violation. Primary key already exists for a row id {}: {:?}", row_id, &row))); } - batch_pipe.batch().put(inserted_row.key, inserted_row.val); + batch_pipe + .batch() + .put_cf(self.cf()?, inserted_row.key, inserted_row.val); let index_row = self.insert_index_row(&row, row_id)?; for to_insert in index_row { @@ -433,7 +435,9 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { { return Err(CubeError::internal(format!("Primary key constraint violation in secondary index. Primary key already exists for a row id {}: {:?}", row_id, &row))); } - batch_pipe.batch().put(to_insert.key, to_insert.val); + batch_pipe + .batch() + .put_cf(self.cf()?, to_insert.key, to_insert.val); } Ok(IdRow::new(row_id, row)) @@ -752,11 +756,13 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { )); } - batch_pipe.batch().put(updated_row.key, updated_row.val); + batch_pipe + .batch() + .put_cf(self.cf()?, updated_row.key, updated_row.val); let index_row = self.insert_index_row(&new_row, row_id)?; for row in index_row { - batch_pipe.batch().put(row.key, row.val); + batch_pipe.batch().put_cf(self.cf()?, row.key, row.val); } Ok(IdRow::new(row_id, new_row)) } @@ -816,7 +822,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { let mut to_write = vec![]; to_write.write_u64::(next_seq)?; - db.put(seq_key.to_bytes(), to_write)?; + db.put_cf(self.cf()?, seq_key.to_bytes(), to_write)?; Ok(next_seq) } From 896769625bda2fc813dba78ca09a147600e25367 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Sat, 11 Mar 2023 00:21:03 +0300 Subject: [PATCH 7/9] chore: declare default cf for metastore --- rust/cubestore/cubestore/src/metastore/mod.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/rust/cubestore/cubestore/src/metastore/mod.rs b/rust/cubestore/cubestore/src/metastore/mod.rs index f0c98eb68ca87..4b2bd2d06bb29 100644 --- a/rust/cubestore/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/cubestore/src/metastore/mod.rs @@ -21,7 +21,7 @@ pub use rocks_table::*; use crate::cluster::node_name_by_partition; use async_trait::async_trait; use log::info; -use rocksdb::{BlockBasedOptions, Env, MergeOperands, Options, DB}; +use rocksdb::{BlockBasedOptions, ColumnFamilyDescriptor, Env, MergeOperands, Options, DB}; use serde::{Deserialize, Serialize}; use std::hash::Hash; use std::{env, io::Cursor, sync::Arc}; @@ -1160,7 +1160,14 @@ impl RocksStoreDetails for RocksMetaStoreDetails { opts.set_block_based_table_factory(&block_opts); - DB::open(&opts, path) + let default_cf = { + let mut opts = Options::default(); + opts.set_prefix_extractor(rocksdb::SliceTransform::create_fixed_prefix(13)); + + ColumnFamilyDescriptor::new(rocksdb::DEFAULT_COLUMN_FAMILY_NAME, opts) + }; + + DB::open_cf_descriptors(&opts, path, vec![default_cf]) .map_err(|err| CubeError::internal(format!("DB::open error for metastore: {}", err))) } From 84bca617ebda7d495a8d587590b3146037804a77 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Sat, 11 Mar 2023 01:12:21 +0300 Subject: [PATCH 8/9] chore: fix compaction --- .../cubestore/src/cachestore/cache_rocksstore.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs index 9f4bfc9861946..ec75e0baf9f5b 100644 --- a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs +++ b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs @@ -78,9 +78,6 @@ impl RocksStoreDetails for RocksCacheStoreDetails { opts.create_if_missing(true); opts.create_missing_column_families(true); opts.set_prefix_extractor(rocksdb::SliceTransform::create_fixed_prefix(13)); - opts.set_compaction_filter_factory(compaction::MetaStoreCacheCompactionFactory::new( - compaction_state, - )); // Disable automatic compaction before migration, will be enabled later in after_migration opts.set_disable_auto_compactions(true); @@ -96,6 +93,9 @@ impl RocksStoreDetails for RocksCacheStoreDetails { let default_cf = { let mut opts = Options::default(); opts.set_prefix_extractor(rocksdb::SliceTransform::create_fixed_prefix(13)); + opts.set_compaction_filter_factory(compaction::MetaStoreCacheCompactionFactory::new( + compaction_state.clone(), + )); ColumnFamilyDescriptor::new(rocksdb::DEFAULT_COLUMN_FAMILY_NAME, opts) }; @@ -103,6 +103,9 @@ impl RocksStoreDetails for RocksCacheStoreDetails { let queue_cf = { let mut opts = Options::default(); opts.set_prefix_extractor(rocksdb::SliceTransform::create_fixed_prefix(13)); + opts.set_compaction_filter_factory(compaction::MetaStoreCacheCompactionFactory::new( + compaction_state, + )); ColumnFamilyDescriptor::new(CACHESTORE_QUEUE_COLUMN_FAMILY_NAME, opts) }; From a27d68ff2a60158a6e825ffa485743e8878754d9 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Sat, 11 Mar 2023 01:14:31 +0300 Subject: [PATCH 9/9] chore: use cf api --- .../cubestore/src/metastore/rocks_table.rs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/rust/cubestore/cubestore/src/metastore/rocks_table.rs b/rust/cubestore/cubestore/src/metastore/rocks_table.rs index 3e01028bededc..094f55f852c0b 100644 --- a/rust/cubestore/cubestore/src/metastore/rocks_table.rs +++ b/rust/cubestore/cubestore/src/metastore/rocks_table.rs @@ -369,6 +369,11 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { where D: Deserializer<'de>; fn indexes() -> Vec>>; + fn cf_default(&self) -> Result<&ColumnFamily, CubeError> { + self.db() + .cf_handle(&rocksdb::DEFAULT_COLUMN_FAMILY_NAME) + .ok_or_else(|| CubeError::internal(format!("cf {:?} not found", self.cf_name()))) + } fn cf(&self) -> Result<&ColumnFamily, CubeError> { self.db() .cf_handle(&self.cf_name().to_string()) @@ -453,7 +458,8 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { fn migration_check_table(&self) -> Result<(), CubeError> { let snapshot = self.snapshot(); - let table_info = snapshot.get( + let table_info = snapshot.get_cf( + self.cf_default()?, &RowKey::TableInfo { table_id: Self::table_id(), } @@ -471,7 +477,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { self.migrate_table(&mut batch, table_info)?; batch.put_cf( - self.cf()?, + self.cf_default()?, &RowKey::TableInfo { table_id: Self::table_id(), } @@ -487,7 +493,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { } } else { self.db().put_cf( - self.cf()?, + self.cf_default()?, &RowKey::TableInfo { table_id: Self::table_id(), } @@ -506,7 +512,8 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { fn migration_check_indexes(&self) -> Result<(), CubeError> { let snapshot = self.snapshot(); for index in Self::indexes().into_iter() { - let index_info = snapshot.get( + let index_info = snapshot.get_cf( + self.cf_default()?, &RowKey::SecondaryIndexInfo { index_id: Self::index_id(index.get_id()), } @@ -587,7 +594,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { batch.put_cf(self.cf()?, index_row.key, index_row.val); } batch.put_cf( - self.cf()?, + self.cf_default()?, &RowKey::SecondaryIndexInfo { index_id: Self::index_id(index.get_id()), } @@ -812,7 +819,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { let seq_key = RowKey::Sequence(Self::table_id()); let before_merge = self .snapshot() - .get(seq_key.to_bytes())? + .get_cf(self.cf()?, seq_key.to_bytes())? .map(|v| Cursor::new(v).read_u64::().unwrap()); // TODO revert back merge operator if locking works