Skip to content

Commit

Permalink
Merge pull request #311 from NASA-PDS/edunn-provenance-wip
Browse files Browse the repository at this point in the history
Remove --reset option from provenance.py (force to always write history from scratch)
  • Loading branch information
nutjob4life authored Apr 11, 2023
2 parents 31be38b + e42bd98 commit f558ddc
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 49 deletions.
105 changes: 57 additions & 48 deletions support/provenance/provenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import collections
import json
import logging
from typing import Union, List
from typing import Union, List, Mapping, Iterable

log = logging.getLogger('provenance')
import requests
Expand All @@ -56,6 +56,8 @@
HOST = collections.namedtuple('HOST', ['nodes', 'password', 'url', 'username',
'verify'])

METADATA_SUCCESSOR_KEY = 'ops:Provenance/ops:superseded_by'


def parse_log_level(input: str) -> int:
"""Given a numeric or uppercase descriptive log level, return the associated int"""
Expand Down Expand Up @@ -93,7 +95,6 @@ def run(
username: str,
password: str,
verify_host_certs: bool = False,
reset: bool = False,
log_filepath: Union[str, None] = None,
log_level: int = logging.INFO):
configure_logging(filepath=log_filepath, log_level=log_level)
Expand All @@ -102,54 +103,61 @@ def run(

host = HOST(cluster_nodes, password, base_url, username, verify_host_certs)

provenance = trawl_registry(host)
updates = get_historic(provenance, reset)
extant_lidvids = get_extant_lidvids(host)
updates = get_successors_by_lidvid(extant_lidvids)

if updates:
update_docs(host, updates)
write_updated_docs(host, updates)

log.info('completed CLI processing')


def get_historic(provenance: {str: str}, reset: bool) -> {str: str}: # TODO: populate comment and rename for clarity
log.info('starting search for history')
def get_successors_by_lidvid(extant_lidvids: Iterable[str]) -> Mapping[str, str]:
"""
Given a collection of LIDVIDs, return a new mapping to their updated direct successors.
"""

log.info('Generating updated history...')

log.info(' reduce lidvids to unique lids')
lids = sorted({lidvid.split('::')[0] for lidvid in provenance})
unique_lids = {lidvid.split('::')[0] for lidvid in extant_lidvids}

log.info(' aggregate lidvids into lid buckets')
aggregates = {lid: [] for lid in lids}
for lidvid in provenance: aggregates[lidvid.split('::')[0]].append(lidvid)
log.info(' ...binning LIDVIDs by LID...')
lidvid_aggregates_by_lid = {lid: [] for lid in unique_lids}
for lidvid in extant_lidvids:
lid = lidvid.split('::')[0]
lidvid_aggregates_by_lid[lid].append(lidvid)

log.info(' process those with history')
count = 0
history = {}
for lidvids in filter(lambda l: 1 < len(l), aggregates.values()):
count += len(lidvids)
log.info(' ...determining updated successors for LIDVIDs...')
successors_by_lidvid = {}
lidvid_aggregates_with_multiple_versions = filter(lambda l: 1 < len(l), lidvid_aggregates_by_lid.values())
for lidvids in lidvid_aggregates_with_multiple_versions:
lidvids.sort(key=_vid_as_tuple_of_int, reverse=True)
for index, lidvid in enumerate(lidvids[1:]):
if reset or not provenance[lidvid]:
history[lidvid] = lidvids[index]

log.info(
f'found {len(history)} products needing update of a {count} full history of {len(provenance)} total products')
for successor_idx, lidvid in enumerate(lidvids[1:]):
successors_by_lidvid[lidvid] = lidvids[successor_idx]

log.info(f'Successors will be updated for {len(successors_by_lidvid)} LIDVIDs!')

if log.isEnabledFor(logging.DEBUG):
for lidvid in history.keys():
for lidvid in successors_by_lidvid.keys():
log.debug(f'{lidvid}')

return history
return successors_by_lidvid


def trawl_registry(host: HOST) -> {str: str}: # TODO: populate comment and rename for clarity
log.info('start trawling')
def get_extant_lidvids(host: HOST) -> Iterable[str]:
"""
Given an OpenSearch host, return all extant LIDVIDs
"""

cluster = [node + ":registry" for node in host.nodes]
key = 'ops:Provenance/ops:superseded_by'
path = ','.join(['registry'] + cluster) + '/_search?scroll=10m'
provenance = {}
log.info('Retrieving extant LIDVIDs')

clusters = [node + ":registry" for node in host.nodes]
path = ','.join(['registry'] + clusters) + '/_search?scroll=10m'
extant_lidvids = []
query = {'query': {'bool': {'must_not': [
{'term': {'ops:Tracking_Meta/ops:archive_status': 'staged'}}]}},
'_source': {'includes': ['lidvid', key]},
'_source': {'includes': ['lidvid']},
'size': 10000}

more_data_exists = True
Expand All @@ -162,36 +170,39 @@ def trawl_registry(host: HOST) -> {str: str}: # TODO: populate comment and rena
data = resp.json()
path = '_search/scroll'
query = {'scroll': '10m', 'scroll_id': data['_scroll_id']}
provenance.update({hit['_source']['lidvid']: hit['_source'].get(key, None) for hit in data['hits']['hits']})
more_data_exists = len(provenance) < data['hits']['total']['value']
extant_lidvids.extend([hit['_source']['lidvid'] for hit in data['hits']['hits']])
more_data_exists = len(extant_lidvids) < data['hits']['total']['value']

hits = data['hits']['total']['value']
percent_hit = int(round(len(provenance) / hits * 100))
log.info(f' progress: {len(provenance)} of {hits} ({percent_hit}%)')
percent_hit = int(round(len(extant_lidvids) / hits * 100))
log.info(f' ...{len(extant_lidvids)} of {hits} retrieved ({percent_hit}%)...')

if 'scroll_id' in query:
path = '_search/scroll/' + query['scroll_id']
requests.delete(urllib.parse.urljoin(host.url, path),
auth=(host.username, host.password),
verify=host.verify)

log.info('finished trawling')
log.info('Finished retrieving LIDVIDs with current direct successors!')

return provenance
return extant_lidvids


def update_docs(host: HOST, history: {str: str}):
"""Write provenance history updates to documents in db"""
log.info('Bulk update %d documents', len(history))
def write_updated_docs(host: HOST, lidvids_and_successors: Mapping[str, str]):
"""
Given an OpenSearch host and a mapping of LIDVIDs onto their direct successors, write provenance history updates
to documents in db.
"""
log.info('Bulk update %d documents', len(lidvids_and_successors))

bulk_updates = []
cluster = [node + ":registry" for node in host.nodes]
headers = {'Content-Type': 'application/x-ndjson'}
path = ','.join(['registry'] + cluster) + '/_bulk'

for lidvid, supersede in history.items():
for lidvid, direct_successor in lidvids_and_successors.items():
bulk_updates.append(json.dumps({'update': {'_id': lidvid}}))
bulk_updates.append(json.dumps({'doc': {'ops:Provenance/ops:superseded_by': supersede}}))
bulk_updates.append(json.dumps({'doc': {METADATA_SUCCESSOR_KEY: direct_successor}}))

bulk_data = '\n'.join(bulk_updates) + '\n'

Expand All @@ -212,9 +223,10 @@ def update_docs(host: HOST, history: {str: str}):


if __name__ == '__main__':
ap = argparse.ArgumentParser(description='''Update the provenance of products with more than one VID
The program sweeps through the registry index to find all the lidvids and existing provenance. It then builds up the updates necessary to mark the newly superseded products accordingly.
ap = argparse.ArgumentParser(description=f'''
Update registry records for non-latest LIDVIDs with up-to-date direct successor metadata ({METADATA_SUCCESSOR_KEY}).
Retrieves existing published LIDVIDs from the registry, determines history for each LID, and writes updated docs back to OpenSearch
''',
epilog='''EXAMPLES:
Expand All @@ -241,8 +253,6 @@ def update_docs(host: HOST, history: {str: str}):
help='Python logging level as an int or string like INFO for logging.INFO [%(default)s]')
ap.add_argument('-p', '--password', default=None, required=False,
help='password to login to opensearch leaving it blank if opensearch does not require login')
ap.add_argument('-r', '--reset', action='store_true', default=False,
help='ignore existing provenance building it from scratch')
ap.add_argument('-u', '--username', default=None, required=False,
help='username to login to opensearch leaving it blank if opensearch does not require login')
ap.add_argument('-v', '--verify', action='store_true', default=False,
Expand All @@ -255,6 +265,5 @@ def update_docs(host: HOST, history: {str: str}):
username=args.username,
password=args.password,
verify_host_certs=args.verify,
reset=args.reset,
log_level=args.log_level,
log_filepath=args.log_file)
2 changes: 1 addition & 1 deletion support/provenance/provenance_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
if remotesStr is not None and remotesStr.strip() != '':
remotesLists = json.loads(remotesStr)

command = f'provenance.py -b {opensearch_endpoint} -l provenance.log -L DEBUG --reset '
command = f'provenance.py -b {opensearch_endpoint} -l provenance.log -L DEBUG'
if username is not None:
command += f' -u {username} -p {passwd}'

Expand Down

0 comments on commit f558ddc

Please sign in to comment.