Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

doc updates: rust transaction example, readme updates #289

Merged
merged 6 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ Trunk.toml
/results/
/regression.*
/output_iso/
.env
113 changes: 91 additions & 22 deletions pgmq-extension/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,25 @@ Postgres 12-16.
- [Archive a message](#archive-a-message)
- [Delete a message](#delete-a-message)
- [Drop a queue](#drop-a-queue)
- [Configuration](#configuration)
- [Partitioned Queues](#partitioned-queues)
- [Configuration](#configuration)
- [Partitioned Queues](#partitioned-queues)
- [Visibility Timeout (vt)](#visibility-timeout-vt)
- [Who uses pgmq?](#who-uses-pgmq)
- [✨ Contributors](#-contributors)

## Installation

The fastest way to get started is by running the Tembo Docker image, where PGMQ comes pre-installed in Postgres.

```bash
docker run -d --name postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/tembo/pg16-pgmq:latest
docker run -d --name pgmq-postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/tembo/pg16-pgmq:latest
```

If you'd like to build from source, you can follow the instructions in [CONTRIBUTING.md](CONTRIBUTING.md).

### Updating

To update PGMQ versions, follow the instructions in [UPDATING.md](UPDATING.md).
To update PGMQ versions, follow the instructions in [UPDATING.md](pgmq-extension/UPDATING.md).
ChuckHend marked this conversation as resolved.
Show resolved Hide resolved

## Client Libraries

Expand Down Expand Up @@ -110,8 +111,10 @@ SELECT pgmq.create('my_queue');

```sql
-- messages are sent as JSON
SELECT * from pgmq.send('my_queue', '{"foo": "bar1"}');
SELECT * from pgmq.send('my_queue', '{"foo": "bar2"}');
SELECT * from pgmq.send(
queue_name => 'my_queue',
msg => '{"foo": "bar1"}'
);
```

The message id is returned from the send function.
Expand All @@ -121,7 +124,19 @@ The message id is returned from the send function.
-----------
1
(1 row)
```

```sql
-- Optionally provide a delay
-- this message will be on the queue but unable to be consumed for 5 seconds
SELECT * from pgmq.send(
queue_name => 'my_queue',
msg => '{"foo": "bar2"}',
delay => 5
);
```

```text
send
-----------
2
Expand All @@ -135,7 +150,11 @@ If the messages are not deleted or archived within 30 seconds, they will become
and can be read by another consumer.

```sql
SELECT * FROM pgmq.read('my_queue', 30, 2);
SELECT * FROM pgmq.read(
queue_name => 'my_queue',
vt => 30,
qty => 2
);
```

```text
Expand All @@ -148,7 +167,11 @@ SELECT * FROM pgmq.read('my_queue', 30, 2);
If the queue is empty, or if all messages are currently invisible, no rows will be returned.

```sql
SELECT pgmq.read('my_queue', 30, 1);
SELECT * FROM pgmq.read(
queue_name => 'my_queue',
vt => 30,
qty => 1
);
```

```text
Expand All @@ -159,8 +182,8 @@ SELECT pgmq.read('my_queue', 30, 1);
### Pop a message

```sql
-- Read a message and immediately delete it from the queue. Returns `None` if the queue is empty.
SELECT pgmq.pop('my_queue');
-- Read a message and immediately delete it from the queue. Returns an empty record if the queue is empty or all messages are invisible.
SELECT * FROM pgmq.pop('my_queue');
```

```text
Expand All @@ -175,7 +198,10 @@ Archiving a message removes it from the queue and inserts it to the archive tabl

```sql
-- Archive message with msg_id=2.
SELECT pgmq.archive('my_queue', 2);
SELECT pgmq.archive(
queue_name => 'my_queue',
msg_id => 2
);
```

```text
Expand All @@ -185,36 +211,79 @@ SELECT pgmq.archive('my_queue', 2);
(1 row)
```

Or archive several messages in one operation using `msg_ids` (plural) parameter:

First, send a batch of messages

```sql
SELECT pgmq.send_batch(
queue_name => 'my_queue',
msgs => ARRAY['{"foo": "bar3"}','{"foo": "bar4"}','{"foo": "bar5"}']::jsonb[]
);
```

```text
send_batch
------------
3
4
5
(3 rows)
```

Then archive them by using the msg_ids (plural) parameter.

```sql
SELECT pgmq.archive(
queue_name => 'my_queue',
msg_ids => ARRAY[3, 4, 5]
);
```

```text
archive
---------
3
4
5
(3 rows)
```

Archive tables can be inspected directly with SQL.
Archive tables have the prefix `a_` in the `pgmq` schema.

```sql
-- Archive tables have the prefix `a_`:
SELECT * FROM pgmq.a_my_queue;
```

```text
msg_id | read_ct | enqueued_at | archived_at | vt | message
--------+---------+------------------------------+-------------------------------+-------------------------------+-----------------
2 | 1 | 2023-04-25 00:55:40.68417-05 | 2023-04-25 00:56:35.937594-05 | 2023-04-25 00:56:20.532012-05 | {"foo": "bar2"}
msg_id | read_ct | enqueued_at | archived_at | vt | message
--------+---------+-------------------------------+-------------------------------+-------------------------------+-----------------
2 | 0 | 2024-08-06 16:03:41.531556+00 | 2024-08-06 16:03:52.811063+00 | 2024-08-06 16:03:46.532246+00 | {"foo": "bar2"}
3 | 0 | 2024-08-06 16:03:58.586444+00 | 2024-08-06 16:04:02.85799+00 | 2024-08-06 16:03:58.587272+00 | {"foo": "bar3"}
4 | 0 | 2024-08-06 16:03:58.586444+00 | 2024-08-06 16:04:02.85799+00 | 2024-08-06 16:03:58.587508+00 | {"foo": "bar4"}
5 | 0 | 2024-08-06 16:03:58.586444+00 | 2024-08-06 16:04:02.85799+00 | 2024-08-06 16:03:58.587543+00 | {"foo": "bar5"}
```

### Delete a message

Send another message, so that we can delete it.

```sql
SELECT pgmq.send('my_queue', '{"foo": "bar3"}');
SELECT pgmq.send('my_queue', '{"foo": "bar6"}');
```

```text
send
-----------
3
6
(1 row)
```

Delete the message with id `3` from the queue named `my_queue`.
Delete the message with id `6` from the queue named `my_queue`.

```sql
SELECT pgmq.delete('my_queue', 3);
SELECT pgmq.delete('my_queue', 6);
```

```text
Expand All @@ -239,9 +308,9 @@ SELECT pgmq.drop_queue('my_queue');
(1 row)
```

# Configuration
## Configuration

## Partitioned Queues
### Partitioned Queues

You will need to install [pg_partman](https://github.com/pgpartman/pg_partman/) if you want to use `pgmq` partitioned queues.

Expand All @@ -264,7 +333,7 @@ Add the following to `postgresql.conf`. Note, changing `shared_preload_libraries

`pg_partman_bgw.interval` sets the interval at which `pg_partman` conducts maintenance. This creates new partitions and dropping of partitions falling out of the `retention_interval`. By default, `pg_partman` will keep 4 partitions "ahead" of the currently active partition.

```
```text
shared_preload_libraries = 'pg_partman_bgw' # requires restart of Postgres
pg_partman_bgw.interval = 60
pg_partman_bgw.role = 'postgres'
Expand Down
2 changes: 1 addition & 1 deletion pgmq-rs/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq"
version = "0.29.0"
version = "0.29.1"
edition = "2021"
authors = ["Tembo.io"]
description = "A distributed message queue for Rust applications, on Postgres."
Expand Down
55 changes: 21 additions & 34 deletions pgmq-rs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,60 +13,40 @@ PGMQ was created by [Tembo](https://tembo.io/). Our goal is to make the full Pos
We're building a radically simplified Postgres platform designed to be developer-first and easily extensible.
PGMQ is a part of that project.

This project contains two APIs, a pure Rust client side library and the Rust SDK wrapped around the Postgres extrension. The APIs aim to be identical, but the extension wrapper has advantages including;
- performance
- support for partitioned queues
- metrics
This project contains two APIs, a pure Rust client side library and the Rust SDK wrapped around the Postgres extension.

The pure Rust client
```rust
use pgmq::PGMQueue;
```

And a Rust SDK wrapped around the Postgres extension.
`The Rust client for the Postgres extension`. This gives the you the an ORM-like experience with the Postgres extension and makes managing connection pools, transactions, and serialization/deserialization much easier.

```rust
use pgmq::PGMQueueExt;
```

Not building in Rust? Try the [Tembo pgmq Postgres extension](https://pgt.dev/extensions/pgmq).

## Features
`The pure Rust client`. This provides minimal functionality but can be used on any existing Postgres instance.

- Lightweight - Rust and Postgres only
- Guaranteed delivery of messages to exactly one consumer within a visibility timeout
- API parity with [AWS SQS](https://aws.amazon.com/sqs/) and [RSMQ](https://github.com/smrchy/rsmq)
- Messages stay in the queue until deleted
- Messages can be archived, instead of deleted, for long-term retention and replayability
- Completely asynchronous API
```rust
use pgmq::PGMQueue;
```

## Quick start

- First, you will need Postgres. We use a container in this example.

```bash
docker run -d --name postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 postgres
docker run -d --name pgmq-postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/tembo/pg16-pgmq:latest
```

- If you don't have Docker installed, it can be found [here](https://docs.docker.com/get-docker/).

- Make sure you have the Rust toolchain installed:
- Make sure you have the [Rust toolchain](https://www.rust-lang.org/tools/install) installed:

```bash
cargo --version
```
- Clone the project and run the [basic example](./examples/basic.rs):

- This example was written with version 1.67.0, but the latest stable should work. You can go [here](https://www.rust-lang.org/tools/install) to install Rust if you don't have it already, then run `rustup install stable` to install the latest, stable toolchain.

- Change directory to the example project:
```bash
cd examples/basic
```
git clone https://github.com/tembo-io/pgmq.git

- Run the project!
cd pgmq-rs

```bash
cargo run
cargo run --example basic
```

## Minimal example at a glance
Expand Down Expand Up @@ -131,6 +111,14 @@ async fn main() -> Result<(), PgmqError> {
}
```

## Transactions

You can execute all of PGMQ's operations within a transaction along with other database operations. See the [transaction example](./examples/transaction.rs) or run the example with:

```bash
cargo run --example transactions
```

## Sending messages

You can send one message at a time with `queue.send()` or several with `queue.send_batch()`.
Expand All @@ -155,8 +143,7 @@ Read messages from the queue archive with SQL:

```sql
SELECT *
FROM pgmq_{your_queue_name}_archive;
FROM pgmq.a_{your_queue_name};
```


License: [PostgreSQL](LICENSE)
Loading
Loading