diff --git a/extensions/pgx_pgmq/README.md b/extensions/pgx_pgmq/README.md index ccbbbc003..257f63612 100644 --- a/extensions/pgx_pgmq/README.md +++ b/extensions/pgx_pgmq/README.md @@ -2,7 +2,6 @@ A lightweight distributed message queue. Like [AWS SQS](https://aws.amazon.com/sqs/) and [RSMQ](https://github.com/smrchy/rsmq) but on Postgres. - ## Features - Lightweight - Rust and Postgres only @@ -12,23 +11,21 @@ A lightweight distributed message queue. Like [AWS SQS](https://aws.amazon.com/s - Messages can be archived, instead of deleted, for long-term retention and replayability - Completely asynchronous API -- [Postgres Message Queue](#postgres-message-queue) - - [Installation](#installation) +## Table of Contents +- [Postgres Message Queue (PGMQ)](#postgres-message-queue-pgmq) + - [Features](#features) + - [Table of Contents](#table-of-contents) + - [Start CoreDB Postgres](#start-coredb-postgres) - [Python Examples](#python-examples) - - [Connect to postgres](#connect-to-postgres) - - [Create and list queues](#create-and-list-queues) - - [Send a message to the queue](#send-a-message-to-the-queue) - - [Read a message from the queue](#read-a-message-from-the-queue) - - [Delete a message from the queue](#delete-a-message-from-the-queue) - [SQL Examples](#sql-examples) - [Creating a queue](#creating-a-queue) - - [Send a message](#send-a-message) - - [Read a message](#read-a-message) + - [Send two message](#send-two-message) + - [Read messages](#read-messages) - [Pop a message](#pop-a-message) - [Archive a message](#archive-a-message) - [Delete a message](#delete-a-message) - - [Development](#development) - - [Packaging](#packaging) +- [Development](#development) +- [Packaging](#packaging) ## Start CoreDB Postgres @@ -41,131 +38,43 @@ docker run -d --name postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io ## Python Examples -### Connect to postgres - -```python -import json -import pprint - -from sqlalchemy import create_engine, text - -engine = create_engine("postgresql://postgres:postrgres@0.0.0.0:5432/postgres") -``` - -### Create and list queues - -```python -with engine.connect() as con: - # create a queue - created = con.execute(text( "select * from pgmq_create('myqueue');")) - # list queues - list_queues = con.execute(text( "select * from pgmq_list_queues()")) - column_names = list_queues.keys() - rows = list_queues.fetchall() - print("### Queues ###") - for row in rows: - pprint.pprint(dict(zip(column_names, row))) -``` - -```python -'### Queues ###' -{ - 'created_at': datetime.datetime(2023, 2, 7, 2, 5, 39, 946356, - tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=64800))), - 'queue_name': 'myqueue' -} - ``` - - -### Send a message to the queue - -```python -with engine.connect() as con: - # send a message - msg = json.dumps({"yolo": 42}) - msg_id = con.execute(text(f"select * from pgmq_send('x', '{msg}') as msg_id;")) - column_names = msg_id.keys() - rows = msg_id.fetchall() - print("### Message ID ###") - for row in rows: - pprint.pprint(dict(zip(column_names, row))) -``` - -```python -'### Message ID ###' -{'msg_id': 1} -``` - -### Read a message from the queue - -```python -with engine.connect() as con: - # read a message, make it unavailable to be read again for 5 seconds - read = con.execute(text("select * from pgmq_read('x', 5);")) - column_names = read.keys() - rows = read.fetchall() - print("### Read Message ###") - for row in rows: - pprint.pprint(dict(zip(column_names, row))) -``` - -```python -'### Read Message ###' -{ - 'enqueued_at': datetime.datetime(2023, 2, 7, 2, 51, 50, 468837, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=64800))), - 'message': {'myqueue': 42}, - 'msg_id': 1, - 'read_ct': 1, - 'vt': datetime.datetime(2023, 2, 7, 16, 9, 4, 826669, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=64800))) -} - ``` -### Delete a message from the queue - -```python -with engine.connect() as con: - # delete a message - deleted = con.execute(text("select pgmq_delete('x', 1);")) - column_names = deleted.keys() - rows = deleted.fetchall() - print("### Message Deleted ###") - for row in rows: - pprint.pprint(dict(zip(column_names, row))) -``` -``` -'### Message Deleted ###' -{'pgmq_delete': True} -``` +See python examples in [examples/python.py](examples/python.py) ## SQL Examples + ```bash +# Connect to Postgres psql postgres://postgres:postgres@0.0.0.0:5432/postgres ``` ```sql +-- create the extension CREATE EXTENSION pgmq; ``` ### Creating a queue ```sql +-- creates the queue. SELECT pgmq_create('my_queue'); pgmq_create ------------- - ``` ### Send two message ```sql +-- messages are sent as JSON pgmq=# -SELECT * from pgmq_send('my_queue', '{"foo": "bar"}'); -SELECT * from pgmq_send('my_queue', '{"foo": "bar"}'); +SELECT * from pgmq_send('my_queue', '{"foo": "bar1"}'); +SELECT * from pgmq_send('my_queue', '{"foo": "bar2"}'); ``` -``` +```sql +-- the message id is returned from the send function pgmq_send ----------- 1 @@ -182,7 +91,8 @@ SELECT * from pgmq_send('my_queue', '{"foo": "bar"}'); Read two message from the queue. Make them invisible for 30 seconds. ```sql -pgmq=# SELECT * from pgmq_read('my_queue', 30, 1); +-- parameters are queue name, visibility timeout, and number of messages to read +pgmq=# SELECT * from pgmq_read('my_queue', 30, 2); msg_id | read_ct | vt | enqueued_at | message --------+---------+-------------------------------+-------------------------------+--------------- @@ -196,14 +106,13 @@ If the queue is empty, or if all messages are currently invisible, no rows will pgx_pgmq=# SELECT * from pgmq_read('my_queue', 30, 1); msg_id | read_ct | vt | enqueued_at | message --------+---------+----+-------------+--------- - ``` ### Pop a message -Read a message and immediately delete it from the queue. Returns `None` if the queue is empty. ```sql +-- Read a message and immediately delete it from the queue. Returns `None` if the queue is empty. pgmq=# SELECT * from pgmq_pop('my_queue'); msg_id | read_ct | vt | enqueued_at | message @@ -213,23 +122,24 @@ pgmq=# SELECT * from pgmq_pop('my_queue'); ### Archive a message -Archiving a message removes it from the queue, and inserts it to the archive table. -TODO: +```sql +-- Archiving a message removes it from the queue, and inserts it to the archive table. +-- TODO: implement this in the extension + +``` ### Delete a message -Delete a message with id `1` from queue named `my_queue`. + ```sql +-- Delete a message id `1` from queue named `my_queue`. pgmq=# select pgmq_delete('my_queue', 1); pgmq_delete ------------- t ``` - - - -## Development +# Development Setup `pgx`. @@ -251,7 +161,7 @@ Run the dev environment cargo pgx run pg14 ``` -## Packaging +# Packaging Run this script to package into a `.deb` file, which can be installed on Ubuntu.