Skip to content

Commit

Permalink
use set of pop/set_vt (#223)
Browse files Browse the repository at this point in the history
* use set of pop/set_vt

* add sql migration

* add pop and set_vt tests
  • Loading branch information
ChuckHend authored May 23, 2024
1 parent 4644699 commit 6f3e1e5
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 8 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq"
version = "1.2.0"
version = "1.2.1"
edition = "2021"
authors = ["Tembo.io"]
description = "Postgres extension for PGMQ"
Expand Down
47 changes: 47 additions & 0 deletions sql/pgmq--1.2.0--1.2.1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
DROP FUNCTION pgmq.set_vt(text, bigint, integer);
CREATE FUNCTION pgmq.set_vt(queue_name TEXT, msg_id BIGINT, vt INTEGER)
RETURNS SETOF pgmq.message_record AS $$
DECLARE
sql TEXT;
result pgmq.message_record;
BEGIN
sql := FORMAT(
$QUERY$
UPDATE pgmq.q_%s
SET vt = (now() + interval '%s seconds')
WHERE msg_id = %s
RETURNING *;
$QUERY$,
queue_name, vt, msg_id
);
RETURN QUERY EXECUTE sql;
END;
$$ LANGUAGE plpgsql;

DROP FUNCTION pgmq.pop(text);
CREATE FUNCTION pgmq.pop(queue_name TEXT)
RETURNS SETOF pgmq.message_record AS $$
DECLARE
sql TEXT;
result pgmq.message_record;
BEGIN
sql := FORMAT(
$QUERY$
WITH cte AS
(
SELECT msg_id
FROM pgmq.q_%s
WHERE vt <= now()
ORDER BY msg_id ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
)
DELETE from pgmq.q_%s
WHERE msg_id = (select msg_id from cte)
RETURNING *;
$QUERY$,
queue_name, queue_name
);
RETURN QUERY EXECUTE sql;
END;
$$ LANGUAGE plpgsql;
10 changes: 4 additions & 6 deletions src/sql_src.sql
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ $$ LANGUAGE plpgsql;

-- pop a single message
CREATE FUNCTION pgmq.pop(queue_name TEXT)
RETURNS pgmq.message_record AS $$
RETURNS SETOF pgmq.message_record AS $$
DECLARE
sql TEXT;
result pgmq.message_record;
Expand All @@ -397,14 +397,13 @@ BEGIN
$QUERY$,
queue_name, queue_name
);
EXECUTE sql INTO result;
RETURN result;
RETURN QUERY EXECUTE sql;
END;
$$ LANGUAGE plpgsql;

-- Sets vt of a message, returns it
CREATE FUNCTION pgmq.set_vt(queue_name TEXT, msg_id BIGINT, vt INTEGER)
RETURNS pgmq.message_record AS $$
RETURNS SETOF pgmq.message_record AS $$
DECLARE
sql TEXT;
result pgmq.message_record;
Expand All @@ -418,8 +417,7 @@ BEGIN
$QUERY$,
queue_name, vt, msg_id
);
EXECUTE sql INTO result;
RETURN result;
RETURN QUERY EXECUTE sql;
END;
$$ LANGUAGE plpgsql;

Expand Down
61 changes: 61 additions & 0 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,12 @@ async fn test_pop() {
let queue_name = format!("test_pop_{test_num}");
create_queue(&queue_name.to_string(), &conn).await;

let none_popped = sqlx::query(&format!("SELECT * FROM {PGMQ_SCHEMA}.pop('{queue_name}')"))
.fetch_optional(&conn)
.await
.expect("failed to get pop result");
assert!(none_popped.is_none());

let first_msg_id = send_sample_message(&queue_name, &conn).await;
send_sample_message(&queue_name, &conn).await;
send_sample_message(&queue_name, &conn).await;
Expand All @@ -485,6 +491,61 @@ async fn test_pop() {
assert_eq!(get_queue_size(&queue_name, &conn).await, 2);
}

// Integration tests are ignored by default
#[ignore]
#[tokio::test]
async fn test_set_vt() {
let conn = init_database().await;
let mut rng = rand::thread_rng();
let test_num = rng.gen_range(0..100000);

let queue_name = format!("test_set_vt_{test_num}");
create_queue(&queue_name.to_string(), &conn).await;

// set a non-existing message must return no records
let none_set = sqlx::query(&format!(
"SELECT * FROM {PGMQ_SCHEMA}.set_vt('{queue_name}', 9999, 0)"
))
.fetch_optional(&conn)
.await
.expect("failed to execute set command");
assert!(none_set.is_none());

let first_msg_id = send_sample_message(&queue_name, &conn).await;

// set message invisible for 100 seconds
let set_first_id = sqlx::query(&format!(
"SELECT * FROM {PGMQ_SCHEMA}.set_vt('{queue_name}', {first_msg_id}, 100)"
))
.fetch_one(&conn)
.await
.expect("failed to execute set command");
let set_first_id = set_first_id.get::<i64, usize>(0);
assert_eq!(set_first_id, first_msg_id);

// read message, it should not be visible
let query = &format!("SELECT * from {PGMQ_SCHEMA}.read('{queue_name}', 1, 1);");
let none_message = fetch_one_message::<serde_json::Value>(query, &conn)
.await
.expect("failed reading message");
assert!(none_message.is_none());

// make it visible
let _set_first_id = sqlx::query(&format!(
"SELECT * FROM {PGMQ_SCHEMA}.set_vt('{queue_name}', {first_msg_id}, 0)"
))
.fetch_one(&conn)
.await
.expect("failed to execute set command");

// set vt works if message is readable
let some_message = fetch_one_message::<serde_json::Value>(query, &conn)
.await
.expect("failed reading message")
.expect("no messages returned");
assert_eq!(some_message.msg_id, first_msg_id);
}

// Integration tests are ignored by default
#[ignore]
#[tokio::test]
Expand Down

0 comments on commit 6f3e1e5

Please sign in to comment.