Skip to content

Commit

Permalink
Make cache an object
Browse files Browse the repository at this point in the history
  • Loading branch information
kesmit13 committed Nov 22, 2023
1 parent ceba7a5 commit 690016c
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 85 deletions.
4 changes: 1 addition & 3 deletions singlestoredb/fusion/cache/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,2 @@
#!/usr/bin/env python3
from .sqlite import connect # noqa
from .sqlite import invalidate # noqa
from .sqlite import update # noqa
from .sqlite import Cache # noqa
144 changes: 81 additions & 63 deletions singlestoredb/fusion/cache/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@

CACHE_NAME = 'file:fusion?mode=memory&cache=shared'
SCHEMA = r'''
CREATE TABLE fusion.Regions (
BEGIN;
CREATE TABLE IF NOT EXISTS fusion.Regions (
regionID TEXT PRIMARY KEY NOT NULL,
region TEXT NOT NULL,
provider TEXT NOT NULL
);
CREATE TABLE fusion.WorkspaceGroups (
CREATE TABLE IF NOT EXISTS fusion.WorkspaceGroups (
workspaceGroupID TEXT PRIMARY KEY NOT NULL,
name TEXT NOT NULL,
regionID TEXT NOT NULL,
Expand All @@ -43,7 +45,7 @@
updateWindow JSON
);
CREATE TABLE fusion.Workspaces (
CREATE TABLE IF NOT EXISTS fusion.Workspaces (
workspaceID TEXT PRIMARY KEY NOT NULL,
name TEXT NOT NULL,
workspaceGroupID TEXT NOT NULL,
Expand All @@ -55,6 +57,13 @@
terminatedAt TEXT,
scalingProgress INTEGER
);
CREATE TABLE IF NOT EXISTS fusion.FusionStatistics (
tableName TEXT PRIMARY KEY NOT NULL,
lastModifiedTime DATETIME NOT NULL
);
COMMIT;
'''
CACHE_TIMEOUTS = dict(
Regions=datetime.timedelta(hours=12),
Expand Down Expand Up @@ -98,10 +107,8 @@ def adapt_bool(val: Optional[Any]) -> Optional[bool]:
return bool(val)


sqlite3.register_adapter(datetime.date, adapt_date_iso)
sqlite3.register_adapter(datetime.date, adapt_date_iso)
sqlite3.register_adapter(datetime.datetime, adapt_datetime_iso)
sqlite3.register_adapter(datetime.datetime, adapt_datetime_epoch)
sqlite3.register_adapter(bool, adapt_bool)
sqlite3.register_adapter(list, adapt_json)
sqlite3.register_adapter(dict, adapt_json)
Expand Down Expand Up @@ -142,73 +149,84 @@ def convert_json(val: Optional[bytes]) -> Optional[Any]:


def dict_factory(cursor: Any, row: Tuple[Any, ...]) -> Dict[str, Any]:
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d


def connect() -> sqlite3.Connection:
conn = sqlite3.connect(':memory:', uri=True, detect_types=sqlite3.PARSE_DECLTYPES)
conn.cursor().execute(f'ATTACH "{CACHE_NAME}" AS fusion')
return conn

"""Return row as a dictionary."""
return {k[0]: v for k, v in zip(cursor.description, row)}

def invalidate(table: str) -> None:
_last_modified_time.pop(table, None)

class Cache(object):

def update(table: str, func: Callable[[], Any]) -> List[Dict[str, Any]]:
with connect() as conn:
table_fields: Dict[str, List[str]] = {}

# If we are within the cache timeout, return the current results
if table in _last_modified_time:
if datetime.datetime.now() - _last_modified_time[table] \
< CACHE_TIMEOUTS[table]:
conn.row_factory = dict_factory
return list(conn.cursor().execute(f'SELECT * FROM fusion.{table}'))

cur = conn.cursor()

# Get column names and primary key
columns = _table_fields.get(table)
if columns is None:
columns = [x[1] for x in cur.execute(f'PRAGMA fusion.table_info({table})')]
_table_fields[table] = columns
def __init__(self) -> None:
self.connection = self.connect()
cur = self.connection.cursor()
cur.executescript(SCHEMA)

# Build query components
values = func()
fields = []
field_subs = []
conflict = []
for k, v in values[0].items():
if k not in columns:
continue
fields.append(k)
field_subs.append(f':{k}')
conflict.append(f'{k}=excluded.{k}')
# Cache table fields
if not self.table_fields:
cur.execute('SELECT name FROM fusion.sqlite_master WHERE type = "table"')
for table in cur.fetchall():
info = cur.execute(f'PRAGMA fusion.table_info({table[0]})').fetchall()
self.table_fields[table[0]] = info

query = f'INSERT INTO fusion.{table}({", ".join(fields)}) ' \
f' VALUES ({", ".join(field_subs)}) ' \
f' ON CONFLICT({columns[0]}) DO UPDATE SET {", ".join(conflict)}' \
def connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(':memory:', uri=True, detect_types=sqlite3.PARSE_DECLTYPES)
conn.cursor().execute(f'ATTACH "{CACHE_NAME}" AS fusion')
return conn

_last_modified_time[table] = datetime.datetime.now()
def invalidate(self, table: str) -> None:
with self.connect() as conn:
cur = conn.cursor()
cur.execute('DELETE FROM fusion.FusionStatistics WHERE tableName = "{table}"')

cur.executemany(query, values)
def update(self, table: str, func: Callable[[], Any]) -> List[Dict[str, Any]]:
with self.connect() as conn:

return values
cur = conn.cursor()

stats = cur.execute(
'SELECT tableName, lastModifiedTime FROM fusion.FusionStatistics '
f'where tableName = "{table}"',
).fetchall()

conn = sqlite3.connect(':memory:')
cur = conn.cursor()
cur.execute(f'ATTACH "{CACHE_NAME}" AS fusion')
cur.executescript(SCHEMA)

# Make sure tha database stays for the length of the process
_main_connection = dict(fusion=conn)

# Last modified times of tables
_last_modified_time: Dict[str, datetime.datetime] = {}
# If we are within the cache timeout, return the current results
if stats and (datetime.datetime.now() - stats[0][1]) < CACHE_TIMEOUTS[table]:
conn.row_factory = dict_factory
return list(conn.cursor().execute(f'SELECT * FROM fusion.{table}'))

# Table fields cache
_table_fields: Dict[str, List[str]] = {}
# Build query components
columns = [x[1] for x in self.table_fields[table]]
values = func()
fields = []
field_subs = []
conflict = []
for k, v in values[0].items():
if k not in columns:
continue
fields.append(k)
field_subs.append(f':{k}')
conflict.append(f'{k}=excluded.{k}')

try:
cur.execute('BEGIN')

# Insert the new value
query = f'INSERT INTO fusion.{table}({", ".join(fields)}) ' \
f' VALUES({", ".join(field_subs)}) ' \
f' ON CONFLICT({columns[0]}) DO UPDATE SET {", ".join(conflict)}'
cur.executemany(query, values)

# Update the last modified time
query = 'INSERT INTO fusion.FusionStatistics ' \
' (tableName, lastModifiedTime) ' \
' VALUES(:tableName, :lastModifiedTime) ' \
' ON CONFLICT(tableName) DO UPDATE SET ' \
' lastModifiedTime=excluded.lastModifiedTime'
cur.execute(query, (table, datetime.datetime.now()))

cur.execute('COMMIT')

except Exception:
cur.execute('ROLLBACK')

return values
3 changes: 3 additions & 0 deletions singlestoredb/management/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ def __init__(
) + '/'
self._params: Dict[str, str] = {}

from ..fusion.cache import Cache
self.cache = Cache()

def _check(
self, res: requests.Response, url: str, params: Dict[str, Any],
) -> requests.Response:
Expand Down
33 changes: 14 additions & 19 deletions singlestoredb/management/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -1009,19 +1009,18 @@ def terminate(
If timeout is reached
"""
from ..fusion import cache
if self._manager is None:
raise ManagementError(
msg='No workspace manager is associated with this object.',
)
self._manager._delete(f'workspaces/{self.id}')
cache.invalidate('Workspaces')
self._manager.cache.invalidate('Workspaces')
if wait_on_terminated:
self._manager._wait_on_state(
self._manager.get_workspace(self.id),
'Terminated', interval=wait_interval, timeout=wait_timeout,
)
cache.invalidate('Workspaces')
self._manager.cache.invalidate('Workspaces')
self.refresh()

def connect(self, **kwargs: Any) -> connection.Connection:
Expand Down Expand Up @@ -1172,7 +1171,6 @@ def update(
List of allowed incoming IP addresses
"""
from ..fusion import cache
if self._manager is None:
raise ManagementError(
msg='No workspace manager is associated with this object.',
Expand All @@ -1184,7 +1182,7 @@ def update(
).items() if v is not None
}
self._manager._patch(f'workspaceGroups/{self.id}', json=data)
cache.invalidate('WorkspaceGroups')
self._manager.cache.invalidate('WorkspaceGroups')
self.refresh()

def terminate(
Expand Down Expand Up @@ -1213,13 +1211,12 @@ def terminate(
If timeout is reached
"""
from ..fusion import cache
if self._manager is None:
raise ManagementError(
msg='No workspace manager is associated with this object.',
)
self._manager._delete(f'workspaceGroups/{self.id}', params=dict(force=force))
cache.invalidate('WorkspaceGroups')
self._manager.cache.invalidate('WorkspaceGroups')
if wait_on_terminated:
while True:
self.refresh()
Expand All @@ -1231,7 +1228,7 @@ def terminate(
)
time.sleep(wait_interval)
wait_timeout -= wait_interval
cache.invalidate('WorkspaceGroups')
self._manager.cache.invalidate('WorkspaceGroups')

def create_workspace(
self, name: str, size: Optional[str] = None,
Expand Down Expand Up @@ -1272,12 +1269,11 @@ def create_workspace(
@property
def workspaces(self) -> NamedList[Workspace]:
"""Return a list of available workspaces."""
from ..fusion import cache
if self._manager is None:
raise ManagementError(
msg='No workspace manager is associated with this object.',
)
res = cache.update(
res = self._manager.cache.update(
'Workspaces',
lambda: self._manager._get( # type: ignore
'workspaces',
Expand Down Expand Up @@ -1393,8 +1389,10 @@ class WorkspaceManager(Manager):
@ property
def workspace_groups(self) -> NamedList[WorkspaceGroup]:
"""Return a list of available workspace groups."""
from ..fusion import cache
res = cache.update('WorkspaceGroups', lambda: self._get('workspaceGroups').json())
res = self.cache.update(
'WorkspaceGroups',
lambda: self._get('workspaceGroups').json(),
)
return NamedList([WorkspaceGroup.from_dict(item, self) for item in res])

@ property
Expand All @@ -1410,8 +1408,7 @@ def billing(self) -> Billing:
@ property
def regions(self) -> NamedList[Region]:
"""Return a list of available regions."""
from ..fusion import cache
res = cache.update('Regions', lambda: self._get('regions').json())
res = self.cache.update('Regions', lambda: self._get('regions').json())
return NamedList([Region.from_dict(item, self) for item in res])

def create_workspace_group(
Expand Down Expand Up @@ -1453,7 +1450,6 @@ def create_workspace_group(
:class:`WorkspaceGroup`
"""
from ..fusion import cache
if isinstance(region, Region):
region = region.id
res = self._post(
Expand All @@ -1466,7 +1462,7 @@ def create_workspace_group(
updateWindow=update_window,
),
)
cache.invalidate('WorkspaceGroups')
self.cache.invalidate('WorkspaceGroups')
return self.get_workspace_group(res.json()['workspaceGroupID'])

def create_workspace(
Expand Down Expand Up @@ -1498,7 +1494,6 @@ def create_workspace(
:class:`Workspace`
"""
from ..fusion import cache
if isinstance(workspace_group, WorkspaceGroup):
workspace_group = workspace_group.id
res = self._post(
Expand All @@ -1507,14 +1502,14 @@ def create_workspace(
size=size,
),
)
cache.invalidate('Workspaces')
self.cache.invalidate('Workspaces')
out = self.get_workspace(res.json()['workspaceID'])
if wait_on_active:
out = self._wait_on_state(
out, 'Active', interval=wait_interval,
timeout=wait_timeout,
)
cache.invalidate('Workspaces')
self.cache.invalidate('Workspaces')
return out

def get_workspace_group(self, id: str) -> WorkspaceGroup:
Expand Down

0 comments on commit 690016c

Please sign in to comment.