From 65badcd21c740784344f2dd704827ed3031e4d1b Mon Sep 17 00:00:00 2001 From: vsemenov Date: Mon, 21 May 2018 08:33:24 +0300 Subject: [PATCH 1/4] ability to execute precompiled sqlalchemy queries --- CHANGES.txt | 6 ++++++ aiomysql/__init__.py | 2 +- aiomysql/sa/connection.py | 16 +++++++++++++--- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index c7ef84ac..a0ff186b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,12 @@ Changes ------- +0.0.16 (2018-05-21) +^^^^^^^^^^^^^^^^^^^ + +* Added ability to execute precompiled sqlalchemy queries + + 0.0.15 (2018-05-20) ^^^^^^^^^^^^^^^^^^^ diff --git a/aiomysql/__init__.py b/aiomysql/__init__.py index 95321cec..89118e6f 100644 --- a/aiomysql/__init__.py +++ b/aiomysql/__init__.py @@ -33,7 +33,7 @@ from .cursors import Cursor, SSCursor, DictCursor, SSDictCursor from .pool import create_pool, Pool -__version__ = '0.0.15' +__version__ = '0.0.16' __all__ = [ diff --git a/aiomysql/sa/connection.py b/aiomysql/sa/connection.py index d8bca3e9..1d5133ac 100644 --- a/aiomysql/sa/connection.py +++ b/aiomysql/sa/connection.py @@ -3,6 +3,7 @@ import weakref from sqlalchemy.sql import ClauseElement +from sqlalchemy.sql.compiler import SQLCompiler from sqlalchemy.sql.dml import UpdateBase from sqlalchemy.sql.ddl import DDLElement @@ -23,10 +24,15 @@ def __init__(self, connection, engine): self._engine = engine self._dialect = engine.dialect + @property + def engine(self): + return self._engine + def execute(self, query, *multiparams, **params): """Executes a SQL query with optional parameters. - query - a SQL query string or any sqlalchemy expression. + query - a SQL query string or any sqlalchemy expression + (optionally it could be compiled). *multiparams/**params - represent bound parameter values to be used in the execution. Typically, the format is a dictionary @@ -73,10 +79,14 @@ async def _execute(self, query, *multiparams, **params): result_map = None + compiled = None + if isinstance(query, SQLCompiler): + compiled = query + if isinstance(query, str): await cursor.execute(query, dp or None) - elif isinstance(query, ClauseElement): - compiled = query.compile(dialect=self._dialect) + elif compiled or isinstance(query, ClauseElement): + compiled = compiled or query.compile(dialect=self._dialect) # parameters = compiled.params if not isinstance(query, DDLElement): if dp and isinstance(dp, (list, tuple)): From 6ab6ff488b8e405a4b03133bd8253c7c9a2b0070 Mon Sep 17 00:00:00 2001 From: vsemenov Date: Sat, 26 May 2018 22:48:33 +0300 Subject: [PATCH 2/4] global cache for compiled queries --- aiomysql/sa/connection.py | 34 +++---- aiomysql/sa/engine.py | 16 ++-- tests/sa/test_sa_compiled_cache.py | 138 +++++++++++++++++++++++++++++ 3 files changed, 167 insertions(+), 21 deletions(-) create mode 100644 tests/sa/test_sa_compiled_cache.py diff --git a/aiomysql/sa/connection.py b/aiomysql/sa/connection.py index 1d5133ac..7f16bb6a 100644 --- a/aiomysql/sa/connection.py +++ b/aiomysql/sa/connection.py @@ -3,7 +3,6 @@ import weakref from sqlalchemy.sql import ClauseElement -from sqlalchemy.sql.compiler import SQLCompiler from sqlalchemy.sql.dml import UpdateBase from sqlalchemy.sql.ddl import DDLElement @@ -16,23 +15,19 @@ class SAConnection: - def __init__(self, connection, engine): + def __init__(self, connection, engine, compiled_cache=None): self._connection = connection self._transaction = None self._savepoint_seq = 0 self._weak_results = weakref.WeakSet() self._engine = engine self._dialect = engine.dialect - - @property - def engine(self): - return self._engine + self._compiled_cache = compiled_cache def execute(self, query, *multiparams, **params): """Executes a SQL query with optional parameters. - query - a SQL query string or any sqlalchemy expression - (optionally it could be compiled). + query - a SQL query string or any sqlalchemy expression. *multiparams/**params - represent bound parameter values to be used in the execution. Typically, the format is a dictionary @@ -79,15 +74,24 @@ async def _execute(self, query, *multiparams, **params): result_map = None - compiled = None - if isinstance(query, SQLCompiler): - compiled = query - if isinstance(query, str): await cursor.execute(query, dp or None) - elif compiled or isinstance(query, ClauseElement): - compiled = compiled or query.compile(dialect=self._dialect) - # parameters = compiled.params + elif isinstance(query, ClauseElement): + if self._compiled_cache is not None: + key = (self._dialect, query) + compiled = self._compiled_cache.get(key) + if not compiled: + compiled = query.compile(dialect=self._dialect) + if ( + dp and dp.keys() == compiled.params.keys() + or + not (dp or compiled.params) + ): + # we only want queries with bound params in cache + self._compiled_cache[key] = compiled + else: + compiled = query.compile(dialect=self._dialect) + if not isinstance(query, DDLElement): if dp and isinstance(dp, (list, tuple)): if isinstance(query, UpdateBase): diff --git a/aiomysql/sa/engine.py b/aiomysql/sa/engine.py index c2ccfd18..fc61121d 100644 --- a/aiomysql/sa/engine.py +++ b/aiomysql/sa/engine.py @@ -20,7 +20,8 @@ def create_engine(minsize=1, maxsize=10, loop=None, - dialect=_dialect, pool_recycle=-1, **kwargs): + dialect=_dialect, pool_recycle=-1, compiled_cache=None, + **kwargs): """A coroutine for Engine creation. Returns Engine instance with embedded connection pool. @@ -28,7 +29,8 @@ def create_engine(minsize=1, maxsize=10, loop=None, The pool has *minsize* opened connections to PostgreSQL server. """ coro = _create_engine(minsize=minsize, maxsize=maxsize, loop=loop, - dialect=dialect, pool_recycle=pool_recycle, **kwargs) + dialect=dialect, pool_recycle=pool_recycle, + compiled_cache=compiled_cache, **kwargs) compatible_cursor_classes = [Cursor] # Without provided kwarg, default is default cursor from Connection class if kwargs.get('cursorclass', Cursor) not in compatible_cursor_classes: @@ -38,7 +40,8 @@ def create_engine(minsize=1, maxsize=10, loop=None, async def _create_engine(minsize=1, maxsize=10, loop=None, - dialect=_dialect, pool_recycle=-1, **kwargs): + dialect=_dialect, pool_recycle=-1, + compiled_cache=None, **kwargs): if loop is None: loop = asyncio.get_event_loop() @@ -47,7 +50,7 @@ async def _create_engine(minsize=1, maxsize=10, loop=None, pool_recycle=pool_recycle, **kwargs) conn = await pool.acquire() try: - return Engine(dialect, pool, **kwargs) + return Engine(dialect, pool, compiled_cache=compiled_cache, **kwargs) finally: pool.release(conn) @@ -61,9 +64,10 @@ class Engine: create_engine coroutine. """ - def __init__(self, dialect, pool, **kwargs): + def __init__(self, dialect, pool, compiled_cache=None, **kwargs): self._dialect = dialect self._pool = pool + self._compiled_cache = compiled_cache self._conn_kw = kwargs @property @@ -124,7 +128,7 @@ def acquire(self): async def _acquire(self): raw = await self._pool.acquire() - conn = SAConnection(raw, self) + conn = SAConnection(raw, self, compiled_cache=self._compiled_cache) return conn def release(self, conn): diff --git a/tests/sa/test_sa_compiled_cache.py b/tests/sa/test_sa_compiled_cache.py new file mode 100644 index 00000000..905b637d --- /dev/null +++ b/tests/sa/test_sa_compiled_cache.py @@ -0,0 +1,138 @@ +import asyncio +from aiomysql import sa +from sqlalchemy import bindparam + +import os +import unittest + +from sqlalchemy import MetaData, Table, Column, Integer, String + +meta = MetaData() +tbl = Table('sa_tbl_cache_test', meta, + Column('id', Integer, nullable=False, + primary_key=True), + Column('val', String(255))) + + +class TestCompiledCache(unittest.TestCase): + def setUp(self): + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(None) + self.host = os.environ.get('MYSQL_HOST', 'localhost') + self.port = int(os.environ.get('MYSQL_PORT', 3306)) + self.user = os.environ.get('MYSQL_USER', 'root') + self.db = os.environ.get('MYSQL_DB', 'test_pymysql') + self.password = os.environ.get('MYSQL_PASSWORD', '') + self.engine = self.loop.run_until_complete(self.make_engine()) + self.loop.run_until_complete(self.start()) + + def tearDown(self): + self.engine.terminate() + self.loop.run_until_complete(self.engine.wait_closed()) + self.loop.close() + + async def make_engine(self, **kwargs): + return (await sa.create_engine(db=self.db, + user=self.user, + password=self.password, + host=self.host, + port=self.port, + loop=self.loop, + minsize=10, + **kwargs)) + + async def start(self): + async with self.engine.acquire() as conn: + tx = await conn.begin() + await conn.execute("DROP TABLE IF EXISTS " + "sa_tbl_cache_test") + await conn.execute("CREATE TABLE sa_tbl_cache_test" + "(id serial, val varchar(255))") + await conn.execute(tbl.insert().values(val='some_val_1')) + await conn.execute(tbl.insert().values(val='some_val_2')) + await conn.execute(tbl.insert().values(val='some_val_3')) + await tx.commit() + + def test_cache(self): + async def go(): + cache = dict() + engine = await self.make_engine(compiled_cache=cache) + async with engine.acquire() as conn: + # check select with params not added to cache + q = tbl.select().where(tbl.c.val == 'some_val_1') + cursor = await conn.execute(q) + row = await cursor.fetchone() + self.assertEqual('some_val_1', row.val) + self.assertEqual(0, len(cache)) + + # check select with bound params added to cache + select_by_val = tbl.select().where( + tbl.c.val == bindparam('value') + ) + cursor = await conn.execute( + select_by_val, {'value': 'some_val_3'} + ) + row = await cursor.fetchone() + self.assertEqual('some_val_3', row.val) + self.assertEqual(1, len(cache)) + + cursor = await conn.execute( + select_by_val, value='some_val_2' + ) + row = await cursor.fetchone() + self.assertEqual('some_val_2', row.val) + self.assertEqual(1, len(cache)) + + select_all = tbl.select() + cursor = await conn.execute(select_all) + rows = await cursor.fetchall() + self.assertEqual(3, len(rows)) + self.assertEqual(2, len(cache)) + + # check insert with bound params not added to cache + await conn.execute(tbl.insert().values(val='some_val_4')) + self.assertEqual(2, len(cache)) + + # check insert with bound params added to cache + q = tbl.insert().values(val=bindparam('value')) + await conn.execute(q, value='some_val_5') + self.assertEqual(3, len(cache)) + + await conn.execute(q, value='some_val_6') + self.assertEqual(3, len(cache)) + + await conn.execute(q, {'value': 'some_val_7'}) + self.assertEqual(3, len(cache)) + + cursor = await conn.execute(select_all) + rows = await cursor.fetchall() + self.assertEqual(7, len(rows)) + self.assertEqual(3, len(cache)) + + # check update with params not added to cache + q = tbl.update().where( + tbl.c.val == 'some_val_1' + ).values(val='updated_val_1') + await conn.execute(q) + self.assertEqual(3, len(cache)) + cursor = await conn.execute( + select_by_val, value='updated_val_1' + ) + row = await cursor.fetchone() + self.assertEqual('updated_val_1', row.val) + + # check update with bound params added to cache + q = tbl.update().where( + tbl.c.val == bindparam('value') + ).values(val=bindparam('update')) + await conn.execute( + q, value='some_val_2', update='updated_val_2' + ) + self.assertEqual(4, len(cache)) + cursor = await conn.execute( + select_by_val, value='updated_val_2' + ) + row = await cursor.fetchone() + self.assertEqual('updated_val_2', row.val) + + self.loop.run_until_complete(go()) From f4633aa8c989307c131b1087d50a13c1f84b09d8 Mon Sep 17 00:00:00 2001 From: vsemenov Date: Sat, 26 May 2018 23:05:03 +0300 Subject: [PATCH 3/4] update formatting --- aiomysql/sa/connection.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/aiomysql/sa/connection.py b/aiomysql/sa/connection.py index 7f16bb6a..e73192ed 100644 --- a/aiomysql/sa/connection.py +++ b/aiomysql/sa/connection.py @@ -82,11 +82,8 @@ async def _execute(self, query, *multiparams, **params): compiled = self._compiled_cache.get(key) if not compiled: compiled = query.compile(dialect=self._dialect) - if ( - dp and dp.keys() == compiled.params.keys() - or - not (dp or compiled.params) - ): + if dp and dp.keys() == compiled.params.keys() \ + or not (dp or compiled.params): # we only want queries with bound params in cache self._compiled_cache[key] = compiled else: From 784432fd8c21ef21b19981de53ad2f0cbaa937dc Mon Sep 17 00:00:00 2001 From: vsemenov Date: Sun, 3 Jun 2018 10:45:23 +0300 Subject: [PATCH 4/4] use only query as a key in compiled cache --- aiomysql/sa/connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiomysql/sa/connection.py b/aiomysql/sa/connection.py index e73192ed..d9597aef 100644 --- a/aiomysql/sa/connection.py +++ b/aiomysql/sa/connection.py @@ -78,7 +78,7 @@ async def _execute(self, query, *multiparams, **params): await cursor.execute(query, dp or None) elif isinstance(query, ClauseElement): if self._compiled_cache is not None: - key = (self._dialect, query) + key = query compiled = self._compiled_cache.get(key) if not compiled: compiled = query.compile(dialect=self._dialect)