From 1852b682fd94c53f5057b9aba00fe0eb6c8bf10b Mon Sep 17 00:00:00 2001 From: Alex Dunn Date: Mon, 10 Apr 2023 14:29:37 -0700 Subject: [PATCH 1/8] examine/cleanup provenance get_historic() --- support/provenance/provenance.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/support/provenance/provenance.py b/support/provenance/provenance.py index 0a7e9797..b5f4bb09 100755 --- a/support/provenance/provenance.py +++ b/support/provenance/provenance.py @@ -115,24 +115,28 @@ def get_historic(provenance: {str: str}, reset: bool) -> {str: str}: # TODO: po log.info('starting search for history') log.info(' reduce lidvids to unique lids') - lids = sorted({lidvid.split('::')[0] for lidvid in provenance}) + sorted_lids = sorted({lidvid.split('::')[0] for lidvid in provenance}) # todo: what does sorting accomplish here? Seems to be nothing. log.info(' aggregate lidvids into lid buckets') - aggregates = {lid: [] for lid in lids} - for lidvid in provenance: aggregates[lidvid.split('::')[0]].append(lidvid) + lidvids_by_lid = {lid: [] for lid in sorted_lids} + for lidvid in provenance: + lid = lidvid.split('::')[0] + lidvids_by_lid[lid].append(lidvid) log.info(' process those with history') - count = 0 + updated_products_count = 0 history = {} - for lidvids in filter(lambda l: 1 < len(l), aggregates.values()): - count += len(lidvids) + lidvid_aggregates_with_multiple_versions = filter(lambda l: 1 < len(l), lidvids_by_lid.values()) + for lidvids in lidvid_aggregates_with_multiple_versions: + updated_products_count += len(lidvids) # todo: technically this isn't true - should be len(lidvids) - 1 as earliest version requires no update lidvids.sort(key=_vid_as_tuple_of_int, reverse=True) + for index, lidvid in enumerate(lidvids[1:]): - if reset or not provenance[lidvid]: + if reset or not provenance[lidvid]: # todo: this seems to result in an error if (for example) v1, v3 exist, and v2 is added later history[lidvid] = lidvids[index] log.info( - f'found {len(history)} products needing update of a {count} full history of {len(provenance)} total products') + f'found {len(history)} products needing update of a {updated_products_count} full history of {len(provenance)} total products') if log.isEnabledFor(logging.DEBUG): for lidvid in history.keys(): log.debug(f'{lidvid}') From ad73a9f88b64e00a52f4b0831b87c637f25839c6 Mon Sep 17 00:00:00 2001 From: Alex Dunn Date: Mon, 10 Apr 2023 15:37:00 -0700 Subject: [PATCH 2/8] remove unnecessary sort in provenance.py --- support/provenance/provenance.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/support/provenance/provenance.py b/support/provenance/provenance.py index b5f4bb09..1126da4a 100755 --- a/support/provenance/provenance.py +++ b/support/provenance/provenance.py @@ -115,10 +115,10 @@ def get_historic(provenance: {str: str}, reset: bool) -> {str: str}: # TODO: po log.info('starting search for history') log.info(' reduce lidvids to unique lids') - sorted_lids = sorted({lidvid.split('::')[0] for lidvid in provenance}) # todo: what does sorting accomplish here? Seems to be nothing. + lids = {lidvid.split('::')[0] for lidvid in provenance} log.info(' aggregate lidvids into lid buckets') - lidvids_by_lid = {lid: [] for lid in sorted_lids} + lidvids_by_lid = {lid: [] for lid in lids} for lidvid in provenance: lid = lidvid.split('::')[0] lidvids_by_lid[lid].append(lidvid) From 7fddd5d5336e41170e273a995d706d5b44cc4e34 Mon Sep 17 00:00:00 2001 From: Alex Dunn Date: Mon, 10 Apr 2023 15:37:44 -0700 Subject: [PATCH 3/8] correct product update count in provenance.py --- support/provenance/provenance.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/support/provenance/provenance.py b/support/provenance/provenance.py index 1126da4a..45f6a5ec 100755 --- a/support/provenance/provenance.py +++ b/support/provenance/provenance.py @@ -128,7 +128,7 @@ def get_historic(provenance: {str: str}, reset: bool) -> {str: str}: # TODO: po history = {} lidvid_aggregates_with_multiple_versions = filter(lambda l: 1 < len(l), lidvids_by_lid.values()) for lidvids in lidvid_aggregates_with_multiple_versions: - updated_products_count += len(lidvids) # todo: technically this isn't true - should be len(lidvids) - 1 as earliest version requires no update + updated_products_count += len(lidvids) - 1 lidvids.sort(key=_vid_as_tuple_of_int, reverse=True) for index, lidvid in enumerate(lidvids[1:]): From bdd8a658c3b7723bc01bbffcfe275b57e0c2f3eb Mon Sep 17 00:00:00 2001 From: Alex Dunn Date: Mon, 10 Apr 2023 15:39:49 -0700 Subject: [PATCH 4/8] remove --reset option (force true) --- support/provenance/provenance.py | 11 +++-------- support/provenance/provenance_driver.py | 2 +- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/support/provenance/provenance.py b/support/provenance/provenance.py index 45f6a5ec..3c211164 100755 --- a/support/provenance/provenance.py +++ b/support/provenance/provenance.py @@ -93,7 +93,6 @@ def run( username: str, password: str, verify_host_certs: bool = False, - reset: bool = False, log_filepath: Union[str, None] = None, log_level: int = logging.INFO): configure_logging(filepath=log_filepath, log_level=log_level) @@ -103,7 +102,7 @@ def run( host = HOST(cluster_nodes, password, base_url, username, verify_host_certs) provenance = trawl_registry(host) - updates = get_historic(provenance, reset) + updates = get_historic(provenance) if updates: update_docs(host, updates) @@ -111,7 +110,7 @@ def run( log.info('completed CLI processing') -def get_historic(provenance: {str: str}, reset: bool) -> {str: str}: # TODO: populate comment and rename for clarity +def get_historic(provenance: {str: str}) -> {str: str}: # TODO: populate comment and rename for clarity log.info('starting search for history') log.info(' reduce lidvids to unique lids') @@ -132,8 +131,7 @@ def get_historic(provenance: {str: str}, reset: bool) -> {str: str}: # TODO: po lidvids.sort(key=_vid_as_tuple_of_int, reverse=True) for index, lidvid in enumerate(lidvids[1:]): - if reset or not provenance[lidvid]: # todo: this seems to result in an error if (for example) v1, v3 exist, and v2 is added later - history[lidvid] = lidvids[index] + history[lidvid] = lidvids[index] log.info( f'found {len(history)} products needing update of a {updated_products_count} full history of {len(provenance)} total products') @@ -245,8 +243,6 @@ def update_docs(host: HOST, history: {str: str}): help='Python logging level as an int or string like INFO for logging.INFO [%(default)s]') ap.add_argument('-p', '--password', default=None, required=False, help='password to login to opensearch leaving it blank if opensearch does not require login') - ap.add_argument('-r', '--reset', action='store_true', default=False, - help='ignore existing provenance building it from scratch') ap.add_argument('-u', '--username', default=None, required=False, help='username to login to opensearch leaving it blank if opensearch does not require login') ap.add_argument('-v', '--verify', action='store_true', default=False, @@ -259,6 +255,5 @@ def update_docs(host: HOST, history: {str: str}): username=args.username, password=args.password, verify_host_certs=args.verify, - reset=args.reset, log_level=args.log_level, log_filepath=args.log_file) diff --git a/support/provenance/provenance_driver.py b/support/provenance/provenance_driver.py index 684b8275..15ca99f1 100755 --- a/support/provenance/provenance_driver.py +++ b/support/provenance/provenance_driver.py @@ -72,7 +72,7 @@ if remotesStr is not None and remotesStr.strip() != '': remotesLists = json.loads(remotesStr) -command = f'provenance.py -b {opensearch_endpoint} -l provenance.log -L DEBUG --reset ' +command = f'provenance.py -b {opensearch_endpoint} -l provenance.log -L DEBUG' if username is not None: command += f' -u {username} -p {passwd}' From cabc9b010a81092a471a6773c11bfa5f7ee8476d Mon Sep 17 00:00:00 2001 From: Alex Dunn Date: Mon, 10 Apr 2023 16:41:09 -0700 Subject: [PATCH 5/8] tweak log messages also renames some variables --- support/provenance/provenance.py | 73 +++++++++++++++++--------------- 1 file changed, 40 insertions(+), 33 deletions(-) diff --git a/support/provenance/provenance.py b/support/provenance/provenance.py index 3c211164..c288d39c 100755 --- a/support/provenance/provenance.py +++ b/support/provenance/provenance.py @@ -45,7 +45,7 @@ import collections import json import logging -from typing import Union, List +from typing import Union, List, Mapping log = logging.getLogger('provenance') import requests @@ -101,8 +101,8 @@ def run( host = HOST(cluster_nodes, password, base_url, username, verify_host_certs) - provenance = trawl_registry(host) - updates = get_historic(provenance) + provenance = get_lidvids_and_direct_successors(host) + updates = get_successors_by_lidvid(provenance) if updates: update_docs(host, updates) @@ -110,48 +110,54 @@ def run( log.info('completed CLI processing') -def get_historic(provenance: {str: str}) -> {str: str}: # TODO: populate comment and rename for clarity - log.info('starting search for history') +def get_successors_by_lidvid(provenance: Mapping[str, str]) -> Mapping[str, str]: + """ + Given a collection of LIDVIDs mapped onto their current direct successors (current successors no longer used by this + function), return a new mapping to their updated direct successors. + """ - log.info(' reduce lidvids to unique lids') - lids = {lidvid.split('::')[0] for lidvid in provenance} + log.info('Generating updated history...') - log.info(' aggregate lidvids into lid buckets') - lidvids_by_lid = {lid: [] for lid in lids} + unique_lids = {lidvid.split('::')[0] for lidvid in provenance} + + log.info(' ...binning LIDVIDs by LID...') + lidvid_aggregates_by_lid = {lid: [] for lid in unique_lids} for lidvid in provenance: lid = lidvid.split('::')[0] - lidvids_by_lid[lid].append(lidvid) + lidvid_aggregates_by_lid[lid].append(lidvid) - log.info(' process those with history') - updated_products_count = 0 - history = {} - lidvid_aggregates_with_multiple_versions = filter(lambda l: 1 < len(l), lidvids_by_lid.values()) + log.info(' ...determining updated successors for LIDVIDs...') + successors_by_lidvid = {} + lidvid_aggregates_with_multiple_versions = filter(lambda l: 1 < len(l), lidvid_aggregates_by_lid.values()) for lidvids in lidvid_aggregates_with_multiple_versions: - updated_products_count += len(lidvids) - 1 lidvids.sort(key=_vid_as_tuple_of_int, reverse=True) - for index, lidvid in enumerate(lidvids[1:]): - history[lidvid] = lidvids[index] + for successor_idx, lidvid in enumerate(lidvids[1:]): + successors_by_lidvid[lidvid] = lidvids[successor_idx] + + log.info(f'Successors will be updated for {len(successors_by_lidvid)} LIDVIDs!') - log.info( - f'found {len(history)} products needing update of a {updated_products_count} full history of {len(provenance)} total products') if log.isEnabledFor(logging.DEBUG): - for lidvid in history.keys(): + for lidvid in successors_by_lidvid.keys(): log.debug(f'{lidvid}') - return history + return successors_by_lidvid -def trawl_registry(host: HOST) -> {str: str}: # TODO: populate comment and rename for clarity - log.info('start trawling') +def get_lidvids_and_direct_successors(host: HOST) -> Mapping[str, str]: + """ + Given an OpenSearch host, return a collection of all extant LIDVIDs, mapped onto their immediate successors (or None) + """ - cluster = [node + ":registry" for node in host.nodes] - key = 'ops:Provenance/ops:superseded_by' - path = ','.join(['registry'] + cluster) + '/_search?scroll=10m' + log.info('Retrieving LIDVIDs with current direct successors') + + clusters = [node + ":registry" for node in host.nodes] + successor_key = 'ops:Provenance/ops:superseded_by' + path = ','.join(['registry'] + clusters) + '/_search?scroll=10m' provenance = {} query = {'query': {'bool': {'must_not': [ {'term': {'ops:Tracking_Meta/ops:archive_status': 'staged'}}]}}, - '_source': {'includes': ['lidvid', key]}, + '_source': {'includes': ['lidvid', successor_key]}, 'size': 10000} more_data_exists = True @@ -164,12 +170,12 @@ def trawl_registry(host: HOST) -> {str: str}: # TODO: populate comment and rena data = resp.json() path = '_search/scroll' query = {'scroll': '10m', 'scroll_id': data['_scroll_id']} - provenance.update({hit['_source']['lidvid']: hit['_source'].get(key, None) for hit in data['hits']['hits']}) + provenance.update({hit['_source']['lidvid']: hit['_source'].get(successor_key, None) for hit in data['hits']['hits']}) more_data_exists = len(provenance) < data['hits']['total']['value'] hits = data['hits']['total']['value'] percent_hit = int(round(len(provenance) / hits * 100)) - log.info(f' progress: {len(provenance)} of {hits} ({percent_hit}%)') + log.info(f' ...{len(provenance)} of {hits} retrieved ({percent_hit}%)...') if 'scroll_id' in query: path = '_search/scroll/' + query['scroll_id'] @@ -177,7 +183,7 @@ def trawl_registry(host: HOST) -> {str: str}: # TODO: populate comment and rena auth=(host.username, host.password), verify=host.verify) - log.info('finished trawling') + log.info('Finished retrieving LIDVIDs with current direct successors!') return provenance @@ -214,9 +220,10 @@ def update_docs(host: HOST, history: {str: str}): if __name__ == '__main__': - ap = argparse.ArgumentParser(description='''Update the provenance of products with more than one VID - - The program sweeps through the registry index to find all the lidvids and existing provenance. It then builds up the updates necessary to mark the newly superseded products accordingly. + ap = argparse.ArgumentParser(description=''' + Update registry records for non-latest LIDVIDs with up-to-date direct successor metadata (ops:Provenance/ops:superseded_by). + + Retrieves existing published LIDVIDs from the registry, determines history for each LID, and writes updated docs back to OpenSearch ''', epilog='''EXAMPLES: From 6d7b313b1c904084e350d967f2db2e81685bdfe2 Mon Sep 17 00:00:00 2001 From: Alex Dunn Date: Mon, 10 Apr 2023 16:47:24 -0700 Subject: [PATCH 6/8] extract OpenSearch provenance key to global constant --- support/provenance/provenance.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/support/provenance/provenance.py b/support/provenance/provenance.py index c288d39c..0a3bba14 100755 --- a/support/provenance/provenance.py +++ b/support/provenance/provenance.py @@ -56,6 +56,8 @@ HOST = collections.namedtuple('HOST', ['nodes', 'password', 'url', 'username', 'verify']) +METADATA_SUCCESSOR_KEY = 'ops:Provenance/ops:superseded_by' + def parse_log_level(input: str) -> int: """Given a numeric or uppercase descriptive log level, return the associated int""" @@ -152,7 +154,7 @@ def get_lidvids_and_direct_successors(host: HOST) -> Mapping[str, str]: log.info('Retrieving LIDVIDs with current direct successors') clusters = [node + ":registry" for node in host.nodes] - successor_key = 'ops:Provenance/ops:superseded_by' + successor_key = METADATA_SUCCESSOR_KEY path = ','.join(['registry'] + clusters) + '/_search?scroll=10m' provenance = {} query = {'query': {'bool': {'must_not': [ @@ -199,7 +201,7 @@ def update_docs(host: HOST, history: {str: str}): for lidvid, supersede in history.items(): bulk_updates.append(json.dumps({'update': {'_id': lidvid}})) - bulk_updates.append(json.dumps({'doc': {'ops:Provenance/ops:superseded_by': supersede}})) + bulk_updates.append(json.dumps({'doc': {METADATA_SUCCESSOR_KEY: supersede}})) bulk_data = '\n'.join(bulk_updates) + '\n' @@ -220,8 +222,8 @@ def update_docs(host: HOST, history: {str: str}): if __name__ == '__main__': - ap = argparse.ArgumentParser(description=''' - Update registry records for non-latest LIDVIDs with up-to-date direct successor metadata (ops:Provenance/ops:superseded_by). + ap = argparse.ArgumentParser(description=f''' + Update registry records for non-latest LIDVIDs with up-to-date direct successor metadata ({METADATA_SUCCESSOR_KEY}). Retrieves existing published LIDVIDs from the registry, determines history for each LID, and writes updated docs back to OpenSearch ''', From 0ac3685cd1c2e8d089b880f0098b711d4ee4d8f3 Mon Sep 17 00:00:00 2001 From: Alex Dunn Date: Mon, 10 Apr 2023 16:57:28 -0700 Subject: [PATCH 7/8] final minor rename and comment tweak --- support/provenance/provenance.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/support/provenance/provenance.py b/support/provenance/provenance.py index 0a3bba14..dc68f476 100755 --- a/support/provenance/provenance.py +++ b/support/provenance/provenance.py @@ -107,7 +107,7 @@ def run( updates = get_successors_by_lidvid(provenance) if updates: - update_docs(host, updates) + write_updated_docs(host, updates) log.info('completed CLI processing') @@ -190,18 +190,21 @@ def get_lidvids_and_direct_successors(host: HOST) -> Mapping[str, str]: return provenance -def update_docs(host: HOST, history: {str: str}): - """Write provenance history updates to documents in db""" - log.info('Bulk update %d documents', len(history)) +def write_updated_docs(host: HOST, lidvids_and_successors: Mapping[str, str]): + """ + Given an OpenSearch host and a mapping of LIDVIDs onto their direct successors, write provenance history updates + to documents in db. + """ + log.info('Bulk update %d documents', len(lidvids_and_successors)) bulk_updates = [] cluster = [node + ":registry" for node in host.nodes] headers = {'Content-Type': 'application/x-ndjson'} path = ','.join(['registry'] + cluster) + '/_bulk' - for lidvid, supersede in history.items(): + for lidvid, direct_successor in lidvids_and_successors.items(): bulk_updates.append(json.dumps({'update': {'_id': lidvid}})) - bulk_updates.append(json.dumps({'doc': {METADATA_SUCCESSOR_KEY: supersede}})) + bulk_updates.append(json.dumps({'doc': {METADATA_SUCCESSOR_KEY: direct_successor}})) bulk_data = '\n'.join(bulk_updates) + '\n' From e42bd98f8131f5809febafdafac8209b8c11677d Mon Sep 17 00:00:00 2001 From: Alex Dunn Date: Mon, 10 Apr 2023 17:08:49 -0700 Subject: [PATCH 8/8] remove unnecessary retrieval of existing provenance metadata --- support/provenance/provenance.py | 36 +++++++++++++++----------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/support/provenance/provenance.py b/support/provenance/provenance.py index dc68f476..3eda2fd7 100755 --- a/support/provenance/provenance.py +++ b/support/provenance/provenance.py @@ -45,7 +45,7 @@ import collections import json import logging -from typing import Union, List, Mapping +from typing import Union, List, Mapping, Iterable log = logging.getLogger('provenance') import requests @@ -103,8 +103,8 @@ def run( host = HOST(cluster_nodes, password, base_url, username, verify_host_certs) - provenance = get_lidvids_and_direct_successors(host) - updates = get_successors_by_lidvid(provenance) + extant_lidvids = get_extant_lidvids(host) + updates = get_successors_by_lidvid(extant_lidvids) if updates: write_updated_docs(host, updates) @@ -112,19 +112,18 @@ def run( log.info('completed CLI processing') -def get_successors_by_lidvid(provenance: Mapping[str, str]) -> Mapping[str, str]: +def get_successors_by_lidvid(extant_lidvids: Iterable[str]) -> Mapping[str, str]: """ - Given a collection of LIDVIDs mapped onto their current direct successors (current successors no longer used by this - function), return a new mapping to their updated direct successors. + Given a collection of LIDVIDs, return a new mapping to their updated direct successors. """ log.info('Generating updated history...') - unique_lids = {lidvid.split('::')[0] for lidvid in provenance} + unique_lids = {lidvid.split('::')[0] for lidvid in extant_lidvids} log.info(' ...binning LIDVIDs by LID...') lidvid_aggregates_by_lid = {lid: [] for lid in unique_lids} - for lidvid in provenance: + for lidvid in extant_lidvids: lid = lidvid.split('::')[0] lidvid_aggregates_by_lid[lid].append(lidvid) @@ -146,20 +145,19 @@ def get_successors_by_lidvid(provenance: Mapping[str, str]) -> Mapping[str, str] return successors_by_lidvid -def get_lidvids_and_direct_successors(host: HOST) -> Mapping[str, str]: +def get_extant_lidvids(host: HOST) -> Iterable[str]: """ - Given an OpenSearch host, return a collection of all extant LIDVIDs, mapped onto their immediate successors (or None) + Given an OpenSearch host, return all extant LIDVIDs """ - log.info('Retrieving LIDVIDs with current direct successors') + log.info('Retrieving extant LIDVIDs') clusters = [node + ":registry" for node in host.nodes] - successor_key = METADATA_SUCCESSOR_KEY path = ','.join(['registry'] + clusters) + '/_search?scroll=10m' - provenance = {} + extant_lidvids = [] query = {'query': {'bool': {'must_not': [ {'term': {'ops:Tracking_Meta/ops:archive_status': 'staged'}}]}}, - '_source': {'includes': ['lidvid', successor_key]}, + '_source': {'includes': ['lidvid']}, 'size': 10000} more_data_exists = True @@ -172,12 +170,12 @@ def get_lidvids_and_direct_successors(host: HOST) -> Mapping[str, str]: data = resp.json() path = '_search/scroll' query = {'scroll': '10m', 'scroll_id': data['_scroll_id']} - provenance.update({hit['_source']['lidvid']: hit['_source'].get(successor_key, None) for hit in data['hits']['hits']}) - more_data_exists = len(provenance) < data['hits']['total']['value'] + extant_lidvids.extend([hit['_source']['lidvid'] for hit in data['hits']['hits']]) + more_data_exists = len(extant_lidvids) < data['hits']['total']['value'] hits = data['hits']['total']['value'] - percent_hit = int(round(len(provenance) / hits * 100)) - log.info(f' ...{len(provenance)} of {hits} retrieved ({percent_hit}%)...') + percent_hit = int(round(len(extant_lidvids) / hits * 100)) + log.info(f' ...{len(extant_lidvids)} of {hits} retrieved ({percent_hit}%)...') if 'scroll_id' in query: path = '_search/scroll/' + query['scroll_id'] @@ -187,7 +185,7 @@ def get_lidvids_and_direct_successors(host: HOST) -> Mapping[str, str]: log.info('Finished retrieving LIDVIDs with current direct successors!') - return provenance + return extant_lidvids def write_updated_docs(host: HOST, lidvids_and_successors: Mapping[str, str]):