Skip to content

Commit

Permalink
pgmq result over panic (#129)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChuckHend authored Feb 17, 2023
1 parent d20b252 commit f44c0f9
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 117 deletions.
11 changes: 8 additions & 3 deletions crates/pgmq/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq"
version = "0.7.4"
version = "0.7.5"
edition = "2021"
authors = ["CoreDB.io"]
description = "A distributed message queue for Rust applications, on Postgres."
Expand All @@ -20,9 +20,14 @@ thiserror = "1.0.38"
tokio = { version = "1", features = ["macros"] }
log = "0.4.17"
url = "2.3.1"
regex = "1.7.1"
lazy_static = "1.4.0"

[dev-dependencies]
cargo-readme = "3.2.0"
criterion = "0.4"
rand = "0.8.5"
regex = "1.5.4"
lazy_static = "1.4.0"

[[bench]]
name = "parsing"
harness = false
26 changes: 26 additions & 0 deletions crates/pgmq/benches/parsing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};

use lazy_static::lazy_static;
use pgmq::query::check_input;
use regex::Regex;

pub fn check_regex(input: &str) {
lazy_static! {
static ref RE: Regex = Regex::new(r#"^[a-zA-Z0-9_]+$"#).unwrap();
}
if !RE.is_match(input) {
panic!("Invalid queue name: {input}")
}
}

fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("check bytes", |b| {
b.iter(|| check_input(black_box("myqueue_123_longername")))
});
c.bench_function("check regex", |b| {
b.iter(|| check_regex(black_box("myqueue_123_longername")))
});
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
5 changes: 5 additions & 0 deletions crates/pgmq/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,9 @@ pub enum PgmqError {
/// a database error
#[error("database error {0}")]
DatabaseError(#[from] sqlx::Error),

/// a queue name error
/// queue names must be alphanumeric and start with a letter
#[error("naming error: {name}")]
InvalidQueueName { name: String },
}
29 changes: 15 additions & 14 deletions crates/pgmq/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@

#![doc(html_root_url = "https://docs.rs/pgmq/")]

use errors::PgmqError;
use log::LevelFilter;
use serde::{Deserialize, Serialize};
use sqlx::error::Error;
Expand Down Expand Up @@ -233,7 +234,7 @@ impl PGMQueue {
/// }
pub async fn create(&self, queue_name: &str) -> Result<(), errors::PgmqError> {
let mut tx = self.connection.begin().await?;
let setup = query::init_queue(queue_name);
let setup = query::init_queue(queue_name)?;
for q in setup {
sqlx::query(&q).execute(&mut tx).await?;
}
Expand Down Expand Up @@ -267,7 +268,7 @@ impl PGMQueue {
/// }
pub async fn destroy(&self, queue_name: &str) -> Result<(), errors::PgmqError> {
let mut tx = self.connection.begin().await?;
let setup = query::destory_queue(queue_name);
let setup = query::destory_queue(queue_name)?;
for q in setup {
sqlx::query(&q).execute(&mut tx).await?;
}
Expand Down Expand Up @@ -333,7 +334,7 @@ impl PGMQueue {
let mut msgs: Vec<serde_json::Value> = Vec::new();
let msg = serde_json::json!(&message);
msgs.push(msg);
let row: PgRow = sqlx::query(&query::enqueue(queue_name, &msgs))
let row: PgRow = sqlx::query(&query::enqueue(queue_name, &msgs)?)
.fetch_one(&self.connection)
.await?;
let msg_id: i64 = row.get("msg_id");
Expand Down Expand Up @@ -391,7 +392,7 @@ impl PGMQueue {
let binding = serde_json::json!(&msg);
msgs.push(binding)
}
let rows: Vec<PgRow> = sqlx::query(&query::enqueue(queue_name, &msgs))
let rows: Vec<PgRow> = sqlx::query(&query::enqueue(queue_name, &msgs)?)
.fetch_all(&self.connection)
.await?;
for row in rows.iter() {
Expand Down Expand Up @@ -474,7 +475,7 @@ impl PGMQueue {
None => &VT_DEFAULT,
};
let limit = &READ_LIMIT_DEFAULT;
let query = &query::read(queue_name, vt_, limit);
let query = &query::read(queue_name, vt_, limit)?;
let message = fetch_one_message::<T>(query, &self.connection).await?;
Ok(message)
}
Expand Down Expand Up @@ -555,7 +556,7 @@ impl PGMQueue {
Some(t) => t,
None => &VT_DEFAULT,
};
let query = &query::read(queue_name, vt_, num_msgs);
let query = &query::read(queue_name, vt_, num_msgs)?;
let messages = fetch_messages::<T>(query, &self.connection).await?;
Ok(messages)
}
Expand Down Expand Up @@ -604,8 +605,8 @@ impl PGMQueue {
///
/// Ok(())
/// }
pub async fn delete(&self, queue_name: &str, msg_id: &i64) -> Result<u64, Error> {
let query = &query::delete(queue_name, msg_id);
pub async fn delete(&self, queue_name: &str, msg_id: &i64) -> Result<u64, PgmqError> {
let query = &query::delete(queue_name, msg_id)?;
let row = sqlx::query(query).execute(&self.connection).await?;
let num_deleted = row.rows_affected();
Ok(num_deleted)
Expand Down Expand Up @@ -653,8 +654,8 @@ impl PGMQueue {
///
/// Ok(())
/// }
pub async fn delete_batch(&self, queue_name: &str, msg_ids: &[i64]) -> Result<u64, Error> {
let query = &query::delete_batch(queue_name, msg_ids);
pub async fn delete_batch(&self, queue_name: &str, msg_ids: &[i64]) -> Result<u64, PgmqError> {
let query = &query::delete_batch(queue_name, msg_ids)?;
let row = sqlx::query(query).execute(&self.connection).await?;
let num_deleted = row.rows_affected();
Ok(num_deleted)
Expand Down Expand Up @@ -696,8 +697,8 @@ impl PGMQueue {
///
/// Ok(())
/// }
pub async fn archive(&self, queue_name: &str, msg_id: &i64) -> Result<u64, Error> {
let query = query::archive(queue_name, msg_id);
pub async fn archive(&self, queue_name: &str, msg_id: &i64) -> Result<u64, PgmqError> {
let query = query::archive(queue_name, msg_id)?;
let row = sqlx::query(&query).execute(&self.connection).await?;
let num_deleted = row.rows_affected();
Ok(num_deleted)
Expand Down Expand Up @@ -748,8 +749,8 @@ impl PGMQueue {
pub async fn pop<T: for<'de> Deserialize<'de>>(
&self,
queue_name: &str,
) -> Result<Option<Message<T>>, errors::PgmqError> {
let query = &query::pop(queue_name);
) -> Result<Option<Message<T>>, PgmqError> {
let query = &query::pop(queue_name)?;
let message = fetch_one_message::<T>(query, &self.connection).await?;
Ok(message)
}
Expand Down
Loading

0 comments on commit f44c0f9

Please sign in to comment.