Repository pattern implementation - isolate db manipulation from domain models
Via pip:
pip install repka
Via poetry:
poetry add repka
BaseRepository used to execute sql-queries (via aiopg & sqlalchemy) and convert sql-rows to/from pydantic models
import sqlalchemy as sa
from repka.api import AiopgRepository, IdModel
from repka.utils import create_async_db_connection
# Define pydantic model
# It should inherit repka.api.IdModel
# to set id on entity insert, to update entity with id and more
# IdModel inherits pydantic.BaseModel and defines int id field
class Task(IdModel):
title: str
# Define sqlachemy table with same model columns
metadata = sa.MetaData()
tasks_table = sa.Table(
"tasks", metadata,
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("title", sa.String)
)
# Define repository
# You should inherit repka.api.BaseRepository and
# set sqlalchemy-table via table property
# Kwargs is sql-row data returned by sqlalchemy
class TaskRepo(AiopgRepository[Task]):
table = tasks_table
# To use the repository you should instantiate it with async sqlalchemy-connection
db_url = "postgresql://postgres@localhost/test"
async with create_async_db_connection(db_url) as conn:
repo = TaskRepo(conn)
# Now you can use the repo
# Here we select first task with matching title
task = await repo.first(tasks_table.c.title == "My first task")
T
means generic type passed to BaseRepository (e.g.BaseRepository[Task]
means that type ofT
isTask
)
-
repo.first(*filters: BinaryExpression, orders: Optional[Columns])
- get first entity matching sqlalchemy {filters} and {orders}; if no entity matches {filters} thenNone
is returnedExample of {filters}:
table.c.title == 'test task'
- equals to sql where clause:where title = 'test task'
Example of {orders}:
table.c.title
- equals to sql order by clause:order by title
-
repo.get_by_ids(entity_ids: List[int])
- get all entities whose id in {entity_ids} (same as sqlwhere id in ({entity_ids})
) -
repo.get_by_id(entity_id: int)
- get entity with id = {entity_id} -
repo.get_or_create(filters: Optional[List[BinaryExpression]], defaults: Optional[Dict])
- get entity that matches {filters} if no entity found create new entity with {defaults}; return tuple of entity and entity existence flag -
repo.get_all(filters: Optional[List[BinaryExpression]], orders: Optional[Columns])
- return all entities matching {filters} and {orders} -
repo.get_all_ids(filters: Optional[List[BinaryExpression]], orders: Optional[Columns])
- return ids of entites matching {filters} and {orders} -
repo.exists(*filters: BinaryExpression)
- check that entity matching {filters} exists using sqlcount
statement
repo.insert(entity: T)
- insert entity to db table and set id field to the entityrepo.insert_many(entities: List[T])
- insert multiple entities and set ids to them in single transaction
repo.update(entity: T)
- updates entity in dbrepo.update_partial(entity: T, **updated_values)
- update entity fields via kwargs and update entity fields in dbrepo.update_many(entities: List[T])
- update multiple entities in single transaction
-
repo.delete(*filters: BinaryExpression)
- delete entities matching {filters} via sqldelete
statementTo delete all entities pass
None
as an arg:repo.delete(None)
-
repo.delete_by_id(entity_id: int)
- delete entity with {entity_id} -
repo.delete_by_ids(entity_ids: List[int])
- delete entities whose id in {entity_ids}
-
repo.serialize(entity: T)
- convert {entity} to dict (e.g. ininsert
andupdate
methods) -
repo.deserialize(**kwargs)
- convert {kwargs} to entity (e.g. infirst
andget_all
methods) -
repo.execute_in_transaction()
- context manager that allows execute multiple queries in transactionExample: delete all old entities and insert new one in single transaction:
async with repo.execute_in_transaction(): await repo.delete() await repo.insert(Task(title="New task"))
-
repo.ignore_default
- list of entity fields that will be ignored on insert and set after insert if they equal to default field value. Useful for auto incrementing / default fields like dates or sequence numbers
You can create lazy-connection repositories with context vars
from contextvars import ContextVar
from repka.utils import create_async_db_connection
# Create context var and instantiate repository
db_connection = ContextVar("db_connection")
repo = TaskRepo(db_connection)
# Now you should set the context var somewhere (e.g. in middleware)
# And start using the repository
async with create_async_db_connection(db_url) as conn:
db_connection.set(conn)
await repo.insert(Task(title="New task"))
Following repositories have same api as AiopgRepository
(select methods, insert methods, etc.)
repka.api.FakeRepo
- repository that uses lists instead of database tables, can be used as mock- This repository is implemented partially, because implementing sqlalchemy features (like filters or orders) is hard and pointless for python lists
This kind of repository used to save/load json objects from file:
from repka.json_ import DictJsonRepo
songs = [
{"artist": "Pig Destroyer", "title": "Thumbsucker"},
{"artist": "Da Menace", "title": "Bag of Funk"}
]
repo = DictJsonRepo()
repo.write(songs, "songs.json")
assert repo.read("songs.json") == songs
-
repo.read(filename: str)
- read json file with {filename}, return its content as json primitive (list, dict, str, etc.) -
repo.write(data: T, filename: str)
- serialize json primitive {data} and save it to file with {filename} -
repo.read_or_write_default(filename: str, default_factory: Callable[[], T])
- check file with {filename} exists, read its content if exists, execute {default_factory} and write it to file with {filename} otherwise-
Example: read data from
test.json
or createtest.json
with[{"field": "value"}]
if no such file:repo = DictJsonRepo() repo.read_or_write_default("test.json", lambda: [{"field": "value"}])
-
-
DictJsonRepo(directory: str)
- set directory where files will be read / written; if not set current working directory will be used-
Example: read files from
data/
dir:repo = DictJsonRepo("data") repo.read("test.json") # will read "./data/test.json"
-
Install production and development dependencies via poetry:
poetry install
To run tests:
- Setup database url via
DB_URL
environment variable (e.g. via .env file)
WARNING: Every test run will drop all tables from the db
- Run tests via
pytest
-
Create fork/branch for new feature/fix/whatever
-
[Optional] Install pre-commit hooks:
pre-commit install
(for manual pre-commit run usepre-commit run -a
) -
When you done create pull request and wait for approval
To deploy new version you need to increment version via bump2version and publish it to PyPI via poetry:
bump2version major/minor/patch
poetry publish --build
Don't forget to fill the CHANGELOG.md before release