Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cloning fixes #172

Merged
merged 5 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 112 additions & 59 deletions deriva/core/ermrest_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,11 +619,17 @@ def clone_catalog(self,
"tag:isrd.isi.edu,2018:clone-state" to save progress markers
which help it restart efficiently if interrupted.

Cloning preserves source row RID values so that any RID-based
foreign keys are still valid. It is not generally advisable to
try to merge more than one source into the same clone, nor to
clone on top of rows generated locally in the destination,
since this could cause duplicate RID conflicts.
Cloning preserves source row RID values for application tables
so that any RID-based foreign keys are still valid. It is not
generally advisable to try to merge more than one source into
the same clone, nor to clone on top of rows generated locally
in the destination, since this could cause duplicate RID
conflicts.

Cloning does not preserve all RID values for special ERMrest
tables in the public schema (e.g. ERMrest_Client,
ERMrest_Group) but normal applications should only consider
the ID key of these tables.

Truncation after cloning avoids retaining incremental
snapshots which contain partial clones.
Expand Down Expand Up @@ -658,41 +664,37 @@ def clone_catalog(self,
fkeys_deferred = {}
exclude_schemas = [] if exclude_schemas is None else exclude_schemas

def prune_parts(d):
if not copy_annotations and 'annotations' in d:
del d['annotations']
def prune_parts(d, *extra_victims):
victims = set(extra_victims)
# we will apply config as a second pass after extending dest model
# but loading bulk first may speed that up
if not copy_annotations:
victims |= {'annotations',}
if not copy_policy:
if 'acls' in d:
del d['acls']
if 'acl_bindings' in d:
del d['acl_bindings']
victims |= {'acls', 'acl_bindings'}
for k in victims:
d.pop(k, None)
return d

def copy_sdef(s):
"""Copy schema definition structure with conditional parts for cloning."""
d = prune_parts(s.prejson())
if 'tables' in d:
del d['tables']
d = prune_parts(s.prejson(), 'tables')
return d

def copy_tdef_core(t):
"""Copy table definition structure with conditional parts excluding fkeys."""
d = prune_parts(t.prejson())
d = prune_parts(t.prejson(), 'foreign_keys')
d['column_definitions'] = [ prune_parts(c) for c in d['column_definitions'] ]
d['keys'] = [ prune_parts(c) for c in d.get('keys', []) ]
if 'foreign_keys' in d:
del d['foreign_keys']
if 'annotations' not in d:
d['annotations'] = {}
d['annotations'][_clone_state_url] = 1 if copy_data else None
d.setdefault('annotations', {})[_clone_state_url] = 1 if copy_data else None
return d

def copy_tdef_fkeys(t):
"""Copy table fkeys structure."""
def check(fkdef):
for fkc in fkdef['referenced_columns']:
if fkc['schema_name'] == 'public' \
and fkc['table_name'] in {'ERMrest_Client', 'ERMrest_Group'} \
and fkc['table_name'] in {'ERMrest_Client', 'ERMrest_Group', 'ERMrest_RID_Lease'} \
and fkc['column_name'] == 'RID':
raise ValueError("Cannot clone catalog with foreign key reference to %(schema_name)s:%(table_name)s:%(column_name)s" % fkc)
return fkdef
Expand Down Expand Up @@ -739,6 +741,12 @@ def copy_kdef(k):
clone_states[(sname, tname)] = 1 if copy_data else None
fkeys_deferred[(sname, tname)] = copy_tdef_fkeys(table)
else:
if dst_model.schemas[sname].tables[tname].foreign_keys:
# assume that presence of any destination foreign keys means we already loaded deferred_fkeys
copy_data = False
else:
fkeys_deferred[(sname, tname)] = copy_tdef_fkeys(table)

src_columns = { c.name: c for c in table.column_definitions }
dst_columns = { c.name: c for c in dst_model.schemas[sname].tables[tname].column_definitions }

Expand All @@ -764,11 +772,8 @@ def copy_kdef(k):
raise ValueError("Destination key %s.%s(%s) does not exist in source catalog." % (sname, tname, ', '.join(utuple)))

clone_states[(sname, tname)] = dst_model.schemas[sname].tables[tname].annotations.get(_clone_state_url)
if dst_model.schemas[sname].tables[tname].foreign_keys:
# assume that presence of any destination foreign keys means we already completed
return dst_catalog
else:
fkeys_deferred[(sname, tname)] = copy_tdef_fkeys(table)

clone_states[('public', 'ERMrest_RID_Lease')] = None # never try to sync leases

# apply the stage 1 model to the destination in bulk
if new_model:
Expand Down Expand Up @@ -822,38 +827,8 @@ def copy_kdef(k):
}:
# special sync behavior for magic ermrest tables
# HACK: these are assumed small enough to join via local merge of arrays
want = sorted(self.get("/entity/%s?limit=none" % tname_uri).json(), key=lambda r: r['ID'])
have = sorted(dst_catalog.get("/entity/%s?limit=none" % tname_uri).json(), key=lambda r: r['ID'])
create = []
update = []

pos_want = 0
pos_have = 0
while pos_want < len(want):
while pos_have < len(have) and have[pos_have]['ID'] < want[pos_want]['ID']:
# dst-only rows will be retained as is
pos_have += 1
if pos_have >= len(have) or have[pos_have]['ID'] > want[pos_want]['ID']:
# src-only rows will be inserted
create.append(want[pos_want])
pos_want += 1
else:
# overlapping rows will be updated
update.append(want[pos_want])
pos_want += 1

dst_catalog.post("/entity/%s?nondefaults=RCT,RCB" % tname_uri, json=create)
dst_catalog.put(
"/attributegroup/%s/ID;%s" % (
tname_uri,
",".join([
urlquote(c.name)
for c in src_model.schemas[sname].tables[tname].column_definitions
if c.name not in {'RID', 'RMT', 'RMB', 'RCB', 'RCT', 'ID'}
])
),
json=update
)
page = self.get("/entity/%s?limit=none" % tname_uri).json()
dst_catalog.post("/entity/%s?onconflict=skip" % tname_uri, json=page)

# record our progress on catalog in case we fail part way through
dst_catalog.put(
Expand All @@ -873,6 +848,84 @@ def copy_kdef(k):
if new_fkeys:
dst_catalog.post("/schema", json=new_fkeys)

# copy over configuration in stage 3
# we need to do this after deferred_fkeys to handle acl_bindings projections with joins
dst_model = dst_catalog.getCatalogModel()

for sname, src_schema in src_model.schemas.items():
if sname in exclude_schemas:
continue
dst_schema = dst_model.schemas[sname]

if copy_annotations:
dst_schema.annotations.clear()
dst_schema.annotations.update(src_schema.annotations)

if copy_policy:
dst_schema.acls.clear()
dst_schema.acls.update(src_schema.acls)

for tname, src_table in src_schema.tables.items():
dst_table = dst_schema.tables[tname]

if copy_annotations:
merged = dict(src_table.annotations)
if _clone_state_url in dst_table.annotations:
merged[_clone_state_url] = dst_table.annotations[_clone_state_url]
dst_table.annotations.clear()
dst_table.annotations.update(merged)

if copy_policy:
dst_table.acls.clear()
dst_table.acls.update(src_table.acls)
dst_table.acl_bindings.clear()
dst_table.acl_bindings.update(src_table.acl_bindings)

for cname, src_col in src_table.columns.elements.items():
dst_col = dst_table.columns[cname]

if copy_annotations:
dst_col.annotations.clear()
dst_col.annotations.update(src_col.annotations)

if copy_policy:
dst_col.acls.clear()
dst_col.acls.update(src_col.acls)
dst_col.acl_bindings.clear()
dst_col.acl_bindings.update(src_col.acl_bindings)

for src_key in src_table.keys:
dst_key = dst_table.key_by_columns([ col.name for col in src_key.unique_columns ])

if copy_annotations:
dst_key.annotations.clear()
dst_key.annotations.update(src_key.annotations)

def xlate_column_map(fkey):
dst_from_table = dst_table
dst_to_schema = dst_model.schemas[fkey.pk_table.schema.name]
dst_to_table = dst_to_schema.tables[fkey.pk_table.name]
return {
dst_from_table._own_column(from_col.name): dst_to_table._own_column(to_col.name)
for from_col, to_col in fkey.column_map.items()
}

for src_fkey in src_table.foreign_keys:
dst_fkey = dst_table.fkey_by_column_map(xlate_column_map(src_fkey))

if copy_annotations:
dst_fkey.annotations.clear()
dst_fkey.annotations.update(src_fkey.annotations)

if copy_policy:
dst_fkey.acls.clear()
dst_fkey.acls.update(src_fkey.acls)
dst_fkey.acl_bindings.clear()
dst_fkey.acl_bindings.update(src_fkey.acl_bindings)

# send all the config changes to the server
dst_model.apply()

# truncate cloning history
if truncate_after:
snaptime = dst_catalog.get("/").json()["snaptime"]
Expand Down
Loading