diff --git a/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts b/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts index 06684a78951fe..1ef775048ba3b 100644 --- a/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts +++ b/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts @@ -52,19 +52,22 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface { addedToQueueTime: new Date().getTime() }; - const _rows = await this.driver.query('QUEUE ADD PRIORITY ? ? ?', [ + const rows = await this.driver.query('QUEUE ADD PRIORITY ? ? ?', [ priority, this.prefixKey(this.redisHash(queryKey)), JSON.stringify(data) ]); + if (rows && rows.length) { + return [ + rows[0].added === 'true' ? 1 : 0, + null, + null, + parseInt(rows[0].pending, 10), + data.addedToQueueTime + ]; + } - return [ - 1, - null, - null, - 1, - data.addedToQueueTime - ]; + throw new Error('Empty response on QUEUE ADD'); } // TODO: Looks useless, because we can do it in one step - getQueriesToCancel diff --git a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs index ad030ed919a88..bb840c35c59bb 100644 --- a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs +++ b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs @@ -261,6 +261,23 @@ impl RocksCacheStore { }) .await } + + fn queue_count_by_prefix_and_status( + db_ref: DbTableRef, + prefix: &Option, + status: QueueItemStatus, + ) -> Result { + let queue_schema = QueueItemRocksTable::new(db_ref.clone()); + let index_key = + QueueItemIndexKey::ByPrefixAndStatus(prefix.clone().unwrap_or("".to_string()), status); + queue_schema.count_rows_by_index(&index_key, &QueueItemRocksIndex::ByPrefixAndStatus) + } +} + +#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)] +pub struct QueueAddResponse { + pub added: bool, + pub pending: u64, } #[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)] @@ -285,7 +302,7 @@ pub trait CacheStore: DIService + Send + Sync { // queue async fn queue_all(&self) -> Result>, CubeError>; - async fn queue_add(&self, item: QueueItem) -> Result; + async fn queue_add(&self, item: QueueItem) -> Result; async fn queue_truncate(&self) -> Result<(), CubeError>; async fn queue_get(&self, key: String) -> Result>, CubeError>; async fn queue_to_cancel( @@ -453,7 +470,7 @@ impl CacheStore for RocksCacheStore { .await } - async fn queue_add(&self, item: QueueItem) -> Result { + async fn queue_add(&self, item: QueueItem) -> Result { self.store .write_operation(move |db_ref, batch_pipe| { let queue_schema = QueueItemRocksTable::new(db_ref.clone()); @@ -461,11 +478,24 @@ impl CacheStore for RocksCacheStore { let id_row_opt = queue_schema .get_single_opt_row_by_index(&index_key, &QueueItemRocksIndex::ByPath)?; - if id_row_opt.is_none() { + let pending = Self::queue_count_by_prefix_and_status( + db_ref, + item.get_prefix(), + QueueItemStatus::Pending, + )?; + + let added = if id_row_opt.is_none() { queue_schema.insert(item, batch_pipe)?; - } - Ok(true) + true + } else { + false + }; + + Ok(QueueAddResponse { + added, + pending: if added { pending + 1 } else { pending }, + }) }) .await } @@ -629,26 +659,12 @@ impl CacheStore for RocksCacheStore { if let Some(id_row) = id_row_opt { if id_row.get_row().get_status() == &QueueItemStatus::Pending { - // TODO: Introduce count + Active index? - let index_key = QueueItemIndexKey::ByPrefix( - if let Some(prefix) = id_row.get_row().get_prefix() { - prefix.clone() - } else { - "".to_string() - }, - ); - let in_queue = queue_schema - .get_rows_by_index(&index_key, &QueueItemRocksIndex::ByPrefix)?; - - let mut current_active = 0; - - for item in in_queue { - if item.get_row().get_status() == &QueueItemStatus::Active { - current_active += 1; - } - } - - if current_active >= allow_concurrency { + let current_active = Self::queue_count_by_prefix_and_status( + db_ref, + id_row.get_row().get_prefix(), + QueueItemStatus::Active, + )?; + if current_active >= (allow_concurrency as u64) { return Ok(None); } @@ -835,7 +851,7 @@ impl CacheStore for ClusterCacheStoreClient { panic!("CacheStore cannot be used on the worker node! queue_all was used.") } - async fn queue_add(&self, _item: QueueItem) -> Result { + async fn queue_add(&self, _item: QueueItem) -> Result { panic!("CacheStore cannot be used on the worker node! queue_add was used.") } diff --git a/rust/cubestore/cubestore/src/cachestore/compaction.rs b/rust/cubestore/cubestore/src/cachestore/compaction.rs index 759e9f4a6b431..71ac9b0891d36 100644 --- a/rust/cubestore/cubestore/src/cachestore/compaction.rs +++ b/rust/cubestore/cubestore/src/cachestore/compaction.rs @@ -35,7 +35,6 @@ impl MetaStoreCacheCompactionFilter { } } -#[cfg(debug_assertions)] impl Drop for MetaStoreCacheCompactionFilter { fn drop(&mut self) { let elapsed = Utc::now() - self.current; diff --git a/rust/cubestore/cubestore/src/cachestore/lazy.rs b/rust/cubestore/cubestore/src/cachestore/lazy.rs index 70907953cb859..1821b22d1143a 100644 --- a/rust/cubestore/cubestore/src/cachestore/lazy.rs +++ b/rust/cubestore/cubestore/src/cachestore/lazy.rs @@ -1,3 +1,4 @@ +use crate::cachestore::cache_rocksstore::QueueAddResponse; use crate::cachestore::{ CacheItem, CacheStore, QueueItem, QueueItemStatus, QueueResultResponse, RocksCacheStore, }; @@ -203,7 +204,7 @@ impl CacheStore for LazyRocksCacheStore { self.init().await?.queue_all().await } - async fn queue_add(&self, item: QueueItem) -> Result { + async fn queue_add(&self, item: QueueItem) -> Result { self.init().await?.queue_add(item).await } diff --git a/rust/cubestore/cubestore/src/cachestore/queue_item.rs b/rust/cubestore/cubestore/src/cachestore/queue_item.rs index bb2f79a2ab4f5..c9de48d38d322 100644 --- a/rust/cubestore/cubestore/src/cachestore/queue_item.rs +++ b/rust/cubestore/cubestore/src/cachestore/queue_item.rs @@ -217,14 +217,14 @@ impl QueueItem { #[derive(Clone, Copy, Debug)] pub(crate) enum QueueItemRocksIndex { ByPath = 1, - ByStatus = 2, + ByPrefixAndStatus = 2, ByPrefix = 3, } rocks_table_impl!(QueueItem, QueueItemRocksTable, TableId::QueueItems, { vec![ Box::new(QueueItemRocksIndex::ByPath), - Box::new(QueueItemRocksIndex::ByStatus), + Box::new(QueueItemRocksIndex::ByPrefixAndStatus), Box::new(QueueItemRocksIndex::ByPrefix), ] }); @@ -232,7 +232,7 @@ rocks_table_impl!(QueueItem, QueueItemRocksTable, TableId::QueueItems, { #[derive(Hash, Clone, Debug)] pub enum QueueItemIndexKey { ByPath(String), - ByStatus(QueueItemStatus), + ByPrefixAndStatus(String, QueueItemStatus), ByPrefix(String), } @@ -242,13 +242,12 @@ impl RocksSecondaryIndex for QueueItemRocksIndex { fn typed_key_by(&self, row: &QueueItem) -> QueueItemIndexKey { match self { QueueItemRocksIndex::ByPath => QueueItemIndexKey::ByPath(row.get_path()), - QueueItemRocksIndex::ByStatus => QueueItemIndexKey::ByStatus(row.get_status().clone()), + QueueItemRocksIndex::ByPrefixAndStatus => QueueItemIndexKey::ByPrefixAndStatus( + row.get_prefix().clone().unwrap_or("".to_string()), + row.get_status().clone(), + ), QueueItemRocksIndex::ByPrefix => { - QueueItemIndexKey::ByPrefix(if let Some(prefix) = row.get_prefix() { - prefix.clone() - } else { - "".to_string() - }) + QueueItemIndexKey::ByPrefix(row.get_prefix().clone().unwrap_or("".to_string())) } } } @@ -257,8 +256,9 @@ impl RocksSecondaryIndex for QueueItemRocksIndex { match key { QueueItemIndexKey::ByPath(s) => s.as_bytes().to_vec(), QueueItemIndexKey::ByPrefix(s) => s.as_bytes().to_vec(), - QueueItemIndexKey::ByStatus(s) => { - let mut r = Vec::with_capacity(1); + QueueItemIndexKey::ByPrefixAndStatus(prefix, s) => { + let mut r = Vec::with_capacity(prefix.len() + 1); + r.extend_from_slice(&prefix.as_bytes()); match s { QueueItemStatus::Pending => r.push(0_u8), @@ -274,7 +274,7 @@ impl RocksSecondaryIndex for QueueItemRocksIndex { fn is_unique(&self) -> bool { match self { QueueItemRocksIndex::ByPath => true, - QueueItemRocksIndex::ByStatus => false, + QueueItemRocksIndex::ByPrefixAndStatus => false, QueueItemRocksIndex::ByPrefix => false, } } @@ -282,7 +282,7 @@ impl RocksSecondaryIndex for QueueItemRocksIndex { fn version(&self) -> u32 { match self { QueueItemRocksIndex::ByPath => 1, - QueueItemRocksIndex::ByStatus => 1, + QueueItemRocksIndex::ByPrefixAndStatus => 2, QueueItemRocksIndex::ByPrefix => 1, } } diff --git a/rust/cubestore/cubestore/src/metastore/rocks_table.rs b/rust/cubestore/cubestore/src/metastore/rocks_table.rs index 734e5b7f42867..c806855ea2bda 100644 --- a/rust/cubestore/cubestore/src/metastore/rocks_table.rs +++ b/rust/cubestore/cubestore/src/metastore/rocks_table.rs @@ -519,6 +519,23 @@ pub trait RocksTable: Debug + Send + Sync { Ok(existing_keys) } + fn count_rows_by_index( + &self, + row_key: &K, + secondary_index: &impl RocksSecondaryIndex, + ) -> Result { + #[cfg(debug_assertions)] + if RocksSecondaryIndex::is_unique(secondary_index) { + return Err(CubeError::internal(format!( + "Wrong usage of count_rows_by_index, called on unique index for {:?} table", + self + ))); + } + + let rows_ids = self.get_row_ids_by_index(row_key, secondary_index)?; + Ok(rows_ids.len() as u64) + } + fn get_rows_by_index( &self, row_key: &K, diff --git a/rust/cubestore/cubestore/src/sql/mod.rs b/rust/cubestore/cubestore/src/sql/mod.rs index 859969a01edf9..a034a5c01ea21 100644 --- a/rust/cubestore/cubestore/src/sql/mod.rs +++ b/rust/cubestore/cubestore/src/sql/mod.rs @@ -1167,7 +1167,8 @@ impl SqlService for SqlServiceImpl { priority, value, } => { - self.cachestore + let response = self + .cachestore .queue_add(QueueItem::new( key.value, value, @@ -1176,7 +1177,16 @@ impl SqlService for SqlServiceImpl { )) .await?; - Ok(Arc::new(DataFrame::new(vec![], vec![]))) + Ok(Arc::new(DataFrame::new( + vec![ + Column::new("added".to_string(), ColumnType::Boolean, 0), + Column::new("pending".to_string(), ColumnType::Int, 1), + ], + vec![Row::new(vec![ + TableValue::Boolean(response.added), + TableValue::Int(response.pending as i64), + ])], + ))) } CubeStoreStatement::QueueTruncate {} => { self.cachestore.queue_truncate().await?;