Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change catalog / storage parameters to arbitrary JSON strings #43

Merged
merged 3 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions singlestoredb/fusion/handlers/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@
from typing import Optional

from .. import result
from ...management.export import Catalog
from ...management.export import ExportService
from ...management.export import ExportStatus
from ...management.export import Link
from ..handler import SQLHandler
from ..result import FusionSQLResult
from .utils import get_workspace_group
Expand Down Expand Up @@ -89,14 +87,14 @@ def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
wsg,
'none',
'none',
Catalog.from_config_and_creds(catalog_config, catalog_creds, wsg._manager),
Link.from_config_and_creds('S3', storage_config, storage_creds, wsg._manager),
dict(**catalog_config, **catalog_creds),
dict(**storage_config, **storage_creds),
columns=None,
).create_cluster_identity()

res = FusionSQLResult()
res.add_field('RoleARN', result.STRING)
res.set_rows([(out['roleARN'],)])
res.add_field('Identity', result.STRING)
res.set_rows([(out['identity'],)])

return res

Expand Down Expand Up @@ -191,8 +189,8 @@ def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
wsg,
from_database,
from_table,
Catalog.from_config_and_creds(catalog_config, catalog_creds, wsg._manager),
Link.from_config_and_creds('S3', storage_config, storage_creds, wsg._manager),
dict(**catalog_config, **catalog_creds),
dict(**storage_config, **storage_creds),
columns=None,
).start()

Expand Down
202 changes: 19 additions & 183 deletions singlestoredb/management/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,194 +2,36 @@
"""SingleStoreDB export service."""
from __future__ import annotations

import abc
import re
import copy
import json
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Union

from .. import ManagementError
from .utils import vars_to_str
from .workspace import WorkspaceGroup
from .workspace import WorkspaceManager


class Link(object):
"""Generic storage base class."""
scheme: str = 'unknown'

def __str__(self) -> str:
"""Return string representation."""
return vars_to_str(self)

def __repr__(self) -> str:
"""Return string representation."""
return str(self)

@abc.abstractmethod
def to_storage_info(self) -> Dict[str, Any]:
raise NotImplementedError

@classmethod
def from_config_and_creds(
cls,
scheme: str,
config: Dict[str, Any],
credentials: Dict[str, Any],
manager: 'WorkspaceManager',
) -> 'Link':
out_cls = None
for c in cls.__subclasses__():
if c.scheme == scheme.upper():
out_cls = c
break

if out_cls is None:
raise TypeError(f'No link class found for given information: {scheme}')

return out_cls.from_config_and_creds(scheme, config, credentials, manager)


class S3Link(Link):
"""S3 link."""

scheme: str = 'S3'
region: str
storage_base_url: str

def __init__(self, region: str, storage_base_url: str):
self.region = region
self.storage_base_url = storage_base_url
self._manager: Optional[WorkspaceManager] = None

def to_storage_info(self) -> Dict[str, Any]:
return dict(
storageBaseURL=self.storage_base_url,
storageRegion=self.region,
)

@classmethod
def from_config_and_creds(
cls,
scheme: str,
config: Dict[str, Any],
credentials: Dict[str, Any],
manager: 'WorkspaceManager',
) -> 'S3Link':
assert scheme.upper() == cls.scheme

params: Dict[str, Any] = {}
params.update(config)
params.update(credentials)

assert params.get('region'), 'region is required'
assert params.get('endpoint_url'), 'endpoint_url is required'

out = cls(params['region'], params['endpoint_url'])
out._manager = manager
return out


class Catalog(object):
"""Generic catalog base class."""

catalog_type: str = 'UNKNOWN'
table_format: str = 'UNKNOWN'

def __str__(self) -> str:
"""Return string representation."""
return vars_to_str(self)

def __repr__(self) -> str:
"""Return string representation."""
return str(self)

@classmethod
def from_config_and_creds(
cls,
config: Dict[str, Any],
credentials: Dict[str, Any],
manager: 'WorkspaceManager',
) -> 'Catalog':
catalog_type = config['type'].upper()
table_format = config['table_format'].upper()

out_cls = None
for c in cls.__subclasses__():
if c.catalog_type == catalog_type and c.table_format == table_format:
out_cls = c
break

if out_cls is None:
raise TypeError(f'No catalog class found for given information: {config}')

return out_cls.from_config_and_creds(config, credentials, manager)

@abc.abstractmethod
def to_catalog_info(self) -> Dict[str, Any]:
"""Return a catalog info dictionary."""
raise NotImplementedError


class IcebergGlueCatalog(Catalog):
"""Iceberg glue catalog."""

table_format = 'ICEBERG'
catalog_type = 'GLUE'

region: str
catalog_id: str

def __init__(self, region: str, catalog_id: str):
self.region = region
self.catalog_id = catalog_id
self._manager: Optional[WorkspaceManager] = None

@classmethod
def from_config_and_creds(
cls,
config: Dict[str, Any],
credentials: Dict[str, Any],
manager: 'WorkspaceManager',
) -> 'IcebergGlueCatalog':
params = {}
params.update(config)
params.update(credentials)

out = cls(
region=params['region'],
catalog_id=params['id'],
)
out._manager = manager
return out

def to_catalog_info(self) -> Dict[str, Any]:
"""Return a catalog info dictionary."""
return dict(
catalogType=self.catalog_type,
tableFormat=self.table_format,
glueRegion=self.region,
glueCatalogID=self.catalog_id,
)


class ExportService(object):
"""Export service."""

database: str
table: str
catalog: Catalog
storage_link: Link
catalog_info: Dict[str, Any]
storage_info: Dict[str, Any]
columns: Optional[List[str]]

def __init__(
self,
workspace_group: WorkspaceGroup,
database: str,
table: str,
catalog: Catalog,
storage_link: Link,
catalog_info: Union[str, Dict[str, Any]],
storage_info: Union[str, Dict[str, Any]],
columns: Optional[List[str]],
):
#: Workspace group
Expand All @@ -205,10 +47,16 @@ def __init__(
self.columns = columns

#: Catalog
self.catalog = catalog
if isinstance(catalog_info, str):
self.catalog_info = json.loads(catalog_info)
else:
self.catalog_info = copy.copy(catalog_info)

#: Storage
self.storage_link = storage_link
if isinstance(storage_info, str):
self.storage_info = json.loads(storage_info)
else:
self.storage_info = copy.copy(storage_info)

self._manager: Optional[WorkspaceManager] = workspace_group._manager

Expand All @@ -227,21 +75,12 @@ def create_cluster_identity(self) -> Dict[str, Any]:
msg='No workspace manager is associated with this object.',
)

if not isinstance(self.catalog, IcebergGlueCatalog):
raise TypeError('Only Iceberg Glue catalog is supported at this time.')

if not isinstance(self.storage_link, S3Link):
raise TypeError('Only S3 links are supported at this time.')

out = self._manager._post(
f'workspaceGroups/{self.workspace_group.id}/'
'egress/createEgressClusterIdentity',
json=dict(
storageBucketName=re.split(
r'/+', self.storage_link.storage_base_url,
)[1],
glueRegion=self.catalog.region,
glueCatalogID=self.catalog.catalog_id,
catalogInfo=self.catalog_info,
storageInfo=self.storage_info,
),
)

Expand All @@ -254,16 +93,13 @@ def start(self, tags: Optional[List[str]] = None) -> 'ExportStatus':
msg='No workspace manager is associated with this object.',
)

if not isinstance(self.storage_link, S3Link):
raise TypeError('Only S3 links are supported at this time.')

out = self._manager._post(
f'workspaceGroups/{self.workspace_group.id}/egress/startTableEgress',
json=dict(
databaseName=self.database,
tableName=self.table,
storageInfo=self.storage_link.to_storage_info(),
catalogInfo=self.catalog.to_catalog_info(),
storageInfo=self.storage_info,
catalogInfo=self.catalog_info,
),
)

Expand Down