From 90e174ffeefe39899b4b55ca4e09e9b7aa382790 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Mon, 31 Oct 2022 17:21:06 +0300 Subject: [PATCH] feat(cubestore): Initial queue support --- .../cubestore-sql-tests/src/tests.rs | 185 ++++++ .../cubestore/src/cachestore/cache_item.rs | 4 +- .../src/cachestore/cache_rocksstore.rs | 609 ++++++++++++++++-- .../cubestore/src/cachestore/lazy.rs | 98 ++- .../cubestore/src/cachestore/listener.rs | 28 + .../cubestore/cubestore/src/cachestore/mod.rs | 7 +- .../cubestore/src/cachestore/queue_item.rs | 301 +++++++++ .../cubestore/src/cachestore/queue_result.rs | 85 +++ rust/cubestore/cubestore/src/config/mod.rs | 27 +- rust/cubestore/cubestore/src/metastore/mod.rs | 23 +- .../cubestore/src/metastore/rocks_store.rs | 8 +- .../cubestore/src/metastore/rocks_table.rs | 38 +- .../src/queryplanner/info_schema/mod.rs | 2 + .../queryplanner/info_schema/system_queue.rs | 115 ++++ .../cubestore/src/queryplanner/mod.rs | 9 +- rust/cubestore/cubestore/src/scheduler/mod.rs | 49 +- rust/cubestore/cubestore/src/sql/mod.rs | 212 +++++- rust/cubestore/cubestore/src/sql/parser.rs | 249 +++++++ 18 files changed, 1940 insertions(+), 109 deletions(-) create mode 100644 rust/cubestore/cubestore/src/cachestore/listener.rs create mode 100644 rust/cubestore/cubestore/src/cachestore/queue_item.rs create mode 100644 rust/cubestore/cubestore/src/cachestore/queue_result.rs create mode 100644 rust/cubestore/cubestore/src/queryplanner/info_schema/system_queue.rs diff --git a/rust/cubestore/cubestore-sql-tests/src/tests.rs b/rust/cubestore/cubestore-sql-tests/src/tests.rs index 9d75db1712a09..307a5e9ea15ec 100644 --- a/rust/cubestore/cubestore-sql-tests/src/tests.rs +++ b/rust/cubestore/cubestore-sql-tests/src/tests.rs @@ -23,6 +23,7 @@ use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, SystemTime}; use tokio::io::{AsyncWriteExt, BufWriter}; +use tokio::join; pub type TestFn = Box< dyn Fn(Box) -> Pin + Send>> @@ -226,6 +227,7 @@ pub fn sql_tests() -> Vec<(&'static str, TestFn)> { t("cache_compaction", cache_compaction), t("cache_set_nx", cache_set_nx), t("cache_prefix_keys", cache_prefix_keys), + t("queue_full_workflow", queue_full_workflow), ]; fn t(name: &'static str, f: fn(Box) -> F) -> (&'static str, TestFn) @@ -6411,6 +6413,189 @@ async fn cache_prefix_keys(service: Box) { ); } +async fn queue_full_workflow(service: Box) { + service + .exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:1" "payload1";"#) + .await + .unwrap(); + + service + .exec_query(r#"QUEUE ADD PRIORITY 10 "STANDALONE#queue:2" "payload2";"#) + .await + .unwrap(); + + service + .exec_query(r#"QUEUE ADD PRIORITY 100 "STANDALONE#queue:3" "payload3";"#) + .await + .unwrap(); + + service + .exec_query(r#"QUEUE ADD PRIORITY 50 "STANDALONE#queue:4" "payload3";"#) + .await + .unwrap(); + + { + let pending_response = service + .exec_query(r#"QUEUE PENDING "STANDALONE#queue""#) + .await + .unwrap(); + assert_eq!( + pending_response.get_columns(), + &vec![ + Column::new("id".to_string(), ColumnType::String, 0), + Column::new("status".to_string(), ColumnType::String, 1), + Column::new("extra".to_string(), ColumnType::String, 2), + ] + ); + assert_eq!( + pending_response.get_rows(), + &vec![ + Row::new(vec![ + TableValue::String("3".to_string()), + TableValue::String("pending".to_string()), + TableValue::Null + ]), + Row::new(vec![ + TableValue::String("4".to_string()), + TableValue::String("pending".to_string()), + TableValue::Null + ]), + Row::new(vec![ + TableValue::String("2".to_string()), + TableValue::String("pending".to_string()), + TableValue::Null + ]), + Row::new(vec![ + TableValue::String("1".to_string()), + TableValue::String("pending".to_string()), + TableValue::Null + ]), + ] + ); + } + + { + let active_response = service + .exec_query(r#"QUEUE ACTIVE "STANDALONE#queue""#) + .await + .unwrap(); + assert_eq!(active_response.get_rows().len(), 0); + } + + { + let retrieve_response = service + .exec_query(r#"QUEUE RETRIEVE CONCURRENCY 2 "STANDALONE#queue:3""#) + .await + .unwrap(); + assert_eq!( + retrieve_response.get_columns(), + &vec![ + Column::new("payload".to_string(), ColumnType::String, 0), + Column::new("extra".to_string(), ColumnType::String, 1), + ] + ); + assert_eq!( + retrieve_response.get_rows(), + &vec![Row::new(vec![ + TableValue::String("payload3".to_string()), + TableValue::Null, + ]),] + ); + } + + { + let active_response = service + .exec_query(r#"QUEUE ACTIVE "STANDALONE#queue""#) + .await + .unwrap(); + assert_eq!( + active_response.get_rows(), + &vec![Row::new(vec![ + TableValue::String("3".to_string()), + TableValue::String("active".to_string()), + TableValue::Null + ]),] + ); + } + + let service = Arc::new(service); + + { + let service_to_move = service.clone(); + let blocking = async move { + service_to_move + .exec_query(r#"QUEUE RESULT_BLOCKING 5000 "STANDALONE#queue:3""#) + .await + .unwrap() + }; + + let service_to_move = service.clone(); + let ack = async move { + tokio::time::sleep(Duration::from_millis(1000)).await; + + service_to_move + .exec_query(r#"QUEUE ACK "STANDALONE#queue:3" "result:3""#) + .await + .unwrap() + }; + + let (blocking_res, _ack_res) = join!(blocking, ack); + assert_eq!( + blocking_res.get_rows(), + &vec![Row::new(vec![ + TableValue::String("result:3".to_string()), + TableValue::String("success".to_string()) + ]),] + ); + } + + // previous job was finished + { + let active_response = service + .exec_query(r#"QUEUE ACTIVE "STANDALONE#queue""#) + .await + .unwrap(); + assert_eq!(active_response.get_rows().len(), 0); + } + + // get + { + let get_response = service + .exec_query(r#"QUEUE GET "STANDALONE#queue:2""#) + .await + .unwrap(); + assert_eq!( + get_response.get_rows(), + &vec![Row::new(vec![ + TableValue::String("payload2".to_string()), + TableValue::Null + ]),] + ); + } + + // cancel job + { + let cancel_response = service + .exec_query(r#"QUEUE CANCEL "STANDALONE#queue:2""#) + .await + .unwrap(); + assert_eq!( + cancel_response.get_rows(), + &vec![Row::new(vec![ + TableValue::String("payload2".to_string()), + TableValue::Null + ]),] + ); + + // assertion that job was removed + let get_response = service + .exec_query(r#"QUEUE GET "STANDALONE#queue:2""#) + .await + .unwrap(); + assert_eq!(get_response.get_rows().len(), 0); + } +} + pub fn to_rows(d: &DataFrame) -> Vec> { return d .get_rows() diff --git a/rust/cubestore/cubestore/src/cachestore/cache_item.rs b/rust/cubestore/cubestore/src/cachestore/cache_item.rs index 939481603b7a1..92000e3a41012 100644 --- a/rust/cubestore/cubestore/src/cachestore/cache_item.rs +++ b/rust/cubestore/cubestore/src/cachestore/cache_item.rs @@ -125,8 +125,8 @@ impl RocksSecondaryIndex for CacheItemRocksIndex { true } - fn get_expire<'a>(&self, row: &'a CacheItem) -> &'a Option> { - row.get_expire() + fn get_expire(&self, row: &CacheItem) -> Option> { + row.get_expire().clone() } fn version(&self) -> u32 { diff --git a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs index 499b0faf8e9c1..ad030ed919a88 100644 --- a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs +++ b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs @@ -1,6 +1,14 @@ -use crate::cachestore::cache_item::{CacheItemIndexKey, CacheItemRocksIndex, CacheItemRocksTable}; -use crate::cachestore::compaction; -use crate::cachestore::CacheItem; +use crate::cachestore::cache_item::{ + CacheItem, CacheItemIndexKey, CacheItemRocksIndex, CacheItemRocksTable, +}; +use crate::cachestore::queue_item::{ + QueueItem, QueueItemIndexKey, QueueItemRocksIndex, QueueItemRocksTable, QueueItemStatus, + QueueResultAckEvent, +}; +use crate::cachestore::queue_result::{ + QueueResultIndexKey, QueueResultRocksIndex, QueueResultRocksTable, +}; +use crate::cachestore::{compaction, QueueResult}; use crate::config::injection::DIService; use crate::config::{Config, ConfigObj}; use std::collections::HashMap; @@ -19,6 +27,10 @@ use futures_timer::Delay; use rocksdb::{Options, DB}; use crate::cachestore::compaction::CompactionPreloadedState; +use crate::cachestore::listener::RocksCacheStoreListener; +use chrono::Utc; +use itertools::Itertools; +use serde_derive::{Deserialize, Serialize}; use std::path::Path; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -45,6 +57,7 @@ impl RocksCacheStoreDetails { } populate_indexes!(CacheItemRocksTable); + populate_indexes!(QueueItemRocksTable); CompactionPreloadedState::new(indexes) } @@ -71,6 +84,7 @@ impl RocksStoreDetails for RocksCacheStoreDetails { fn migrate(&self, table_ref: DbTableRef) -> Result<(), CubeError> { CacheItemRocksTable::new(table_ref.clone()).migrate()?; + QueueItemRocksTable::new(table_ref.clone()).migrate()?; table_ref .db @@ -90,6 +104,18 @@ pub struct RocksCacheStore { } impl RocksCacheStore { + pub async fn get_listener(&self) -> RocksCacheStoreListener { + let listeners = self.store.listeners.read().await; + + let sender = if listeners.len() > 0 { + listeners.first().unwrap() + } else { + panic!("Unable to get listener for CacheStore"); + }; + + RocksCacheStoreListener::new(sender.subscribe()) + } + pub fn new( path: &Path, metastore_fs: Arc, @@ -211,8 +237,40 @@ impl RocksCacheStore { } } +impl RocksCacheStore { + async fn lookup_queue_result( + &self, + key: String, + ) -> Result, CubeError> { + self.store + .write_operation(move |db_ref, batch_pipe| { + let result_schema = QueueResultRocksTable::new(db_ref.clone()); + let index_key = QueueResultIndexKey::ByPath(key); + let queue_result = result_schema + .get_single_opt_row_by_index(&index_key, &QueueResultRocksIndex::ByPath)?; + + if let Some(queue_result) = queue_result { + result_schema.try_delete(queue_result.get_id(), batch_pipe)?; + + Ok(Some(QueueResultResponse::Success { + value: queue_result.row.value, + })) + } else { + Ok(None) + } + }) + .await + } +} + +#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)] +pub enum QueueResultResponse { + Success { value: String }, +} + #[cuberpc::service] pub trait CacheStore: DIService + Send + Sync { + // cache async fn cache_all(&self) -> Result>, CubeError>; async fn cache_set( &self, @@ -224,53 +282,129 @@ pub trait CacheStore: DIService + Send + Sync { async fn cache_get(&self, key: String) -> Result>, CubeError>; async fn cache_keys(&self, prefix: String) -> Result>, CubeError>; async fn cache_incr(&self, key: String) -> Result, CubeError>; + + // queue + async fn queue_all(&self) -> Result>, CubeError>; + async fn queue_add(&self, item: QueueItem) -> Result; + async fn queue_truncate(&self) -> Result<(), CubeError>; + async fn queue_get(&self, key: String) -> Result>, CubeError>; + async fn queue_to_cancel( + &self, + prefix: String, + orphaned_timeout: Option, + stalled_timeout: Option, + ) -> Result>, CubeError>; + async fn queue_list( + &self, + prefix: String, + status_filter: Option, + priority_sort: bool, + ) -> Result>, CubeError>; + async fn queue_cancel(&self, key: String) -> Result>, CubeError>; + async fn queue_heartbeat(&self, key: String) -> Result<(), CubeError>; + async fn queue_retrieve( + &self, + key: String, + allow_concurrency: u32, + ) -> Result>, CubeError>; + async fn queue_ack(&self, key: String, result: String) -> Result<(), CubeError>; + async fn queue_result(&self, key: String) -> Result, CubeError>; + async fn queue_result_blocking( + &self, + key: String, + timeout: u64, + ) -> Result, CubeError>; + async fn queue_merge_extra(&self, key: String, payload: String) -> Result<(), CubeError>; + // Force compaction for the whole RocksDB async fn compaction(&self) -> Result<(), CubeError>; } #[async_trait] impl CacheStore for RocksCacheStore { - async fn cache_get(&self, key: String) -> Result>, CubeError> { + async fn cache_all(&self) -> Result>, CubeError> { self.store - .read_operation(move |db_ref| { - let cache_schema = CacheItemRocksTable::new(db_ref.clone()); - let index_key = CacheItemIndexKey::ByPath(key); - let id_row_opt = cache_schema - .get_single_opt_row_by_index(&index_key, &CacheItemRocksIndex::ByPath)?; - - Ok(id_row_opt) + .read_operation_out_of_queue(move |db_ref| { + Ok(CacheItemRocksTable::new(db_ref).all_rows()?) }) .await } - async fn cache_incr(&self, path: String) -> Result, CubeError> { + async fn cache_set( + &self, + item: CacheItem, + update_if_not_exists: bool, + ) -> Result { self.store .write_operation(move |db_ref, batch_pipe| { let cache_schema = CacheItemRocksTable::new(db_ref.clone()); - let index_key = CacheItemIndexKey::ByPath(path.clone()); + let index_key = CacheItemIndexKey::ByPath(item.get_path()); let id_row_opt = cache_schema .get_single_opt_row_by_index(&index_key, &CacheItemRocksIndex::ByPath)?; - // TODO: Merge operator? if let Some(id_row) = id_row_opt { - let mut new = id_row.row.clone(); + if update_if_not_exists { + return Ok(false); + }; - let last_val = id_row.row.value.parse::()?; - new.value = (last_val + 1).to_string(); + let mut new = id_row.row.clone(); + new.value = item.value; + new.expire = item.expire; - cache_schema.update(id_row.id, new, &id_row.row, batch_pipe) + cache_schema.update(id_row.id, new, &id_row.row, batch_pipe)?; } else { - let item = CacheItem::new(path, None, "1".to_string()); - cache_schema.insert(item, batch_pipe) + cache_schema.insert(item, batch_pipe)?; } + + Ok(true) }) .await } - async fn cache_all(&self) -> Result>, CubeError> { + async fn cache_truncate(&self) -> Result<(), CubeError> { self.store - .read_operation_out_of_queue(move |db_ref| { - Ok(CacheItemRocksTable::new(db_ref).all_rows()?) + .write_operation(move |db_ref, batch_pipe| { + let cache_schema = CacheItemRocksTable::new(db_ref); + let rows = cache_schema.all_rows()?; + for row in rows.iter() { + cache_schema.delete(row.get_id(), batch_pipe)?; + } + + Ok(()) + }) + .await?; + + Ok(()) + } + + async fn cache_delete(&self, key: String) -> Result<(), CubeError> { + self.store + .write_operation(move |db_ref, batch_pipe| { + let cache_schema = CacheItemRocksTable::new(db_ref.clone()); + let index_key = CacheItemIndexKey::ByPath(key); + let row_opt = cache_schema + .get_single_opt_row_by_index(&index_key, &CacheItemRocksIndex::ByPath)?; + + if let Some(row) = row_opt { + cache_schema.delete(row.id, batch_pipe)?; + } + + Ok(()) + }) + .await?; + + Ok(()) + } + + async fn cache_get(&self, key: String) -> Result>, CubeError> { + self.store + .read_operation(move |db_ref| { + let cache_schema = CacheItemRocksTable::new(db_ref.clone()); + let index_key = CacheItemIndexKey::ByPath(key); + let id_row_opt = cache_schema + .get_single_opt_row_by_index(&index_key, &CacheItemRocksIndex::ByPath)?; + + Ok(id_row_opt) }) .await } @@ -289,30 +423,46 @@ impl CacheStore for RocksCacheStore { .await } - async fn cache_set( - &self, - item: CacheItem, - update_if_not_exists: bool, - ) -> Result { + async fn cache_incr(&self, path: String) -> Result, CubeError> { self.store .write_operation(move |db_ref, batch_pipe| { let cache_schema = CacheItemRocksTable::new(db_ref.clone()); - let index_key = CacheItemIndexKey::ByPath(item.get_path()); + let index_key = CacheItemIndexKey::ByPath(path.clone()); let id_row_opt = cache_schema .get_single_opt_row_by_index(&index_key, &CacheItemRocksIndex::ByPath)?; + // TODO: Merge operator? if let Some(id_row) = id_row_opt { - if update_if_not_exists { - return Ok(false); - }; - let mut new = id_row.row.clone(); - new.value = item.value; - new.expire = item.expire; - cache_schema.update(id_row.id, new, &id_row.row, batch_pipe)?; + let last_val = id_row.row.value.parse::()?; + new.value = (last_val + 1).to_string(); + + cache_schema.update(id_row.id, new, &id_row.row, batch_pipe) } else { - cache_schema.insert(item, batch_pipe)?; + let item = CacheItem::new(path, None, "1".to_string()); + cache_schema.insert(item, batch_pipe) + } + }) + .await + } + + async fn queue_all(&self) -> Result>, CubeError> { + self.store + .read_operation(move |db_ref| Ok(QueueItemRocksTable::new(db_ref).all_rows()?)) + .await + } + + async fn queue_add(&self, item: QueueItem) -> Result { + self.store + .write_operation(move |db_ref, batch_pipe| { + let queue_schema = QueueItemRocksTable::new(db_ref.clone()); + let index_key = QueueItemIndexKey::ByPath(item.get_path()); + let id_row_opt = queue_schema + .get_single_opt_row_by_index(&index_key, &QueueItemRocksIndex::ByPath)?; + + if id_row_opt.is_none() { + queue_schema.insert(item, batch_pipe)?; } Ok(true) @@ -320,13 +470,13 @@ impl CacheStore for RocksCacheStore { .await } - async fn cache_truncate(&self) -> Result<(), CubeError> { + async fn queue_truncate(&self) -> Result<(), CubeError> { self.store .write_operation(move |db_ref, batch_pipe| { - let cache_schema = CacheItemRocksTable::new(db_ref); - let rows = cache_schema.all_rows()?; + let queue_schema = QueueItemRocksTable::new(db_ref); + let rows = queue_schema.all_rows()?; for row in rows.iter() { - cache_schema.delete(row.get_id(), batch_pipe)?; + queue_schema.delete(row.get_id(), batch_pipe)?; } Ok(()) @@ -336,23 +486,294 @@ impl CacheStore for RocksCacheStore { Ok(()) } - async fn cache_delete(&self, key: String) -> Result<(), CubeError> { + async fn queue_get(&self, key: String) -> Result>, CubeError> { + self.store + .read_operation(move |db_ref| { + let queue_schema = QueueItemRocksTable::new(db_ref.clone()); + let index_key = QueueItemIndexKey::ByPath(key); + queue_schema.get_single_opt_row_by_index(&index_key, &QueueItemRocksIndex::ByPath) + }) + .await + } + + async fn queue_to_cancel( + &self, + prefix: String, + orphaned_timeout: Option, + stalled_timeout: Option, + ) -> Result>, CubeError> { + self.store + .read_operation(move |db_ref| { + let queue_schema = QueueItemRocksTable::new(db_ref.clone()); + let index_key = QueueItemIndexKey::ByPrefix(prefix); + let items = + queue_schema.get_rows_by_index(&index_key, &QueueItemRocksIndex::ByPrefix)?; + + let now = Utc::now(); + + let res = items.into_iter().filter(|item| { + if item.get_row().get_status() == &QueueItemStatus::Pending { + if let Some(stalled_timeout) = stalled_timeout { + let elapsed = now - item.get_row().get_created().clone(); + if elapsed.num_milliseconds() > stalled_timeout as i64 { + return true; + } + } + } + + if item.get_row().get_status() == &QueueItemStatus::Active { + if let Some(orphaned_timeout) = orphaned_timeout { + if let Some(heartbeat) = item.get_row().get_heartbeat() { + let elapsed = now - heartbeat.clone(); + if elapsed.num_milliseconds() > orphaned_timeout as i64 { + return true; + } + } + } + } + + false + }); + + Ok(res.collect()) + }) + .await + } + + async fn queue_list( + &self, + prefix: String, + status_filter: Option, + priority_sort: bool, + ) -> Result>, CubeError> { + self.store + .read_operation(move |db_ref| { + let queue_schema = QueueItemRocksTable::new(db_ref.clone()); + let index_key = QueueItemIndexKey::ByPrefix(prefix); + let items = + queue_schema.get_rows_by_index(&index_key, &QueueItemRocksIndex::ByPrefix)?; + + let items = if let Some(status_filter) = status_filter { + items + .into_iter() + .filter(|item| item.get_row().status == status_filter) + .collect() + } else { + items + }; + + if priority_sort { + Ok(items + .into_iter() + .sorted_by(|a, b| { + b.get_row().get_priority().cmp(a.get_row().get_priority()) + }) + .collect()) + } else { + Ok(items) + } + }) + .await + } + + async fn queue_cancel(&self, key: String) -> Result>, CubeError> { self.store .write_operation(move |db_ref, batch_pipe| { - let cache_schema = CacheItemRocksTable::new(db_ref.clone()); - let index_key = CacheItemIndexKey::ByPath(key); - let row_opt = cache_schema - .get_single_opt_row_by_index(&index_key, &CacheItemRocksIndex::ByPath)?; + let queue_schema = QueueItemRocksTable::new(db_ref.clone()); + let index_key = QueueItemIndexKey::ByPath(key); + let id_row_opt = queue_schema + .get_single_opt_row_by_index(&index_key, &QueueItemRocksIndex::ByPath)?; - if let Some(row) = row_opt { - cache_schema.delete(row.id, batch_pipe)?; + if let Some(id_row) = id_row_opt { + Ok(Some(queue_schema.delete(id_row.get_id(), batch_pipe)?)) + } else { + Ok(None) } + }) + .await + } - Ok(()) + async fn queue_heartbeat(&self, key: String) -> Result<(), CubeError> { + self.store + .write_operation(move |db_ref, batch_pipe| { + let queue_schema = QueueItemRocksTable::new(db_ref.clone()); + let index_key = QueueItemIndexKey::ByPath(key.clone()); + let id_row_opt = queue_schema + .get_single_opt_row_by_index(&index_key, &QueueItemRocksIndex::ByPath)?; + + if let Some(id_row) = id_row_opt { + queue_schema.update_with_fn( + id_row.id, + |item| item.update_heartbeat(), + batch_pipe, + )?; + Ok(()) + } else { + Ok(()) + } }) - .await?; + .await + } - Ok(()) + async fn queue_retrieve( + &self, + key: String, + allow_concurrency: u32, + ) -> Result>, CubeError> { + self.store + .write_operation(move |db_ref, batch_pipe| { + let queue_schema = QueueItemRocksTable::new(db_ref.clone()); + let index_key = QueueItemIndexKey::ByPath(key.clone()); + let id_row_opt = queue_schema + .get_single_opt_row_by_index(&index_key, &QueueItemRocksIndex::ByPath)?; + + if let Some(id_row) = id_row_opt { + if id_row.get_row().get_status() == &QueueItemStatus::Pending { + // TODO: Introduce count + Active index? + let index_key = QueueItemIndexKey::ByPrefix( + if let Some(prefix) = id_row.get_row().get_prefix() { + prefix.clone() + } else { + "".to_string() + }, + ); + let in_queue = queue_schema + .get_rows_by_index(&index_key, &QueueItemRocksIndex::ByPrefix)?; + + let mut current_active = 0; + + for item in in_queue { + if item.get_row().get_status() == &QueueItemStatus::Active { + current_active += 1; + } + } + + if current_active >= allow_concurrency { + return Ok(None); + } + + let new = queue_schema.update_with_fn( + id_row.id, + |item| { + let mut new = item.clone(); + new.status = QueueItemStatus::Active; + + new + }, + batch_pipe, + )?; + + Ok(Some(new)) + } else { + Ok(None) + } + } else { + Ok(None) + } + }) + .await + } + + async fn queue_ack(&self, path: String, result: String) -> Result<(), CubeError> { + self.store + .write_operation(move |db_ref, batch_pipe| { + let queue_schema = QueueItemRocksTable::new(db_ref.clone()); + let result_schema = QueueResultRocksTable::new(db_ref.clone()); + let index_key = QueueItemIndexKey::ByPath(path.clone()); + let item_row = queue_schema + .get_single_opt_row_by_index(&index_key, &QueueItemRocksIndex::ByPath)?; + + if let Some(item_row) = item_row { + queue_schema.delete(item_row.get_id(), batch_pipe)?; + + let queue_result = QueueResult::new(path.clone(), result); + let result_row = result_schema.insert(queue_result, batch_pipe)?; + + batch_pipe.add_event(MetaStoreEvent::AckQueueItem(QueueResultAckEvent { + row_id: result_row.get_id(), + path, + })); + + Ok(()) + } else { + Err(CubeError::user(format!( + "Unable ack queue, unknown id: {}", + path + ))) + } + }) + .await + } + + async fn queue_result(&self, key: String) -> Result, CubeError> { + self.lookup_queue_result(key).await + } + + async fn queue_result_blocking( + &self, + key: String, + timeout: u64, + ) -> Result, CubeError> { + let store_in_result = self.lookup_queue_result(key.clone()).await?; + if store_in_result.is_some() { + return Ok(store_in_result); + } + + let listener = self.get_listener().await; + let fut = tokio::time::timeout( + Duration::from_millis(timeout), + listener.wait_for_queue_ack(key), + ); + + if let Ok(res) = fut.await { + match res { + Ok(Some(ack_event)) => { + self.store + .write_operation(move |db_ref, batch_pipe| { + let queue_schema = QueueResultRocksTable::new(db_ref.clone()); + let queue_result = + queue_schema.try_delete(ack_event.row_id, batch_pipe)?; + + if let Some(queue_result) = queue_result { + Ok(Some(QueueResultResponse::Success { + value: queue_result.row.value, + })) + } else { + Ok(None) + } + }) + .await + } + Ok(None) => Ok(None), + Err(e) => Err(e), + } + } else { + Ok(None) + } + } + + async fn queue_merge_extra(&self, key: String, payload: String) -> Result<(), CubeError> { + self.store + .write_operation(move |db_ref, batch_pipe| { + let queue_schema = QueueItemRocksTable::new(db_ref.clone()); + let index_key = QueueItemIndexKey::ByPath(key.clone()); + let id_row_opt = queue_schema + .get_single_opt_row_by_index(&index_key, &QueueItemRocksIndex::ByPath)?; + + if let Some(id_row) = id_row_opt { + let new = id_row.get_row().merge_extra(payload)?; + + queue_schema.update(id_row.id, new, id_row.get_row(), batch_pipe)?; + + Ok(()) + } else { + Err(CubeError::user(format!( + "Unable to find queue with id: {}", + key + ))) + } + }) + .await } async fn compaction(&self) -> Result<(), CubeError> { @@ -378,18 +799,10 @@ pub struct ClusterCacheStoreClient {} #[async_trait] impl CacheStore for ClusterCacheStoreClient { - async fn cache_incr(&self, _: String) -> Result, CubeError> { - panic!("CacheStore cannot be used on the worker node! cache_incr was used.") - } - async fn cache_all(&self) -> Result>, CubeError> { panic!("CacheStore cannot be used on the worker node! cache_all was used.") } - async fn compaction(&self) -> Result<(), CubeError> { - panic!("CacheStore cannot be used on the worker node! compaction was used.") - } - async fn cache_set( &self, _item: CacheItem, @@ -413,6 +826,84 @@ impl CacheStore for ClusterCacheStoreClient { async fn cache_keys(&self, _prefix: String) -> Result>, CubeError> { panic!("CacheStore cannot be used on the worker node! cache_keys was used.") } + + async fn cache_incr(&self, _: String) -> Result, CubeError> { + panic!("CacheStore cannot be used on the worker node! cache_incr was used.") + } + + async fn queue_all(&self) -> Result>, CubeError> { + panic!("CacheStore cannot be used on the worker node! queue_all was used.") + } + + async fn queue_add(&self, _item: QueueItem) -> Result { + panic!("CacheStore cannot be used on the worker node! queue_add was used.") + } + + async fn queue_truncate(&self) -> Result<(), CubeError> { + panic!("CacheStore cannot be used on the worker node! queue_truncate was used.") + } + + async fn queue_get(&self, _key: String) -> Result>, CubeError> { + panic!("CacheStore cannot be used on the worker node! queue_get was used.") + } + + async fn queue_to_cancel( + &self, + _prefix: String, + _orphaned_timeout: Option, + _stalled_timeout: Option, + ) -> Result>, CubeError> { + panic!("CacheStore cannot be used on the worker node! queue_to_cancel was used.") + } + + async fn queue_list( + &self, + _prefix: String, + _status_filter: Option, + _priority_sort: bool, + ) -> Result>, CubeError> { + panic!("CacheStore cannot be used on the worker node! queue_list was used.") + } + + async fn queue_cancel(&self, _key: String) -> Result>, CubeError> { + panic!("CacheStore cannot be used on the worker node! queue_cancel was used.") + } + + async fn queue_heartbeat(&self, _key: String) -> Result<(), CubeError> { + panic!("CacheStore cannot be used on the worker node! queue_heartbeat was used.") + } + + async fn queue_retrieve( + &self, + _key: String, + _allow_concurrency: u32, + ) -> Result>, CubeError> { + panic!("CacheStore cannot be used on the worker node! queue_retrieve was used.") + } + + async fn queue_ack(&self, _key: String, _result: String) -> Result<(), CubeError> { + panic!("CacheStore cannot be used on the worker node! queue_ack was used.") + } + + async fn queue_result(&self, _key: String) -> Result, CubeError> { + panic!("CacheStore cannot be used on the worker node! queue_result was used.") + } + + async fn queue_result_blocking( + &self, + _key: String, + _timeout: u64, + ) -> Result, CubeError> { + panic!("CacheStore cannot be used on the worker node! queue_result_blocking was used.") + } + + async fn queue_merge_extra(&self, _key: String, _payload: String) -> Result<(), CubeError> { + panic!("CacheStore cannot be used on the worker node! queue_merge_extra was used.") + } + + async fn compaction(&self) -> Result<(), CubeError> { + panic!("CacheStore cannot be used on the worker node! compaction was used.") + } } crate::di_service!(ClusterCacheStoreClient, [CacheStore]); diff --git a/rust/cubestore/cubestore/src/cachestore/lazy.rs b/rust/cubestore/cubestore/src/cachestore/lazy.rs index 53d1e9da5a30d..70907953cb859 100644 --- a/rust/cubestore/cubestore/src/cachestore/lazy.rs +++ b/rust/cubestore/cubestore/src/cachestore/lazy.rs @@ -1,6 +1,8 @@ -use crate::cachestore::{CacheItem, CacheStore, RocksCacheStore}; +use crate::cachestore::{ + CacheItem, CacheStore, QueueItem, QueueItemStatus, QueueResultResponse, RocksCacheStore, +}; use crate::config::ConfigObj; -use crate::metastore::{IdRow, MetaStoreFs}; +use crate::metastore::{IdRow, MetaStoreEvent, MetaStoreFs}; use crate::CubeError; use async_trait::async_trait; use log::trace; @@ -13,6 +15,7 @@ pub enum LazyRocksCacheStoreState { path: String, metastore_fs: Arc, config: Arc, + listeners: Vec>, init_flag: Sender, }, Closed {}, @@ -32,9 +35,14 @@ impl LazyRocksCacheStore { dump_path: &Path, metastore_fs: Arc, config: Arc, + listeners: Vec>, ) -> Result, CubeError> { let store = RocksCacheStore::load_from_dump(path, dump_path, metastore_fs, config).await?; + for listener in listeners { + store.add_listener(listener).await; + } + Ok(Arc::new(Self { init_signal: None, state: tokio::sync::RwLock::new(LazyRocksCacheStoreState::Initialized { store }), @@ -45,6 +53,7 @@ impl LazyRocksCacheStore { path: &str, metastore_fs: Arc, config: Arc, + listeners: Vec>, ) -> Result, CubeError> { let (init_flag, init_signal) = tokio::sync::watch::channel::(false); @@ -54,6 +63,7 @@ impl LazyRocksCacheStore { path: path.to_string(), metastore_fs, config, + listeners, init_flag, }), })) @@ -81,6 +91,7 @@ impl LazyRocksCacheStore { path, metastore_fs, config, + listeners, // receiver will be closed on drop init_flag: _, } => { @@ -88,6 +99,10 @@ impl LazyRocksCacheStore { RocksCacheStore::load_from_remote(&path, metastore_fs.clone(), config.clone()) .await?; + for listener in listeners { + store.add_listener(listener.clone()).await; + } + *guard = LazyRocksCacheStoreState::Initialized { store: store.clone(), }; @@ -184,6 +199,85 @@ impl CacheStore for LazyRocksCacheStore { self.init().await?.cache_incr(path).await } + async fn queue_all(&self) -> Result>, CubeError> { + self.init().await?.queue_all().await + } + + async fn queue_add(&self, item: QueueItem) -> Result { + self.init().await?.queue_add(item).await + } + + async fn queue_truncate(&self) -> Result<(), CubeError> { + self.init().await?.queue_truncate().await + } + + async fn queue_get(&self, key: String) -> Result>, CubeError> { + self.init().await?.queue_get(key).await + } + + async fn queue_to_cancel( + &self, + prefix: String, + orphaned_timeout: Option, + stalled_timeout: Option, + ) -> Result>, CubeError> { + self.init() + .await? + .queue_to_cancel(prefix, orphaned_timeout, stalled_timeout) + .await + } + + async fn queue_list( + &self, + prefix: String, + status_filter: Option, + priority_sort: bool, + ) -> Result>, CubeError> { + self.init() + .await? + .queue_list(prefix, status_filter, priority_sort) + .await + } + + async fn queue_cancel(&self, key: String) -> Result>, CubeError> { + self.init().await?.queue_cancel(key).await + } + + async fn queue_heartbeat(&self, key: String) -> Result<(), CubeError> { + self.init().await?.queue_heartbeat(key).await + } + + async fn queue_retrieve( + &self, + key: String, + allow_concurrency: u32, + ) -> Result>, CubeError> { + self.init() + .await? + .queue_retrieve(key, allow_concurrency) + .await + } + + async fn queue_ack(&self, key: String, result: String) -> Result<(), CubeError> { + self.init().await?.queue_ack(key, result).await + } + + async fn queue_result(&self, key: String) -> Result, CubeError> { + self.init().await?.queue_result(key).await + } + + async fn queue_result_blocking( + &self, + key: String, + timeout: u64, + ) -> Result, CubeError> { + self.init().await?.queue_result_blocking(key, timeout).await + } + + async fn queue_merge_extra(&self, key: String, payload: String) -> Result<(), CubeError> { + self.init().await?.queue_merge_extra(key, payload).await + } + async fn compaction(&self) -> Result<(), CubeError> { self.init().await?.compaction().await } diff --git a/rust/cubestore/cubestore/src/cachestore/listener.rs b/rust/cubestore/cubestore/src/cachestore/listener.rs new file mode 100644 index 0000000000000..63f93a13decf4 --- /dev/null +++ b/rust/cubestore/cubestore/src/cachestore/listener.rs @@ -0,0 +1,28 @@ +use crate::cachestore::QueueResultAckEvent; +use crate::metastore::MetaStoreEvent; +use crate::CubeError; +use tokio::sync::broadcast::Receiver; + +pub struct RocksCacheStoreListener { + receiver: Receiver, +} + +impl RocksCacheStoreListener { + pub fn new(receiver: Receiver) -> Self { + Self { receiver } + } + + pub async fn wait_for_queue_ack( + mut self, + path: String, + ) -> Result, CubeError> { + loop { + let event = self.receiver.recv().await?; + if let MetaStoreEvent::AckQueueItem(payload) = event { + if payload.path == path { + return Ok(Some(payload)); + } + } + } + } +} diff --git a/rust/cubestore/cubestore/src/cachestore/mod.rs b/rust/cubestore/cubestore/src/cachestore/mod.rs index 0ca050ca20b71..fad5f4aa1b5e1 100644 --- a/rust/cubestore/cubestore/src/cachestore/mod.rs +++ b/rust/cubestore/cubestore/src/cachestore/mod.rs @@ -2,9 +2,14 @@ mod cache_item; mod cache_rocksstore; mod compaction; mod lazy; +mod listener; +mod queue_item; +mod queue_result; pub use cache_item::CacheItem; pub use cache_rocksstore::{ - CacheStore, CacheStoreRpcClient, ClusterCacheStoreClient, RocksCacheStore, + CacheStore, CacheStoreRpcClient, ClusterCacheStoreClient, QueueResultResponse, RocksCacheStore, }; pub use lazy::LazyRocksCacheStore; +pub use queue_item::{QueueItem, QueueItemStatus, QueueResultAckEvent}; +pub use queue_result::QueueResult; diff --git a/rust/cubestore/cubestore/src/cachestore/queue_item.rs b/rust/cubestore/cubestore/src/cachestore/queue_item.rs new file mode 100644 index 0000000000000..bb2f79a2ab4f5 --- /dev/null +++ b/rust/cubestore/cubestore/src/cachestore/queue_item.rs @@ -0,0 +1,301 @@ +use crate::metastore::{IndexId, RocksSecondaryIndex, TableId}; +use crate::table::{Row, TableValue}; +use crate::{base_rocks_secondary_index, rocks_table_impl, CubeError}; +use chrono::serde::ts_seconds; +use chrono::{DateTime, Duration, Utc}; +use serde::{Deserialize, Deserializer, Serialize}; + +fn merge(a: serde_json::Value, b: serde_json::Value) -> Option { + match (a, b) { + (mut root @ serde_json::Value::Object(_), serde_json::Value::Object(b)) => { + let r = root.as_object_mut().unwrap(); + for (k, v) in b { + if r.contains_key(&k) { + r.remove(&k); + } + + r.insert(k, v); + } + + Some(root) + } + // Special case to truncate extra + (_a, serde_json::Value::Null) => None, + (_a, b) => Some(b), + } +} + +#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)] +pub struct QueueResultAckEvent { + pub path: String, + pub row_id: u64, +} + +#[repr(u8)] +#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)] +pub enum QueueItemStatus { + Pending = 0, + Active = 1, + Finished = 2, +} + +impl ToString for QueueItemStatus { + fn to_string(&self) -> String { + match self { + QueueItemStatus::Pending => "pending".to_string(), + QueueItemStatus::Active => "active".to_string(), + QueueItemStatus::Finished => "finished".to_string(), + } + } +} + +#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq)] +pub struct QueueItem { + prefix: Option, + key: String, + // Immutable field + value: String, + extra: Option, + #[serde(default = "QueueItem::status_default")] + pub(crate) status: QueueItemStatus, + #[serde(default)] + priority: i64, + created: DateTime, + heartbeat: Option>, + #[serde(with = "ts_seconds")] + pub(crate) expire: DateTime, +} + +impl QueueItem { + pub fn new(path: String, value: String, status: QueueItemStatus, priority: i64) -> Self { + let parts: Vec<&str> = path.rsplitn(2, ":").collect(); + + let (prefix, key) = match parts.len() { + 2 => (Some(parts[1].to_string()), parts[0].to_string()), + _ => (None, path), + }; + + QueueItem { + prefix, + key, + value, + status, + priority, + extra: None, + created: Utc::now(), + heartbeat: None, + expire: Utc::now() + Duration::days(1), + } + } + + pub fn into_queue_cancel_row(self) -> Row { + let res = vec![ + TableValue::String(self.value), + if let Some(extra) = self.extra { + TableValue::String(extra) + } else { + TableValue::Null + }, + ]; + + Row::new(res) + } + + pub fn into_queue_retrieve_row(self) -> Row { + let res = vec![ + TableValue::String(self.value), + if let Some(extra) = self.extra { + TableValue::String(extra) + } else { + TableValue::Null + }, + ]; + + Row::new(res) + } + + pub fn into_queue_get_row(self) -> Row { + let res = vec![ + TableValue::String(self.value), + if let Some(extra) = self.extra { + TableValue::String(extra) + } else { + TableValue::Null + }, + ]; + + Row::new(res) + } + + pub fn into_queue_list_row(self, with_payload: bool) -> Row { + let mut res = vec![ + TableValue::String(self.key), + TableValue::String(self.status.to_string()), + if let Some(extra) = self.extra { + TableValue::String(extra) + } else { + TableValue::Null + }, + ]; + + if with_payload { + res.push(TableValue::String(self.value)); + } + + Row::new(res) + } + + pub fn get_key(&self) -> &String { + &self.key + } + + pub fn get_prefix(&self) -> &Option { + &self.prefix + } + + pub fn get_path(&self) -> String { + if let Some(prefix) = &self.prefix { + format!("{}:{}", prefix, self.key) + } else { + self.key.clone() + } + } + + pub fn get_value(&self) -> &String { + &self.value + } + + pub fn get_priority(&self) -> &i64 { + &self.priority + } + + pub fn get_extra(&self) -> &Option { + &self.extra + } + + pub fn get_status(&self) -> &QueueItemStatus { + &self.status + } + + pub fn get_heartbeat(&self) -> &Option> { + &self.heartbeat + } + + pub fn get_created(&self) -> &DateTime { + &self.created + } + + pub fn status_default() -> QueueItemStatus { + QueueItemStatus::Pending + } + + pub fn update_heartbeat(&self) -> Self { + let mut new = self.clone(); + new.heartbeat = Some(Utc::now()); + + new + } + + pub fn merge_extra(&self, payload: String) -> Result { + let mut new = self.clone(); + + if let Some(extra) = &self.extra { + let prev = serde_json::from_str(&extra)?; + let next = serde_json::from_str(&payload)?; + + let extra = merge(prev, next); + + new.extra = extra.map(|v| v.to_string()) + } else { + new.extra = Some(payload); + } + + Ok(new) + } +} + +#[derive(Clone, Copy, Debug)] +pub(crate) enum QueueItemRocksIndex { + ByPath = 1, + ByStatus = 2, + ByPrefix = 3, +} + +rocks_table_impl!(QueueItem, QueueItemRocksTable, TableId::QueueItems, { + vec![ + Box::new(QueueItemRocksIndex::ByPath), + Box::new(QueueItemRocksIndex::ByStatus), + Box::new(QueueItemRocksIndex::ByPrefix), + ] +}); + +#[derive(Hash, Clone, Debug)] +pub enum QueueItemIndexKey { + ByPath(String), + ByStatus(QueueItemStatus), + ByPrefix(String), +} + +base_rocks_secondary_index!(QueueItem, QueueItemRocksIndex); + +impl RocksSecondaryIndex for QueueItemRocksIndex { + fn typed_key_by(&self, row: &QueueItem) -> QueueItemIndexKey { + match self { + QueueItemRocksIndex::ByPath => QueueItemIndexKey::ByPath(row.get_path()), + QueueItemRocksIndex::ByStatus => QueueItemIndexKey::ByStatus(row.get_status().clone()), + QueueItemRocksIndex::ByPrefix => { + QueueItemIndexKey::ByPrefix(if let Some(prefix) = row.get_prefix() { + prefix.clone() + } else { + "".to_string() + }) + } + } + } + + fn key_to_bytes(&self, key: &QueueItemIndexKey) -> Vec { + match key { + QueueItemIndexKey::ByPath(s) => s.as_bytes().to_vec(), + QueueItemIndexKey::ByPrefix(s) => s.as_bytes().to_vec(), + QueueItemIndexKey::ByStatus(s) => { + let mut r = Vec::with_capacity(1); + + match s { + QueueItemStatus::Pending => r.push(0_u8), + QueueItemStatus::Active => r.push(1_u8), + QueueItemStatus::Finished => r.push(2_u8), + } + + r + } + } + } + + fn is_unique(&self) -> bool { + match self { + QueueItemRocksIndex::ByPath => true, + QueueItemRocksIndex::ByStatus => false, + QueueItemRocksIndex::ByPrefix => false, + } + } + + fn version(&self) -> u32 { + match self { + QueueItemRocksIndex::ByPath => 1, + QueueItemRocksIndex::ByStatus => 1, + QueueItemRocksIndex::ByPrefix => 1, + } + } + + fn is_ttl(&self) -> bool { + true + } + + fn get_expire(&self, row: &QueueItem) -> Option> { + Some(row.expire.clone()) + } + + fn get_id(&self) -> IndexId { + *self as IndexId + } +} diff --git a/rust/cubestore/cubestore/src/cachestore/queue_result.rs b/rust/cubestore/cubestore/src/cachestore/queue_result.rs new file mode 100644 index 0000000000000..6a00659b83fec --- /dev/null +++ b/rust/cubestore/cubestore/src/cachestore/queue_result.rs @@ -0,0 +1,85 @@ +use crate::metastore::{IndexId, RocksSecondaryIndex, TableId}; +use crate::{base_rocks_secondary_index, rocks_table_impl}; +use chrono::serde::ts_seconds; +use chrono::{DateTime, Duration, Utc}; +use serde::{Deserialize, Deserializer, Serialize}; + +#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq)] +pub struct QueueResult { + path: String, + pub(crate) value: String, + #[serde(with = "ts_seconds")] + pub(crate) expire: DateTime, +} + +impl QueueResult { + pub fn new(path: String, value: String) -> Self { + QueueResult { + path, + value, + expire: Utc::now() + Duration::minutes(10), + } + } + + pub fn get_path(&self) -> &String { + &self.path + } + + pub fn get_value(&self) -> &String { + &self.value + } +} + +#[derive(Clone, Copy, Debug)] +pub(crate) enum QueueResultRocksIndex { + ByPath = 1, +} + +rocks_table_impl!(QueueResult, QueueResultRocksTable, TableId::QueueResults, { + vec![Box::new(QueueResultRocksIndex::ByPath)] +}); + +#[derive(Hash, Clone, Debug)] +pub enum QueueResultIndexKey { + ByPath(String), +} + +base_rocks_secondary_index!(QueueResult, QueueResultRocksIndex); + +impl RocksSecondaryIndex for QueueResultRocksIndex { + fn typed_key_by(&self, row: &QueueResult) -> QueueResultIndexKey { + match self { + QueueResultRocksIndex::ByPath => QueueResultIndexKey::ByPath(row.get_path().clone()), + } + } + + fn key_to_bytes(&self, key: &QueueResultIndexKey) -> Vec { + match key { + QueueResultIndexKey::ByPath(s) => s.as_bytes().to_vec(), + } + } + + fn is_unique(&self) -> bool { + match self { + QueueResultRocksIndex::ByPath => true, + } + } + + fn version(&self) -> u32 { + match self { + QueueResultRocksIndex::ByPath => 1, + } + } + + fn is_ttl(&self) -> bool { + true + } + + fn get_expire(&self, row: &QueueResult) -> Option> { + Some(row.expire.clone()) + } + + fn get_id(&self) -> IndexId { + *self as IndexId + } +} diff --git a/rust/cubestore/cubestore/src/config/mod.rs b/rust/cubestore/cubestore/src/config/mod.rs index c04d4bbdd9e35..338e3c7c295a2 100644 --- a/rust/cubestore/cubestore/src/config/mod.rs +++ b/rust/cubestore/cubestore/src/config/mod.rs @@ -1175,8 +1175,11 @@ impl Config { }) .await; - let (event_sender, _) = broadcast::channel(10000); // TODO config - let event_sender_to_move = event_sender.clone(); + let (metastore_event_sender, _) = broadcast::channel(8192); // TODO config + let (cachestore_event_sender, _) = broadcast::channel(2048); // TODO config + + let metastore_event_sender_to_move = metastore_event_sender.clone(); + let cachestore_event_sender_to_move = cachestore_event_sender.clone(); self.injector .register_typed::(async move |i| { @@ -1233,7 +1236,7 @@ impl Config { .await .unwrap() }; - meta_store.add_listener(event_sender).await; + meta_store.add_listener(metastore_event_sender).await; meta_store }, ) @@ -1272,15 +1275,20 @@ impl Config { dump_dir, cachestore_fs, config, + vec![cachestore_event_sender], ) .await .unwrap() } else { - LazyRocksCacheStore::load_from_remote(&path, cachestore_fs, config) - .await - .unwrap() + LazyRocksCacheStore::load_from_remote( + &path, + cachestore_fs, + config, + vec![cachestore_event_sender], + ) + .await + .unwrap() }; - cache_store }, ) @@ -1406,7 +1414,7 @@ impl Config { }) .await; - let cluster_meta_store_sender = event_sender_to_move.clone(); + let cluster_meta_store_sender = metastore_event_sender_to_move.clone(); self.injector .register_typed_with_default::(async move |i| { @@ -1457,7 +1465,8 @@ impl Config { i.get_service_typed().await, i.get_service_typed().await, i.get_service_typed().await, - event_sender_to_move.subscribe(), + metastore_event_sender_to_move.subscribe(), + cachestore_event_sender_to_move.subscribe(), i.get_service_typed().await, )) }) diff --git a/rust/cubestore/cubestore/src/metastore/mod.rs b/rust/cubestore/cubestore/src/metastore/mod.rs index fc38dd9b26880..a7601b1a0efb2 100644 --- a/rust/cubestore/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/cubestore/src/metastore/mod.rs @@ -78,7 +78,7 @@ use std::mem::take; use std::path::Path; use std::str::FromStr; -use crate::cachestore::CacheItem; +use crate::cachestore::{CacheItem, QueueItem, QueueItemStatus, QueueResult, QueueResultAckEvent}; use crate::remotefs::LocalDirRemoteFs; use snapshot_info::SnapshotInfo; use std::time::Duration; @@ -160,7 +160,7 @@ macro_rules! base_rocks_secondary_index { RocksSecondaryIndex::is_ttl(self) } - fn get_expire<'a>(&self, row: &'a $table) -> &'a Option> { + fn get_expire(&self, row: &$table) -> Option> { RocksSecondaryIndex::get_expire(self, row) } } @@ -183,6 +183,12 @@ impl DataFrameValue for u64 { } } +impl DataFrameValue for i64 { + fn value(v: &Self) -> String { + format!("{}", v) + } +} + impl DataFrameValue for bool { fn value(v: &Self) -> String { format!("{}", v) @@ -247,6 +253,12 @@ impl DataFrameValue for IndexType { } } +impl DataFrameValue for QueueItemStatus { + fn value(v: &Self) -> String { + format!("{:?}", v) + } +} + impl DataFrameValue for Option { fn value(v: &Self) -> String { v.as_ref() @@ -1058,6 +1070,13 @@ pub enum MetaStoreEvent { // TODO: Split to CacheStoreEvent UpdateCacheItem(IdRow, IdRow), DeleteCacheItem(IdRow), + + UpdateQueueItem(IdRow, IdRow), + DeleteQueueItem(IdRow), + AckQueueItem(QueueResultAckEvent), + + UpdateQueueResult(IdRow, IdRow), + DeleteQueueResult(IdRow), } fn meta_store_merge( diff --git a/rust/cubestore/cubestore/src/metastore/rocks_store.rs b/rust/cubestore/cubestore/src/metastore/rocks_store.rs index e0719faa5f945..cc1ca2d2fba45 100644 --- a/rust/cubestore/cubestore/src/metastore/rocks_store.rs +++ b/rust/cubestore/cubestore/src/metastore/rocks_store.rs @@ -72,7 +72,9 @@ enum_from_primitive! { MultiIndexes = 0x0900, MultiPartitions = 0x0A00, ReplayHandles = 0x0B00, - CacheItems = 0x0C00 + CacheItems = 0x0C00, + QueueItems = 0x0D00, + QueueResults = 0x0E00 } } @@ -91,6 +93,8 @@ impl TableId { TableId::MultiPartitions => false, TableId::ReplayHandles => false, TableId::CacheItems => true, + TableId::QueueItems => true, + TableId::QueueResults => true, } } } @@ -500,7 +504,7 @@ pub struct RocksStore { pub db: Arc, pub config: Arc, seq_store: Arc>>, - listeners: Arc>>>, + pub listeners: Arc>>>, metastore_fs: Arc, last_checkpoint_time: Arc>, write_notify: Arc, diff --git a/rust/cubestore/cubestore/src/metastore/rocks_table.rs b/rust/cubestore/cubestore/src/metastore/rocks_table.rs index 33e513e85e0a5..734e5b7f42867 100644 --- a/rust/cubestore/cubestore/src/metastore/rocks_table.rs +++ b/rust/cubestore/cubestore/src/metastore/rocks_table.rs @@ -118,7 +118,7 @@ pub trait BaseRocksSecondaryIndex: Debug { fn is_ttl(&self) -> bool; - fn get_expire<'a>(&self, _row: &'a T) -> &'a Option>; + fn get_expire(&self, _row: &T) -> Option>; fn version(&self) -> u32; @@ -141,7 +141,7 @@ pub trait RocksSecondaryIndex: BaseRocksSecondaryIndex { if RocksSecondaryIndex::is_ttl(self) { let expire = RocksSecondaryIndex::get_expire(self, row); - RocksSecondaryIndexValue::HashAndTTL(&hash, expire.clone()) + RocksSecondaryIndexValue::HashAndTTL(&hash, expire) .to_bytes(RocksSecondaryIndex::value_version(self)) } else { RocksSecondaryIndexValue::Hash(&hash).to_bytes(RocksSecondaryIndex::value_version(self)) @@ -170,8 +170,8 @@ pub trait RocksSecondaryIndex: BaseRocksSecondaryIndex { false } - fn get_expire<'a>(&self, _row: &'a T) -> &'a Option> { - &None + fn get_expire(&self, _row: &T) -> Option> { + None } } @@ -199,7 +199,7 @@ where RocksSecondaryIndex::is_ttl(self) } - fn get_expire<'a>(&self, row: &'a T) -> &'a Option> { + fn get_expire(&self, row: &T) -> Option> { RocksSecondaryIndex::get_expire(self, row) } @@ -636,14 +636,36 @@ pub trait RocksTable: Debug + Send + Sync { fn delete(&self, row_id: u64, batch_pipe: &mut BatchPipe) -> Result, CubeError> { let row = self.get_row_or_not_found(row_id)?; - let deleted_row = self.delete_index_row(row.get_row(), row_id)?; - batch_pipe.add_event(MetaStoreEvent::Delete(Self::table_id(), row_id)); + self.delete_impl(row, batch_pipe) + } + + fn try_delete( + &self, + row_id: u64, + batch_pipe: &mut BatchPipe, + ) -> Result>, CubeError> { + if let Some(row) = self.get_row(row_id)? { + Ok(Some(self.delete_impl(row, batch_pipe)?)) + } else { + Ok(None) + } + } + + fn delete_impl( + &self, + row: IdRow, + batch_pipe: &mut BatchPipe, + ) -> Result, CubeError> { + let deleted_row = self.delete_index_row(row.get_row(), row.get_id())?; + batch_pipe.add_event(MetaStoreEvent::Delete(Self::table_id(), row.get_id())); batch_pipe.add_event(self.delete_event(row.clone())); for row in deleted_row { batch_pipe.batch().delete(row.key); } - batch_pipe.batch().delete(self.delete_row(row_id)?.key); + batch_pipe + .batch() + .delete(self.delete_row(row.get_id())?.key); Ok(row) } diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/mod.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/mod.rs index 8da722434b5ad..309c074574d73 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/mod.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/mod.rs @@ -5,6 +5,7 @@ mod system_chunks; mod system_indexes; mod system_jobs; mod system_partitions; +mod system_queue; mod system_replay_handles; mod system_snapshots; mod system_tables; @@ -16,6 +17,7 @@ pub use system_chunks::*; pub use system_indexes::*; pub use system_jobs::*; pub use system_partitions::*; +pub use system_queue::*; pub use system_replay_handles::*; pub use system_snapshots::*; pub use system_tables::*; diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_queue.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_queue.rs new file mode 100644 index 0000000000000..10eb1fd222980 --- /dev/null +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_queue.rs @@ -0,0 +1,115 @@ +use crate::cachestore::QueueItem; +use crate::metastore::IdRow; +use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext}; +use crate::CubeError; +use arrow::array::{ArrayRef, Int64Array, StringArray, TimestampNanosecondArray}; +use arrow::datatypes::{DataType, Field, TimeUnit}; +use async_trait::async_trait; +use std::sync::Arc; + +pub struct SystemQueueTableDef; + +#[async_trait] +impl InfoSchemaTableDef for SystemQueueTableDef { + type T = IdRow; + + async fn rows(&self, ctx: InfoSchemaTableDefContext) -> Result>, CubeError> { + Ok(Arc::new(ctx.cache_store.queue_all().await?)) + } + + fn columns(&self) -> Vec<(Field, Box>) -> ArrayRef>)> { + vec![ + ( + Field::new("id", DataType::Utf8, false), + Box::new(|items| { + Arc::new(StringArray::from_iter( + items.iter().map(|row| Some(row.get_row().get_key())), + )) + }), + ), + ( + Field::new("prefix", DataType::Utf8, false), + Box::new(|items| { + Arc::new(StringArray::from_iter( + items.iter().map(|row| row.get_row().get_prefix().clone()), + )) + }), + ), + ( + Field::new( + "created", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Box::new(|items| { + Arc::new(TimestampNanosecondArray::from( + items + .iter() + .map(|row| row.get_row().get_created().timestamp_nanos()) + .collect::>(), + )) + }), + ), + ( + Field::new("status", DataType::Utf8, false), + Box::new(|items| { + Arc::new(StringArray::from( + items + .iter() + .map(|row| format!("{:?}", row.get_row().get_status())) + .collect::>(), + )) + }), + ), + ( + Field::new("priority", DataType::Int64, false), + Box::new(|items| { + Arc::new(Int64Array::from( + items + .iter() + .map(|row| row.get_row().get_priority().clone()) + .collect::>(), + )) + }), + ), + ( + Field::new( + "heartbeat", + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + ), + Box::new(|items| { + Arc::new(TimestampNanosecondArray::from( + items + .iter() + .map(|row| { + row.get_row() + .get_heartbeat() + .as_ref() + .map(|v| v.timestamp_nanos()) + }) + .collect::>(), + )) + }), + ), + ( + Field::new("value", DataType::Utf8, false), + Box::new(|items| { + Arc::new(StringArray::from_iter( + items.iter().map(|row| Some(row.get_row().get_value())), + )) + }), + ), + ( + Field::new("extra", DataType::Utf8, true), + Box::new(|items| { + Arc::new(StringArray::from_iter( + items.iter().map(|row| row.get_row().get_extra().clone()), + )) + }), + ), + ] + } +} + +crate::base_info_schema_table_def!(SystemQueueTableDef); diff --git a/rust/cubestore/cubestore/src/queryplanner/mod.rs b/rust/cubestore/cubestore/src/queryplanner/mod.rs index dbf797da8b8fb..5b4afff7ef977 100644 --- a/rust/cubestore/cubestore/src/queryplanner/mod.rs +++ b/rust/cubestore/cubestore/src/queryplanner/mod.rs @@ -25,7 +25,7 @@ use crate::metastore::{IdRow, MetaStore}; use crate::queryplanner::flatten_union::FlattenUnion; use crate::queryplanner::info_schema::{ SchemataInfoSchemaTableDef, SystemCacheTableDef, SystemChunksTableDef, SystemIndexesTableDef, - SystemJobsTableDef, SystemPartitionsTableDef, SystemReplayHandlesTableDef, + SystemJobsTableDef, SystemPartitionsTableDef, SystemQueueTableDef, SystemReplayHandlesTableDef, SystemSnapshotsTableDef, SystemTablesTableDef, TablesInfoSchemaTableDef, }; use crate::queryplanner::now::MaterializeNow; @@ -331,6 +331,11 @@ impl ContextProvider for MetaStoreSchemaProvider { self.cache_store.clone(), InfoSchemaTable::SystemChunks, ))), + ("system", "queue") => Some(Arc::new(InfoSchemaTableProvider::new( + self.meta_store.clone(), + self.cache_store.clone(), + InfoSchemaTable::SystemQueue, + ))), ("system", "replay_handles") => Some(Arc::new(InfoSchemaTableProvider::new( self.meta_store.clone(), self.cache_store.clone(), @@ -383,6 +388,7 @@ pub enum InfoSchemaTable { SystemIndexes, SystemPartitions, SystemChunks, + SystemQueue, SystemReplayHandles, SystemCache, SystemSnapshots, @@ -448,6 +454,7 @@ impl InfoSchemaTable { InfoSchemaTable::SystemTables => Box::new(SystemTablesTableDef), InfoSchemaTable::SystemIndexes => Box::new(SystemIndexesTableDef), InfoSchemaTable::SystemChunks => Box::new(SystemChunksTableDef), + InfoSchemaTable::SystemQueue => Box::new(SystemQueueTableDef), InfoSchemaTable::SystemReplayHandles => Box::new(SystemReplayHandlesTableDef), InfoSchemaTable::SystemPartitions => Box::new(SystemPartitionsTableDef), InfoSchemaTable::SystemJobs => Box::new(SystemJobsTableDef), diff --git a/rust/cubestore/cubestore/src/scheduler/mod.rs b/rust/cubestore/cubestore/src/scheduler/mod.rs index a775e8205f8ee..003669955e35f 100644 --- a/rust/cubestore/cubestore/src/scheduler/mod.rs +++ b/rust/cubestore/cubestore/src/scheduler/mod.rs @@ -36,6 +36,7 @@ pub struct SchedulerImpl { cluster: Arc, remote_fs: Arc, event_receiver: Mutex>, + cachestore_event_receiver: Mutex>, cancel_token: CancellationToken, gc_loop: Arc, config: Arc, @@ -50,6 +51,7 @@ impl SchedulerImpl { cluster: Arc, remote_fs: Arc, event_receiver: Receiver, + cachestore_event_receiver: Receiver, config: Arc, ) -> SchedulerImpl { let cancel_token = CancellationToken::new(); @@ -64,6 +66,7 @@ impl SchedulerImpl { cluster, remote_fs, event_receiver: Mutex::new(event_receiver), + cachestore_event_receiver: Mutex::new(cachestore_event_receiver), cancel_token, gc_loop, config, @@ -76,6 +79,8 @@ impl SchedulerImpl { ) -> Vec>> { let scheduler2 = scheduler.clone(); let scheduler3 = scheduler.clone(); + let scheduler4 = scheduler.clone(); + vec![ cube_ext::spawn(async move { let gc_loop = scheduler.gc_loop.clone(); @@ -83,14 +88,18 @@ impl SchedulerImpl { Ok(()) }), cube_ext::spawn(async move { - Self::run_scheduler(scheduler2).await; + scheduler2.run_meta_event_processor().await; + Ok(()) + }), + cube_ext::spawn(async move { + scheduler3.run_cache_event_processor().await; Ok(()) }), cube_ext::spawn(async move { - scheduler3 + scheduler4 .reconcile_loop .process( - scheduler3.clone(), + scheduler4.clone(), async move |_| Ok(Delay::new(Duration::from_secs(30)).await), async move |s, _| s.reconcile().await, ) @@ -100,11 +109,11 @@ impl SchedulerImpl { ] } - async fn run_scheduler(scheduler: Arc) { + async fn run_meta_event_processor(self: Arc) { loop { - let mut event_receiver = scheduler.event_receiver.lock().await; + let mut event_receiver = self.event_receiver.lock().await; let event = tokio::select! { - _ = scheduler.cancel_token.cancelled() => { + _ = self.cancel_token.cancelled() => { return; } event = event_receiver.recv() => { @@ -120,7 +129,8 @@ impl SchedulerImpl { } } }; - let scheduler_to_move = scheduler.clone(); + + let scheduler_to_move = self.clone(); cube_ext::spawn(async move { let res = scheduler_to_move.process_event(event.clone()).await; if let Err(e) = res { @@ -130,6 +140,31 @@ impl SchedulerImpl { } } + async fn run_cache_event_processor(self: Arc) { + loop { + let mut event_receiver = self.cachestore_event_receiver.lock().await; + let _ = tokio::select! { + _ = self.cancel_token.cancelled() => { + return; + } + event = event_receiver.recv() => { + match event { + Err(broadcast::error::RecvError::Lagged(messages)) => { + error!("Scheduler is lagging on cache store event processing for {} messages", messages); + continue; + }, + Err(broadcast::error::RecvError::Closed) => { + return; + }, + Ok(event) => event, + } + } + }; + + // Right now, it's used to free channel + } + } + pub async fn reconcile(&self) -> Result<(), CubeError> { if let Err(e) = warn_long_fut( "Removing orphaned jobs", diff --git a/rust/cubestore/cubestore/src/sql/mod.rs b/rust/cubestore/cubestore/src/sql/mod.rs index af4b181d1ff4d..df55f667d77be 100644 --- a/rust/cubestore/cubestore/src/sql/mod.rs +++ b/rust/cubestore/cubestore/src/sql/mod.rs @@ -35,7 +35,7 @@ use tracing_futures::WithSubscriber; use cubehll::HllSketch; use parser::Statement as CubeStoreStatement; -use crate::cachestore::{CacheItem, CacheStore}; +use crate::cachestore::{CacheItem, CacheStore, QueueItem, QueueResultResponse}; use crate::cluster::{Cluster, JobEvent, JobResultListener}; use crate::config::injection::DIService; use crate::config::ConfigObj; @@ -1162,6 +1162,189 @@ impl SqlService for SqlServiceImpl { .await?; Ok(Arc::new(DataFrame::new(vec![], vec![]))) } + CubeStoreStatement::QueueAdd { + key, + priority, + value, + } => { + self.cachestore + .queue_add(QueueItem::new( + key.value, + value, + QueueItem::status_default(), + priority, + )) + .await?; + + Ok(Arc::new(DataFrame::new(vec![], vec![]))) + } + CubeStoreStatement::QueueTruncate {} => { + self.cachestore.queue_truncate().await?; + + Ok(Arc::new(DataFrame::new(vec![], vec![]))) + } + CubeStoreStatement::QueueCancel { key } => { + let columns = vec![ + Column::new("payload".to_string(), ColumnType::String, 0), + Column::new("extra".to_string(), ColumnType::String, 1), + ]; + + let result = self.cachestore.queue_cancel(key.value).await?; + if let Some(result) = result { + Ok(Arc::new(DataFrame::new( + columns, + vec![result.into_row().into_queue_cancel_row()], + ))) + } else { + Ok(Arc::new(DataFrame::new(columns, vec![]))) + } + } + CubeStoreStatement::QueueHeartbeat { key } => { + self.cachestore.queue_heartbeat(key.value).await?; + + Ok(Arc::new(DataFrame::new(vec![], vec![]))) + } + CubeStoreStatement::QueueMergeExtra { key, payload } => { + self.cachestore + .queue_merge_extra(key.value, payload) + .await?; + + Ok(Arc::new(DataFrame::new(vec![], vec![]))) + } + CubeStoreStatement::QueueAck { key, result } => { + self.cachestore.queue_ack(key.value, result).await?; + + Ok(Arc::new(DataFrame::new(vec![], vec![]))) + } + CubeStoreStatement::QueueGet { key } => { + let columns = vec![ + Column::new("payload".to_string(), ColumnType::String, 0), + Column::new("extra".to_string(), ColumnType::String, 1), + ]; + + let result = self.cachestore.queue_get(key.value).await?; + if let Some(result) = result { + Ok(Arc::new(DataFrame::new( + columns, + vec![result.into_row().into_queue_get_row()], + ))) + } else { + Ok(Arc::new(DataFrame::new(columns, vec![]))) + } + } + CubeStoreStatement::QueueToCancel { + prefix, + orphaned_timeout, + stalled_timeout, + } => { + let rows = self + .cachestore + .queue_to_cancel(prefix.value, orphaned_timeout, stalled_timeout) + .await?; + + let columns = vec![Column::new("id".to_string(), ColumnType::String, 0)]; + + Ok(Arc::new(DataFrame::new( + columns, + rows.into_iter() + .map(|item| { + Row::new(vec![TableValue::String(item.get_row().get_key().clone())]) + }) + .collect(), + ))) + } + CubeStoreStatement::QueueList { + prefix, + with_payload, + status_filter, + sort_by_priority, + } => { + let rows = self + .cachestore + .queue_list(prefix.value, status_filter, sort_by_priority) + .await?; + + let mut columns = vec![ + Column::new("id".to_string(), ColumnType::String, 0), + Column::new("status".to_string(), ColumnType::String, 1), + Column::new("extra".to_string(), ColumnType::String, 2), + ]; + + if with_payload { + columns.push(Column::new("payload".to_string(), ColumnType::String, 3)); + } + + Ok(Arc::new(DataFrame::new( + columns, + rows.into_iter() + .map(|item| item.into_row().into_queue_list_row(with_payload)) + .collect(), + ))) + } + CubeStoreStatement::QueueRetrieve { key, concurrency } => { + let result = self + .cachestore + .queue_retrieve(key.value, concurrency) + .await?; + let rows = if let Some(result) = result { + vec![result.into_row().into_queue_retrieve_row()] + } else { + vec![] + }; + + Ok(Arc::new(DataFrame::new( + vec![ + Column::new("payload".to_string(), ColumnType::String, 0), + Column::new("extra".to_string(), ColumnType::String, 1), + ], + rows, + ))) + } + CubeStoreStatement::QueueResult { key } => { + let columns = vec![ + Column::new("payload".to_string(), ColumnType::String, 0), + Column::new("type".to_string(), ColumnType::String, 1), + ]; + + let ack_result = self.cachestore.queue_result(key.value).await?; + if let Some(ack_result) = ack_result { + match ack_result { + QueueResultResponse::Success { value } => Ok(Arc::new(DataFrame::new( + columns, + vec![Row::new(vec![ + TableValue::String(value), + TableValue::String("success".to_string()), + ])], + ))), + } + } else { + Ok(Arc::new(DataFrame::new(columns, vec![]))) + } + } + CubeStoreStatement::QueueResultBlocking { timeout, key } => { + let columns = vec![ + Column::new("payload".to_string(), ColumnType::String, 0), + Column::new("type".to_string(), ColumnType::String, 1), + ]; + + let ack_result = self + .cachestore + .queue_result_blocking(key.value, timeout) + .await?; + if let Some(ack_result) = ack_result { + match ack_result { + QueueResultResponse::Success { value } => Ok(Arc::new(DataFrame::new( + columns, + vec![Row::new(vec![ + TableValue::String(value), + TableValue::String("success".to_string()), + ])], + ))), + } + } else { + Ok(Arc::new(DataFrame::new(vec![], vec![]))) + } + } CubeStoreStatement::Statement(Statement::Query(q)) => { let logical_plan = self .query_planner @@ -1245,20 +1428,17 @@ impl SqlService for SqlServiceImpl { ))) } CubeStoreStatement::CacheGet { key } => { - let row = self.cachestore.cache_get(key.value).await?; - if let Some(r) = row { - Ok(Arc::new(DataFrame::new( - vec![Column::new("value".to_string(), ColumnType::String, 0)], - vec![Row::new(vec![TableValue::String( - r.get_row().get_value().clone(), - )])], - ))) + let result = self.cachestore.cache_get(key.value).await?; + let value = if let Some(result) = result { + TableValue::String(result.into_row().value) } else { - Ok(Arc::new(DataFrame::new( - vec![Column::new("value".to_string(), ColumnType::String, 0)], - vec![Row::new(vec![TableValue::Null])], - ))) - } + TableValue::Null + }; + + Ok(Arc::new(DataFrame::new( + vec![Column::new("value".to_string(), ColumnType::String, 0)], + vec![Row::new(vec![value])], + ))) } CubeStoreStatement::CacheKeys { prefix } => { let rows = self.cachestore.cache_keys(prefix.value).await?; @@ -3363,7 +3543,7 @@ mod tests { ).await.unwrap(); let result = service.exec_query( - "EXPLAIN SELECT platform, sum(amount) from foo.orders where age > 15 group by platform" + "EXPLAIN SELECT platform, sum(amount) from foo.orders where age > 15 group by platform" ).await.unwrap(); assert_eq!(result.len(), 1); assert_eq!(result.get_columns().len(), 1); @@ -3415,7 +3595,7 @@ mod tests { ).await.unwrap(); let result = service.exec_query( - "EXPLAIN ANALYZE SELECT platform, sum(amount) from foo.orders where age > 15 group by platform" + "EXPLAIN ANALYZE SELECT platform, sum(amount) from foo.orders where age > 15 group by platform" ).await.unwrap(); assert_eq!(result.len(), 2); diff --git a/rust/cubestore/cubestore/src/sql/parser.rs b/rust/cubestore/cubestore/src/sql/parser.rs index 2fbdcd8931e24..e836557a08555 100644 --- a/rust/cubestore/cubestore/src/sql/parser.rs +++ b/rust/cubestore/cubestore/src/sql/parser.rs @@ -1,3 +1,4 @@ +use crate::cachestore::QueueItemStatus; use sqlparser::ast::{ HiveDistributionStyle, Ident, ObjectName, Query, SqlOption, Statement as SQLStatement, Value, }; @@ -73,6 +74,52 @@ pub enum Statement { CacheIncr { path: Ident, }, + // queue + QueueAdd { + priority: i64, + key: Ident, + value: String, + }, + QueueGet { + key: Ident, + }, + QueueToCancel { + prefix: Ident, + orphaned_timeout: Option, + stalled_timeout: Option, + }, + QueueList { + prefix: Ident, + with_payload: bool, + status_filter: Option, + sort_by_priority: bool, + }, + QueueCancel { + key: Ident, + }, + QueueHeartbeat { + key: Ident, + }, + QueueAck { + key: Ident, + result: String, + }, + QueueMergeExtra { + key: Ident, + payload: String, + }, + QueueRetrieve { + key: Ident, + concurrency: u32, + }, + QueueResult { + key: Ident, + }, + QueueResultBlocking { + key: Ident, + timeout: u64, + }, + QueueTruncate {}, System(SystemCommand), Dump(Box), } @@ -118,6 +165,10 @@ impl<'a> CubeStoreParser<'a> { self.parser.next_token(); self.parse_system() } + _ if w.value.eq_ignore_ascii_case("queue") => { + self.parser.next_token(); + self.parse_queue() + } Keyword::CACHE => { self.parser.next_token(); self.parse_cache() @@ -223,6 +274,23 @@ impl<'a> CubeStoreParser<'a> { } } + fn parse_number(&mut self, var_name: &str) -> Result { + match self.parser.parse_number_value()? { + Value::Number(var, false) => var.parse::().map_err(|err| { + ParserError::ParserError(format!( + "{} must be a positive integer, error: {}", + var_name, err + )) + }), + x => { + return Err(ParserError::ParserError(format!( + "{} must be a positive integer, actual: {:?}", + var_name, x + ))) + } + } + } + pub fn parse_metastore(&mut self) -> Result { if self.parse_custom_token("set_current") { match self.parser.parse_number_value()? { @@ -248,6 +316,187 @@ impl<'a> CubeStoreParser<'a> { } } + fn parse_queue(&mut self) -> Result { + let command = match self.parser.next_token() { + Token::Word(w) => w.value.to_ascii_lowercase(), + _ => { + return Err(ParserError::ParserError( + "Unknown queue command, available: ADD|TRUNCATE".to_string(), + )) + } + }; + + match command.as_str() { + "add" => { + let priority = if self.parse_custom_token(&"priority") { + match self.parser.parse_number_value()? { + Value::Number(priority, _) => { + let r = priority.parse::().map_err(|err| { + ParserError::ParserError(format!( + "priority must be a positive integer, error: {}", + err + )) + })?; + + r + } + x => { + return Err(ParserError::ParserError(format!( + "priority must be a positive integer, actual: {:?}", + x + ))) + } + } + } else { + 0 + }; + + Ok(Statement::QueueAdd { + priority, + key: self.parser.parse_identifier()?, + value: self.parser.parse_literal_string()?, + }) + } + "cancel" => Ok(Statement::QueueCancel { + key: self.parser.parse_identifier()?, + }), + "heartbeat" => Ok(Statement::QueueHeartbeat { + key: self.parser.parse_identifier()?, + }), + "ack" => Ok(Statement::QueueAck { + key: self.parser.parse_identifier()?, + result: self.parser.parse_literal_string()?, + }), + "merge_extra" => Ok(Statement::QueueMergeExtra { + key: self.parser.parse_identifier()?, + payload: self.parser.parse_literal_string()?, + }), + "get" => Ok(Statement::QueueGet { + key: self.parser.parse_identifier()?, + }), + "stalled" => { + let stalled_timeout = self.parse_number("stalled timeout")?; + + Ok(Statement::QueueToCancel { + prefix: self.parser.parse_identifier()?, + orphaned_timeout: None, + stalled_timeout: Some(stalled_timeout), + }) + } + "orphaned" => { + let orphaned_timeout = self.parse_number("orphaned timeout")?; + + Ok(Statement::QueueToCancel { + prefix: self.parser.parse_identifier()?, + orphaned_timeout: Some(orphaned_timeout), + stalled_timeout: None, + }) + } + "to_cancel" => { + let stalled_timeout = self.parse_number("stalled timeout")?; + let orphaned_timeout = self.parse_number("orphaned timeout")?; + + Ok(Statement::QueueToCancel { + prefix: self.parser.parse_identifier()?, + orphaned_timeout: Some(stalled_timeout), + stalled_timeout: Some(orphaned_timeout), + }) + } + "pending" => { + let with_payload = self.parse_custom_token(&"with_payload"); + + Ok(Statement::QueueList { + prefix: self.parser.parse_identifier()?, + with_payload, + status_filter: Some(QueueItemStatus::Pending), + sort_by_priority: true, + }) + } + "active" => { + let with_payload = self.parse_custom_token(&"with_payload"); + + Ok(Statement::QueueList { + prefix: self.parser.parse_identifier()?, + with_payload, + status_filter: Some(QueueItemStatus::Active), + sort_by_priority: false, + }) + } + "list" => { + let with_payload = self.parse_custom_token(&"with_payload"); + + Ok(Statement::QueueList { + prefix: self.parser.parse_identifier()?, + with_payload, + status_filter: None, + sort_by_priority: true, + }) + } + "retrieve" => { + let concurrency = if self.parse_custom_token(&"concurrency") { + match self.parser.parse_number_value()? { + Value::Number(concurrency, false) => { + let r = concurrency.parse::().map_err(|err| { + ParserError::ParserError(format!( + "CONCURRENCY must be a positive integer, error: {}", + err + )) + })?; + + r + } + x => { + return Err(ParserError::ParserError(format!( + "CONCURRENCY must be a positive integer, actual: {:?}", + x + ))) + } + } + } else { + 1 + }; + + Ok(Statement::QueueRetrieve { + key: self.parser.parse_identifier()?, + concurrency, + }) + } + "result" => Ok(Statement::QueueResult { + key: self.parser.parse_identifier()?, + }), + "result_blocking" => { + let timeout = match self.parser.parse_number_value()? { + Value::Number(concurrency, false) => { + let r = concurrency.parse::().map_err(|err| { + ParserError::ParserError(format!( + "TIMEOUT must be a positive integer, error: {}", + err + )) + })?; + + r + } + x => { + return Err(ParserError::ParserError(format!( + "TIMEOUT must be a positive integer, actual: {:?}", + x + ))) + } + }; + + Ok(Statement::QueueResultBlocking { + timeout, + key: self.parser.parse_identifier()?, + }) + } + "truncate" => Ok(Statement::QueueTruncate {}), + command => Err(ParserError::ParserError(format!( + "Unknown queue command: {}", + command + ))), + } + } + fn parse_system(&mut self) -> Result { if self.parse_custom_token("kill") && self.parser.parse_keywords(&[Keyword::ALL])