diff --git a/singlestoredb/fusion/cache/__init__.py b/singlestoredb/fusion/cache/__init__.py new file mode 100644 index 00000000..ff7a52b7 --- /dev/null +++ b/singlestoredb/fusion/cache/__init__.py @@ -0,0 +1,4 @@ +#!/usr/bin/env python3 +from .sqlite import connect # noqa +from .sqlite import invalidate # noqa +from .sqlite import update # noqa diff --git a/singlestoredb/fusion/cache/sqlite.py b/singlestoredb/fusion/cache/sqlite.py new file mode 100644 index 00000000..80b4bbae --- /dev/null +++ b/singlestoredb/fusion/cache/sqlite.py @@ -0,0 +1,214 @@ +#!/usr/bin/env python3 +import datetime +import json +import sqlite3 +from sqlite3 import apilevel # noqa +from sqlite3 import DatabaseError # noqa +from sqlite3 import DataError # noqa +from sqlite3 import Error # noqa +from sqlite3 import IntegrityError # noqa +from sqlite3 import InterfaceError # noqa +from sqlite3 import InternalError # noqa +from sqlite3 import NotSupportedError # noqa +from sqlite3 import OperationalError # noqa +from sqlite3 import paramstyle # noqa +from sqlite3 import ProgrammingError # noqa +from sqlite3 import threadsafety # noqa +from sqlite3 import Warning # noqa +from typing import Any +from typing import Callable +from typing import Dict +from typing import List +from typing import Optional +from typing import Tuple + +CACHE_NAME = 'file:fusion?mode=memory&cache=shared' +SCHEMA = r''' + CREATE TABLE fusion.Regions ( + regionID TEXT PRIMARY KEY NOT NULL, + region TEXT NOT NULL, + provider TEXT NOT NULL + ); + + CREATE TABLE fusion.WorkspaceGroups ( + workspaceGroupID TEXT PRIMARY KEY NOT NULL, + name TEXT NOT NULL, + regionID TEXT NOT NULL, + state TEXT NOT NULL, + createdAt DATETIME NOT NULL, + firewallRanges JSON, + expiresAt DATETIME, + terminatedAt DATETIME, + allowAllTraffic BOOL, + updateWindow JSON + ); + + CREATE TABLE fusion.Workspaces ( + workspaceID TEXT PRIMARY KEY NOT NULL, + name TEXT NOT NULL, + workspaceGroupID TEXT NOT NULL, + state TEXT NOT NULL, + size TEXT NOT NULL, + createdAt TEXT NOT NULL, + endpoint TEXT, + lastResumedAt TEXT, + terminatedAt TEXT, + scalingProgress INTEGER + ); +''' +CACHE_TIMEOUTS = dict( + Regions=datetime.timedelta(hours=12), + WorkspaceGroups=datetime.timedelta(hours=1), + Workspaces=datetime.timedelta(minutes=1), +) + + +def adapt_date_iso(val: Optional[datetime.date]) -> Optional[str]: + """Adapt datetime.date to ISO 8601 date.""" + if val is None: + return None + return val.isoformat() + + +def adapt_datetime_iso(val: Optional[datetime.datetime]) -> Optional[str]: + """Adapt datetime.datetime to timezone-naive ISO 8601 date.""" + if val is None: + return None + return val.isoformat() + + +def adapt_datetime_epoch(val: Optional[datetime.datetime]) -> Optional[int]: + """Adapt datetime.datetime to Unix timestamp.""" + if val is None: + return None + return int(val.timestamp()) + + +def adapt_json(val: Optional[Any]) -> Optional[str]: + """Adapt JSON to TEXT.""" + if val is None: + return None + return json.dumps(val) + + +def adapt_bool(val: Optional[Any]) -> Optional[bool]: + """Adapt any to bool.""" + if val is None: + return None + return bool(val) + + +sqlite3.register_adapter(datetime.date, adapt_date_iso) +sqlite3.register_adapter(datetime.date, adapt_date_iso) +sqlite3.register_adapter(datetime.datetime, adapt_datetime_iso) +sqlite3.register_adapter(datetime.datetime, adapt_datetime_epoch) +sqlite3.register_adapter(bool, adapt_bool) +sqlite3.register_adapter(list, adapt_json) +sqlite3.register_adapter(dict, adapt_json) + + +def convert_date(val: Optional[bytes]) -> Optional[datetime.date]: + """Convert ISO 8601 date to datetime.date object.""" + if val is None: + return None + return datetime.date.fromisoformat(val.decode()) + + +def convert_datetime(val: Optional[bytes]) -> Optional[datetime.datetime]: + """Convert ISO 8601 datetime to datetime.datetime object.""" + if val is None: + return None + return datetime.datetime.fromisoformat(val.decode()) + + +def convert_timestamp(val: Optional[bytes]) -> Optional[datetime.datetime]: + """Convert Unix epoch timestamp to datetime.datetime object.""" + if val is None: + return None + return datetime.datetime.fromtimestamp(int(val)) + + +def convert_json(val: Optional[bytes]) -> Optional[Any]: + """Convert JSON text to object.""" + if val is None: + return None + return json.loads(val) + + +sqlite3.register_converter('date', convert_date) +sqlite3.register_converter('datetime', convert_datetime) +sqlite3.register_converter('timestamp', convert_timestamp) +sqlite3.register_converter('json', convert_json) + + +def dict_factory(cursor: Any, row: Tuple[Any, ...]) -> Dict[str, Any]: + d = {} + for idx, col in enumerate(cursor.description): + d[col[0]] = row[idx] + return d + + +def connect() -> sqlite3.Connection: + conn = sqlite3.connect(':memory:', uri=True, detect_types=sqlite3.PARSE_DECLTYPES) + conn.cursor().execute(f'ATTACH "{CACHE_NAME}" AS fusion') + return conn + + +def invalidate(table: str) -> None: + _last_modified_time.pop(table, None) + + +def update(table: str, func: Callable[[], Any]) -> List[Dict[str, Any]]: + with connect() as conn: + + # If we are within the cache timeout, return the current results + if table in _last_modified_time: + if datetime.datetime.now() - _last_modified_time[table] \ + < CACHE_TIMEOUTS[table]: + conn.row_factory = dict_factory + return list(conn.cursor().execute(f'SELECT * FROM fusion.{table}')) + + cur = conn.cursor() + + # Get column names and primary key + columns = _table_fields.get(table) + if columns is None: + columns = [x[1] for x in cur.execute(f'PRAGMA fusion.table_info({table})')] + _table_fields[table] = columns + + # Build query components + values = func() + fields = [] + field_subs = [] + conflict = [] + for k, v in values[0].items(): + if k not in columns: + continue + fields.append(k) + field_subs.append(f':{k}') + conflict.append(f'{k}=excluded.{k}') + + query = f'INSERT INTO fusion.{table}({", ".join(fields)}) ' \ + f' VALUES ({", ".join(field_subs)}) ' \ + f' ON CONFLICT({columns[0]}) DO UPDATE SET {", ".join(conflict)}' \ + + _last_modified_time[table] = datetime.datetime.now() + + cur.executemany(query, values) + + return values + + +conn = sqlite3.connect(':memory:') +cur = conn.cursor() +cur.execute(f'ATTACH "{CACHE_NAME}" AS fusion') +cur.executescript(SCHEMA) + +# Make sure tha database stays for the length of the process +_main_connection = dict(fusion=conn) + +# Last modified times of tables +_last_modified_time: Dict[str, datetime.datetime] = {} + +# Table fields cache +_table_fields: Dict[str, List[str]] = {} diff --git a/singlestoredb/management/workspace.py b/singlestoredb/management/workspace.py index ea73236d..97ff9f57 100644 --- a/singlestoredb/management/workspace.py +++ b/singlestoredb/management/workspace.py @@ -1009,16 +1009,19 @@ def terminate( If timeout is reached """ + from ..fusion import cache if self._manager is None: raise ManagementError( msg='No workspace manager is associated with this object.', ) self._manager._delete(f'workspaces/{self.id}') + cache.invalidate('Workspaces') if wait_on_terminated: self._manager._wait_on_state( self._manager.get_workspace(self.id), 'Terminated', interval=wait_interval, timeout=wait_timeout, ) + cache.invalidate('Workspaces') self.refresh() def connect(self, **kwargs: Any) -> connection.Connection: @@ -1169,6 +1172,7 @@ def update( List of allowed incoming IP addresses """ + from ..fusion import cache if self._manager is None: raise ManagementError( msg='No workspace manager is associated with this object.', @@ -1180,6 +1184,7 @@ def update( ).items() if v is not None } self._manager._patch(f'workspaceGroups/{self.id}', json=data) + cache.invalidate('WorkspaceGroups') self.refresh() def terminate( @@ -1208,11 +1213,13 @@ def terminate( If timeout is reached """ + from ..fusion import cache if self._manager is None: raise ManagementError( msg='No workspace manager is associated with this object.', ) self._manager._delete(f'workspaceGroups/{self.id}', params=dict(force=force)) + cache.invalidate('WorkspaceGroups') if wait_on_terminated: while True: self.refresh() @@ -1224,6 +1231,7 @@ def terminate( ) time.sleep(wait_interval) wait_timeout -= wait_interval + cache.invalidate('WorkspaceGroups') def create_workspace( self, name: str, size: Optional[str] = None, @@ -1264,13 +1272,20 @@ def create_workspace( @property def workspaces(self) -> NamedList[Workspace]: """Return a list of available workspaces.""" + from ..fusion import cache if self._manager is None: raise ManagementError( msg='No workspace manager is associated with this object.', ) - res = self._manager._get('workspaces', params=dict(workspaceGroupID=self.id)) + res = cache.update( + 'Workspaces', + lambda: self._manager._get( # type: ignore + 'workspaces', + params=dict(workspaceGroupID=self.id), + ).json(), + ) return NamedList( - [Workspace.from_dict(item, self._manager) for item in res.json()], + [Workspace.from_dict(item, self._manager) for item in res], ) @@ -1378,8 +1393,9 @@ class WorkspaceManager(Manager): @ property def workspace_groups(self) -> NamedList[WorkspaceGroup]: """Return a list of available workspace groups.""" - res = self._get('workspaceGroups') - return NamedList([WorkspaceGroup.from_dict(item, self) for item in res.json()]) + from ..fusion import cache + res = cache.update('WorkspaceGroups', lambda: self._get('workspaceGroups').json()) + return NamedList([WorkspaceGroup.from_dict(item, self) for item in res]) @ property def organizations(self) -> Organizations: @@ -1394,8 +1410,9 @@ def billing(self) -> Billing: @ property def regions(self) -> NamedList[Region]: """Return a list of available regions.""" - res = self._get('regions') - return NamedList([Region.from_dict(item, self) for item in res.json()]) + from ..fusion import cache + res = cache.update('Regions', lambda: self._get('regions').json()) + return NamedList([Region.from_dict(item, self) for item in res]) def create_workspace_group( self, name: str, region: Union[str, Region], @@ -1436,6 +1453,7 @@ def create_workspace_group( :class:`WorkspaceGroup` """ + from ..fusion import cache if isinstance(region, Region): region = region.id res = self._post( @@ -1448,6 +1466,7 @@ def create_workspace_group( updateWindow=update_window, ), ) + cache.invalidate('WorkspaceGroups') return self.get_workspace_group(res.json()['workspaceGroupID']) def create_workspace( @@ -1479,6 +1498,7 @@ def create_workspace( :class:`Workspace` """ + from ..fusion import cache if isinstance(workspace_group, WorkspaceGroup): workspace_group = workspace_group.id res = self._post( @@ -1487,12 +1507,14 @@ def create_workspace( size=size, ), ) + cache.invalidate('Workspaces') out = self.get_workspace(res.json()['workspaceID']) if wait_on_active: out = self._wait_on_state( out, 'Active', interval=wait_interval, timeout=wait_timeout, ) + cache.invalidate('Workspaces') return out def get_workspace_group(self, id: str) -> WorkspaceGroup: