Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pgmq crate in pgmq_ext #76

Merged
merged 12 commits into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/pgx-init/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ inputs:
pgx_version:
description: 'The version of pgx to use'
required: false
default: "0.6.1"
default: "0.7.1"
outputs: {}
runs:
using: "composite"
Expand Down
2 changes: 1 addition & 1 deletion crates/pgmq/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn main() {
let msg_id2: i64 = queue.enqueue(&myqueue, &msg2).await.expect("Failed to enqueue message");

// READ A MESSAGE as `serde_json::Value`
let vt: u32 = 30;
let vt: i32 = 30;
let read_msg1: Message<Value> = queue.read::<Value>(&myqueue, Some(&vt)).await.unwrap().expect("no messages in the queue!");
assert_eq!(read_msg1.msg_id, msg_id1);

Expand Down
2 changes: 1 addition & 1 deletion crates/pgmq/examples/basic/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async fn main() {
.expect("Failed to enqueue message");

// READ A MESSAGE as `serde_json::Value`
let vt: u32 = 30;
let vt: i32 = 30;
let read_msg1: Message<Value> = queue
.read::<Value>(&myqueue, Some(&vt))
.await
Expand Down
8 changes: 4 additions & 4 deletions crates/pgmq/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
//! let msg_id2: i64 = queue.enqueue(&myqueue, &msg2).await.expect("Failed to enqueue message");
//!
//! // READ A MESSAGE as `serde_json::Value`
//! let vt: u32 = 30;
//! let vt: i32 = 30;
//! let read_msg1: Message<Value> = queue.read::<Value>(&myqueue, Some(&vt)).await.unwrap().expect("no messages in the queue!");
//! assert_eq!(read_msg1.msg_id, msg_id1);
//!
Expand Down Expand Up @@ -91,10 +91,10 @@ use sqlx::FromRow;
use sqlx::{Pool, Postgres, Row};

pub mod errors;
mod query;
pub mod query;
use chrono::serde::ts_seconds::deserialize as from_ts;

const VT_DEFAULT: u32 = 30;
const VT_DEFAULT: i32 = 30;

#[derive(Debug, Deserialize, FromRow)]
pub struct Message<T = serde_json::Value> {
Expand Down Expand Up @@ -157,7 +157,7 @@ impl PGMQueue {
pub async fn read<T: for<'de> Deserialize<'de>>(
&self,
queue_name: &str,
vt: Option<&u32>,
vt: Option<&i32>,
) -> Result<Option<Message<T>>, errors::PgmqError> {
// map vt or default VT
let vt_ = match vt {
Expand Down
16 changes: 13 additions & 3 deletions crates/pgmq/src/query.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const TABLE_PREFIX: &str = r#"pgmq"#;
pub const TABLE_PREFIX: &str = r#"pgmq"#;

pub fn create(name: &str) -> String {
format!(
Expand Down Expand Up @@ -30,8 +30,18 @@ pub fn enqueue(name: &str, message: &serde_json::Value) -> String {
"
)
}
pub fn enqueue_str(name: &str, message: &str) -> String {
// TOOO: vt should be now() + delay
format!(
"
INSERT INTO {TABLE_PREFIX}_{name} (vt, message)
VALUES (now() at time zone 'utc', '{message}'::json)
RETURNING msg_id;
"
)
}

pub fn read(name: &str, vt: &u32) -> String {
pub fn read(name: &str, vt: &i32) -> String {
format!(
"
WITH cte AS
Expand Down Expand Up @@ -102,7 +112,7 @@ mod tests {
#[test]
fn test_read() {
let qname = "myqueue";
let vt: u32 = 20;
let vt: i32 = 20;

let query = read(&qname, &vt);

Expand Down
16 changes: 8 additions & 8 deletions crates/pgmq/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ async fn test_fifo() {
let msg_id3 = queue.enqueue(&test_queue, &msg).await.unwrap();
assert_eq!(msg_id3, 3);

let vt: u32 = 1;
let vt: i32 = 1;
// READ FIRST TWO MESSAGES
let read1 = queue
.read::<Value>(&test_queue, Some(&vt))
Expand Down Expand Up @@ -187,7 +187,7 @@ async fn test_serde() {
assert_eq!(msg1, 1);

let msg_read = queue
.read::<MyMessage>(&test_queue, Some(&30_u32))
.read::<MyMessage>(&test_queue, Some(&30_i32))
.await
.unwrap()
.unwrap();
Expand All @@ -204,7 +204,7 @@ async fn test_serde() {
assert_eq!(msg2, 2);

let msg_read = queue
.read::<Value>(&test_queue, Some(&30_u32))
.read::<Value>(&test_queue, Some(&30_i32))
.await
.unwrap()
.unwrap();
Expand All @@ -223,7 +223,7 @@ async fn test_serde() {
assert_eq!(msg3, 3);

let msg_read = queue
.read::<MyMessage>(&test_queue, Some(&30_u32))
.read::<MyMessage>(&test_queue, Some(&30_i32))
.await
.unwrap()
.unwrap();
Expand All @@ -239,7 +239,7 @@ async fn test_serde() {
let msg4 = queue.enqueue(&test_queue, &msg).await.unwrap();
assert_eq!(msg4, 4);
let msg_read = queue
.read::<Value>(&test_queue, Some(&30_u32))
.read::<Value>(&test_queue, Some(&30_i32))
.await
.unwrap()
.unwrap();
Expand All @@ -257,7 +257,7 @@ async fn test_serde() {
let msg5 = queue.enqueue(&test_queue, &msg).await.unwrap();
assert_eq!(msg5, 5);
let msg_read: crate::pgmq::Message = queue
.read(&test_queue, Some(&30_u32)) // no turbofish on this line
.read(&test_queue, Some(&30_i32)) // no turbofish on this line
.await
.unwrap()
.unwrap();
Expand Down Expand Up @@ -295,7 +295,7 @@ async fn test_database_error_modes() {
assert!(msg_id.is_err());

// read from a queue that does not exist should error
let read_msg = queue.read::<Message>("doesNotExist", Some(&10_u32)).await;
let read_msg = queue.read::<Message>("doesNotExist", Some(&10_i32)).await;
assert!(read_msg.is_err());

// connect to a postgres instance that doesnt exist should error
Expand Down Expand Up @@ -324,7 +324,7 @@ async fn test_parsing_error_modes() {
let _ = queue.enqueue(&test_queue, &msg).await.unwrap();

// we sent MyMessage, so trying to parse into YoloMessage should error
let read_msg = queue.read::<YoloMessage>(&test_queue, Some(&10_u32)).await;
let read_msg = queue.read::<YoloMessage>(&test_queue, Some(&10_i32)).await;

// we expect a parse error
match read_msg {
Expand Down
3 changes: 0 additions & 3 deletions extensions/pgmq/.cargo/config.toml

This file was deleted.

7 changes: 4 additions & 3 deletions extensions/pgmq/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq"
version = "0.0.0"
version = "0.0.1"
edition = "2021"

[lib]
Expand All @@ -16,12 +16,13 @@ pg15 = ["pgx/pg15", "pgx-tests/pg15" ]
pg_test = []

[dependencies]
pgx = "=0.6.1"
pgx = "0.7.1"
serde = "1.0.152"
pgmq = { path = "../../crates/pgmq", version = "0.2.0" }
serde_json = "1.0.91"

[dev-dependencies]
pgx-tests = "=0.6.1"
pgx-tests = "0.7.1"

[profile.dev]
panic = "unwind"
Expand Down
2 changes: 1 addition & 1 deletion extensions/pgmq/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Then, clone this repo and change into this directory.

```
git clone git@github.com:CoreDB-io/coredb.git
cd coredb/extenesions/pgmq/
cd coredb/extensions/pgmq/
```

Run the dev environment
Expand Down
Loading