Skip to content

Commit

Permalink
feat(cubestore): Cache/Queue - implement migrations via truncating
Browse files Browse the repository at this point in the history
  • Loading branch information
ovr committed Jan 27, 2023
1 parent 28ea47c commit 85dc04b
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 33 deletions.
25 changes: 22 additions & 3 deletions rust/cubestore/cubestore/src/cachestore/cache_item.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::metastore::{IndexId, RocksSecondaryIndex, TableId};
use crate::{base_rocks_secondary_index, rocks_table_impl};
use crate::metastore::{
BaseRocksTable, IndexId, RocksEntity, RocksSecondaryIndex, RocksTable, TableId, TableInfo,
};
use crate::{base_rocks_secondary_index, rocks_table_new, CubeError};
use chrono::serde::ts_seconds_option;
use chrono::{DateTime, Duration, Utc};
use serde::{Deserialize, Deserializer, Serialize};
Expand All @@ -13,6 +15,8 @@ pub struct CacheItem {
pub(crate) expire: Option<DateTime<Utc>>,
}

impl RocksEntity for CacheItem {}

impl CacheItem {
pub fn parse_path_to_prefix(mut path: String) -> String {
if path.ends_with(":*") {
Expand Down Expand Up @@ -77,8 +81,23 @@ pub(crate) enum CacheItemRocksIndex {
ByPath = 1,
ByPrefix = 2,
}
pub struct CacheItemRocksTable<'a> {
db: crate::metastore::DbTableRef<'a>,
}

impl<'a> CacheItemRocksTable<'a> {
pub fn new(db: crate::metastore::DbTableRef<'a>) -> Self {
Self { db }
}
}

impl<'a> BaseRocksTable for CacheItemRocksTable<'a> {
fn migrate_table(&self, _table_info: TableInfo) -> Result<(), CubeError> {
self.migrate_table_by_truncate()
}
}

rocks_table_impl!(CacheItem, CacheItemRocksTable, TableId::CacheItems, {
rocks_table_new!(CacheItem, CacheItemRocksTable, TableId::CacheItems, {
vec![
Box::new(CacheItemRocksIndex::ByPath),
Box::new(CacheItemRocksIndex::ByPrefix),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ impl RocksStoreDetails for RocksCacheStoreDetails {
fn migrate(&self, table_ref: DbTableRef) -> Result<(), CubeError> {
CacheItemRocksTable::new(table_ref.clone()).migrate()?;
QueueItemRocksTable::new(table_ref.clone()).migrate()?;
QueueResultRocksTable::new(table_ref.clone()).migrate()?;

table_ref
.db
Expand Down
33 changes: 29 additions & 4 deletions rust/cubestore/cubestore/src/cachestore/queue_item.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use crate::metastore::{IndexId, RocksSecondaryIndex, TableId};
use crate::metastore::{
BaseRocksTable, IndexId, RocksEntity, RocksSecondaryIndex, RocksTable, TableId, TableInfo,
};
use crate::table::{Row, TableValue};
use crate::{base_rocks_secondary_index, rocks_table_impl, CubeError};
use chrono::serde::ts_seconds;
use crate::{base_rocks_secondary_index, rocks_table_new, CubeError};
use chrono::serde::{ts_seconds};
use chrono::{DateTime, Duration, Utc};

use serde::{Deserialize, Deserializer, Serialize};

fn merge(a: serde_json::Value, b: serde_json::Value) -> Option<serde_json::Value> {
Expand Down Expand Up @@ -72,6 +75,12 @@ pub struct QueueItem {
pub(crate) expire: DateTime<Utc>,
}

impl RocksEntity for QueueItem {
fn version() -> u32 {
1
}
}

impl QueueItem {
pub fn new(path: String, value: String, status: QueueItemStatus, priority: i64) -> Self {
let parts: Vec<&str> = path.rsplitn(2, ":").collect();
Expand Down Expand Up @@ -224,7 +233,23 @@ pub(crate) enum QueueItemRocksIndex {
ByPrefix = 3,
}

rocks_table_impl!(QueueItem, QueueItemRocksTable, TableId::QueueItems, {
pub struct QueueItemRocksTable<'a> {
db: crate::metastore::DbTableRef<'a>,
}

impl<'a> QueueItemRocksTable<'a> {
pub fn new(db: crate::metastore::DbTableRef<'a>) -> Self {
Self { db }
}
}

impl<'a> BaseRocksTable for QueueItemRocksTable<'a> {
fn migrate_table(&self, _table_info: TableInfo) -> Result<(), CubeError> {
self.migrate_table_by_truncate()
}
}

rocks_table_new!(QueueItem, QueueItemRocksTable, TableId::QueueItems, {
vec![
Box::new(QueueItemRocksIndex::ByPath),
Box::new(QueueItemRocksIndex::ByPrefixAndStatus),
Expand Down
25 changes: 22 additions & 3 deletions rust/cubestore/cubestore/src/cachestore/queue_result.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::metastore::{IndexId, RocksSecondaryIndex, TableId};
use crate::{base_rocks_secondary_index, rocks_table_impl};
use crate::metastore::{
BaseRocksTable, IndexId, RocksEntity, RocksSecondaryIndex, RocksTable, TableId, TableInfo,
};
use crate::{base_rocks_secondary_index, rocks_table_new, CubeError};
use chrono::serde::ts_seconds;
use chrono::{DateTime, Duration, Utc};
use serde::{Deserialize, Deserializer, Serialize};
Expand All @@ -12,6 +14,8 @@ pub struct QueueResult {
pub(crate) expire: DateTime<Utc>,
}

impl RocksEntity for QueueResult {}

impl QueueResult {
pub fn new(path: String, value: String) -> Self {
QueueResult {
Expand All @@ -34,8 +38,23 @@ impl QueueResult {
pub(crate) enum QueueResultRocksIndex {
ByPath = 1,
}
pub struct QueueResultRocksTable<'a> {
db: crate::metastore::DbTableRef<'a>,
}

impl<'a> QueueResultRocksTable<'a> {
pub fn new(db: crate::metastore::DbTableRef<'a>) -> Self {
Self { db }
}
}

impl<'a> BaseRocksTable for QueueResultRocksTable<'a> {
fn migrate_table(&self, _table_info: TableInfo) -> Result<(), CubeError> {
self.migrate_table_by_truncate()
}
}

rocks_table_impl!(QueueResult, QueueResultRocksTable, TableId::QueueResults, {
rocks_table_new!(QueueResult, QueueResultRocksTable, TableId::QueueResults, {
vec![Box::new(QueueResultRocksIndex::ByPath)]
});

Expand Down
4 changes: 3 additions & 1 deletion rust/cubestore/cubestore/src/metastore/job.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{IndexId, RocksSecondaryIndex, TableId};
use crate::base_rocks_secondary_index;
use crate::metastore::table::Table;
use crate::metastore::RowKey;
use crate::metastore::{RocksEntity, RowKey};
use crate::rocks_table_impl;
use byteorder::{BigEndian, WriteBytesExt};
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -53,6 +53,8 @@ pub struct Job {
status: JobStatus,
}

impl RocksEntity for Job {}

impl Job {
pub fn new(row_reference: RowKey, job_type: JobType, shard: String) -> Job {
Job {
Expand Down
10 changes: 10 additions & 0 deletions rust/cubestore/cubestore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,8 @@ pub struct Schema {
}
}

impl RocksEntity for Schema {}

#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
pub enum IndexType {
Regular = 1,
Expand All @@ -568,6 +570,8 @@ pub struct Index {
}
}

impl RocksEntity for Index {}

#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
pub enum AggregateFunction {
SUM = 1,
Expand Down Expand Up @@ -657,6 +661,8 @@ pub struct Partition {
}
}

impl RocksEntity for Partition {}

data_frame_from! {
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
pub struct Chunk {
Expand Down Expand Up @@ -684,13 +690,17 @@ pub struct Chunk {
}
}

impl RocksEntity for Chunk {}

#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
pub struct WAL {
table_id: u64,
row_count: u64,
uploaded: bool,
}

impl RocksEntity for WAL {}

meta_store_table_impl!(SchemaMetaStoreTable, Schema, SchemaRocksTable);
meta_store_table_impl!(ChunkMetaStoreTable, Chunk, ChunkRocksTable);
meta_store_table_impl!(IndexMetaStoreTable, Index, IndexRocksTable);
Expand Down
6 changes: 5 additions & 1 deletion rust/cubestore/cubestore/src/metastore/multi_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//! Multi-partitioned are compacted and repartitioned by applying the same operation to ordinary
//! partitions they own.
use crate::data_frame_from;
use crate::metastore::{Column, IdRow, IndexId, RocksSecondaryIndex, TableId};
use crate::metastore::{Column, IdRow, IndexId, RocksEntity, RocksSecondaryIndex, TableId};
use crate::rocks_table_impl;
use crate::table::Row;
use byteorder::{BigEndian, WriteBytesExt};
Expand All @@ -24,6 +24,8 @@ pub struct MultiIndex {
}
}

impl RocksEntity for MultiIndex {}

impl MultiIndex {
pub fn new(schema_id: u64, name: String, key_columns: Vec<Column>) -> MultiIndex {
MultiIndex {
Expand Down Expand Up @@ -114,6 +116,8 @@ pub struct MultiPartition {
}
}

impl RocksEntity for MultiPartition {}

impl MultiPartition {
// Note that roots are active by default.
pub fn new_root(multi_index_id: u64) -> MultiPartition {
Expand Down
4 changes: 3 additions & 1 deletion rust/cubestore/cubestore/src/metastore/replay_handle.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{IndexId, RocksSecondaryIndex, TableId};
use crate::metastore::table::Table;
use crate::metastore::IdRow;
use crate::metastore::{IdRow, RocksEntity};
use crate::rocks_table_impl;
use crate::{base_rocks_secondary_index, CubeError};
use byteorder::{BigEndian, WriteBytesExt};
Expand All @@ -20,6 +20,8 @@ pub struct ReplayHandle {
}
}

impl RocksEntity for ReplayHandle {}

#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
pub struct SeqPointer {
start_seq: Option<i64>,
Expand Down
Loading

0 comments on commit 85dc04b

Please sign in to comment.