Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pgmq result over panic #129

Merged
merged 7 commits into from
Feb 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions crates/pgmq/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq"
version = "0.7.4"
version = "0.7.5"
edition = "2021"
authors = ["CoreDB.io"]
description = "A distributed message queue for Rust applications, on Postgres."
Expand All @@ -20,9 +20,14 @@ thiserror = "1.0.38"
tokio = { version = "1", features = ["macros"] }
log = "0.4.17"
url = "2.3.1"
regex = "1.7.1"
lazy_static = "1.4.0"

[dev-dependencies]
cargo-readme = "3.2.0"
criterion = "0.4"
rand = "0.8.5"
regex = "1.5.4"
lazy_static = "1.4.0"

[[bench]]
name = "parsing"
harness = false
26 changes: 26 additions & 0 deletions crates/pgmq/benches/parsing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};

use lazy_static::lazy_static;
use pgmq::query::check_input;
use regex::Regex;

pub fn check_regex(input: &str) {
lazy_static! {
static ref RE: Regex = Regex::new(r#"^[a-zA-Z0-9_]+$"#).unwrap();
}
if !RE.is_match(input) {
panic!("Invalid queue name: {input}")
}
}

fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("check bytes", |b| {
b.iter(|| check_input(black_box("myqueue_123_longername")))
});
c.bench_function("check regex", |b| {
b.iter(|| check_regex(black_box("myqueue_123_longername")))
});
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
5 changes: 5 additions & 0 deletions crates/pgmq/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,9 @@ pub enum PgmqError {
/// a database error
#[error("database error {0}")]
DatabaseError(#[from] sqlx::Error),

/// a queue name error
/// queue names must be alphanumeric and start with a letter
#[error("naming error: {name}")]
InvalidQueueName { name: String },
}
29 changes: 15 additions & 14 deletions crates/pgmq/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@

#![doc(html_root_url = "https://docs.rs/pgmq/")]

use errors::PgmqError;
use log::LevelFilter;
use serde::{Deserialize, Serialize};
use sqlx::error::Error;
Expand Down Expand Up @@ -233,7 +234,7 @@ impl PGMQueue {
/// }
pub async fn create(&self, queue_name: &str) -> Result<(), errors::PgmqError> {
let mut tx = self.connection.begin().await?;
let setup = query::init_queue(queue_name);
let setup = query::init_queue(queue_name)?;
for q in setup {
sqlx::query(&q).execute(&mut tx).await?;
}
Expand Down Expand Up @@ -267,7 +268,7 @@ impl PGMQueue {
/// }
pub async fn destroy(&self, queue_name: &str) -> Result<(), errors::PgmqError> {
let mut tx = self.connection.begin().await?;
let setup = query::destory_queue(queue_name);
let setup = query::destory_queue(queue_name)?;
for q in setup {
sqlx::query(&q).execute(&mut tx).await?;
}
Expand Down Expand Up @@ -333,7 +334,7 @@ impl PGMQueue {
let mut msgs: Vec<serde_json::Value> = Vec::new();
let msg = serde_json::json!(&message);
msgs.push(msg);
let row: PgRow = sqlx::query(&query::enqueue(queue_name, &msgs))
let row: PgRow = sqlx::query(&query::enqueue(queue_name, &msgs)?)
.fetch_one(&self.connection)
.await?;
let msg_id: i64 = row.get("msg_id");
Expand Down Expand Up @@ -391,7 +392,7 @@ impl PGMQueue {
let binding = serde_json::json!(&msg);
msgs.push(binding)
}
let rows: Vec<PgRow> = sqlx::query(&query::enqueue(queue_name, &msgs))
let rows: Vec<PgRow> = sqlx::query(&query::enqueue(queue_name, &msgs)?)
.fetch_all(&self.connection)
.await?;
for row in rows.iter() {
Expand Down Expand Up @@ -474,7 +475,7 @@ impl PGMQueue {
None => &VT_DEFAULT,
};
let limit = &READ_LIMIT_DEFAULT;
let query = &query::read(queue_name, vt_, limit);
let query = &query::read(queue_name, vt_, limit)?;
let message = fetch_one_message::<T>(query, &self.connection).await?;
Ok(message)
}
Expand Down Expand Up @@ -555,7 +556,7 @@ impl PGMQueue {
Some(t) => t,
None => &VT_DEFAULT,
};
let query = &query::read(queue_name, vt_, num_msgs);
let query = &query::read(queue_name, vt_, num_msgs)?;
let messages = fetch_messages::<T>(query, &self.connection).await?;
Ok(messages)
}
Expand Down Expand Up @@ -604,8 +605,8 @@ impl PGMQueue {
///
/// Ok(())
/// }
pub async fn delete(&self, queue_name: &str, msg_id: &i64) -> Result<u64, Error> {
let query = &query::delete(queue_name, msg_id);
pub async fn delete(&self, queue_name: &str, msg_id: &i64) -> Result<u64, PgmqError> {
let query = &query::delete(queue_name, msg_id)?;
let row = sqlx::query(query).execute(&self.connection).await?;
let num_deleted = row.rows_affected();
Ok(num_deleted)
Expand Down Expand Up @@ -653,8 +654,8 @@ impl PGMQueue {
///
/// Ok(())
/// }
pub async fn delete_batch(&self, queue_name: &str, msg_ids: &[i64]) -> Result<u64, Error> {
let query = &query::delete_batch(queue_name, msg_ids);
pub async fn delete_batch(&self, queue_name: &str, msg_ids: &[i64]) -> Result<u64, PgmqError> {
let query = &query::delete_batch(queue_name, msg_ids)?;
let row = sqlx::query(query).execute(&self.connection).await?;
let num_deleted = row.rows_affected();
Ok(num_deleted)
Expand Down Expand Up @@ -696,8 +697,8 @@ impl PGMQueue {
///
/// Ok(())
/// }
pub async fn archive(&self, queue_name: &str, msg_id: &i64) -> Result<u64, Error> {
let query = query::archive(queue_name, msg_id);
pub async fn archive(&self, queue_name: &str, msg_id: &i64) -> Result<u64, PgmqError> {
let query = query::archive(queue_name, msg_id)?;
let row = sqlx::query(&query).execute(&self.connection).await?;
let num_deleted = row.rows_affected();
Ok(num_deleted)
Expand Down Expand Up @@ -748,8 +749,8 @@ impl PGMQueue {
pub async fn pop<T: for<'de> Deserialize<'de>>(
&self,
queue_name: &str,
) -> Result<Option<Message<T>>, errors::PgmqError> {
let query = &query::pop(queue_name);
) -> Result<Option<Message<T>>, PgmqError> {
let query = &query::pop(queue_name)?;
let message = fetch_one_message::<T>(query, &self.connection).await?;
Ok(message)
}
Expand Down
Loading