Skip to content

Commit

Permalink
chore: wip
Browse files Browse the repository at this point in the history
  • Loading branch information
ovr committed Jan 18, 2023
1 parent c201def commit a25898b
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 13 deletions.
11 changes: 2 additions & 9 deletions packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
this.prefixKey(this.redisHash(queryKey)),
JSON.stringify(data)
]);
if (rows.length > 0) {
if (rows && rows.length) {
return [
rows[0].added === 'true' ? 1 : 0,
null,
Expand All @@ -67,14 +67,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
];
}

// TODO: Throw error in near time
return [
1,
null,
null,
1,
data.addedToQueueTime
];
throw new Error('Empty response on QUEUE ADD');
}

// TODO: Looks useless, because we can do it in one step - getQueriesToCancel
Expand Down
4 changes: 2 additions & 2 deletions rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ impl RocksCacheStore {

#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
pub struct QueueAddResponse {
added: bool,
pending: u64,
pub added: bool,
pub pending: u64,
}

#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
Expand Down
14 changes: 12 additions & 2 deletions rust/cubestore/cubestore/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,8 @@ impl SqlService for SqlServiceImpl {
priority,
value,
} => {
self.cachestore
let response = self
.cachestore
.queue_add(QueueItem::new(
key.value,
value,
Expand All @@ -1176,7 +1177,16 @@ impl SqlService for SqlServiceImpl {
))
.await?;

Ok(Arc::new(DataFrame::new(vec![], vec![])))
Ok(Arc::new(DataFrame::new(
vec![
Column::new("added".to_string(), ColumnType::Boolean, 0),
Column::new("pending".to_string(), ColumnType::Int, 1),
],
vec![Row::new(vec![
TableValue::Boolean(response.added),
TableValue::Int(response.pending as i64),
])],
)))
}
CubeStoreStatement::QueueTruncate {} => {
self.cachestore.queue_truncate().await?;
Expand Down

0 comments on commit a25898b

Please sign in to comment.