The goal of this project is to try to use postgres
as a queue backend. The idea is to use the SKIP LOCKED
feature
of postgres to implement locking of messages and processing every message
exactly once.
The consumer script should process every message, remove it from the queue, sort it by topic and store in table for the topic.
The final sum of items in topics tables should equal number of messages in queue.
Processing 1000 messages with 5 async workers:
Processing 1000 messages using 5 processes with 5 async workers each:
- Get a postgres database, e.g. using docker:
docker run --name postgres -e POSTGRES_PASSWORD=postgres -d postgres
Create a database in the container:
docker exec -it postgres psql -U postgres -c "CREATE DATABASE queue_db;"
- Create a .env file containing database credentials
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_USER=postgres
POSTGRES_PASSWORD=postgres
POSTGRES_DB=postgres
- Initialize the database
make migrate
- Start the generate script to put messages to queue
make generate
To generate more messages in parallel, run
make generate-parallel
- Start the worker to consume messages from queue
make consume
To consume more messages in parallel, run
make consume-parallel
- Clean the database for next run
make clean