Skip to content

Commit

Permalink
SQLite cache implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
kesmit13 committed Nov 14, 2023
1 parent ae86e44 commit ceba7a5
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 6 deletions.
4 changes: 4 additions & 0 deletions singlestoredb/fusion/cache/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/usr/bin/env python3
from .sqlite import connect # noqa
from .sqlite import invalidate # noqa
from .sqlite import update # noqa
214 changes: 214 additions & 0 deletions singlestoredb/fusion/cache/sqlite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
#!/usr/bin/env python3
import datetime
import json
import sqlite3
from sqlite3 import apilevel # noqa
from sqlite3 import DatabaseError # noqa
from sqlite3 import DataError # noqa
from sqlite3 import Error # noqa
from sqlite3 import IntegrityError # noqa
from sqlite3 import InterfaceError # noqa
from sqlite3 import InternalError # noqa
from sqlite3 import NotSupportedError # noqa
from sqlite3 import OperationalError # noqa
from sqlite3 import paramstyle # noqa
from sqlite3 import ProgrammingError # noqa
from sqlite3 import threadsafety # noqa
from sqlite3 import Warning # noqa
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple

CACHE_NAME = 'file:fusion?mode=memory&cache=shared'
SCHEMA = r'''
CREATE TABLE fusion.Regions (
regionID TEXT PRIMARY KEY NOT NULL,
region TEXT NOT NULL,
provider TEXT NOT NULL
);
CREATE TABLE fusion.WorkspaceGroups (
workspaceGroupID TEXT PRIMARY KEY NOT NULL,
name TEXT NOT NULL,
regionID TEXT NOT NULL,
state TEXT NOT NULL,
createdAt DATETIME NOT NULL,
firewallRanges JSON,
expiresAt DATETIME,
terminatedAt DATETIME,
allowAllTraffic BOOL,
updateWindow JSON
);
CREATE TABLE fusion.Workspaces (
workspaceID TEXT PRIMARY KEY NOT NULL,
name TEXT NOT NULL,
workspaceGroupID TEXT NOT NULL,
state TEXT NOT NULL,
size TEXT NOT NULL,
createdAt TEXT NOT NULL,
endpoint TEXT,
lastResumedAt TEXT,
terminatedAt TEXT,
scalingProgress INTEGER
);
'''
CACHE_TIMEOUTS = dict(
Regions=datetime.timedelta(hours=12),
WorkspaceGroups=datetime.timedelta(hours=1),
Workspaces=datetime.timedelta(minutes=1),
)


def adapt_date_iso(val: Optional[datetime.date]) -> Optional[str]:
"""Adapt datetime.date to ISO 8601 date."""
if val is None:
return None
return val.isoformat()


def adapt_datetime_iso(val: Optional[datetime.datetime]) -> Optional[str]:
"""Adapt datetime.datetime to timezone-naive ISO 8601 date."""
if val is None:
return None
return val.isoformat()


def adapt_datetime_epoch(val: Optional[datetime.datetime]) -> Optional[int]:
"""Adapt datetime.datetime to Unix timestamp."""
if val is None:
return None
return int(val.timestamp())


def adapt_json(val: Optional[Any]) -> Optional[str]:
"""Adapt JSON to TEXT."""
if val is None:
return None
return json.dumps(val)


def adapt_bool(val: Optional[Any]) -> Optional[bool]:
"""Adapt any to bool."""
if val is None:
return None
return bool(val)


sqlite3.register_adapter(datetime.date, adapt_date_iso)
sqlite3.register_adapter(datetime.date, adapt_date_iso)
sqlite3.register_adapter(datetime.datetime, adapt_datetime_iso)
sqlite3.register_adapter(datetime.datetime, adapt_datetime_epoch)
sqlite3.register_adapter(bool, adapt_bool)
sqlite3.register_adapter(list, adapt_json)
sqlite3.register_adapter(dict, adapt_json)


def convert_date(val: Optional[bytes]) -> Optional[datetime.date]:
"""Convert ISO 8601 date to datetime.date object."""
if val is None:
return None
return datetime.date.fromisoformat(val.decode())


def convert_datetime(val: Optional[bytes]) -> Optional[datetime.datetime]:
"""Convert ISO 8601 datetime to datetime.datetime object."""
if val is None:
return None
return datetime.datetime.fromisoformat(val.decode())


def convert_timestamp(val: Optional[bytes]) -> Optional[datetime.datetime]:
"""Convert Unix epoch timestamp to datetime.datetime object."""
if val is None:
return None
return datetime.datetime.fromtimestamp(int(val))


def convert_json(val: Optional[bytes]) -> Optional[Any]:
"""Convert JSON text to object."""
if val is None:
return None
return json.loads(val)


sqlite3.register_converter('date', convert_date)
sqlite3.register_converter('datetime', convert_datetime)
sqlite3.register_converter('timestamp', convert_timestamp)
sqlite3.register_converter('json', convert_json)


def dict_factory(cursor: Any, row: Tuple[Any, ...]) -> Dict[str, Any]:
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d


def connect() -> sqlite3.Connection:
conn = sqlite3.connect(':memory:', uri=True, detect_types=sqlite3.PARSE_DECLTYPES)
conn.cursor().execute(f'ATTACH "{CACHE_NAME}" AS fusion')
return conn


def invalidate(table: str) -> None:
_last_modified_time.pop(table, None)


def update(table: str, func: Callable[[], Any]) -> List[Dict[str, Any]]:
with connect() as conn:

# If we are within the cache timeout, return the current results
if table in _last_modified_time:
if datetime.datetime.now() - _last_modified_time[table] \
< CACHE_TIMEOUTS[table]:
conn.row_factory = dict_factory
return list(conn.cursor().execute(f'SELECT * FROM fusion.{table}'))

cur = conn.cursor()

# Get column names and primary key
columns = _table_fields.get(table)
if columns is None:
columns = [x[1] for x in cur.execute(f'PRAGMA fusion.table_info({table})')]
_table_fields[table] = columns

# Build query components
values = func()
fields = []
field_subs = []
conflict = []
for k, v in values[0].items():
if k not in columns:
continue
fields.append(k)
field_subs.append(f':{k}')
conflict.append(f'{k}=excluded.{k}')

query = f'INSERT INTO fusion.{table}({", ".join(fields)}) ' \
f' VALUES ({", ".join(field_subs)}) ' \
f' ON CONFLICT({columns[0]}) DO UPDATE SET {", ".join(conflict)}' \

_last_modified_time[table] = datetime.datetime.now()

cur.executemany(query, values)

return values


conn = sqlite3.connect(':memory:')
cur = conn.cursor()
cur.execute(f'ATTACH "{CACHE_NAME}" AS fusion')
cur.executescript(SCHEMA)

# Make sure tha database stays for the length of the process
_main_connection = dict(fusion=conn)

# Last modified times of tables
_last_modified_time: Dict[str, datetime.datetime] = {}

# Table fields cache
_table_fields: Dict[str, List[str]] = {}
34 changes: 28 additions & 6 deletions singlestoredb/management/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -1009,16 +1009,19 @@ def terminate(
If timeout is reached
"""
from ..fusion import cache
if self._manager is None:
raise ManagementError(
msg='No workspace manager is associated with this object.',
)
self._manager._delete(f'workspaces/{self.id}')
cache.invalidate('Workspaces')
if wait_on_terminated:
self._manager._wait_on_state(
self._manager.get_workspace(self.id),
'Terminated', interval=wait_interval, timeout=wait_timeout,
)
cache.invalidate('Workspaces')
self.refresh()

def connect(self, **kwargs: Any) -> connection.Connection:
Expand Down Expand Up @@ -1169,6 +1172,7 @@ def update(
List of allowed incoming IP addresses
"""
from ..fusion import cache
if self._manager is None:
raise ManagementError(
msg='No workspace manager is associated with this object.',
Expand All @@ -1180,6 +1184,7 @@ def update(
).items() if v is not None
}
self._manager._patch(f'workspaceGroups/{self.id}', json=data)
cache.invalidate('WorkspaceGroups')
self.refresh()

def terminate(
Expand Down Expand Up @@ -1208,11 +1213,13 @@ def terminate(
If timeout is reached
"""
from ..fusion import cache
if self._manager is None:
raise ManagementError(
msg='No workspace manager is associated with this object.',
)
self._manager._delete(f'workspaceGroups/{self.id}', params=dict(force=force))
cache.invalidate('WorkspaceGroups')
if wait_on_terminated:
while True:
self.refresh()
Expand All @@ -1224,6 +1231,7 @@ def terminate(
)
time.sleep(wait_interval)
wait_timeout -= wait_interval
cache.invalidate('WorkspaceGroups')

def create_workspace(
self, name: str, size: Optional[str] = None,
Expand Down Expand Up @@ -1264,13 +1272,20 @@ def create_workspace(
@property
def workspaces(self) -> NamedList[Workspace]:
"""Return a list of available workspaces."""
from ..fusion import cache
if self._manager is None:
raise ManagementError(
msg='No workspace manager is associated with this object.',
)
res = self._manager._get('workspaces', params=dict(workspaceGroupID=self.id))
res = cache.update(
'Workspaces',
lambda: self._manager._get( # type: ignore
'workspaces',
params=dict(workspaceGroupID=self.id),
).json(),
)
return NamedList(
[Workspace.from_dict(item, self._manager) for item in res.json()],
[Workspace.from_dict(item, self._manager) for item in res],
)


Expand Down Expand Up @@ -1378,8 +1393,9 @@ class WorkspaceManager(Manager):
@ property
def workspace_groups(self) -> NamedList[WorkspaceGroup]:
"""Return a list of available workspace groups."""
res = self._get('workspaceGroups')
return NamedList([WorkspaceGroup.from_dict(item, self) for item in res.json()])
from ..fusion import cache
res = cache.update('WorkspaceGroups', lambda: self._get('workspaceGroups').json())
return NamedList([WorkspaceGroup.from_dict(item, self) for item in res])

@ property
def organizations(self) -> Organizations:
Expand All @@ -1394,8 +1410,9 @@ def billing(self) -> Billing:
@ property
def regions(self) -> NamedList[Region]:
"""Return a list of available regions."""
res = self._get('regions')
return NamedList([Region.from_dict(item, self) for item in res.json()])
from ..fusion import cache
res = cache.update('Regions', lambda: self._get('regions').json())
return NamedList([Region.from_dict(item, self) for item in res])

def create_workspace_group(
self, name: str, region: Union[str, Region],
Expand Down Expand Up @@ -1436,6 +1453,7 @@ def create_workspace_group(
:class:`WorkspaceGroup`
"""
from ..fusion import cache
if isinstance(region, Region):
region = region.id
res = self._post(
Expand All @@ -1448,6 +1466,7 @@ def create_workspace_group(
updateWindow=update_window,
),
)
cache.invalidate('WorkspaceGroups')
return self.get_workspace_group(res.json()['workspaceGroupID'])

def create_workspace(
Expand Down Expand Up @@ -1479,6 +1498,7 @@ def create_workspace(
:class:`Workspace`
"""
from ..fusion import cache
if isinstance(workspace_group, WorkspaceGroup):
workspace_group = workspace_group.id
res = self._post(
Expand All @@ -1487,12 +1507,14 @@ def create_workspace(
size=size,
),
)
cache.invalidate('Workspaces')
out = self.get_workspace(res.json()['workspaceID'])
if wait_on_active:
out = self._wait_on_state(
out, 'Active', interval=wait_interval,
timeout=wait_timeout,
)
cache.invalidate('Workspaces')
return out

def get_workspace_group(self, id: str) -> WorkspaceGroup:
Expand Down

0 comments on commit ceba7a5

Please sign in to comment.