Skip to content

Commit

Permalink
Add function for testing a single job.
Browse files Browse the repository at this point in the history
  • Loading branch information
Diggsey committed Jun 30, 2022
1 parent 069cfd7 commit bca15c5
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 4 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "sqlxmq"
version = "0.4.0"
version = "0.4.1"
authors = ["Diggory Blake <diggsey@googlemail.com>"]
edition = "2018"
license = "MIT OR Apache-2.0"
Expand All @@ -23,7 +23,7 @@ uuid = { version = "1.1.2", features = ["v4"] }
log = "0.4.14"
serde_json = "1.0.64"
serde = "1.0.124"
sqlxmq_macros = { version = "0.4.0", path = "sqlxmq_macros" }
sqlxmq_macros = { version = "0.4.1", path = "sqlxmq_macros" }
anymap2 = "0.13.0"

[features]
Expand Down
2 changes: 1 addition & 1 deletion sqlxmq_macros/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "sqlxmq_macros"
version = "0.4.0"
version = "0.4.1"
authors = ["Diggory Blake <diggsey@googlemail.com>"]
edition = "2018"
license = "MIT OR Apache-2.0"
Expand Down
54 changes: 53 additions & 1 deletion src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize};
use sqlx::postgres::types::PgInterval;
use sqlx::postgres::PgListener;
use sqlx::{Pool, Postgres};
use tokio::sync::Notify;
use tokio::sync::{oneshot, Notify};
use tokio::task;
use uuid::Uuid;

Expand Down Expand Up @@ -266,6 +266,58 @@ impl JobRunnerOptions {
listener_task,
))))
}

/// Run a single job and then return. Intended for use by tests. The job should
/// have been spawned normally and be ready to run.
pub async fn test_one(&self) -> Result<(), sqlx::Error> {
let options = self.clone();
let job_runner = Arc::new(JobRunner {
options,
running_jobs: AtomicUsize::new(0),
notify: Notify::new(),
});

log::info!("Polling for single message");
let mut messages = sqlx::query_as::<_, PolledMessage>("SELECT * FROM mq_poll($1, 1)")
.bind(&self.channel_names)
.fetch_all(&self.pool)
.await?;

assert_eq!(messages.len(), 1, "Expected one message to be ready");
let msg = messages.pop().unwrap();

if let PolledMessage {
id: Some(id),
is_committed: Some(true),
name: Some(name),
payload_json,
payload_bytes,
..
} = msg
{
let (tx, rx) = oneshot::channel::<()>();
let keep_alive = Some(OwnedHandle::new(task::spawn(async move {
let _tx = tx;
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
}
})));
let current_job = CurrentJob {
id,
name,
payload_json,
payload_bytes,
job_runner: job_runner.clone(),
keep_alive,
};
job_runner.running_jobs.fetch_add(1, Ordering::SeqCst);
(self.dispatch)(current_job);

// Wait for job to complete
let _ = rx.await;
}
Ok(())
}
}

async fn start_listener(job_runner: Arc<JobRunner>) -> Result<OwnedHandle, sqlx::Error> {
Expand Down

0 comments on commit bca15c5

Please sign in to comment.