diff --git a/tembo-pgmq-python/tembo_pgmq_python/sqlalchemy/__init__.py b/tembo-pgmq-python/tembo_pgmq_python/sqlalchemy/__init__.py index dae49ed0..3bef335d 100644 --- a/tembo-pgmq-python/tembo_pgmq_python/sqlalchemy/__init__.py +++ b/tembo-pgmq-python/tembo_pgmq_python/sqlalchemy/__init__.py @@ -1,3 +1,3 @@ -from tembo_pgmq_python.sqlalchemy.queue import PGMQueue +from tembo_pgmq_python.sqlalchemy.queue import PGMQueue -__all__ = ["PGMQueue"] \ No newline at end of file +__all__ = ["PGMQueue"] diff --git a/tembo-pgmq-python/tembo_pgmq_python/sqlalchemy/_db_api_interface.py b/tembo-pgmq-python/tembo_pgmq_python/sqlalchemy/_db_api_interface.py index 2fa859f6..a7ab018c 100644 --- a/tembo-pgmq-python/tembo_pgmq_python/sqlalchemy/_db_api_interface.py +++ b/tembo-pgmq-python/tembo_pgmq_python/sqlalchemy/_db_api_interface.py @@ -18,53 +18,42 @@ class DBAPIConnection(Protocol): - def close(self) -> object: - ... + def close(self) -> object: ... - def commit(self) -> object: - ... + def commit(self) -> object: ... # optional: # def rollback(self) -> Any: ... - def cursor(self) -> "DBAPICursor": - ... + def cursor(self) -> "DBAPICursor": ... class DBAPICursor(Protocol): @property - def description(self) -> Union[Sequence[DBAPIColumnDescription], None]: - ... + def description(self) -> Union[Sequence[DBAPIColumnDescription], None]: ... @property - def rowcount(self) -> int: - ... + def rowcount(self) -> int: ... # optional: # def callproc(self, procname: str, parameters: Sequence[Any] = ..., /) -> Sequence[Any]: ... - def close(self) -> object: - ... + def close(self) -> object: ... def execute( self, operation: str, parameters: Union[Sequence[Any], Mapping[str, Any]] = ..., /, - ) -> object: - ... + ) -> object: ... def executemany( self, operation: str, seq_of_parameters: Sequence[Sequence[Any]], / - ) -> object: - ... + ) -> object: ... - def fetchone(self) -> Union[Sequence[Any], None]: - ... + def fetchone(self) -> Union[Sequence[Any], None]: ... - def fetchmany(self, size: int = ..., /) -> Sequence[Sequence[Any]]: - ... + def fetchmany(self, size: int = ..., /) -> Sequence[Sequence[Any]]: ... - def fetchall(self) -> Sequence[Sequence[Any]]: - ... + def fetchall(self) -> Sequence[Sequence[Any]]: ... # optional: # def nextset(self) -> None | Literal[True]: ... @@ -72,8 +61,6 @@ def fetchall(self) -> Sequence[Sequence[Any]]: def setinputsizes( self, sizes: Sequence[Union[DBAPITypeCode, int, None]], / - ) -> object: - ... + ) -> object: ... - def setoutputsize(self, size: int, column: int = ..., /) -> object: - ... + def setoutputsize(self, size: int, column: int = ..., /) -> object: ... diff --git a/tembo-pgmq-python/tembo_pgmq_python/sqlalchemy/_types.py b/tembo-pgmq-python/tembo_pgmq_python/sqlalchemy/_types.py index 21865480..0a469526 100644 --- a/tembo-pgmq-python/tembo_pgmq_python/sqlalchemy/_types.py +++ b/tembo-pgmq-python/tembo_pgmq_python/sqlalchemy/_types.py @@ -11,21 +11,21 @@ SESSION_TYPE = Union[Session, AsyncSession] PARAM_STYLE_TYPE = Literal["qmark", "numeric", "named", "format", "pyformat"] DIALECTS_TYPE = Literal[ - 'sqlalchemy', - 'asyncpg', - 'psycopg2', - 'psycopg3', + "sqlalchemy", + "asyncpg", + "psycopg2", + "psycopg3", ] STATEMENT_TYPE = Union[Tuple[TextClause, Dict[str, Any]], TextClause] + class AsyncDBAPICursor(DBAPICursor): async def execute( self, operation: str, parameters: Union[Sequence[Any], Mapping[str, Any]] = ..., /, - ) -> object: - ... + ) -> object: ... __all__ = [ diff --git a/tembo-pgmq-python/tembo_pgmq_python/sqlalchemy/queue.py b/tembo-pgmq-python/tembo_pgmq_python/sqlalchemy/queue.py index 0ae48cb1..8cd0c706 100644 --- a/tembo-pgmq-python/tembo_pgmq_python/sqlalchemy/queue.py +++ b/tembo-pgmq-python/tembo_pgmq_python/sqlalchemy/queue.py @@ -156,9 +156,7 @@ def _initialize_sqlalchemy( elif engine: self.engine = engine self.is_async = self.engine.dialect.is_async - self.session_maker = sessionmaker( - bind=self.engine, class_=get_session_ - ) + self.session_maker = sessionmaker(bind=self.engine, class_=get_session_) else: self.engine = ( create_async_engine(dsn) if is_async_dsn(dsn) else create_engine(dsn) @@ -384,20 +382,22 @@ def create_partitioned_queue( ) @inject_session - def _validate_queue_name_sync(self, queue_name: str, session: Optional[Session] = None): + def _validate_queue_name_sync( + self, queue_name: str, session: Optional[Session] = None + ): """Validate the length of a queue name.""" - session.execute( - *_statement.validate_queue_name(queue_name) - ) + session.execute(*_statement.validate_queue_name(queue_name)) @inject_async_session - async def _validate_queue_name_async(self, queue_name: str, session: Optional[AsyncSession] = None): + async def _validate_queue_name_async( + self, queue_name: str, session: Optional[AsyncSession] = None + ): """Validate the length of a queue name.""" - await session.execute( - *_statement.validate_queue_name(queue_name) - ) + await session.execute(*_statement.validate_queue_name(queue_name)) - def validate_queue_name(self, queue_name: str, session: Optional[Union[Session, AsyncSession]] = None): + def validate_queue_name( + self, queue_name: str, session: Optional[Union[Session, AsyncSession]] = None + ): """ * Will raise an error if the ``queue_name`` is more than 48 characters. """ @@ -414,7 +414,13 @@ def validate_queue_name(self, queue_name: str, session: Optional[Union[Session, ) @inject_session - def _drop_queue_sync(self, queue_name: str, partitioned: bool = False, session: Optional[Session] = None,commit: bool = True) -> bool: + def _drop_queue_sync( + self, + queue_name: str, + partitioned: bool = False, + session: Optional[Session] = None, + commit: bool = True, + ) -> bool: """Drop a queue.""" row = session.execute( *_statement.drop_queue(queue_name, partitioned) @@ -424,18 +430,28 @@ def _drop_queue_sync(self, queue_name: str, partitioned: bool = False, session: return row[0] @inject_async_session - async def _drop_queue_async(self, queue_name: str, partitioned: bool = False, session: Optional[AsyncSession] = None, commit: bool = True) -> bool: + async def _drop_queue_async( + self, + queue_name: str, + partitioned: bool = False, + session: Optional[AsyncSession] = None, + commit: bool = True, + ) -> bool: """Drop a queue.""" row = ( - await session.execute( - *_statement.drop_queue(queue_name, partitioned) - ) + await session.execute(*_statement.drop_queue(queue_name, partitioned)) ).fetchone() if commit: await session.commit() return row[0] - def drop_queue(self, queue_name: str, partitioned: bool = False, session: Optional[Union[Session, AsyncSession]] = None, commit: bool = True) -> bool: + def drop_queue( + self, + queue_name: str, + partitioned: bool = False, + session: Optional[Union[Session, AsyncSession]] = None, + commit: bool = True, + ) -> bool: """Drop a queue. .. _drop_queue_method: ref:`pgmq_sqlalchemy.PGMQueue.drop_queue` @@ -474,7 +490,7 @@ def drop_queue(self, queue_name: str, partitioned: bool = False, session: Option ) @inject_session - def _list_queues_sync(self, session: Optional[Session] = None ) -> List[str]: + def _list_queues_sync(self, session: Optional[Session] = None) -> List[str]: """List all queues.""" rows = session.execute( text("select queue_name from pgmq.list_queues();") @@ -482,16 +498,18 @@ def _list_queues_sync(self, session: Optional[Session] = None ) -> List[str]: return [row[0] for row in rows] @inject_async_session - async def _list_queues_async(self,session: Optional[AsyncSession] = None) -> List[str]: + async def _list_queues_async( + self, session: Optional[AsyncSession] = None + ) -> List[str]: """List all queues.""" rows = ( - await session.execute( - text("select queue_name from pgmq.list_queues();") - ) + await session.execute(text("select queue_name from pgmq.list_queues();")) ).fetchall() return [row[0] for row in rows] - def list_queues(self, session: Optional[Union[Session, AsyncSession]] = None) -> List[str]: + def list_queues( + self, session: Optional[Union[Session, AsyncSession]] = None + ) -> List[str]: """List all queues. .. code-block:: python @@ -500,36 +518,49 @@ def list_queues(self, session: Optional[Union[Session, AsyncSession]] = None) -> print(queue_list) """ if self.is_async: - return self.loop.run_until_complete(self._list_queues_async( - session=session - )) - return self._list_queues_sync( - session=session - ) + return self.loop.run_until_complete( + self._list_queues_async(session=session) + ) + return self._list_queues_sync(session=session) @inject_session - def _send_sync(self, queue_name: str, message: str, delay: int = 0, session: Optional[Session] = None, commit: bool = True) -> int: - row = ( - session.execute( - _statement.send(queue_name, message, delay) - ) - ).fetchone() + def _send_sync( + self, + queue_name: str, + message: str, + delay: int = 0, + session: Optional[Session] = None, + commit: bool = True, + ) -> int: + row = (session.execute(_statement.send(queue_name, message, delay))).fetchone() if commit: session.commit() return row[0] @inject_async_session - async def _send_async(self, queue_name: str, message: str, delay: int = 0, session: Optional[AsyncSession] = None, commit: bool = True) -> int: + async def _send_async( + self, + queue_name: str, + message: str, + delay: int = 0, + session: Optional[AsyncSession] = None, + commit: bool = True, + ) -> int: row = ( - await session.execute( - _statement.send(queue_name, message, delay) - ) + await session.execute(_statement.send(queue_name, message, delay)) ).fetchone() if commit: await session.commit() return row[0] - def send(self, queue_name: str, message: dict, delay: int = 0, session: Optional[Union[Session, AsyncSession]] = None, commit: bool = True) -> int: + def send( + self, + queue_name: str, + message: dict, + delay: int = 0, + session: Optional[Union[Session, AsyncSession]] = None, + commit: bool = True, + ) -> int: """Send a message to a queue. .. code-block:: python @@ -568,12 +599,15 @@ def send(self, queue_name: str, message: dict, delay: int = 0, session: Optional @inject_session def _send_batch_sync( - self, queue_name: str, messages: str, delay: int = 0 , session: Optional[Session] = None, commit: bool = True + self, + queue_name: str, + messages: str, + delay: int = 0, + session: Optional[Session] = None, + commit: bool = True, ) -> List[int]: rows = ( - session.execute( - _statement.send_batch(queue_name, messages, delay) - ) + session.execute(_statement.send_batch(queue_name, messages, delay)) ).fetchall() if commit: session.commit() @@ -581,19 +615,27 @@ def _send_batch_sync( @inject_async_session async def _send_batch_async( - self, queue_name: str, messages: str, delay: int = 0, session: Optional[AsyncSession] = None, commit: bool = True + self, + queue_name: str, + messages: str, + delay: int = 0, + session: Optional[AsyncSession] = None, + commit: bool = True, ) -> List[int]: rows = ( - await session.execute( - _statement.send_batch(queue_name, messages, delay) - ) + await session.execute(_statement.send_batch(queue_name, messages, delay)) ).fetchall() if commit: await session.commit() return [row[0] for row in rows] def send_batch( - self, queue_name: str, messages: List[dict], delay: int = 0, session: Optional[Union[Session, AsyncSession]] = None, commit: bool = True + self, + queue_name: str, + messages: List[dict], + delay: int = 0, + session: Optional[Union[Session, AsyncSession]] = None, + commit: bool = True, ) -> List[int]: """ Send a batch of messages to a queue. @@ -626,10 +668,14 @@ def send_batch( ) @inject_session - def _read_sync(self, queue_name: str, vt: int, session: Optional[Session] = None, commit: bool = True) -> Optional[Message]: - row = session.execute( - *_statement.read(queue_name, vt) - ).fetchone() + def _read_sync( + self, + queue_name: str, + vt: int, + session: Optional[Session] = None, + commit: bool = True, + ) -> Optional[Message]: + row = session.execute(*_statement.read(queue_name, vt)).fetchone() if commit: session.commit() if row is None: @@ -639,12 +685,14 @@ def _read_sync(self, queue_name: str, vt: int, session: Optional[Session] = None ) @inject_async_session - async def _read_async(self, queue_name: str, vt: int, session: Optional[AsyncSession] = None, commit: bool = True) -> Optional[Message]: - row = ( - await session.execute( - *_statement.read(queue_name, vt) - ) - ).fetchone() + async def _read_async( + self, + queue_name: str, + vt: int, + session: Optional[AsyncSession] = None, + commit: bool = True, + ) -> Optional[Message]: + row = (await session.execute(*_statement.read(queue_name, vt))).fetchone() if commit: await session.commit() if row is None: @@ -653,7 +701,13 @@ async def _read_async(self, queue_name: str, vt: int, session: Optional[AsyncSes msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4] ) - def read(self, queue_name: str, vt: Optional[int] = None, session: Optional[Union[Session, AsyncSession]] = None, commit: bool = True) -> Optional[Message]: + def read( + self, + queue_name: str, + vt: Optional[int] = None, + session: Optional[Union[Session, AsyncSession]] = None, + commit: bool = True, + ) -> Optional[Message]: """ .. _for_update_skip_locked: https://www.postgresql.org/docs/current/sql-select.html#SQL-FOR-UPDATE-SHARE .. |for_update_skip_locked| replace:: **FOR UPDATE SKIP LOCKED** @@ -716,12 +770,14 @@ def read(self, queue_name: str, vt: Optional[int] = None, session: Optional[Unio if vt is None: vt = self.vt if self.is_async: - return self.loop.run_until_complete(self._read_async( - queue_name=queue_name, - vt=vt, - session=session, - commit=commit, - )) + return self.loop.run_until_complete( + self._read_async( + queue_name=queue_name, + vt=vt, + session=session, + commit=commit, + ) + ) return self._read_sync( queue_name=queue_name, vt=vt, @@ -766,9 +822,7 @@ async def _read_batch_async( commit: bool = True, ) -> Optional[List[Message]]: rows = ( - await session.execute( - *_statement.read_batch(queue_name, vt, batch_size) - ) + await session.execute(*_statement.read_batch(queue_name, vt, batch_size)) ).fetchall() if commit: await session.commit() @@ -822,12 +876,12 @@ def read_batch( ) ) return self._read_batch_sync( - queue_name=queue_name, - vt=vt, - batch_size=batch_size, - session=session, - commit=commit, - ) + queue_name=queue_name, + vt=vt, + batch_size=batch_size, + session=session, + commit=commit, + ) @inject_session def _read_with_poll_sync( @@ -968,18 +1022,23 @@ def read_with_poll( ) ) return self._read_with_poll_sync( - queue_name=queue_name, - vt=vt, - qty=qty, - max_poll_seconds=max_poll_seconds, - poll_interval_ms=poll_interval_ms, - session=session, - commit=commit, - ) + queue_name=queue_name, + vt=vt, + qty=qty, + max_poll_seconds=max_poll_seconds, + poll_interval_ms=poll_interval_ms, + session=session, + commit=commit, + ) @inject_session def _set_vt_sync( - self, queue_name: str, msg_id: int, vt_offset: int, session: Optional[Session] = None, commit: bool = True + self, + queue_name: str, + msg_id: int, + vt_offset: int, + session: Optional[Session] = None, + commit: bool = True, ) -> Optional[Message]: """Set the visibility timeout for a message.""" row = session.execute( @@ -994,13 +1053,16 @@ def _set_vt_sync( ) async def _set_vt_async( - self, queue_name: str, msg_id: int, vt_offset: int, session: Optional[AsyncSession] = None, commit: bool = True + self, + queue_name: str, + msg_id: int, + vt_offset: int, + session: Optional[AsyncSession] = None, + commit: bool = True, ) -> Optional[Message]: """Set the visibility timeout for a message.""" row = ( - await session.execute( - *_statement.set_vt(queue_name, msg_id, vt_offset) - ) + await session.execute(*_statement.set_vt(queue_name, msg_id, vt_offset)) ).fetchone() if commit: await session.commit() @@ -1010,7 +1072,14 @@ async def _set_vt_async( msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4] ) - def set_vt(self, queue_name: str, msg_id: int, vt_offset: int, session: Optional[Union[Session, AsyncSession]] = None, commit: bool = True) -> Optional[Message]: + def set_vt( + self, + queue_name: str, + msg_id: int, + vt_offset: int, + session: Optional[Union[Session, AsyncSession]] = None, + commit: bool = True, + ) -> Optional[Message]: """ .. _set_vt_method: ref:`pgmq_sqlalchemy.PGMQueue.set_vt` .. |set_vt_method| replace:: :py:meth:`~pgmq_sqlalchemy.PGMQueue.set_vt` @@ -1086,10 +1155,10 @@ def consumer_with_backoff_retry(pgmq_client: PGMQueue, queue_name: str): ) @inject_session - def _pop_sync(self, queue_name: str, session: Optional[Session] = None, commit: bool = True) -> Optional[Message]: - row = session.execute( - *_statement.pop(queue_name) - ).fetchone() + def _pop_sync( + self, queue_name: str, session: Optional[Session] = None, commit: bool = True + ) -> Optional[Message]: + row = session.execute(*_statement.pop(queue_name)).fetchone() if commit: session.commit() if row is None: @@ -1099,12 +1168,13 @@ def _pop_sync(self, queue_name: str, session: Optional[Session] = None, commit: ) @inject_async_session - async def _pop_async(self, queue_name: str, session: Optional[AsyncSession] = None, commit: bool = True) -> Optional[Message]: - row = ( - await session.execute( - *_statement.pop(queue_name) - ) - ).fetchone() + async def _pop_async( + self, + queue_name: str, + session: Optional[AsyncSession] = None, + commit: bool = True, + ) -> Optional[Message]: + row = (await session.execute(*_statement.pop(queue_name))).fetchone() if commit: await session.commit() if row is None: @@ -1113,7 +1183,12 @@ async def _pop_async(self, queue_name: str, session: Optional[AsyncSession] = No msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4] ) - def pop(self, queue_name: str, session: Optional[Union[Session, AsyncSession]] = None, commit: bool = True) -> Optional[Message]: + def pop( + self, + queue_name: str, + session: Optional[Union[Session, AsyncSession]] = None, + commit: bool = True, + ) -> Optional[Message]: """ Reads a single message from a queue and deletes it upon read. @@ -1133,11 +1208,10 @@ def pop(self, queue_name: str, session: Optional[Union[Session, AsyncSession]] = ) ) return self._pop_sync( - queue_name=queue_name, - session=session, - commit=commit, - ) - + queue_name=queue_name, + session=session, + commit=commit, + ) @inject_session def _delete_sync( @@ -1148,9 +1222,7 @@ def _delete_sync( commit: bool = True, ) -> bool: # should add explicit type casts to choose the correct candidate function - row = session.execute( - _statement.delete(queue_name, msg_id) - ).fetchone() + row = session.execute(_statement.delete(queue_name, msg_id)).fetchone() if commit: session.commit() return row[0] @@ -1173,7 +1245,13 @@ async def _delete_async( await session.commit() return row[0] - def delete(self, queue_name: str, msg_id: int, session: Optional[Union[Session, AsyncSession]] = None, commit: bool = True) -> bool: + def delete( + self, + queue_name: str, + msg_id: int, + session: Optional[Union[Session, AsyncSession]] = None, + commit: bool = True, + ) -> bool: """ Delete a message from the queue. @@ -1192,18 +1270,20 @@ def delete(self, queue_name: str, msg_id: int, session: Optional[Union[Session, """ if self.is_async: - return self.loop.run_until_complete(self._delete_async( - queue_name=queue_name, - msg_id=msg_id, - session=session, - commit=commit, - )) - return self._delete_sync( - queue_name=queue_name, - msg_id=msg_id, - session=session, - commit=commit, + return self.loop.run_until_complete( + self._delete_async( + queue_name=queue_name, + msg_id=msg_id, + session=session, + commit=commit, + ) ) + return self._delete_sync( + queue_name=queue_name, + msg_id=msg_id, + session=session, + commit=commit, + ) @inject_session def _delete_batch_sync( @@ -1213,9 +1293,7 @@ def _delete_batch_sync( session: Optional[Session] = None, commit: bool = True, ) -> List[int]: - rows = session.execute( - _statement.delete_batch(queue_name, msg_ids) - ).fetchall() + rows = session.execute(_statement.delete_batch(queue_name, msg_ids)).fetchall() if commit: session.commit() return [row[0] for row in rows] @@ -1237,7 +1315,13 @@ async def _delete_batch_async( await session.commit() return [row[0] for row in rows] - def delete_batch(self, queue_name: str, msg_ids: List[int], session: Optional[Union[Session, AsyncSession]] = None, commit: bool = True) -> List[int]: + def delete_batch( + self, + queue_name: str, + msg_ids: List[int], + session: Optional[Union[Session, AsyncSession]] = None, + commit: bool = True, + ) -> List[int]: """ Delete a batch of messages from the queue. @@ -1271,28 +1355,40 @@ def delete_batch(self, queue_name: str, msg_ids: List[int], session: Optional[Un ) @inject_session - def _archive_sync(self, queue_name: str, msg_id: int, session: Optional[Session] = None, commit: bool = True) -> bool: + def _archive_sync( + self, + queue_name: str, + msg_id: int, + session: Optional[Session] = None, + commit: bool = True, + ) -> bool: """Archive a message from a queue synchronously.""" - row = session.execute( - _statement.archive(queue_name, msg_id) - ).fetchone() + row = session.execute(_statement.archive(queue_name, msg_id)).fetchone() if commit: session.commit() return row[0] @inject_async_session - async def _archive_async(self, queue_name: str, msg_id: int, session: Optional[AsyncSession] = None, commit: bool = True) -> bool: + async def _archive_async( + self, + queue_name: str, + msg_id: int, + session: Optional[AsyncSession] = None, + commit: bool = True, + ) -> bool: """Archive a message from a queue asynchronously.""" - row = ( - await session.execute( - _statement.archive(queue_name, msg_id) - ) - ).fetchone() + row = (await session.execute(_statement.archive(queue_name, msg_id))).fetchone() if commit: await session.commit() return row[0] - def archive(self, queue_name: str, msg_id: int, session: Optional[Union[Session, AsyncSession]] = None, commit: bool = True) -> bool: + def archive( + self, + queue_name: str, + msg_id: int, + session: Optional[Union[Session, AsyncSession]] = None, + commit: bool = True, + ) -> bool: """ Archive a message from a queue. @@ -1330,30 +1426,42 @@ def archive(self, queue_name: str, msg_id: int, session: Optional[Union[Session, ) @inject_session - def _archive_batch_sync(self, queue_name: str, msg_ids: List[int], session: Optional[Session] = None, commit: bool = True) -> List[int]: + def _archive_batch_sync( + self, + queue_name: str, + msg_ids: List[int], + session: Optional[Session] = None, + commit: bool = True, + ) -> List[int]: """Archive multiple messages from a queue synchronously.""" - rows = session.execute( - _statement.archive_batch(queue_name, msg_ids) - ).fetchall() + rows = session.execute(_statement.archive_batch(queue_name, msg_ids)).fetchall() if commit: session.commit() return [row[0] for row in rows] @inject_async_session async def _archive_batch_async( - self, queue_name: str, msg_ids: List[int], session: Optional[AsyncSession] = None, commit: bool = True + self, + queue_name: str, + msg_ids: List[int], + session: Optional[AsyncSession] = None, + commit: bool = True, ) -> List[int]: """Archive multiple messages from a queue asynchronously.""" rows = ( - await session.execute( - _statement.archive_batch(queue_name, msg_ids) - ) + await session.execute(_statement.archive_batch(queue_name, msg_ids)) ).fetchall() if commit: await session.commit() return [row[0] for row in rows] - def archive_batch(self, queue_name: str, msg_ids: List[int], session: Optional[Union[Session, AsyncSession]] = None, commit: bool = True) -> List[int]: + def archive_batch( + self, + queue_name: str, + msg_ids: List[int], + session: Optional[Union[Session, AsyncSession]] = None, + commit: bool = True, + ) -> List[int]: """ Archive multiple messages from a queue. @@ -1377,35 +1485,41 @@ def archive_batch(self, queue_name: str, msg_ids: List[int], session: Optional[U ) ) return self._archive_batch_sync( - queue_name=queue_name, - msg_ids=msg_ids, - session=session, - commit=commit, - ) + queue_name=queue_name, + msg_ids=msg_ids, + session=session, + commit=commit, + ) @inject_session - def _purge_sync(self, queue_name: str, session: Optional[Session] = None, commit: bool = True) -> int: + def _purge_sync( + self, queue_name: str, session: Optional[Session] = None, commit: bool = True + ) -> int: """Purge a queue synchronously,return deleted_count.""" - row = session.execute( - *_statement.purge(queue_name) - ).fetchone() + row = session.execute(*_statement.purge(queue_name)).fetchone() if commit: session.commit() return row[0] @inject_async_session - async def _purge_async(self, queue_name: str, session: Optional[AsyncSession] = None, commit: bool = True) -> int: + async def _purge_async( + self, + queue_name: str, + session: Optional[AsyncSession] = None, + commit: bool = True, + ) -> int: """Purge a queue asynchronously,return deleted_count.""" - row = ( - await session.execute( - *_statement.purge(queue_name) - ) - ).fetchone() + row = (await session.execute(*_statement.purge(queue_name))).fetchone() if commit: await session.commit() return row[0] - def purge(self, queue_name: str, session: Optional[Union[Session, AsyncSession]] = None, commit: bool = True) -> int: + def purge( + self, + queue_name: str, + session: Optional[Union[Session, AsyncSession]] = None, + commit: bool = True, + ) -> int: """ * Delete all messages from a queue, return the number of messages deleted. * Archive tables will **not** be affected. @@ -1432,11 +1546,11 @@ def purge(self, queue_name: str, session: Optional[Union[Session, AsyncSession]] ) @inject_session - def _metrics_sync(self, queue_name: str, session: Optional[Session] = None) -> Optional[QueueMetrics]: + def _metrics_sync( + self, queue_name: str, session: Optional[Session] = None + ) -> Optional[QueueMetrics]: """Get queue metrics synchronously.""" - row = session.execute( - *_statement.metrics(queue_name) - ).fetchone() + row = session.execute(*_statement.metrics(queue_name)).fetchone() if row is None: return None return QueueMetrics( @@ -1449,13 +1563,11 @@ def _metrics_sync(self, queue_name: str, session: Optional[Session] = None) -> O ) @inject_async_session - async def _metrics_async(self, queue_name: str, session: Optional[AsyncSession] = None) -> Optional[QueueMetrics]: + async def _metrics_async( + self, queue_name: str, session: Optional[AsyncSession] = None + ) -> Optional[QueueMetrics]: """Get queue metrics asynchronously.""" - row = ( - await session.execute( - *_statement.metrics(queue_name) - ) - ).fetchone() + row = (await session.execute(*_statement.metrics(queue_name))).fetchone() if row is None: return None return QueueMetrics( @@ -1467,7 +1579,9 @@ async def _metrics_async(self, queue_name: str, session: Optional[AsyncSession] scrape_time=row[5], ) - def metrics(self, queue_name: str, session: Optional[Union[Session, AsyncSession]] = None) -> Optional[QueueMetrics]: + def metrics( + self, queue_name: str, session: Optional[Union[Session, AsyncSession]] = None + ) -> Optional[QueueMetrics]: """ Get metrics for a queue. @@ -1487,17 +1601,21 @@ def metrics(self, queue_name: str, session: Optional[Union[Session, AsyncSession """ if self.is_async: - return self.loop.run_until_complete(self._metrics_async( - queue_name=queue_name, - session=session, - )) + return self.loop.run_until_complete( + self._metrics_async( + queue_name=queue_name, + session=session, + ) + ) return self._metrics_sync( queue_name=queue_name, session=session, ) @inject_session - def _metrics_all_sync(self, session: Optional[Session] = None) -> Optional[List[QueueMetrics]]: + def _metrics_all_sync( + self, session: Optional[Session] = None + ) -> Optional[List[QueueMetrics]]: """Get metrics for all queues synchronously.""" rows = session.execute(_statement.metrics_all()).fetchall() if not rows: @@ -1515,11 +1633,11 @@ def _metrics_all_sync(self, session: Optional[Session] = None) -> Optional[List[ ] @inject_async_session - async def _metrics_all_async(self, session: Optional[AsyncSession] = None) -> Optional[List[QueueMetrics]]: + async def _metrics_all_async( + self, session: Optional[AsyncSession] = None + ) -> Optional[List[QueueMetrics]]: """Get metrics for all queues asynchronously.""" - rows = ( - await session.execute(_statement.metrics_all()) - ).fetchall() + rows = (await session.execute(_statement.metrics_all())).fetchall() if not rows: return None return [ @@ -1534,7 +1652,9 @@ async def _metrics_all_async(self, session: Optional[AsyncSession] = None) -> Op for row in rows ] - def metrics_all(self, session: Optional[Union[Session, AsyncSession]] = None) -> Optional[List[QueueMetrics]]: + def metrics_all( + self, session: Optional[Union[Session, AsyncSession]] = None + ) -> Optional[List[QueueMetrics]]: """ .. _read_committed_isolation_level: https://www.postgresql.org/docs/current/transaction-iso.html#XACT-READ-COMMITTED @@ -1569,9 +1689,11 @@ def metrics_all(self, session: Optional[Union[Session, AsyncSession]] = None) -> """ if self.is_async: - return self.loop.run_until_complete(self._metrics_all_async( - session=session, - )) + return self.loop.run_until_complete( + self._metrics_all_async( + session=session, + ) + ) return self._metrics_all_sync( session=session, )