Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix ext readme #103

Merged
merged 1 commit into from
Feb 9, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 31 additions & 121 deletions extensions/pgx_pgmq/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

A lightweight distributed message queue. Like [AWS SQS](https://aws.amazon.com/sqs/) and [RSMQ](https://github.com/smrchy/rsmq) but on Postgres.


## Features

- Lightweight - Rust and Postgres only
Expand All @@ -12,23 +11,21 @@ A lightweight distributed message queue. Like [AWS SQS](https://aws.amazon.com/s
- Messages can be archived, instead of deleted, for long-term retention and replayability
- Completely asynchronous API

- [Postgres Message Queue](#postgres-message-queue)
- [Installation](#installation)
## Table of Contents
- [Postgres Message Queue (PGMQ)](#postgres-message-queue-pgmq)
- [Features](#features)
- [Table of Contents](#table-of-contents)
- [Start CoreDB Postgres](#start-coredb-postgres)
- [Python Examples](#python-examples)
- [Connect to postgres](#connect-to-postgres)
- [Create and list queues](#create-and-list-queues)
- [Send a message to the queue](#send-a-message-to-the-queue)
- [Read a message from the queue](#read-a-message-from-the-queue)
- [Delete a message from the queue](#delete-a-message-from-the-queue)
- [SQL Examples](#sql-examples)
- [Creating a queue](#creating-a-queue)
- [Send a message](#send-a-message)
- [Read a message](#read-a-message)
- [Send two message](#send-two-message)
- [Read messages](#read-messages)
- [Pop a message](#pop-a-message)
- [Archive a message](#archive-a-message)
- [Delete a message](#delete-a-message)
- [Development](#development)
- [Packaging](#packaging)
- [Development](#development)
- [Packaging](#packaging)

## Start CoreDB Postgres

Expand All @@ -41,131 +38,43 @@ docker run -d --name postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io

## Python Examples

### Connect to postgres

```python
import json
import pprint

from sqlalchemy import create_engine, text

engine = create_engine("postgresql://postgres:postrgres@0.0.0.0:5432/postgres")
```

### Create and list queues

```python
with engine.connect() as con:
# create a queue
created = con.execute(text( "select * from pgmq_create('myqueue');"))
# list queues
list_queues = con.execute(text( "select * from pgmq_list_queues()"))
column_names = list_queues.keys()
rows = list_queues.fetchall()
print("### Queues ###")
for row in rows:
pprint.pprint(dict(zip(column_names, row)))
```

```python
'### Queues ###'
{
'created_at': datetime.datetime(2023, 2, 7, 2, 5, 39, 946356,
tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=64800))),
'queue_name': 'myqueue'
}
```


### Send a message to the queue

```python
with engine.connect() as con:
# send a message
msg = json.dumps({"yolo": 42})
msg_id = con.execute(text(f"select * from pgmq_send('x', '{msg}') as msg_id;"))
column_names = msg_id.keys()
rows = msg_id.fetchall()
print("### Message ID ###")
for row in rows:
pprint.pprint(dict(zip(column_names, row)))
```

```python
'### Message ID ###'
{'msg_id': 1}
```

### Read a message from the queue

```python
with engine.connect() as con:
# read a message, make it unavailable to be read again for 5 seconds
read = con.execute(text("select * from pgmq_read('x', 5);"))
column_names = read.keys()
rows = read.fetchall()
print("### Read Message ###")
for row in rows:
pprint.pprint(dict(zip(column_names, row)))
```

```python
'### Read Message ###'
{
'enqueued_at': datetime.datetime(2023, 2, 7, 2, 51, 50, 468837, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=64800))),
'message': {'myqueue': 42},
'msg_id': 1,
'read_ct': 1,
'vt': datetime.datetime(2023, 2, 7, 16, 9, 4, 826669, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=64800)))
}
```

### Delete a message from the queue

```python
with engine.connect() as con:
# delete a message
deleted = con.execute(text("select pgmq_delete('x', 1);"))
column_names = deleted.keys()
rows = deleted.fetchall()
print("### Message Deleted ###")
for row in rows:
pprint.pprint(dict(zip(column_names, row)))
```
```
'### Message Deleted ###'
{'pgmq_delete': True}
```
See python examples in [examples/python.py](examples/python.py)

## SQL Examples


```bash
# Connect to Postgres
psql postgres://postgres:postgres@0.0.0.0:5432/postgres
```

```sql
-- create the extension
CREATE EXTENSION pgmq;
```

### Creating a queue

```sql
-- creates the queue.
SELECT pgmq_create('my_queue');

pgmq_create
-------------

```

### Send two message

```sql
-- messages are sent as JSON
pgmq=#
SELECT * from pgmq_send('my_queue', '{"foo": "bar"}');
SELECT * from pgmq_send('my_queue', '{"foo": "bar"}');
SELECT * from pgmq_send('my_queue', '{"foo": "bar1"}');
SELECT * from pgmq_send('my_queue', '{"foo": "bar2"}');
```

```
```sql
-- the message id is returned from the send function
pgmq_send
-----------
1
Expand All @@ -182,7 +91,8 @@ SELECT * from pgmq_send('my_queue', '{"foo": "bar"}');
Read two message from the queue. Make them invisible for 30 seconds.

```sql
pgmq=# SELECT * from pgmq_read('my_queue', 30, 1);
-- parameters are queue name, visibility timeout, and number of messages to read
pgmq=# SELECT * from pgmq_read('my_queue', 30, 2);

msg_id | read_ct | vt | enqueued_at | message
--------+---------+-------------------------------+-------------------------------+---------------
Expand All @@ -196,14 +106,13 @@ If the queue is empty, or if all messages are currently invisible, no rows will
pgx_pgmq=# SELECT * from pgmq_read('my_queue', 30, 1);
msg_id | read_ct | vt | enqueued_at | message
--------+---------+----+-------------+---------

```

### Pop a message

Read a message and immediately delete it from the queue. Returns `None` if the queue is empty.

```sql
-- Read a message and immediately delete it from the queue. Returns `None` if the queue is empty.
pgmq=# SELECT * from pgmq_pop('my_queue');

msg_id | read_ct | vt | enqueued_at | message
Expand All @@ -213,23 +122,24 @@ pgmq=# SELECT * from pgmq_pop('my_queue');

### Archive a message

Archiving a message removes it from the queue, and inserts it to the archive table.
TODO:

```sql
-- Archiving a message removes it from the queue, and inserts it to the archive table.
-- TODO: implement this in the extension

```

### Delete a message
Delete a message with id `1` from queue named `my_queue`.

```sql
-- Delete a message id `1` from queue named `my_queue`.
pgmq=# select pgmq_delete('my_queue', 1);
pgmq_delete
-------------
t
```




## Development
# Development

Setup `pgx`.

Expand All @@ -251,7 +161,7 @@ Run the dev environment
cargo pgx run pg14
```

## Packaging
# Packaging

Run this script to package into a `.deb` file, which can be installed on Ubuntu.

Expand Down