diff --git a/support/provenance/provenance.py b/support/provenance/provenance.py index 0a7e9797..3eda2fd7 100755 --- a/support/provenance/provenance.py +++ b/support/provenance/provenance.py @@ -45,7 +45,7 @@ import collections import json import logging -from typing import Union, List +from typing import Union, List, Mapping, Iterable log = logging.getLogger('provenance') import requests @@ -56,6 +56,8 @@ HOST = collections.namedtuple('HOST', ['nodes', 'password', 'url', 'username', 'verify']) +METADATA_SUCCESSOR_KEY = 'ops:Provenance/ops:superseded_by' + def parse_log_level(input: str) -> int: """Given a numeric or uppercase descriptive log level, return the associated int""" @@ -93,7 +95,6 @@ def run( username: str, password: str, verify_host_certs: bool = False, - reset: bool = False, log_filepath: Union[str, None] = None, log_level: int = logging.INFO): configure_logging(filepath=log_filepath, log_level=log_level) @@ -102,54 +103,61 @@ def run( host = HOST(cluster_nodes, password, base_url, username, verify_host_certs) - provenance = trawl_registry(host) - updates = get_historic(provenance, reset) + extant_lidvids = get_extant_lidvids(host) + updates = get_successors_by_lidvid(extant_lidvids) if updates: - update_docs(host, updates) + write_updated_docs(host, updates) log.info('completed CLI processing') -def get_historic(provenance: {str: str}, reset: bool) -> {str: str}: # TODO: populate comment and rename for clarity - log.info('starting search for history') +def get_successors_by_lidvid(extant_lidvids: Iterable[str]) -> Mapping[str, str]: + """ + Given a collection of LIDVIDs, return a new mapping to their updated direct successors. + """ + + log.info('Generating updated history...') - log.info(' reduce lidvids to unique lids') - lids = sorted({lidvid.split('::')[0] for lidvid in provenance}) + unique_lids = {lidvid.split('::')[0] for lidvid in extant_lidvids} - log.info(' aggregate lidvids into lid buckets') - aggregates = {lid: [] for lid in lids} - for lidvid in provenance: aggregates[lidvid.split('::')[0]].append(lidvid) + log.info(' ...binning LIDVIDs by LID...') + lidvid_aggregates_by_lid = {lid: [] for lid in unique_lids} + for lidvid in extant_lidvids: + lid = lidvid.split('::')[0] + lidvid_aggregates_by_lid[lid].append(lidvid) - log.info(' process those with history') - count = 0 - history = {} - for lidvids in filter(lambda l: 1 < len(l), aggregates.values()): - count += len(lidvids) + log.info(' ...determining updated successors for LIDVIDs...') + successors_by_lidvid = {} + lidvid_aggregates_with_multiple_versions = filter(lambda l: 1 < len(l), lidvid_aggregates_by_lid.values()) + for lidvids in lidvid_aggregates_with_multiple_versions: lidvids.sort(key=_vid_as_tuple_of_int, reverse=True) - for index, lidvid in enumerate(lidvids[1:]): - if reset or not provenance[lidvid]: - history[lidvid] = lidvids[index] - log.info( - f'found {len(history)} products needing update of a {count} full history of {len(provenance)} total products') + for successor_idx, lidvid in enumerate(lidvids[1:]): + successors_by_lidvid[lidvid] = lidvids[successor_idx] + + log.info(f'Successors will be updated for {len(successors_by_lidvid)} LIDVIDs!') + if log.isEnabledFor(logging.DEBUG): - for lidvid in history.keys(): + for lidvid in successors_by_lidvid.keys(): log.debug(f'{lidvid}') - return history + return successors_by_lidvid -def trawl_registry(host: HOST) -> {str: str}: # TODO: populate comment and rename for clarity - log.info('start trawling') +def get_extant_lidvids(host: HOST) -> Iterable[str]: + """ + Given an OpenSearch host, return all extant LIDVIDs + """ - cluster = [node + ":registry" for node in host.nodes] - key = 'ops:Provenance/ops:superseded_by' - path = ','.join(['registry'] + cluster) + '/_search?scroll=10m' - provenance = {} + log.info('Retrieving extant LIDVIDs') + + clusters = [node + ":registry" for node in host.nodes] + path = ','.join(['registry'] + clusters) + '/_search?scroll=10m' + extant_lidvids = [] query = {'query': {'bool': {'must_not': [ {'term': {'ops:Tracking_Meta/ops:archive_status': 'staged'}}]}}, - '_source': {'includes': ['lidvid', key]}, + '_source': {'includes': ['lidvid']}, 'size': 10000} more_data_exists = True @@ -162,12 +170,12 @@ def trawl_registry(host: HOST) -> {str: str}: # TODO: populate comment and rena data = resp.json() path = '_search/scroll' query = {'scroll': '10m', 'scroll_id': data['_scroll_id']} - provenance.update({hit['_source']['lidvid']: hit['_source'].get(key, None) for hit in data['hits']['hits']}) - more_data_exists = len(provenance) < data['hits']['total']['value'] + extant_lidvids.extend([hit['_source']['lidvid'] for hit in data['hits']['hits']]) + more_data_exists = len(extant_lidvids) < data['hits']['total']['value'] hits = data['hits']['total']['value'] - percent_hit = int(round(len(provenance) / hits * 100)) - log.info(f' progress: {len(provenance)} of {hits} ({percent_hit}%)') + percent_hit = int(round(len(extant_lidvids) / hits * 100)) + log.info(f' ...{len(extant_lidvids)} of {hits} retrieved ({percent_hit}%)...') if 'scroll_id' in query: path = '_search/scroll/' + query['scroll_id'] @@ -175,23 +183,26 @@ def trawl_registry(host: HOST) -> {str: str}: # TODO: populate comment and rena auth=(host.username, host.password), verify=host.verify) - log.info('finished trawling') + log.info('Finished retrieving LIDVIDs with current direct successors!') - return provenance + return extant_lidvids -def update_docs(host: HOST, history: {str: str}): - """Write provenance history updates to documents in db""" - log.info('Bulk update %d documents', len(history)) +def write_updated_docs(host: HOST, lidvids_and_successors: Mapping[str, str]): + """ + Given an OpenSearch host and a mapping of LIDVIDs onto their direct successors, write provenance history updates + to documents in db. + """ + log.info('Bulk update %d documents', len(lidvids_and_successors)) bulk_updates = [] cluster = [node + ":registry" for node in host.nodes] headers = {'Content-Type': 'application/x-ndjson'} path = ','.join(['registry'] + cluster) + '/_bulk' - for lidvid, supersede in history.items(): + for lidvid, direct_successor in lidvids_and_successors.items(): bulk_updates.append(json.dumps({'update': {'_id': lidvid}})) - bulk_updates.append(json.dumps({'doc': {'ops:Provenance/ops:superseded_by': supersede}})) + bulk_updates.append(json.dumps({'doc': {METADATA_SUCCESSOR_KEY: direct_successor}})) bulk_data = '\n'.join(bulk_updates) + '\n' @@ -212,9 +223,10 @@ def update_docs(host: HOST, history: {str: str}): if __name__ == '__main__': - ap = argparse.ArgumentParser(description='''Update the provenance of products with more than one VID - - The program sweeps through the registry index to find all the lidvids and existing provenance. It then builds up the updates necessary to mark the newly superseded products accordingly. + ap = argparse.ArgumentParser(description=f''' + Update registry records for non-latest LIDVIDs with up-to-date direct successor metadata ({METADATA_SUCCESSOR_KEY}). + + Retrieves existing published LIDVIDs from the registry, determines history for each LID, and writes updated docs back to OpenSearch ''', epilog='''EXAMPLES: @@ -241,8 +253,6 @@ def update_docs(host: HOST, history: {str: str}): help='Python logging level as an int or string like INFO for logging.INFO [%(default)s]') ap.add_argument('-p', '--password', default=None, required=False, help='password to login to opensearch leaving it blank if opensearch does not require login') - ap.add_argument('-r', '--reset', action='store_true', default=False, - help='ignore existing provenance building it from scratch') ap.add_argument('-u', '--username', default=None, required=False, help='username to login to opensearch leaving it blank if opensearch does not require login') ap.add_argument('-v', '--verify', action='store_true', default=False, @@ -255,6 +265,5 @@ def update_docs(host: HOST, history: {str: str}): username=args.username, password=args.password, verify_host_certs=args.verify, - reset=args.reset, log_level=args.log_level, log_filepath=args.log_file) diff --git a/support/provenance/provenance_driver.py b/support/provenance/provenance_driver.py index 684b8275..15ca99f1 100755 --- a/support/provenance/provenance_driver.py +++ b/support/provenance/provenance_driver.py @@ -72,7 +72,7 @@ if remotesStr is not None and remotesStr.strip() != '': remotesLists = json.loads(remotesStr) -command = f'provenance.py -b {opensearch_endpoint} -l provenance.log -L DEBUG --reset ' +command = f'provenance.py -b {opensearch_endpoint} -l provenance.log -L DEBUG' if username is not None: command += f' -u {username} -p {passwd}'