Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove --reset option from provenance.py (force to always write history from scratch) #311

Merged
merged 8 commits into from
Apr 11, 2023
105 changes: 57 additions & 48 deletions support/provenance/provenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import collections
import json
import logging
from typing import Union, List
from typing import Union, List, Mapping, Iterable

log = logging.getLogger('provenance')
import requests
Expand All @@ -56,6 +56,8 @@
HOST = collections.namedtuple('HOST', ['nodes', 'password', 'url', 'username',
'verify'])

METADATA_SUCCESSOR_KEY = 'ops:Provenance/ops:superseded_by'


def parse_log_level(input: str) -> int:
"""Given a numeric or uppercase descriptive log level, return the associated int"""
Expand Down Expand Up @@ -93,7 +95,6 @@ def run(
username: str,
password: str,
verify_host_certs: bool = False,
reset: bool = False,
log_filepath: Union[str, None] = None,
log_level: int = logging.INFO):
configure_logging(filepath=log_filepath, log_level=log_level)
Expand All @@ -102,54 +103,61 @@ def run(

host = HOST(cluster_nodes, password, base_url, username, verify_host_certs)

provenance = trawl_registry(host)
updates = get_historic(provenance, reset)
extant_lidvids = get_extant_lidvids(host)
updates = get_successors_by_lidvid(extant_lidvids)

if updates:
update_docs(host, updates)
write_updated_docs(host, updates)

log.info('completed CLI processing')


def get_historic(provenance: {str: str}, reset: bool) -> {str: str}: # TODO: populate comment and rename for clarity
log.info('starting search for history')
def get_successors_by_lidvid(extant_lidvids: Iterable[str]) -> Mapping[str, str]:
alexdunnjpl marked this conversation as resolved.
Show resolved Hide resolved
"""
Given a collection of LIDVIDs, return a new mapping to their updated direct successors.
"""

log.info('Generating updated history...')

log.info(' reduce lidvids to unique lids')
lids = sorted({lidvid.split('::')[0] for lidvid in provenance})
unique_lids = {lidvid.split('::')[0] for lidvid in extant_lidvids}

log.info(' aggregate lidvids into lid buckets')
aggregates = {lid: [] for lid in lids}
for lidvid in provenance: aggregates[lidvid.split('::')[0]].append(lidvid)
log.info(' ...binning LIDVIDs by LID...')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Me being pedantic: use an actual ellipsis character instead of three periods ... 😅

lidvid_aggregates_by_lid = {lid: [] for lid in unique_lids}
for lidvid in extant_lidvids:
lid = lidvid.split('::')[0]
lidvid_aggregates_by_lid[lid].append(lidvid)

log.info(' process those with history')
count = 0
history = {}
for lidvids in filter(lambda l: 1 < len(l), aggregates.values()):
count += len(lidvids)
log.info(' ...determining updated successors for LIDVIDs...')
successors_by_lidvid = {}
lidvid_aggregates_with_multiple_versions = filter(lambda l: 1 < len(l), lidvid_aggregates_by_lid.values())
alexdunnjpl marked this conversation as resolved.
Show resolved Hide resolved
for lidvids in lidvid_aggregates_with_multiple_versions:
lidvids.sort(key=_vid_as_tuple_of_int, reverse=True)
for index, lidvid in enumerate(lidvids[1:]):
if reset or not provenance[lidvid]:
history[lidvid] = lidvids[index]

log.info(
f'found {len(history)} products needing update of a {count} full history of {len(provenance)} total products')
for successor_idx, lidvid in enumerate(lidvids[1:]):
successors_by_lidvid[lidvid] = lidvids[successor_idx]

log.info(f'Successors will be updated for {len(successors_by_lidvid)} LIDVIDs!')

if log.isEnabledFor(logging.DEBUG):
for lidvid in history.keys():
for lidvid in successors_by_lidvid.keys():
log.debug(f'{lidvid}')

return history
return successors_by_lidvid


def trawl_registry(host: HOST) -> {str: str}: # TODO: populate comment and rename for clarity
log.info('start trawling')
def get_extant_lidvids(host: HOST) -> Iterable[str]:
"""
Given an OpenSearch host, return all extant LIDVIDs
"""

cluster = [node + ":registry" for node in host.nodes]
key = 'ops:Provenance/ops:superseded_by'
path = ','.join(['registry'] + cluster) + '/_search?scroll=10m'
provenance = {}
log.info('Retrieving extant LIDVIDs')

clusters = [node + ":registry" for node in host.nodes]
path = ','.join(['registry'] + clusters) + '/_search?scroll=10m'
extant_lidvids = []
query = {'query': {'bool': {'must_not': [
{'term': {'ops:Tracking_Meta/ops:archive_status': 'staged'}}]}},
'_source': {'includes': ['lidvid', key]},
'_source': {'includes': ['lidvid']},
'size': 10000}

more_data_exists = True
Expand All @@ -162,36 +170,39 @@ def trawl_registry(host: HOST) -> {str: str}: # TODO: populate comment and rena
data = resp.json()
path = '_search/scroll'
query = {'scroll': '10m', 'scroll_id': data['_scroll_id']}
provenance.update({hit['_source']['lidvid']: hit['_source'].get(key, None) for hit in data['hits']['hits']})
more_data_exists = len(provenance) < data['hits']['total']['value']
extant_lidvids.extend([hit['_source']['lidvid'] for hit in data['hits']['hits']])
more_data_exists = len(extant_lidvids) < data['hits']['total']['value']

hits = data['hits']['total']['value']
percent_hit = int(round(len(provenance) / hits * 100))
log.info(f' progress: {len(provenance)} of {hits} ({percent_hit}%)')
percent_hit = int(round(len(extant_lidvids) / hits * 100))
log.info(f' ...{len(extant_lidvids)} of {hits} retrieved ({percent_hit}%)...')

if 'scroll_id' in query:
path = '_search/scroll/' + query['scroll_id']
requests.delete(urllib.parse.urljoin(host.url, path),
auth=(host.username, host.password),
verify=host.verify)

log.info('finished trawling')
log.info('Finished retrieving LIDVIDs with current direct successors!')

return provenance
return extant_lidvids


def update_docs(host: HOST, history: {str: str}):
"""Write provenance history updates to documents in db"""
log.info('Bulk update %d documents', len(history))
def write_updated_docs(host: HOST, lidvids_and_successors: Mapping[str, str]):
"""
Given an OpenSearch host and a mapping of LIDVIDs onto their direct successors, write provenance history updates
to documents in db.
"""
log.info('Bulk update %d documents', len(lidvids_and_successors))

bulk_updates = []
cluster = [node + ":registry" for node in host.nodes]
headers = {'Content-Type': 'application/x-ndjson'}
path = ','.join(['registry'] + cluster) + '/_bulk'

for lidvid, supersede in history.items():
for lidvid, direct_successor in lidvids_and_successors.items():
bulk_updates.append(json.dumps({'update': {'_id': lidvid}}))
bulk_updates.append(json.dumps({'doc': {'ops:Provenance/ops:superseded_by': supersede}}))
bulk_updates.append(json.dumps({'doc': {METADATA_SUCCESSOR_KEY: direct_successor}}))

bulk_data = '\n'.join(bulk_updates) + '\n'

Expand All @@ -212,9 +223,10 @@ def update_docs(host: HOST, history: {str: str}):


if __name__ == '__main__':
ap = argparse.ArgumentParser(description='''Update the provenance of products with more than one VID

The program sweeps through the registry index to find all the lidvids and existing provenance. It then builds up the updates necessary to mark the newly superseded products accordingly.
ap = argparse.ArgumentParser(description=f'''
Update registry records for non-latest LIDVIDs with up-to-date direct successor metadata ({METADATA_SUCCESSOR_KEY}).

alexdunnjpl marked this conversation as resolved.
Show resolved Hide resolved
Retrieves existing published LIDVIDs from the registry, determines history for each LID, and writes updated docs back to OpenSearch
''',
epilog='''EXAMPLES:

Expand All @@ -241,8 +253,6 @@ def update_docs(host: HOST, history: {str: str}):
help='Python logging level as an int or string like INFO for logging.INFO [%(default)s]')
ap.add_argument('-p', '--password', default=None, required=False,
help='password to login to opensearch leaving it blank if opensearch does not require login')
ap.add_argument('-r', '--reset', action='store_true', default=False,
help='ignore existing provenance building it from scratch')
ap.add_argument('-u', '--username', default=None, required=False,
help='username to login to opensearch leaving it blank if opensearch does not require login')
ap.add_argument('-v', '--verify', action='store_true', default=False,
Expand All @@ -255,6 +265,5 @@ def update_docs(host: HOST, history: {str: str}):
username=args.username,
password=args.password,
verify_host_certs=args.verify,
reset=args.reset,
log_level=args.log_level,
log_filepath=args.log_file)
2 changes: 1 addition & 1 deletion support/provenance/provenance_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
if remotesStr is not None and remotesStr.strip() != '':
remotesLists = json.loads(remotesStr)

command = f'provenance.py -b {opensearch_endpoint} -l provenance.log -L DEBUG --reset '
command = f'provenance.py -b {opensearch_endpoint} -l provenance.log -L DEBUG'
if username is not None:
command += f' -u {username} -p {passwd}'

Expand Down