Skip to content

Commit

Permalink
chore(cubestore): Improve queue
Browse files Browse the repository at this point in the history
  • Loading branch information
ovr committed Jan 17, 2023
1 parent f4744bf commit 38e8eda
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 42 deletions.
12 changes: 11 additions & 1 deletion packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,22 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
addedToQueueTime: new Date().getTime()
};

const _rows = await this.driver.query('QUEUE ADD PRIORITY ? ? ?', [
const rows = await this.driver.query('QUEUE ADD PRIORITY ? ? ?', [
priority,
this.prefixKey(this.redisHash(queryKey)),
JSON.stringify(data)
]);
if (rows.length > 0) {
return [
rows[0].added === 'true' ? 1 : 0,
null,
null,
parseInt(rows[0].pending, 10),
data.addedToQueueTime
];
}

// TODO: Throw error in near time
return [
1,
null,
Expand Down
68 changes: 42 additions & 26 deletions rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,23 @@ impl RocksCacheStore {
})
.await
}

fn queue_count_by_prefix_and_status(
db_ref: DbTableRef,
prefix: &Option<String>,
status: QueueItemStatus,
) -> Result<u64, CubeError> {
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
let index_key =
QueueItemIndexKey::ByPrefixAndStatus(prefix.clone().unwrap_or("".to_string()), status);
queue_schema.count_rows_by_index(&index_key, &QueueItemRocksIndex::ByPrefixAndStatus)
}
}

#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
pub struct QueueAddResponse {
added: bool,
pending: u64,
}

#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
Expand All @@ -285,7 +302,7 @@ pub trait CacheStore: DIService + Send + Sync {

// queue
async fn queue_all(&self) -> Result<Vec<IdRow<QueueItem>>, CubeError>;
async fn queue_add(&self, item: QueueItem) -> Result<bool, CubeError>;
async fn queue_add(&self, item: QueueItem) -> Result<QueueAddResponse, CubeError>;
async fn queue_truncate(&self) -> Result<(), CubeError>;
async fn queue_get(&self, key: String) -> Result<Option<IdRow<QueueItem>>, CubeError>;
async fn queue_to_cancel(
Expand Down Expand Up @@ -453,19 +470,32 @@ impl CacheStore for RocksCacheStore {
.await
}

async fn queue_add(&self, item: QueueItem) -> Result<bool, CubeError> {
async fn queue_add(&self, item: QueueItem) -> Result<QueueAddResponse, CubeError> {
self.store
.write_operation(move |db_ref, batch_pipe| {
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
let index_key = QueueItemIndexKey::ByPath(item.get_path());
let id_row_opt = queue_schema
.get_single_opt_row_by_index(&index_key, &QueueItemRocksIndex::ByPath)?;

if id_row_opt.is_none() {
let pending = Self::queue_count_by_prefix_and_status(
db_ref,
item.get_prefix(),
QueueItemStatus::Pending,
)?;

let added = if id_row_opt.is_none() {
queue_schema.insert(item, batch_pipe)?;
}

Ok(true)
true
} else {
false
};

Ok(QueueAddResponse {
added,
pending: if added { pending + 1 } else { pending },
})
})
.await
}
Expand Down Expand Up @@ -629,26 +659,12 @@ impl CacheStore for RocksCacheStore {

if let Some(id_row) = id_row_opt {
if id_row.get_row().get_status() == &QueueItemStatus::Pending {
// TODO: Introduce count + Active index?
let index_key = QueueItemIndexKey::ByPrefix(
if let Some(prefix) = id_row.get_row().get_prefix() {
prefix.clone()
} else {
"".to_string()
},
);
let in_queue = queue_schema
.get_rows_by_index(&index_key, &QueueItemRocksIndex::ByPrefix)?;

let mut current_active = 0;

for item in in_queue {
if item.get_row().get_status() == &QueueItemStatus::Active {
current_active += 1;
}
}

if current_active >= allow_concurrency {
let current_active = Self::queue_count_by_prefix_and_status(
db_ref,
id_row.get_row().get_prefix(),
QueueItemStatus::Active,
)?;
if current_active >= (allow_concurrency as u64) {
return Ok(None);
}

Expand Down Expand Up @@ -835,7 +851,7 @@ impl CacheStore for ClusterCacheStoreClient {
panic!("CacheStore cannot be used on the worker node! queue_all was used.")
}

async fn queue_add(&self, _item: QueueItem) -> Result<bool, CubeError> {
async fn queue_add(&self, _item: QueueItem) -> Result<QueueAddResponse, CubeError> {
panic!("CacheStore cannot be used on the worker node! queue_add was used.")
}

Expand Down
1 change: 0 additions & 1 deletion rust/cubestore/cubestore/src/cachestore/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ impl MetaStoreCacheCompactionFilter {
}
}

#[cfg(debug_assertions)]
impl Drop for MetaStoreCacheCompactionFilter {
fn drop(&mut self) {
let elapsed = Utc::now() - self.current;
Expand Down
3 changes: 2 additions & 1 deletion rust/cubestore/cubestore/src/cachestore/lazy.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::cachestore::cache_rocksstore::QueueAddResponse;
use crate::cachestore::{
CacheItem, CacheStore, QueueItem, QueueItemStatus, QueueResultResponse, RocksCacheStore,
};
Expand Down Expand Up @@ -203,7 +204,7 @@ impl CacheStore for LazyRocksCacheStore {
self.init().await?.queue_all().await
}

async fn queue_add(&self, item: QueueItem) -> Result<bool, CubeError> {
async fn queue_add(&self, item: QueueItem) -> Result<QueueAddResponse, CubeError> {
self.init().await?.queue_add(item).await
}

Expand Down
26 changes: 13 additions & 13 deletions rust/cubestore/cubestore/src/cachestore/queue_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,22 +217,22 @@ impl QueueItem {
#[derive(Clone, Copy, Debug)]
pub(crate) enum QueueItemRocksIndex {
ByPath = 1,
ByStatus = 2,
ByPrefixAndStatus = 2,
ByPrefix = 3,
}

rocks_table_impl!(QueueItem, QueueItemRocksTable, TableId::QueueItems, {
vec![
Box::new(QueueItemRocksIndex::ByPath),
Box::new(QueueItemRocksIndex::ByStatus),
Box::new(QueueItemRocksIndex::ByPrefixAndStatus),
Box::new(QueueItemRocksIndex::ByPrefix),
]
});

#[derive(Hash, Clone, Debug)]
pub enum QueueItemIndexKey {
ByPath(String),
ByStatus(QueueItemStatus),
ByPrefixAndStatus(String, QueueItemStatus),
ByPrefix(String),
}

Expand All @@ -242,13 +242,12 @@ impl RocksSecondaryIndex<QueueItem, QueueItemIndexKey> for QueueItemRocksIndex {
fn typed_key_by(&self, row: &QueueItem) -> QueueItemIndexKey {
match self {
QueueItemRocksIndex::ByPath => QueueItemIndexKey::ByPath(row.get_path()),
QueueItemRocksIndex::ByStatus => QueueItemIndexKey::ByStatus(row.get_status().clone()),
QueueItemRocksIndex::ByPrefixAndStatus => QueueItemIndexKey::ByPrefixAndStatus(
row.get_prefix().clone().unwrap_or("".to_string()),
row.get_status().clone(),
),
QueueItemRocksIndex::ByPrefix => {
QueueItemIndexKey::ByPrefix(if let Some(prefix) = row.get_prefix() {
prefix.clone()
} else {
"".to_string()
})
QueueItemIndexKey::ByPrefix(row.get_prefix().clone().unwrap_or("".to_string()))
}
}
}
Expand All @@ -257,8 +256,9 @@ impl RocksSecondaryIndex<QueueItem, QueueItemIndexKey> for QueueItemRocksIndex {
match key {
QueueItemIndexKey::ByPath(s) => s.as_bytes().to_vec(),
QueueItemIndexKey::ByPrefix(s) => s.as_bytes().to_vec(),
QueueItemIndexKey::ByStatus(s) => {
let mut r = Vec::with_capacity(1);
QueueItemIndexKey::ByPrefixAndStatus(prefix, s) => {
let mut r = Vec::with_capacity(prefix.len() + 1);
r.extend_from_slice(&prefix.as_bytes());

match s {
QueueItemStatus::Pending => r.push(0_u8),
Expand All @@ -274,15 +274,15 @@ impl RocksSecondaryIndex<QueueItem, QueueItemIndexKey> for QueueItemRocksIndex {
fn is_unique(&self) -> bool {
match self {
QueueItemRocksIndex::ByPath => true,
QueueItemRocksIndex::ByStatus => false,
QueueItemRocksIndex::ByPrefixAndStatus => false,
QueueItemRocksIndex::ByPrefix => false,
}
}

fn version(&self) -> u32 {
match self {
QueueItemRocksIndex::ByPath => 1,
QueueItemRocksIndex::ByStatus => 1,
QueueItemRocksIndex::ByPrefixAndStatus => 2,
QueueItemRocksIndex::ByPrefix => 1,
}
}
Expand Down
17 changes: 17 additions & 0 deletions rust/cubestore/cubestore/src/metastore/rocks_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,23 @@ pub trait RocksTable: Debug + Send + Sync {
Ok(existing_keys)
}

fn count_rows_by_index<K: Debug + Hash>(
&self,
row_key: &K,
secondary_index: &impl RocksSecondaryIndex<Self::T, K>,
) -> Result<u64, CubeError> {
#[cfg(debug_assertions)]
if RocksSecondaryIndex::is_unique(secondary_index) {
return Err(CubeError::internal(format!(
"Wrong usage of count_rows_by_index, called on unique index for {:?} table",
self
)));
}

let rows_ids = self.get_row_ids_by_index(row_key, secondary_index)?;
Ok(rows_ids.len() as u64)
}

fn get_rows_by_index<K: Debug>(
&self,
row_key: &K,
Expand Down

0 comments on commit 38e8eda

Please sign in to comment.