Skip to content

Commit

Permalink
Add copy_ wrappers to Pool
Browse files Browse the repository at this point in the history
The `copy_to_table()` and friends are currently missing from the `Pool`
interface, add them in.

Fixes: #641.
  • Loading branch information
elprans committed Nov 26, 2020
1 parent 690048d commit 659904a
Showing 1 changed file with 182 additions and 14 deletions.
196 changes: 182 additions & 14 deletions asyncpg/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ async def execute(self, query: str, *args, timeout: float=None) -> str:
Pool performs this operation using one of its connections. Other than
that, it behaves identically to
:meth:`Connection.execute() <connection.Connection.execute>`.
:meth:`Connection.execute() <asyncpg.connection.Connection.execute>`.
.. versionadded:: 0.10.0
"""
Expand All @@ -534,7 +534,8 @@ async def executemany(self, command: str, args, *, timeout: float=None):
Pool performs this operation using one of its connections. Other than
that, it behaves identically to
:meth:`Connection.executemany() <connection.Connection.executemany>`.
:meth:`Connection.executemany()
<asyncpg.connection.Connection.executemany>`.
.. versionadded:: 0.10.0
"""
Expand All @@ -546,7 +547,7 @@ async def fetch(self, query, *args, timeout=None) -> list:
Pool performs this operation using one of its connections. Other than
that, it behaves identically to
:meth:`Connection.fetch() <connection.Connection.fetch>`.
:meth:`Connection.fetch() <asyncpg.connection.Connection.fetch>`.
.. versionadded:: 0.10.0
"""
Expand All @@ -558,7 +559,8 @@ async def fetchval(self, query, *args, column=0, timeout=None):
Pool performs this operation using one of its connections. Other than
that, it behaves identically to
:meth:`Connection.fetchval() <connection.Connection.fetchval>`.
:meth:`Connection.fetchval()
<asyncpg.connection.Connection.fetchval>`.
.. versionadded:: 0.10.0
"""
Expand All @@ -571,13 +573,178 @@ async def fetchrow(self, query, *args, timeout=None):
Pool performs this operation using one of its connections. Other than
that, it behaves identically to
:meth:`Connection.fetchrow() <connection.Connection.fetchrow>`.
:meth:`Connection.fetchrow() <asyncpg.connection.Connection.fetchrow>`.
.. versionadded:: 0.10.0
"""
async with self.acquire() as con:
return await con.fetchrow(query, *args, timeout=timeout)

async def copy_from_table(
self,
table_name,
*,
output,
columns=None,
schema_name=None,
timeout=None,
format=None,
oids=None,
delimiter=None,
null=None,
header=None,
quote=None,
escape=None,
force_quote=None,
encoding=None
):
"""Copy table contents to a file or file-like object.
Pool performs this operation using one of its connections. Other than
that, it behaves identically to
:meth:`Connection.copy_from_table()
<asyncpg.connection.Connection.copy_from_table>`.
.. versionadded:: 0.22.0
"""
async with self.acquire() as con:
return await con.copy_from_table(
table_name,
output=output,
columns=columns,
schema_name=schema_name,
timeout=timeout,
format=format,
oids=oids,
delimiter=delimiter,
null=null,
header=header,
quote=quote,
escape=escape,
force_quote=force_quote,
encoding=encoding
)

async def copy_from_query(
self,
query,
*args,
output,
timeout=None,
format=None,
oids=None,
delimiter=None,
null=None,
header=None,
quote=None,
escape=None,
force_quote=None,
encoding=None
):
"""Copy the results of a query to a file or file-like object.
Pool performs this operation using one of its connections. Other than
that, it behaves identically to
:meth:`Connection.copy_from_query()
<asyncpg.connection.Connection.copy_from_query>`.
.. versionadded:: 0.22.0
"""
async with self.acquire() as con:
return await con.copy_from_query(
query,
*args,
output=output,
timeout=timeout,
format=format,
oids=oids,
delimiter=delimiter,
null=null,
header=header,
quote=quote,
escape=escape,
force_quote=force_quote,
encoding=encoding
)

async def copy_to_table(
self,
table_name,
*,
source,
columns=None,
schema_name=None,
timeout=None,
format=None,
oids=None,
freeze=None,
delimiter=None,
null=None,
header=None,
quote=None,
escape=None,
force_quote=None,
force_not_null=None,
force_null=None,
encoding=None
):
"""Copy data to the specified table.
Pool performs this operation using one of its connections. Other than
that, it behaves identically to
:meth:`Connection.copy_to_table()
<asyncpg.connection.Connection.copy_to_table>`.
.. versionadded:: 0.22.0
"""
async with self.acquire() as con:
return await con.copy_to_table(
table_name,
source=source,
columns=columns,
schema_name=schema_name,
timeout=timeout,
format=format,
oids=oids,
freeze=freeze,
delimiter=delimiter,
null=null,
header=header,
quote=quote,
escape=escape,
force_quote=force_quote,
force_not_null=force_not_null,
force_null=force_null,
encoding=encoding
)

async def copy_records_to_table(
self,
table_name,
*,
records,
columns=None,
schema_name=None,
timeout=None
):
"""Copy a list of records to the specified table using binary COPY.
Pool performs this operation using one of its connections. Other than
that, it behaves identically to
:meth:`Connection.copy_records_to_table()
<asyncpg.connection.Connection.copy_records_to_table>`.
.. versionadded:: 0.22.0
"""
async with self.acquire() as con:
return await con.copy_to_table(
table_name,
records=records,
columns=columns,
schema_name=schema_name,
timeout=timeout
)

def acquire(self, *, timeout=None):
"""Acquire a database connection from the pool.
Expand Down Expand Up @@ -844,12 +1011,12 @@ def create_pool(dsn=None, *,
.. warning::
Prepared statements and cursors returned by
:meth:`Connection.prepare() <connection.Connection.prepare>` and
:meth:`Connection.cursor() <connection.Connection.cursor>` become
invalid once the connection is released. Likewise, all notification
and log listeners are removed, and ``asyncpg`` will issue a warning
if there are any listener callbacks registered on a connection that
is being released to the pool.
:meth:`Connection.prepare() <asyncpg.connection.Connection.prepare>`
and :meth:`Connection.cursor() <asyncpg.connection.Connection.cursor>`
become invalid once the connection is released. Likewise, all
notification and log listeners are removed, and ``asyncpg`` will
issue a warning if there are any listener callbacks registered on a
connection that is being released to the pool.
:param str dsn:
Connection arguments specified using as a single string in
Expand Down Expand Up @@ -915,10 +1082,11 @@ def create_pool(dsn=None, *,
.. versionchanged:: 0.13.0
An :exc:`~asyncpg.exceptions.InterfaceWarning` will be produced
if there are any active listeners (added via
:meth:`Connection.add_listener() <connection.Connection.add_listener>`
:meth:`Connection.add_listener()
<asyncpg.connection.Connection.add_listener>`
or :meth:`Connection.add_log_listener()
<connection.Connection.add_log_listener>`) present on the connection
at the moment of its release to the pool.
<asyncpg.connection.Connection.add_log_listener>`) present on the
connection at the moment of its release to the pool.
.. versionchanged:: 0.22.0
Added the *record_class* parameter.
Expand Down

0 comments on commit 659904a

Please sign in to comment.