From 690016c6ad15b32f61a998970631132162e2491f Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Wed, 22 Nov 2023 15:42:18 -0600 Subject: [PATCH] Make cache an object --- singlestoredb/fusion/cache/__init__.py | 4 +- singlestoredb/fusion/cache/sqlite.py | 144 ++++++++++++++----------- singlestoredb/management/manager.py | 3 + singlestoredb/management/workspace.py | 33 +++--- 4 files changed, 99 insertions(+), 85 deletions(-) diff --git a/singlestoredb/fusion/cache/__init__.py b/singlestoredb/fusion/cache/__init__.py index ff7a52b7..636ffb64 100644 --- a/singlestoredb/fusion/cache/__init__.py +++ b/singlestoredb/fusion/cache/__init__.py @@ -1,4 +1,2 @@ #!/usr/bin/env python3 -from .sqlite import connect # noqa -from .sqlite import invalidate # noqa -from .sqlite import update # noqa +from .sqlite import Cache # noqa diff --git a/singlestoredb/fusion/cache/sqlite.py b/singlestoredb/fusion/cache/sqlite.py index 80b4bbae..7a4f9b30 100644 --- a/singlestoredb/fusion/cache/sqlite.py +++ b/singlestoredb/fusion/cache/sqlite.py @@ -24,13 +24,15 @@ CACHE_NAME = 'file:fusion?mode=memory&cache=shared' SCHEMA = r''' - CREATE TABLE fusion.Regions ( + BEGIN; + + CREATE TABLE IF NOT EXISTS fusion.Regions ( regionID TEXT PRIMARY KEY NOT NULL, region TEXT NOT NULL, provider TEXT NOT NULL ); - CREATE TABLE fusion.WorkspaceGroups ( + CREATE TABLE IF NOT EXISTS fusion.WorkspaceGroups ( workspaceGroupID TEXT PRIMARY KEY NOT NULL, name TEXT NOT NULL, regionID TEXT NOT NULL, @@ -43,7 +45,7 @@ updateWindow JSON ); - CREATE TABLE fusion.Workspaces ( + CREATE TABLE IF NOT EXISTS fusion.Workspaces ( workspaceID TEXT PRIMARY KEY NOT NULL, name TEXT NOT NULL, workspaceGroupID TEXT NOT NULL, @@ -55,6 +57,13 @@ terminatedAt TEXT, scalingProgress INTEGER ); + + CREATE TABLE IF NOT EXISTS fusion.FusionStatistics ( + tableName TEXT PRIMARY KEY NOT NULL, + lastModifiedTime DATETIME NOT NULL + ); + + COMMIT; ''' CACHE_TIMEOUTS = dict( Regions=datetime.timedelta(hours=12), @@ -98,10 +107,8 @@ def adapt_bool(val: Optional[Any]) -> Optional[bool]: return bool(val) -sqlite3.register_adapter(datetime.date, adapt_date_iso) sqlite3.register_adapter(datetime.date, adapt_date_iso) sqlite3.register_adapter(datetime.datetime, adapt_datetime_iso) -sqlite3.register_adapter(datetime.datetime, adapt_datetime_epoch) sqlite3.register_adapter(bool, adapt_bool) sqlite3.register_adapter(list, adapt_json) sqlite3.register_adapter(dict, adapt_json) @@ -142,73 +149,84 @@ def convert_json(val: Optional[bytes]) -> Optional[Any]: def dict_factory(cursor: Any, row: Tuple[Any, ...]) -> Dict[str, Any]: - d = {} - for idx, col in enumerate(cursor.description): - d[col[0]] = row[idx] - return d - - -def connect() -> sqlite3.Connection: - conn = sqlite3.connect(':memory:', uri=True, detect_types=sqlite3.PARSE_DECLTYPES) - conn.cursor().execute(f'ATTACH "{CACHE_NAME}" AS fusion') - return conn - + """Return row as a dictionary.""" + return {k[0]: v for k, v in zip(cursor.description, row)} -def invalidate(table: str) -> None: - _last_modified_time.pop(table, None) +class Cache(object): -def update(table: str, func: Callable[[], Any]) -> List[Dict[str, Any]]: - with connect() as conn: + table_fields: Dict[str, List[str]] = {} - # If we are within the cache timeout, return the current results - if table in _last_modified_time: - if datetime.datetime.now() - _last_modified_time[table] \ - < CACHE_TIMEOUTS[table]: - conn.row_factory = dict_factory - return list(conn.cursor().execute(f'SELECT * FROM fusion.{table}')) - - cur = conn.cursor() - - # Get column names and primary key - columns = _table_fields.get(table) - if columns is None: - columns = [x[1] for x in cur.execute(f'PRAGMA fusion.table_info({table})')] - _table_fields[table] = columns + def __init__(self) -> None: + self.connection = self.connect() + cur = self.connection.cursor() + cur.executescript(SCHEMA) - # Build query components - values = func() - fields = [] - field_subs = [] - conflict = [] - for k, v in values[0].items(): - if k not in columns: - continue - fields.append(k) - field_subs.append(f':{k}') - conflict.append(f'{k}=excluded.{k}') + # Cache table fields + if not self.table_fields: + cur.execute('SELECT name FROM fusion.sqlite_master WHERE type = "table"') + for table in cur.fetchall(): + info = cur.execute(f'PRAGMA fusion.table_info({table[0]})').fetchall() + self.table_fields[table[0]] = info - query = f'INSERT INTO fusion.{table}({", ".join(fields)}) ' \ - f' VALUES ({", ".join(field_subs)}) ' \ - f' ON CONFLICT({columns[0]}) DO UPDATE SET {", ".join(conflict)}' \ + def connect(self) -> sqlite3.Connection: + conn = sqlite3.connect(':memory:', uri=True, detect_types=sqlite3.PARSE_DECLTYPES) + conn.cursor().execute(f'ATTACH "{CACHE_NAME}" AS fusion') + return conn - _last_modified_time[table] = datetime.datetime.now() + def invalidate(self, table: str) -> None: + with self.connect() as conn: + cur = conn.cursor() + cur.execute('DELETE FROM fusion.FusionStatistics WHERE tableName = "{table}"') - cur.executemany(query, values) + def update(self, table: str, func: Callable[[], Any]) -> List[Dict[str, Any]]: + with self.connect() as conn: - return values + cur = conn.cursor() + stats = cur.execute( + 'SELECT tableName, lastModifiedTime FROM fusion.FusionStatistics ' + f'where tableName = "{table}"', + ).fetchall() -conn = sqlite3.connect(':memory:') -cur = conn.cursor() -cur.execute(f'ATTACH "{CACHE_NAME}" AS fusion') -cur.executescript(SCHEMA) - -# Make sure tha database stays for the length of the process -_main_connection = dict(fusion=conn) - -# Last modified times of tables -_last_modified_time: Dict[str, datetime.datetime] = {} + # If we are within the cache timeout, return the current results + if stats and (datetime.datetime.now() - stats[0][1]) < CACHE_TIMEOUTS[table]: + conn.row_factory = dict_factory + return list(conn.cursor().execute(f'SELECT * FROM fusion.{table}')) -# Table fields cache -_table_fields: Dict[str, List[str]] = {} + # Build query components + columns = [x[1] for x in self.table_fields[table]] + values = func() + fields = [] + field_subs = [] + conflict = [] + for k, v in values[0].items(): + if k not in columns: + continue + fields.append(k) + field_subs.append(f':{k}') + conflict.append(f'{k}=excluded.{k}') + + try: + cur.execute('BEGIN') + + # Insert the new value + query = f'INSERT INTO fusion.{table}({", ".join(fields)}) ' \ + f' VALUES({", ".join(field_subs)}) ' \ + f' ON CONFLICT({columns[0]}) DO UPDATE SET {", ".join(conflict)}' + cur.executemany(query, values) + + # Update the last modified time + query = 'INSERT INTO fusion.FusionStatistics ' \ + ' (tableName, lastModifiedTime) ' \ + ' VALUES(:tableName, :lastModifiedTime) ' \ + ' ON CONFLICT(tableName) DO UPDATE SET ' \ + ' lastModifiedTime=excluded.lastModifiedTime' + cur.execute(query, (table, datetime.datetime.now())) + + cur.execute('COMMIT') + + except Exception: + cur.execute('ROLLBACK') + + return values diff --git a/singlestoredb/management/manager.py b/singlestoredb/management/manager.py index 1cfd9daf..a9173fff 100644 --- a/singlestoredb/management/manager.py +++ b/singlestoredb/management/manager.py @@ -52,6 +52,9 @@ def __init__( ) + '/' self._params: Dict[str, str] = {} + from ..fusion.cache import Cache + self.cache = Cache() + def _check( self, res: requests.Response, url: str, params: Dict[str, Any], ) -> requests.Response: diff --git a/singlestoredb/management/workspace.py b/singlestoredb/management/workspace.py index 97ff9f57..9954e74a 100644 --- a/singlestoredb/management/workspace.py +++ b/singlestoredb/management/workspace.py @@ -1009,19 +1009,18 @@ def terminate( If timeout is reached """ - from ..fusion import cache if self._manager is None: raise ManagementError( msg='No workspace manager is associated with this object.', ) self._manager._delete(f'workspaces/{self.id}') - cache.invalidate('Workspaces') + self._manager.cache.invalidate('Workspaces') if wait_on_terminated: self._manager._wait_on_state( self._manager.get_workspace(self.id), 'Terminated', interval=wait_interval, timeout=wait_timeout, ) - cache.invalidate('Workspaces') + self._manager.cache.invalidate('Workspaces') self.refresh() def connect(self, **kwargs: Any) -> connection.Connection: @@ -1172,7 +1171,6 @@ def update( List of allowed incoming IP addresses """ - from ..fusion import cache if self._manager is None: raise ManagementError( msg='No workspace manager is associated with this object.', @@ -1184,7 +1182,7 @@ def update( ).items() if v is not None } self._manager._patch(f'workspaceGroups/{self.id}', json=data) - cache.invalidate('WorkspaceGroups') + self._manager.cache.invalidate('WorkspaceGroups') self.refresh() def terminate( @@ -1213,13 +1211,12 @@ def terminate( If timeout is reached """ - from ..fusion import cache if self._manager is None: raise ManagementError( msg='No workspace manager is associated with this object.', ) self._manager._delete(f'workspaceGroups/{self.id}', params=dict(force=force)) - cache.invalidate('WorkspaceGroups') + self._manager.cache.invalidate('WorkspaceGroups') if wait_on_terminated: while True: self.refresh() @@ -1231,7 +1228,7 @@ def terminate( ) time.sleep(wait_interval) wait_timeout -= wait_interval - cache.invalidate('WorkspaceGroups') + self._manager.cache.invalidate('WorkspaceGroups') def create_workspace( self, name: str, size: Optional[str] = None, @@ -1272,12 +1269,11 @@ def create_workspace( @property def workspaces(self) -> NamedList[Workspace]: """Return a list of available workspaces.""" - from ..fusion import cache if self._manager is None: raise ManagementError( msg='No workspace manager is associated with this object.', ) - res = cache.update( + res = self._manager.cache.update( 'Workspaces', lambda: self._manager._get( # type: ignore 'workspaces', @@ -1393,8 +1389,10 @@ class WorkspaceManager(Manager): @ property def workspace_groups(self) -> NamedList[WorkspaceGroup]: """Return a list of available workspace groups.""" - from ..fusion import cache - res = cache.update('WorkspaceGroups', lambda: self._get('workspaceGroups').json()) + res = self.cache.update( + 'WorkspaceGroups', + lambda: self._get('workspaceGroups').json(), + ) return NamedList([WorkspaceGroup.from_dict(item, self) for item in res]) @ property @@ -1410,8 +1408,7 @@ def billing(self) -> Billing: @ property def regions(self) -> NamedList[Region]: """Return a list of available regions.""" - from ..fusion import cache - res = cache.update('Regions', lambda: self._get('regions').json()) + res = self.cache.update('Regions', lambda: self._get('regions').json()) return NamedList([Region.from_dict(item, self) for item in res]) def create_workspace_group( @@ -1453,7 +1450,6 @@ def create_workspace_group( :class:`WorkspaceGroup` """ - from ..fusion import cache if isinstance(region, Region): region = region.id res = self._post( @@ -1466,7 +1462,7 @@ def create_workspace_group( updateWindow=update_window, ), ) - cache.invalidate('WorkspaceGroups') + self.cache.invalidate('WorkspaceGroups') return self.get_workspace_group(res.json()['workspaceGroupID']) def create_workspace( @@ -1498,7 +1494,6 @@ def create_workspace( :class:`Workspace` """ - from ..fusion import cache if isinstance(workspace_group, WorkspaceGroup): workspace_group = workspace_group.id res = self._post( @@ -1507,14 +1502,14 @@ def create_workspace( size=size, ), ) - cache.invalidate('Workspaces') + self.cache.invalidate('Workspaces') out = self.get_workspace(res.json()['workspaceID']) if wait_on_active: out = self._wait_on_state( out, 'Active', interval=wait_interval, timeout=wait_timeout, ) - cache.invalidate('Workspaces') + self.cache.invalidate('Workspaces') return out def get_workspace_group(self, id: str) -> WorkspaceGroup: