Skip to content

Commit

Permalink
feat(cubestore): Initial queue support
Browse files Browse the repository at this point in the history
  • Loading branch information
ovr committed Jan 16, 2023
1 parent 2d12eff commit 90e174f
Show file tree
Hide file tree
Showing 18 changed files with 1,940 additions and 109 deletions.
185 changes: 185 additions & 0 deletions rust/cubestore/cubestore-sql-tests/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::join;

pub type TestFn = Box<
dyn Fn(Box<dyn SqlClient>) -> Pin<Box<dyn Future<Output = ()> + Send>>
Expand Down Expand Up @@ -226,6 +227,7 @@ pub fn sql_tests() -> Vec<(&'static str, TestFn)> {
t("cache_compaction", cache_compaction),
t("cache_set_nx", cache_set_nx),
t("cache_prefix_keys", cache_prefix_keys),
t("queue_full_workflow", queue_full_workflow),
];

fn t<F>(name: &'static str, f: fn(Box<dyn SqlClient>) -> F) -> (&'static str, TestFn)
Expand Down Expand Up @@ -6411,6 +6413,189 @@ async fn cache_prefix_keys(service: Box<dyn SqlClient>) {
);
}

async fn queue_full_workflow(service: Box<dyn SqlClient>) {
service
.exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:1" "payload1";"#)
.await
.unwrap();

service
.exec_query(r#"QUEUE ADD PRIORITY 10 "STANDALONE#queue:2" "payload2";"#)
.await
.unwrap();

service
.exec_query(r#"QUEUE ADD PRIORITY 100 "STANDALONE#queue:3" "payload3";"#)
.await
.unwrap();

service
.exec_query(r#"QUEUE ADD PRIORITY 50 "STANDALONE#queue:4" "payload3";"#)
.await
.unwrap();

{
let pending_response = service
.exec_query(r#"QUEUE PENDING "STANDALONE#queue""#)
.await
.unwrap();
assert_eq!(
pending_response.get_columns(),
&vec![
Column::new("id".to_string(), ColumnType::String, 0),
Column::new("status".to_string(), ColumnType::String, 1),
Column::new("extra".to_string(), ColumnType::String, 2),
]
);
assert_eq!(
pending_response.get_rows(),
&vec![
Row::new(vec![
TableValue::String("3".to_string()),
TableValue::String("pending".to_string()),
TableValue::Null
]),
Row::new(vec![
TableValue::String("4".to_string()),
TableValue::String("pending".to_string()),
TableValue::Null
]),
Row::new(vec![
TableValue::String("2".to_string()),
TableValue::String("pending".to_string()),
TableValue::Null
]),
Row::new(vec![
TableValue::String("1".to_string()),
TableValue::String("pending".to_string()),
TableValue::Null
]),
]
);
}

{
let active_response = service
.exec_query(r#"QUEUE ACTIVE "STANDALONE#queue""#)
.await
.unwrap();
assert_eq!(active_response.get_rows().len(), 0);
}

{
let retrieve_response = service
.exec_query(r#"QUEUE RETRIEVE CONCURRENCY 2 "STANDALONE#queue:3""#)
.await
.unwrap();
assert_eq!(
retrieve_response.get_columns(),
&vec![
Column::new("payload".to_string(), ColumnType::String, 0),
Column::new("extra".to_string(), ColumnType::String, 1),
]
);
assert_eq!(
retrieve_response.get_rows(),
&vec![Row::new(vec![
TableValue::String("payload3".to_string()),
TableValue::Null,
]),]
);
}

{
let active_response = service
.exec_query(r#"QUEUE ACTIVE "STANDALONE#queue""#)
.await
.unwrap();
assert_eq!(
active_response.get_rows(),
&vec![Row::new(vec![
TableValue::String("3".to_string()),
TableValue::String("active".to_string()),
TableValue::Null
]),]
);
}

let service = Arc::new(service);

{
let service_to_move = service.clone();
let blocking = async move {
service_to_move
.exec_query(r#"QUEUE RESULT_BLOCKING 5000 "STANDALONE#queue:3""#)
.await
.unwrap()
};

let service_to_move = service.clone();
let ack = async move {
tokio::time::sleep(Duration::from_millis(1000)).await;

service_to_move
.exec_query(r#"QUEUE ACK "STANDALONE#queue:3" "result:3""#)
.await
.unwrap()
};

let (blocking_res, _ack_res) = join!(blocking, ack);
assert_eq!(
blocking_res.get_rows(),
&vec![Row::new(vec![
TableValue::String("result:3".to_string()),
TableValue::String("success".to_string())
]),]
);
}

// previous job was finished
{
let active_response = service
.exec_query(r#"QUEUE ACTIVE "STANDALONE#queue""#)
.await
.unwrap();
assert_eq!(active_response.get_rows().len(), 0);
}

// get
{
let get_response = service
.exec_query(r#"QUEUE GET "STANDALONE#queue:2""#)
.await
.unwrap();
assert_eq!(
get_response.get_rows(),
&vec![Row::new(vec![
TableValue::String("payload2".to_string()),
TableValue::Null
]),]
);
}

// cancel job
{
let cancel_response = service
.exec_query(r#"QUEUE CANCEL "STANDALONE#queue:2""#)
.await
.unwrap();
assert_eq!(
cancel_response.get_rows(),
&vec![Row::new(vec![
TableValue::String("payload2".to_string()),
TableValue::Null
]),]
);

// assertion that job was removed
let get_response = service
.exec_query(r#"QUEUE GET "STANDALONE#queue:2""#)
.await
.unwrap();
assert_eq!(get_response.get_rows().len(), 0);
}
}

pub fn to_rows(d: &DataFrame) -> Vec<Vec<TableValue>> {
return d
.get_rows()
Expand Down
4 changes: 2 additions & 2 deletions rust/cubestore/cubestore/src/cachestore/cache_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ impl RocksSecondaryIndex<CacheItem, CacheItemIndexKey> for CacheItemRocksIndex {
true
}

fn get_expire<'a>(&self, row: &'a CacheItem) -> &'a Option<DateTime<Utc>> {
row.get_expire()
fn get_expire(&self, row: &CacheItem) -> Option<DateTime<Utc>> {
row.get_expire().clone()
}

fn version(&self) -> u32 {
Expand Down
Loading

0 comments on commit 90e174f

Please sign in to comment.