Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds cascading drop to ermrest model and unit tests #181

Merged
merged 4 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 38 additions & 4 deletions deriva/core/ermrest_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -686,11 +686,18 @@ def create_table(self, table_def):
self.model.digest_fkeys()
return newtable

def drop(self):
def drop(self, cascade=False):
"""Remove this schema from the remote database.

:param cascade: drop dependent objects.
"""
if self.name not in self.model.schemas:
raise ValueError('Schema %s does not appear to belong to model.' % (self,))

if cascade:
for table in list(self.tables.values()):
table.drop(cascade=True)

self.catalog.delete(self.uri_path).raise_for_status()
del self.model.schemas[self.name]
for table in self.tables.values():
Expand Down Expand Up @@ -1572,11 +1579,18 @@ def add_fkey(fkey):
return fkey
return self._create_table_part('foreignkey', add_fkey, ForeignKey, fkey_def)

def drop(self):
def drop(self, cascade=False):
"""Remove this table from the remote database.

:param cascade: drop dependent objects.
"""
if self.name not in self.schema.tables:
raise ValueError('Table %s does not appear to belong to schema %s.' % (self, self.schema))

if cascade:
for fkey in list(self.referenced_by):
fkey.drop()

self.catalog.delete(self.uri_path).raise_for_status()
del self.schema.tables[self.name]
for fkey in self.foreign_keys:
Expand Down Expand Up @@ -2030,11 +2044,22 @@ def alter(

return self

def drop(self):
def drop(self, cascade=False):
"""Remove this column from the remote database.

:param cascade: drop dependent objects.
"""
if self.name not in self.table.column_definitions.elements:
raise ValueError('Column %s does not appear to belong to table %s.' % (self, self.table))

if cascade:
for fkey in list(self.table.foreign_keys):
if self in fkey.foreign_key_columns:
fkey.drop()
for key in list(self.table.keys):
karlcz marked this conversation as resolved.
Show resolved Hide resolved
if self in key.unique_columns:
key.drop(cascade=True)

self.catalog.delete(self.uri_path).raise_for_status()
del self.table.column_definitions[self.name]

Expand Down Expand Up @@ -2240,11 +2265,20 @@ def alter(

return self

def drop(self):
def drop(self, cascade=False):
"""Remove this key from the remote database.

:param cascade: drop dependent objects.
"""
if self.name not in self.table.keys.elements:
raise ValueError('Key %s does not appear to belong to table %s.' % (self, self.table))

if cascade:
for fkey in list(self.table.referenced_by):
assert self.table == fkey.pk_table, "Expected key.table and foreign_key.pk_table to match"
if set(self.unique_columns) == set(fkey.referenced_columns):
fkey.drop()

self.catalog.delete(self.uri_path).raise_for_status()
del self.table.keys[self.name]

Expand Down
139 changes: 139 additions & 0 deletions tests/deriva/core/test_ermrest_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# Tests for the datapath module.
#
# Environment variables:
# DERIVA_PY_TEST_HOSTNAME: hostname of the test server
# DERIVA_PY_CATALOG: catalog identifier of the reusable test catalog (optional)
# DERIVA_PY_TEST_CREDENTIAL: user credential, if none, it will attempt to get credential for given hostname (optional)
# DERIVA_PY_TEST_VERBOSE: set for verbose logging output to stdout (optional)

import logging
import os
import unittest
from deriva.core import DerivaServer, get_credential, ermrest_model

logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())
if os.getenv("DERIVA_PY_TEST_VERBOSE"):
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)

hostname = os.getenv("DERIVA_PY_TEST_HOSTNAME")


@unittest.skipUnless(hostname, "Test host not specified")
class ErmrestModelTests (unittest.TestCase):

catalog = None

@classmethod
def _purgeCatalog(cls):
model = cls.catalog.getCatalogModel()

# exclude the 'public' schema
schemas = [s for s in model.schemas.values() if s.name != 'public']

# drop all fkeys
for s in schemas:
for t in s.tables.values():
for fk in list(t.foreign_keys):
fk.drop()

# drop all tables and schemas
for s in list(schemas):
for t in list(s.tables.values()):
t.drop()
s.drop()

@classmethod
def setUpClass(cls):
credential = os.getenv("DERIVA_PY_TEST_CREDENTIAL") or get_credential(hostname)
server = DerivaServer('https', hostname, credentials=credential)
catalog_id = os.getenv("DERIVA_PY_TEST_CATALOG")
if catalog_id is not None:
logger.info(f"Reusing catalog {catalog_id} on host {hostname}")
cls.catalog = server.connect_ermrest(catalog_id)
cls._purgeCatalog()
else:
cls.catalog = server.create_ermrest_catalog()
logger.info(f"Created catalog {cls.catalog.catalog_id} on host {hostname}")

@classmethod
def tearDownClass(cls):
if cls.catalog and os.getenv("DERIVA_PY_TEST_CATALOG") is None:
logger.info(f"Deleting catalog {cls.catalog.catalog_id} on host {hostname}")
cls.catalog.delete_ermrest_catalog(really=True)

def setUp(self):
self.model = self.catalog.getCatalogModel()

def tearDown(self):
self._purgeCatalog()

def _create_schema_with_fkeys(self):
"""Creates a simple schema of two tables with a fkey relationship from child to parent."""

# build a single, low-level catalog /schema POST operation
# should be (slightly) faster and avoids using the client APIs under test in this module
schema_def = ermrest_model.Schema.define('schema_with_fkeys')
schema_def["tables"] = {
"parent": ermrest_model.Table.define(
'parent',
column_defs=[
ermrest_model.Column.define('id', ermrest_model.builtin_types.text),
ermrest_model.Column.define('id_extra', ermrest_model.builtin_types.text),
],
key_defs=[
ermrest_model.Key.define(['id'], constraint_name='parent_id_key'),
ermrest_model.Key.define(['id', 'id_extra'], constraint_name='parent_compound_key'),
]
),
"child": ermrest_model.Table.define(
'child',
column_defs=[
ermrest_model.Column.define('parent_id', ermrest_model.builtin_types.text),
ermrest_model.Column.define('parent_id_extra', ermrest_model.builtin_types.text),
],
fkey_defs=[
ermrest_model.ForeignKey.define(
['parent_id'], 'schema_with_fkeys', 'parent', ['id']
),
ermrest_model.ForeignKey.define(
['parent_id_extra', 'parent_id'], 'schema_with_fkeys', 'parent', ['id_extra', 'id']
)
]
),
}
self.catalog.post('/schema', json=[schema_def])
# refresh the local state of the model
self.model = self.catalog.getCatalogModel()

def test_key_drop_cascading(self):
self._create_schema_with_fkeys()
schema = self.model.schemas['schema_with_fkeys']
self.model.schemas['schema_with_fkeys'].tables['parent'].keys[(schema, 'parent_id_key')].drop(cascade=True)

def test_key_reordered_drop_cascading(self):
self._create_schema_with_fkeys()
schema = self.model.schemas['schema_with_fkeys']
self.model.schemas['schema_with_fkeys'].tables['parent'].keys[(schema, 'parent_compound_key')].drop(cascade=True)

def test_key_column_drop_cascading(self):
self._create_schema_with_fkeys()
self.model.schemas['schema_with_fkeys'].tables['parent'].columns['id'].drop(cascade=True)

def test_fkey_column_drop_cascading(self):
self._create_schema_with_fkeys()
self.model.schemas['schema_with_fkeys'].tables['child'].columns['parent_id_extra'].drop(cascade=True)

def test_table_drop_cascading(self):
self._create_schema_with_fkeys()
self.model.schemas['schema_with_fkeys'].tables['parent'].drop(cascade=True)

def test_schema_drop_cascading(self):
self._create_schema_with_fkeys()
self.model.schemas['schema_with_fkeys'].drop(cascade=True)


if __name__ == '__main__':
unittest.main()