Skip to content

Commit

Permalink
feat: postgres integration - create database (#932)
Browse files Browse the repository at this point in the history
1. Add support for creating database
```
CREATE DATABASE db
WITH ENGINE = "postgres",
PARAMETERS = {
    "user": "user",
    "password": "password",
    "host": "127.0.0.1",
    "port": "5432",
    "database": "demo"
};
```
2. Persist the info in the catalog
3. Todo: Verify if the provided info is valid - next PR
  • Loading branch information
gaurav274 authored Aug 16, 2023
1 parent 24bbbc3 commit 0f1bcf6
Show file tree
Hide file tree
Showing 18 changed files with 459 additions and 25 deletions.
45 changes: 45 additions & 0 deletions evadb/catalog/catalog_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
)
from evadb.catalog.models.utils import (
ColumnCatalogEntry,
DatabaseCatalogEntry,
IndexCatalogEntry,
TableCatalogEntry,
UdfCacheCatalogEntry,
Expand All @@ -45,6 +46,7 @@
truncate_catalog_tables,
)
from evadb.catalog.services.column_catalog_service import ColumnCatalogService
from evadb.catalog.services.database_catalog_service import DatabaseCatalogService
from evadb.catalog.services.index_catalog_service import IndexCatalogService
from evadb.catalog.services.table_catalog_service import TableCatalogService
from evadb.catalog.services.udf_cache_catalog_service import UdfCacheCatalogService
Expand All @@ -70,6 +72,7 @@ def __init__(self, db_uri: str, config: ConfigurationManager):
self._sql_config = SQLConfig(db_uri)
self._config = config
self._bootstrap_catalog()
self._db_catalog_service = DatabaseCatalogService(self._sql_config.session)
self._table_catalog_service = TableCatalogService(self._sql_config.session)
self._column_service = ColumnCatalogService(self._sql_config.session)
self._udf_service = UdfCatalogService(self._sql_config.session)
Expand Down Expand Up @@ -121,6 +124,48 @@ def _clear_catalog_contents(self):
# clean up the dataset, index, and cache directories
cleanup_storage(self._config)

"Database catalog services"

def insert_database_catalog_entry(self, name: str, engine: str, params: dict):
"""A new entry is persisted in the database catalog."
Args:
name: database name
engine: engine name
params: required params as a dictionary for the database
"""
self._db_catalog_service.insert_entry(name, engine, params)

def get_database_catalog_entry(self, database_name: str) -> DatabaseCatalogEntry:
"""
Returns the database catalog entry for the given database_name
Arguments:
database_name (str): name of the database
Returns:
DatabaseCatalogEntry
"""

table_entry = self._db_catalog_service.get_entry_by_name(database_name)

return table_entry

def delete_database_catalog_entry(
self, database_entry: DatabaseCatalogEntry
) -> bool:
"""
This method deletes the database from catalog.
Arguments:
database_entry: database catalog entry to remove
Returns:
True if successfully deleted else False
"""
# todo: do we need to remove also the associated tables etc or that will be
# taken care by the underlying db
return self._db_catalog_service.delete_entry(database_entry)

"Table catalog services"

def insert_table_catalog_entry(
Expand Down
13 changes: 0 additions & 13 deletions evadb/catalog/catalog_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,6 @@
from evadb.parser.create_statement import ColConstraintInfo, ColumnDefinition
from evadb.utils.generic_utils import get_str_hash, remove_directory_contents

CATALOG_TABLES = [
"column_catalog",
"table_catalog",
"depend_column_and_udf_cache",
"udf_cache",
"udf_catalog",
"depend_udf_and_udf_cache",
"index_catalog",
"udfio_catalog",
"udf_cost_catalog",
"udf_metadata_catalog",
]


def is_video_table(table: TableCatalogEntry):
return table.table_type == TableType.VIDEO_DATA
Expand Down
47 changes: 47 additions & 0 deletions evadb/catalog/models/database_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# coding=utf-8
# Copyright 2018-2023 EvaDB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from sqlalchemy import Column, String

from evadb.catalog.models.base_model import BaseModel
from evadb.catalog.models.utils import DatabaseCatalogEntry, TextPickleType


class DatabaseCatalog(BaseModel):
"""The `DatabaseCatalog` catalog stores information about all databases.
`_row_id:` an autogenerated unique identifier.
`_name:` the database name.
`_engine:` database engine
`_param:` parameters specific to the database engine
"""

__tablename__ = "database_catalog"

_name = Column("name", String(100), unique=True)
_engine = Column("engine", String(100))
_params = Column("params", TextPickleType())

def __init__(self, name: str, engine: str, params: dict):
self._name = name
self._engine = engine
self._params = params

def as_dataclass(self) -> "DatabaseCatalogEntry":
return DatabaseCatalogEntry(
row_id=self._row_id,
name=self._name,
engine=self._engine,
params=self._params,
)
40 changes: 40 additions & 0 deletions evadb/catalog/models/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import json
from dataclasses import dataclass, field
from typing import List, Tuple

import sqlalchemy
from sqlalchemy.engine import Engine
from sqlalchemy.types import TypeDecorator
from sqlalchemy_utils import create_database, database_exists

from evadb.catalog.catalog_type import (
Expand All @@ -31,6 +33,25 @@
from evadb.utils.logging_manager import logger


class TextPickleType(TypeDecorator):
"""Used to handle serialization and deserialization to Text
https://stackoverflow.com/questions/1378325/python-dicts-in-sqlalchemy
"""

impl = sqlalchemy.String(1024)

def process_bind_param(self, value, dialect):
if value is not None:
value = json.dumps(value)

return value

def process_result_value(self, value, dialect):
if value is not None:
value = json.loads(value)
return value


def init_db(engine: Engine):
"""Create database if doesn't exist and create all tables."""
if not database_exists(engine.url):
Expand Down Expand Up @@ -209,3 +230,22 @@ def _to_str(col):
"impl": self.impl_file_path,
"metadata": self.metadata,
}


@dataclass(unsafe_hash=True)
class DatabaseCatalogEntry:
"""Dataclass representing an entry in the `DatabaseCatalog`.
This is done to ensure we don't expose the sqlalchemy dependencies beyond catalog service. Further, sqlalchemy does not allow sharing of objects across threads.
"""

name: str
engine: str
params: dict
row_id: int = None

def display_format(self):
return {
"name": self.name,
"engine": self.engine,
"params": self.params,
}
83 changes: 83 additions & 0 deletions evadb/catalog/services/database_catalog_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# coding=utf-8
# Copyright 2018-2023 EvaDB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import select

from evadb.catalog.models.database_catalog import DatabaseCatalog
from evadb.catalog.models.utils import DatabaseCatalogEntry
from evadb.catalog.services.base_service import BaseService
from evadb.utils.errors import CatalogError
from evadb.utils.logging_manager import logger


class DatabaseCatalogService(BaseService):
def __init__(self, db_session: Session):
super().__init__(DatabaseCatalog, db_session)

def insert_entry(
self,
name: str,
engine: str,
params: dict,
):
try:
db_catalog_obj = self.model(
name=name,
engine=engine,
params=params,
)
db_catalog_obj = db_catalog_obj.save(self.session)

except Exception as e:
logger.exception(
f"Failed to insert entry into database catalog with exception {str(e)}"
)
raise CatalogError(e)

def get_entry_by_name(self, database_name: str) -> DatabaseCatalogEntry:
"""
Get the table catalog entry with given table name.
Arguments:
database_name (str): Database name
Returns:
DatabaseCatalogEntry - catalog entry for given database name
"""
entry = self.session.execute(
select(self.model).filter(self.model._name == database_name)
).scalar_one_or_none()
if entry:
return entry.as_dataclass()
return entry

def delete_entry(self, database_entry: DatabaseCatalogEntry):
"""Delete database from the catalog
Arguments:
database (DatabaseCatalogEntry): database to delete
Returns:
True if successfully removed else false
"""
try:
db_catalog_obj = self.session.execute(
select(self.model).filter(self.model._row_id == database_entry.row_id)
).scalar_one_or_none()
db_catalog_obj.delete(self.session)
return True
except Exception as e:
err_msg = (
f"Delete database failed for {database_entry} with error {str(e)}."
)
logger.exception(err_msg)
raise CatalogError(err_msg)
1 change: 1 addition & 0 deletions evadb/catalog/sql_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
CATALOG_TABLES = [
"column_catalog",
"table_catalog",
"database_catalog",
"depend_column_and_udf_cache",
"udf_cache",
"udf_catalog",
Expand Down
48 changes: 48 additions & 0 deletions evadb/executor/create_database_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# coding=utf-8
# Copyright 2018-2023 EvaDB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pandas as pd

from evadb.database import EvaDBDatabase
from evadb.executor.abstract_executor import AbstractExecutor
from evadb.models.storage.batch import Batch
from evadb.parser.create_statement import CreateDatabaseStatement
from evadb.utils.logging_manager import logger


class CreateDatabaseExecutor(AbstractExecutor):
def __init__(self, db: EvaDBDatabase, node: CreateDatabaseStatement):
super().__init__(db, node)

def exec(self, *args, **kwargs):
# todo handle if_not_exists

logger.debug(
f"Trying to connect to the provided engine {self.node.engine} with params {self.node.param_dict}"
)
# todo handle if the provided database params are valid

logger.debug(f"Creating database {self.node}")

self.catalog().insert_database_catalog_entry(
self.node.database_name, self.node.engine, self.node.param_dict
)

yield Batch(
pd.DataFrame(
[
f"The database {self.node.database_name} has been successfully created."
]
)
)
13 changes: 11 additions & 2 deletions evadb/executor/plan_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterator
from typing import Iterator, Union

from evadb.database import EvaDBDatabase
from evadb.executor.abstract_executor import AbstractExecutor
from evadb.executor.apply_and_merge_executor import ApplyAndMergeExecutor
from evadb.executor.create_database_executor import CreateDatabaseExecutor
from evadb.executor.create_executor import CreateExecutor
from evadb.executor.create_index_executor import CreateIndexExecutor
from evadb.executor.create_udf_executor import CreateUDFExecutor
Expand Down Expand Up @@ -46,6 +47,8 @@
from evadb.executor.union_executor import UnionExecutor
from evadb.executor.vector_index_scan_executor import VectorIndexScanExecutor
from evadb.models.storage.batch import Batch
from evadb.parser.create_statement import CreateDatabaseStatement
from evadb.parser.statement import AbstractStatement
from evadb.plan_nodes.abstract_plan import AbstractPlan
from evadb.plan_nodes.types import PlanOprType
from evadb.utils.logging_manager import logger
Expand All @@ -65,7 +68,9 @@ def __init__(self, evadb: EvaDBDatabase, plan: AbstractPlan):
self._db = evadb
self._plan = plan

def _build_execution_tree(self, plan: AbstractPlan) -> AbstractExecutor:
def _build_execution_tree(
self, plan: Union[AbstractPlan, AbstractStatement]
) -> AbstractExecutor:
"""build the execution tree from plan tree
Arguments:
Expand All @@ -78,6 +83,10 @@ def _build_execution_tree(self, plan: AbstractPlan) -> AbstractExecutor:
if plan is None:
return root

# First handle cases when the plan is actually a parser statement
if isinstance(plan, CreateDatabaseStatement):
return CreateDatabaseExecutor(db=self._db, node=plan)

# Get plan node type
plan_opr_type = plan.opr_type

Expand Down
Loading

0 comments on commit 0f1bcf6

Please sign in to comment.