Skip to content

Commit

Permalink
Python Client for PGMQ (#248)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChuckHend authored Apr 14, 2023
1 parent 90ec61d commit 66c0bd3
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 0 deletions.
3 changes: 3 additions & 0 deletions extensions/pgmq/coredb-pgmq-python/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
poetry.lock
**/*.pyc
.vscode
2 changes: 2 additions & 0 deletions extensions/pgmq/coredb-pgmq-python/.tool-versions
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
python 3.11.3
poetry 1.2.2
11 changes: 11 additions & 0 deletions extensions/pgmq/coredb-pgmq-python/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
SCOPE=src/

format:
poetry run black ${SCOPE}
poetry run isort --atomic .

lints:
poetry run black --check ${SCOPE}
poetry run isort --check-only ${SCOPE}
poetry run flake8 ${SCOPE}
poetry run mypy ${SCOPE}
4 changes: 4 additions & 0 deletions extensions/pgmq/coredb-pgmq-python/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Coredb's Python Client for PGMQ

## Installation

33 changes: 33 additions & 0 deletions extensions/pgmq/coredb-pgmq-python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
[tool.poetry]
name = "coredb-pgmq-python"
version = "0.0.1"
description = "Python client for the PGMQ Postgres extension."
authors = ["Adam Hendel <adam@coredb.io>"]
license = "Apache 2.0"
readme = "README.md"
packages = [{include = "src/coredb_pgmq_python"}]

[tool.poetry.dependencies]
python = "^3.9"
psycopg = {extras = ["binary", "pool"], version = "^3.1.8"}
pydantic = "^1.10.7"
orjson = "^3.8.10"

[tool.poetry.group.dev.dependencies]
mypy = "1.1.1"
pytest = "^7.3.0"
debugpy = "^1.6.7"
black = "^23.3.0"
isort = "^5.12.0"
flake8 = "^6.0.0"

[tool.black]
line-length = 120
target-version = ['py311', 'py310', 'py39']

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.mypy]
namespace_packages = false
6 changes: 6 additions & 0 deletions extensions/pgmq/coredb-pgmq-python/setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[flake8]
format = pylint
ignore = E203, W503
max-complexity = 10
max-line-length = 120
statistics = True
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from src.coredb_pgmq_python.queue import PGMQueue # type: ignore

__all__ = ["PGMQueue"]
102 changes: 102 additions & 0 deletions extensions/pgmq/coredb-pgmq-python/src/coredb_pgmq_python/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, Union

from psycopg.types.json import Jsonb
from psycopg_pool import ConnectionPool


@dataclass
class Message:
msg_id: int
read_ct: int
enqueued_at: datetime
vt: datetime
message: dict


@dataclass
class PGMQueue:
"""Base class for interacting with a queue"""

host: str = "localhost"
port: str = "5432"
database: str = "postgres"
delay: int = 0
vt: int = 30
partition_size: int = 5000

username: str = "postgres"
password: str = "postgres"

pool_size: int = 10

kwargs: dict = field(default_factory=dict)

pool: ConnectionPool = field(init=False)

def __post_init__(self) -> None:
conninfo = f"""
host={self.host}
port={self.port}
dbname={self.database}
user={self.username}
password={self.password}
"""
self.pool = ConnectionPool(conninfo, **self.kwargs)

with self.pool.connection() as conn:
conn.execute("create extension if not exists pgmq cascade;")

def create_queue(self, queue: str) -> None:
"""Create a queue"""
with self.pool.connection() as conn:
conn.execute("select pgmq_create(%s);", [queue])

def create_partitioned_queue(self, queue: str, partition_size: Optional[int] = None) -> None:
"""Create a partitioned queue"""
with self.pool.connection() as conn:
conn.execute("select pgmq_create_partitioned(%s, %s);", [queue, partition_size])

def send(self, queue: str, message: dict, delay: Optional[int] = None) -> int:
"""Send a message to a queue"""

with self.pool.connection() as conn:
if delay is not None:
# TODO(chuckend): implement send_delay in pgmq
raise NotImplementedError("send_delay is not implemented in pgmq")
message = conn.execute(
"select * from pgmq_send(%s, %s);",
[queue, Jsonb(message)], # type: ignore
).fetchall()
return message[0][0]

def read(self, queue: str, vt: Optional[int] = None, limit: int = 1) -> Union[Message, list[Message]]:
"""Read a message from a queue"""
with self.pool.connection() as conn:
rows = conn.execute("select * from pgmq_read(%s, %s, %s);", [queue, vt or self.vt, limit]).fetchall()

messages = [Message(msg_id=x[0], read_ct=x[1], enqueued_at=x[2], vt=x[3], message=x[4]) for x in rows]
return messages[0] if len(messages) == 1 else messages

def pop(self, queue: str) -> Message:
"""Read a message from a queue"""
with self.pool.connection() as conn:
rows = conn.execute("select * from pgmq_pop(%s);", [queue]).fetchall()

messages = [Message(msg_id=x[0], read_ct=x[1], enqueued_at=x[2], vt=x[3], message=x[4]) for x in rows]
return messages[0]

def delete(self, queue: str, msg_id: int) -> bool:
"""Delete a message from a queue"""
with self.pool.connection() as conn:
row = conn.execute("select pgmq_delete(%s, %s);", [queue, msg_id]).fetchall()

return row[0][0]

def archive(self, queue: str, msg_id: int) -> bool:
"""Archive a message from a queue"""
with self.pool.connection() as conn:
row = conn.execute("select pgmq_archive(%s, %s);", [queue, msg_id]).fetchall()

return row[0][0]

0 comments on commit 66c0bd3

Please sign in to comment.