diff --git a/oc_meta/core/creator.py b/oc_meta/core/creator.py index ad3ffb9..04cf6f1 100644 --- a/oc_meta/core/creator.py +++ b/oc_meta/core/creator.py @@ -24,9 +24,12 @@ from oc_meta.core.curator import get_edited_br_metaid from oc_meta.lib.finder import ResourceFinder -from oc_meta.lib.master_of_regex import (comma_and_spaces, name_and_ids, - one_or_more_spaces, - semicolon_in_people_field) +from oc_meta.lib.master_of_regex import ( + comma_and_spaces, + name_and_ids, + one_or_more_spaces, + semicolon_in_people_field, +) from rdflib import Graph, URIRef from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler @@ -37,14 +40,54 @@ class Creator(object): - def __init__(self, data:list, endpoint:str, base_iri:str, counter_handler:RedisCounterHandler, supplier_prefix:str, resp_agent:str, ra_index:dict, br_index:dict, re_index_csv:dict, ar_index_csv:dict, vi_index:dict, preexisting_entities: set, everything_everywhere_allatonce: Graph, settings: dict = None, meta_config_path: str = None): + def __init__( + self, + data: list, + endpoint: str, + base_iri: str, + counter_handler: RedisCounterHandler, + supplier_prefix: str, + resp_agent: str, + ra_index: dict, + br_index: dict, + re_index_csv: dict, + ar_index_csv: dict, + vi_index: dict, + preexisting_entities: set, + everything_everywhere_allatonce: Graph, + settings: dict = None, + meta_config_path: str = None, + ): self.url = base_iri - self.setgraph = GraphSet(self.url, supplier_prefix=supplier_prefix, wanted_label=False, custom_counter_handler=counter_handler) + self.setgraph = GraphSet( + self.url, + supplier_prefix=supplier_prefix, + wanted_label=False, + custom_counter_handler=counter_handler, + ) self.resp_agent = resp_agent - self.finder = ResourceFinder(ts_url = endpoint, base_iri = base_iri, local_g=everything_everywhere_allatonce, settings=settings, meta_config_path=meta_config_path) + self.finder = ResourceFinder( + ts_url=endpoint, + base_iri=base_iri, + local_g=everything_everywhere_allatonce, + settings=settings, + meta_config_path=meta_config_path, + ) - self.ra_id_schemas = {'crossref', 'orcid', 'viaf', 'wikidata', 'ror'} - self.br_id_schemas = {'arxiv', 'doi', 'issn', 'isbn', 'jid', 'openalex', 'pmid', 'pmcid', 'url', 'wikidata', 'wikipedia'} + self.ra_id_schemas = {"crossref", "orcid", "viaf", "wikidata", "ror"} + self.br_id_schemas = { + "arxiv", + "doi", + "issn", + "isbn", + "jid", + "openalex", + "pmid", + "pmcid", + "url", + "wikidata", + "wikipedia", + } self.schemas = self.ra_id_schemas.union(self.br_id_schemas) self.ra_index = self.indexer_id(ra_index) @@ -60,19 +103,19 @@ def __init__(self, data:list, endpoint:str, base_iri:str, counter_handler:RedisC def creator(self, source=None): self.src = source for row in self.data: - self.row_meta = '' - self.venue_meta = '' - ids = row['id'] - title = row['title'] - authors = row['author'] - pub_date = row['pub_date'] - venue = row['venue'] - vol = row['volume'] - issue = row['issue'] - page = row['page'] - self.type = row['type'] - publisher = row['publisher'] - editor = row['editor'] + self.row_meta = "" + self.venue_meta = "" + ids = row["id"] + title = row["title"] + authors = row["author"] + pub_date = row["pub_date"] + venue = row["venue"] + vol = row["volume"] + issue = row["issue"] + page = row["page"] + self.type = row["type"] + publisher = row["publisher"] + editor = row["editor"] self.venue_graph = None self.vol_graph = None self.issue_graph = None @@ -91,26 +134,26 @@ def creator(self, source=None): def index_re(id_index): index = dict() for row in id_index: - index[row['br']] = row['re'] + index[row["br"]] = row["re"] return index @staticmethod def index_ar(id_index): index = dict() for row in id_index: - index[row['meta']] = dict() - index[row['meta']]['author'] = Creator.__ar_worker(row['author']) - index[row['meta']]['editor'] = Creator.__ar_worker(row['editor']) - index[row['meta']]['publisher'] = Creator.__ar_worker(row['publisher']) + index[row["meta"]] = dict() + index[row["meta"]]["author"] = Creator.__ar_worker(row["author"]) + index[row["meta"]]["editor"] = Creator.__ar_worker(row["editor"]) + index[row["meta"]]["publisher"] = Creator.__ar_worker(row["publisher"]) return index @staticmethod - def __ar_worker(s:str) -> dict: + def __ar_worker(s: str) -> dict: if s: ar_dict = dict() - couples = s.split('; ') + couples = s.split("; ") for c in couples: - cou = c.split(', ') + cou = c.split(", ") ar_dict[cou[1]] = cou[0] return ar_dict else: @@ -122,22 +165,33 @@ def indexer_id(self, csv_index): index[schema] = dict() for row in csv_index: for schema in self.schemas: - if row['id'].startswith(schema): - identifier = row['id'].replace(f'{schema}:', '') - index[schema][identifier] = row['meta'] + if row["id"].startswith(schema): + identifier = row["id"].replace(f"{schema}:", "") + index[schema][identifier] = row["meta"] return index def id_action(self, ids): idslist = re.split(one_or_more_spaces, ids) # publication id for identifier in idslist: - if 'omid:' in identifier: - identifier = identifier.replace('omid:', '') - preexisting_entity = True if identifier in self.preexisting_entities else False - self.row_meta = identifier.replace('br/', '') + if "omid:" in identifier: + identifier = identifier.replace("omid:", "") + preexisting_entity = ( + True if identifier in self.preexisting_entities else False + ) + self.row_meta = identifier.replace("br/", "") url = URIRef(self.url + identifier) - preexisting_graph = self.finder.get_subgraph(url, self.preexisting_graphs) if preexisting_entity else None - self.br_graph = self.setgraph.add_br(self.resp_agent, source=self.src, res=url, preexisting_graph=preexisting_graph) + preexisting_graph = ( + self.finder.get_subgraph(url, self.preexisting_graphs) + if preexisting_entity + else None + ) + self.br_graph = self.setgraph.add_br( + self.resp_agent, + source=self.src, + res=url, + preexisting_graph=preexisting_graph, + ) for identifier in idslist: self.id_creator(self.br_graph, identifier, ra=False) @@ -152,18 +206,31 @@ def author_action(self, authors): for aut in authorslist: aut_and_ids = re.search(name_and_ids, aut) aut_id = aut_and_ids.group(2) - aut_id_list = aut_id.split(' ') + aut_id_list = aut_id.split(" ") for identifier in aut_id_list: - if 'omid:' in identifier: - identifier = str(identifier).replace('omid:', '') - preexisting_entity = True if identifier in self.preexisting_entities else False + if "omid:" in identifier: + identifier = str(identifier).replace("omid:", "") + preexisting_entity = ( + True if identifier in self.preexisting_entities else False + ) url = URIRef(self.url + identifier) - aut_meta = identifier.replace('ra/', '') - preexisting_graph = self.finder.get_subgraph(url, self.preexisting_graphs) if preexisting_entity else None - pub_aut = self.setgraph.add_ra(self.resp_agent, source=self.src, res=url, preexisting_graph=preexisting_graph) + aut_meta = identifier.replace("ra/", "") + preexisting_graph = ( + self.finder.get_subgraph(url, self.preexisting_graphs) + if preexisting_entity + else None + ) + pub_aut = self.setgraph.add_ra( + self.resp_agent, + source=self.src, + res=url, + preexisting_graph=preexisting_graph, + ) author_name = aut_and_ids.group(1) - if ',' in author_name: - author_name_splitted = re.split(comma_and_spaces, author_name) + if "," in author_name: + author_name_splitted = re.split( + comma_and_spaces, author_name + ) first_name = author_name_splitted[1] last_name = author_name_splitted[0] if first_name.strip(): @@ -175,23 +242,36 @@ def author_action(self, authors): for identifier in aut_id_list: self.id_creator(pub_aut, identifier, ra=True) # Author ROLE - AR = self.ar_index[self.row_meta]['author'][aut_meta] - ar_id = 'ar/' + str(AR) - preexisting_entity = True if ar_id in self.preexisting_entities else False + AR = self.ar_index[self.row_meta]["author"][aut_meta] + ar_id = "ar/" + str(AR) + preexisting_entity = ( + True if ar_id in self.preexisting_entities else False + ) url_ar = URIRef(self.url + ar_id) - preexisting_graph = self.finder.get_subgraph(url_ar, self.preexisting_graphs) if preexisting_entity else None - pub_aut_role = self.setgraph.add_ar(self.resp_agent, source=self.src, res=url_ar, preexisting_graph=preexisting_graph) + preexisting_graph = ( + self.finder.get_subgraph(url_ar, self.preexisting_graphs) + if preexisting_entity + else None + ) + pub_aut_role = self.setgraph.add_ar( + self.resp_agent, + source=self.src, + res=url_ar, + preexisting_graph=preexisting_graph, + ) pub_aut_role.create_author() self.br_graph.has_contributor(pub_aut_role) pub_aut_role.is_held_by(pub_aut) aut_role_list.append(pub_aut_role) if len(aut_role_list) > 1: - aut_role_list[aut_role_list.index(pub_aut_role)-1].has_next(pub_aut_role) + aut_role_list[aut_role_list.index(pub_aut_role) - 1].has_next( + pub_aut_role + ) def pub_date_action(self, pub_date): if pub_date: datelist = list() - datesplit = pub_date.split('-') + datesplit = pub_date.split("-") if datesplit: for x in datesplit: datelist.append(int(x)) @@ -206,45 +286,82 @@ def vvi_action(self, venue, vol, issue): venue_ids = venue_and_ids.group(2) venue_ids_list = venue_ids.split() for identifier in venue_ids_list: - if 'omid:' in identifier: - ven_id = str(identifier).replace('omid:', '') - self.venue_meta = ven_id.replace('br/', '') - preexisting_entity = True if ven_id in self.preexisting_entities else False + if "omid:" in identifier: + ven_id = str(identifier).replace("omid:", "") + self.venue_meta = ven_id.replace("br/", "") + preexisting_entity = ( + True if ven_id in self.preexisting_entities else False + ) url = URIRef(self.url + ven_id) venue_title = venue_and_ids.group(1) - preexisting_graph = self.finder.get_subgraph(url, self.preexisting_graphs) if preexisting_entity else None - self.venue_graph = self.setgraph.add_br(self.resp_agent, source=self.src, res=url, preexisting_graph=preexisting_graph) + preexisting_graph = ( + self.finder.get_subgraph(url, self.preexisting_graphs) + if preexisting_entity + else None + ) + self.venue_graph = self.setgraph.add_br( + self.resp_agent, + source=self.src, + res=url, + preexisting_graph=preexisting_graph, + ) try: venue_type = self.get_venue_type(self.type, venue_ids_list) except UnboundLocalError: error_message = f"[INFO:Creator] I found the venue {venue} for the resource of type {self.type}, but I don't know how to handle it" raise UnboundLocalError(error_message) if venue_type: - venue_type = venue_type.replace(' ', '_') - getattr(self.venue_graph, f'create_{venue_type}')() + venue_type = venue_type.replace(" ", "_") + getattr(self.venue_graph, f"create_{venue_type}")() self.venue_graph.has_title(venue_title) for identifier in venue_ids_list: self.id_creator(self.venue_graph, identifier, ra=False) - if self.type in {'journal article', 'journal volume', 'journal issue'}: + if self.type in {"journal article", "journal volume", "journal issue"}: if vol: - vol_meta = self.vi_index[self.venue_meta]['volume'][vol]['id'] - vol_meta = 'br/' + vol_meta - preexisting_entity = True if vol_meta in self.preexisting_entities else False + vol_meta = self.vi_index[self.venue_meta]["volume"][vol]["id"] + vol_meta = "br/" + vol_meta + preexisting_entity = ( + True if vol_meta in self.preexisting_entities else False + ) vol_url = URIRef(self.url + vol_meta) - preexisting_graph = self.finder.get_subgraph(vol_url, self.preexisting_graphs) if preexisting_entity else None - self.vol_graph = self.setgraph.add_br(self.resp_agent, source=self.src, res=vol_url, preexisting_graph=preexisting_graph) + preexisting_graph = ( + self.finder.get_subgraph(vol_url, self.preexisting_graphs) + if preexisting_entity + else None + ) + self.vol_graph = self.setgraph.add_br( + self.resp_agent, + source=self.src, + res=vol_url, + preexisting_graph=preexisting_graph, + ) self.vol_graph.create_volume() self.vol_graph.has_number(vol) if issue: if vol: - issue_meta = self.vi_index[self.venue_meta]['volume'][vol]['issue'][issue]['id'] + issue_meta = self.vi_index[self.venue_meta]["volume"][vol][ + "issue" + ][issue]["id"] else: - issue_meta = self.vi_index[self.venue_meta]['issue'][issue]['id'] - issue_meta = 'br/' + issue_meta - preexisting_entity = True if issue_meta in self.preexisting_entities else False + issue_meta = self.vi_index[self.venue_meta]["issue"][issue][ + "id" + ] + issue_meta = "br/" + issue_meta + preexisting_entity = ( + True if issue_meta in self.preexisting_entities else False + ) issue_url = URIRef(self.url + issue_meta) - preexisting_graph = self.finder.get_subgraph(issue_url, self.preexisting_graphs) if preexisting_entity else None - self.issue_graph = self.setgraph.add_br(self.resp_agent, source=self.src, res=issue_url, preexisting_graph=preexisting_graph) + preexisting_graph = ( + self.finder.get_subgraph(issue_url, self.preexisting_graphs) + if preexisting_entity + else None + ) + self.issue_graph = self.setgraph.add_br( + self.resp_agent, + source=self.src, + res=issue_url, + preexisting_graph=preexisting_graph, + ) self.issue_graph.create_issue() self.issue_graph.has_number(issue) if venue and vol and issue: @@ -261,133 +378,144 @@ def vvi_action(self, venue, vol, issue): self.issue_graph.is_part_of(self.venue_graph) @classmethod - def get_venue_type(cls, br_type:str, venue_ids:list) -> str: - schemas = {venue_id.split(':', maxsplit=1)[0] for venue_id in venue_ids} - venue_type = '' - if br_type in {'journal article', 'journal volume', 'journal issue'}: - venue_type = 'journal' - elif br_type in {'book chapter', 'book part', 'book section', 'book track'}: - venue_type = 'book' - elif br_type in {'book', 'edited book', 'monograph', 'reference book'}: - venue_type = 'book series' - elif br_type == 'proceedings article': - venue_type = 'proceedings' - elif br_type in {'proceedings', 'report', 'standard', 'series'}: - venue_type = 'series' - elif br_type == 'reference entry': - venue_type = 'reference book' - elif br_type == 'report series': - venue_type = 'report series' - elif not br_type or br_type in {'dataset', 'data file'}: - venue_type = '' + def get_venue_type(cls, br_type: str, venue_ids: list) -> str: + schemas = {venue_id.split(":", maxsplit=1)[0] for venue_id in venue_ids} + venue_type = "" + if br_type in {"journal article", "journal volume", "journal issue"}: + venue_type = "journal" + elif br_type in {"book chapter", "book part", "book section", "book track"}: + venue_type = "book" + elif br_type in {"book", "edited book", "monograph", "reference book"}: + venue_type = "book series" + elif br_type == "proceedings article": + venue_type = "proceedings" + elif br_type in {"proceedings", "report", "standard", "series"}: + venue_type = "series" + elif br_type == "reference entry": + venue_type = "reference book" + elif br_type == "report series": + venue_type = "report series" + elif not br_type or br_type in {"dataset", "data file"}: + venue_type = "" # Check the type based on the identifier scheme - if any(identifier for identifier in venue_ids if not identifier.startswith('omid:')): - if venue_type in {'journal', 'book series', 'series', 'report series'}: - if 'isbn' in schemas or 'issn' not in schemas: + if any( + identifier for identifier in venue_ids if not identifier.startswith("omid:") + ): + if venue_type in {"journal", "book series", "series", "report series"}: + if "isbn" in schemas or "issn" not in schemas: # It is undecidable - venue_type = '' - elif venue_type in {'book', 'proceedings'}: - if 'issn' in schemas or 'isbn' not in schemas: - venue_type = '' - elif venue_type == 'reference book': - if 'isbn' in schemas and 'issn' not in schemas: - venue_type = 'reference book' - elif 'issn' in schemas and 'isbn' not in schemas: - venue_type = 'journal' - elif 'issn' in schemas and 'isbn' in schemas: - venue_type = '' + venue_type = "" + elif venue_type in {"book", "proceedings"}: + if "issn" in schemas or "isbn" not in schemas: + venue_type = "" + elif venue_type == "reference book": + if "isbn" in schemas and "issn" not in schemas: + venue_type = "reference book" + elif "issn" in schemas and "isbn" not in schemas: + venue_type = "journal" + elif "issn" in schemas and "isbn" in schemas: + venue_type = "" return venue_type def page_action(self, page): if page: res_em = self.re_index[self.row_meta] - re_id = 're/' + str(res_em) + re_id = "re/" + str(res_em) preexisting_entity = True if re_id in self.preexisting_entities else False url_re = URIRef(self.url + re_id) - preexisting_graph = self.finder.get_subgraph(url_re, self.preexisting_graphs) if preexisting_entity else None - form = self.setgraph.add_re(self.resp_agent, source=self.src, res=url_re, preexisting_graph=preexisting_graph) + preexisting_graph = ( + self.finder.get_subgraph(url_re, self.preexisting_graphs) + if preexisting_entity + else None + ) + form = self.setgraph.add_re( + self.resp_agent, + source=self.src, + res=url_re, + preexisting_graph=preexisting_graph, + ) form.has_starting_page(page) form.has_ending_page(page) self.br_graph.has_format(form) def type_action(self, entity_type): - if entity_type == 'abstract': + if entity_type == "abstract": self.br_graph.create_abstract() - if entity_type == 'archival document': + if entity_type == "archival document": self.br_graph.create_archival_document() - elif entity_type == 'audio document': + elif entity_type == "audio document": self.br_graph.create_audio_document() - elif entity_type == 'book': + elif entity_type == "book": self.br_graph.create_book() - elif entity_type == 'book chapter': + elif entity_type == "book chapter": self.br_graph.create_book_chapter() - elif entity_type == 'book part': + elif entity_type == "book part": self.br_graph.create_book_part() - elif entity_type == 'book section': + elif entity_type == "book section": self.br_graph.create_book_section() - elif entity_type == 'book series': + elif entity_type == "book series": self.br_graph.create_book_series() - elif entity_type == 'book set': + elif entity_type == "book set": self.br_graph.create_book_set() - elif entity_type == 'computer program': + elif entity_type == "computer program": self.br_graph.create_computer_program() - elif entity_type in {'data file', 'dataset'}: + elif entity_type in {"data file", "dataset"}: self.br_graph.create_dataset() - elif entity_type == 'data management plan': + elif entity_type == "data management plan": self.br_graph.create_data_management_plan() - elif entity_type == 'dissertation': + elif entity_type == "dissertation": self.br_graph.create_dissertation() - elif entity_type == 'editorial': + elif entity_type == "editorial": self.br_graph.create_editorial() # elif entity_type == 'edited book': # self.br_graph.create_edited_book() - elif entity_type == 'journal': + elif entity_type == "journal": self.br_graph.create_journal() - elif entity_type == 'journal article': + elif entity_type == "journal article": self.br_graph.create_journal_article() - elif entity_type == 'journal editorial': + elif entity_type == "journal editorial": self.br_graph.create_journal_editorial() - elif entity_type == 'journal issue': + elif entity_type == "journal issue": self.br_graph.create_issue() - elif entity_type == 'journal volume': + elif entity_type == "journal volume": self.br_graph.create_volume() - elif entity_type == 'newspaper': + elif entity_type == "newspaper": self.br_graph.create_newspaper() - elif entity_type == 'newspaper article': + elif entity_type == "newspaper article": self.br_graph.create_newspaper_article() - elif entity_type == 'newspaper issue': + elif entity_type == "newspaper issue": self.br_graph.create_newspaper_issue() # elif entity_type == 'monograph': # self.br_graph.create_monograph() - elif entity_type == 'peer review': + elif entity_type == "peer review": self.br_graph.create_peer_review() - elif entity_type == 'preprint': + elif entity_type == "preprint": self.br_graph.create_preprint() - elif entity_type == 'presentation': + elif entity_type == "presentation": self.br_graph.create_presentation() - elif entity_type == 'proceedings': + elif entity_type == "proceedings": self.br_graph.create_proceedings() - elif entity_type == 'proceedings article': + elif entity_type == "proceedings article": self.br_graph.create_proceedings_article() # elif entity_type == 'proceedings series': # self.br_graph.create_proceedings_series() - elif entity_type == 'reference book': + elif entity_type == "reference book": self.br_graph.create_reference_book() - elif entity_type == 'reference entry': + elif entity_type == "reference entry": self.br_graph.create_reference_entry() - elif entity_type == 'report': + elif entity_type == "report": self.br_graph.create_report() - elif entity_type == 'report series': + elif entity_type == "report series": self.br_graph.create_report_series() - elif entity_type == 'retraction notice': + elif entity_type == "retraction notice": self.br_graph.create_retraction_notice() - elif entity_type == 'standard': + elif entity_type == "standard": self.br_graph.create_standard() - elif entity_type == 'series': + elif entity_type == "series": self.br_graph.create_series() # elif entity_type == 'standard series': # self.br_graph.create_standard_series()() - elif entity_type == 'web content': + elif entity_type == "web content": self.br_graph.create_web_content() def publisher_action(self, publisher): @@ -399,30 +527,52 @@ def publisher_action(self, publisher): publ_id = publ_and_ids.group(2) publ_id_list = publ_id.split() for identifier in publ_id_list: - if 'omid:' in identifier: - identifier = str(identifier).replace('omid:', '') - preexisting_entity = True if identifier in self.preexisting_entities else False - pub_meta = identifier.replace('ra/', '') + if "omid:" in identifier: + identifier = str(identifier).replace("omid:", "") + preexisting_entity = ( + True if identifier in self.preexisting_entities else False + ) + pub_meta = identifier.replace("ra/", "") url = URIRef(self.url + identifier) publ_name = publ_and_ids.group(1) - preexisting_graph = self.finder.get_subgraph(url, self.preexisting_graphs) if preexisting_entity else None - publ = self.setgraph.add_ra(self.resp_agent, source=self.src, res=url, preexisting_graph=preexisting_graph) + preexisting_graph = ( + self.finder.get_subgraph(url, self.preexisting_graphs) + if preexisting_entity + else None + ) + publ = self.setgraph.add_ra( + self.resp_agent, + source=self.src, + res=url, + preexisting_graph=preexisting_graph, + ) publ.has_name(publ_name) for identifier in publ_id_list: self.id_creator(publ, identifier, ra=True) # publisherRole - AR = self.ar_index[self.row_meta]['publisher'][pub_meta] - ar_id = 'ar/' + str(AR) - preexisting_entity = True if ar_id in self.preexisting_entities else False + AR = self.ar_index[self.row_meta]["publisher"][pub_meta] + ar_id = "ar/" + str(AR) + preexisting_entity = ( + True if ar_id in self.preexisting_entities else False + ) url_ar = URIRef(self.url + ar_id) - preexisting_graph = self.finder.get_subgraph(url_ar, self.preexisting_graphs) - publ_role = self.setgraph.add_ar(self.resp_agent, source=self.src, res=url_ar, preexisting_graph=preexisting_graph) + preexisting_graph = self.finder.get_subgraph( + url_ar, self.preexisting_graphs + ) + publ_role = self.setgraph.add_ar( + self.resp_agent, + source=self.src, + res=url_ar, + preexisting_graph=preexisting_graph, + ) publ_role.create_publisher() self.br_graph.has_contributor(publ_role) publ_role.is_held_by(publ) pub_role_list.append(publ_role) if len(pub_role_list) > 1: - pub_role_list[pub_role_list.index(publ_role)-1].has_next(publ_role) + pub_role_list[pub_role_list.index(publ_role) - 1].has_next( + publ_role + ) def editor_action(self, editor, row): if editor: @@ -431,18 +581,31 @@ def editor_action(self, editor, row): for ed in editorslist: ed_and_ids = re.search(name_and_ids, ed) ed_id = ed_and_ids.group(2) - ed_id_list = ed_id.split(' ') + ed_id_list = ed_id.split(" ") for identifier in ed_id_list: - if 'omid:' in identifier: - identifier = str(identifier).replace('omid:', '') - preexisting_entity = True if identifier in self.preexisting_entities else False - ed_meta = identifier.replace('ra/', '') + if "omid:" in identifier: + identifier = str(identifier).replace("omid:", "") + preexisting_entity = ( + True if identifier in self.preexisting_entities else False + ) + ed_meta = identifier.replace("ra/", "") url = URIRef(self.url + identifier) - preexisting_graph = self.finder.get_subgraph(url, self.preexisting_graphs) if preexisting_entity else None - pub_ed = self.setgraph.add_ra(self.resp_agent, source=self.src, res=url, preexisting_graph=preexisting_graph) + preexisting_graph = ( + self.finder.get_subgraph(url, self.preexisting_graphs) + if preexisting_entity + else None + ) + pub_ed = self.setgraph.add_ra( + self.resp_agent, + source=self.src, + res=url, + preexisting_graph=preexisting_graph, + ) editor_name = ed_and_ids.group(1) - if ',' in editor_name: - editor_name_splitted = re.split(comma_and_spaces, editor_name) + if "," in editor_name: + editor_name_splitted = re.split( + comma_and_spaces, editor_name + ) firstName = editor_name_splitted[1] lastName = editor_name_splitted[0] if firstName.strip(): @@ -455,14 +618,30 @@ def editor_action(self, editor, row): self.id_creator(pub_ed, identifier, ra=True) # editorRole br_key = get_edited_br_metaid(row, self.row_meta, self.venue_meta) - AR = self.ar_index[br_key]['editor'][ed_meta] - ar_id = 'ar/' + str(AR) - preexisting_entity = True if ar_id in self.preexisting_entities else False + AR = self.ar_index[br_key]["editor"][ed_meta] + ar_id = "ar/" + str(AR) + preexisting_entity = ( + True if ar_id in self.preexisting_entities else False + ) url_ar = URIRef(self.url + ar_id) - preexisting_graph = self.finder.get_subgraph(url_ar, self.preexisting_graphs) if preexisting_entity else None - pub_ed_role = self.setgraph.add_ar(self.resp_agent, source=self.src, res=url_ar, preexisting_graph=preexisting_graph) + preexisting_graph = ( + self.finder.get_subgraph(url_ar, self.preexisting_graphs) + if preexisting_entity + else None + ) + pub_ed_role = self.setgraph.add_ar( + self.resp_agent, + source=self.src, + res=url_ar, + preexisting_graph=preexisting_graph, + ) pub_ed_role.create_editor() - br_graphs:List[BibliographicResource] = [self.br_graph, self.issue_graph, self.vol_graph, self.venue_graph] + br_graphs: List[BibliographicResource] = [ + self.br_graph, + self.issue_graph, + self.vol_graph, + self.venue_graph, + ] for graph in br_graphs: if br_key == self.__res_metaid(graph): graph.has_contributor(pub_ed_role) @@ -470,33 +649,55 @@ def editor_action(self, editor, row): edit_role_list.append(pub_ed_role) for i, edit_role in enumerate(edit_role_list): if i > 0: - edit_role_list[i-1].has_next(edit_role) - - def __res_metaid(self, graph:BibliographicResource): + edit_role_list[i - 1].has_next(edit_role) + + def __res_metaid(self, graph: BibliographicResource): if graph: - return graph.res.replace(f'{self.url}br/','') + return graph.res.replace(f"{self.url}br/", "") - def id_creator(self, graph:BibliographicEntity, identifier:str, ra:bool) -> None: + def id_creator(self, graph: BibliographicEntity, identifier: str, ra: bool) -> None: new_id = None if ra: for ra_id_schema in self.ra_id_schemas: if identifier.startswith(ra_id_schema): - identifier = identifier.replace(f'{ra_id_schema}:', '') + identifier = identifier.replace(f"{ra_id_schema}:", "") res = self.ra_index[ra_id_schema][identifier] - preexisting_entity = True if f'id/{res}' in self.preexisting_entities else False - url = URIRef(self.url + 'id/' + res) - preexisting_graph = self.finder.get_subgraph(url, self.preexisting_graphs) if preexisting_entity else None - new_id = self.setgraph.add_id(self.resp_agent, source=self.src, res=url, preexisting_graph=preexisting_graph) - getattr(new_id, f'create_{ra_id_schema}')(identifier) + preexisting_entity = ( + True if f"id/{res}" in self.preexisting_entities else False + ) + url = URIRef(self.url + "id/" + res) + preexisting_graph = ( + self.finder.get_subgraph(url, self.preexisting_graphs) + if preexisting_entity + else None + ) + new_id = self.setgraph.add_id( + self.resp_agent, + source=self.src, + res=url, + preexisting_graph=preexisting_graph, + ) + getattr(new_id, f"create_{ra_id_schema}")(identifier) else: for br_id_schema in self.br_id_schemas: if identifier.startswith(br_id_schema): - identifier = identifier.replace(f'{br_id_schema}:', '') + identifier = identifier.replace(f"{br_id_schema}:", "") res = self.br_index[br_id_schema][identifier] - preexisting_entity = True if f'id/{res}' in self.preexisting_entities else False - url = URIRef(self.url + 'id/' + res) - preexisting_graph = self.finder.get_subgraph(url, self.preexisting_graphs) if preexisting_entity else None - new_id = self.setgraph.add_id(self.resp_agent, source=self.src, res=url, preexisting_graph=preexisting_graph) - getattr(new_id, f'create_{br_id_schema}')(identifier) + preexisting_entity = ( + True if f"id/{res}" in self.preexisting_entities else False + ) + url = URIRef(self.url + "id/" + res) + preexisting_graph = ( + self.finder.get_subgraph(url, self.preexisting_graphs) + if preexisting_entity + else None + ) + new_id = self.setgraph.add_id( + self.resp_agent, + source=self.src, + res=url, + preexisting_graph=preexisting_graph, + ) + getattr(new_id, f"create_{br_id_schema}")(identifier) if new_id: - graph.has_identifier(new_id) \ No newline at end of file + graph.has_identifier(new_id) diff --git a/oc_meta/core/curator.py b/oc_meta/core/curator.py index 4a874b2..ba2795b 100644 --- a/oc_meta/core/curator.py +++ b/oc_meta/core/curator.py @@ -35,28 +35,51 @@ class Curator: - def __init__(self, data:List[dict], ts:str, prov_config:str, counter_handler:RedisCounterHandler, base_iri:str='https://w3id.org/oc/meta', prefix:str='060', separator:str=None, valid_dois_cache:dict=dict(), settings:dict|None = None, silencer:list = [], meta_config_path: str = None): + def __init__( + self, + data: List[dict], + ts: str, + prov_config: str, + counter_handler: RedisCounterHandler, + base_iri: str = "https://w3id.org/oc/meta", + prefix: str = "060", + separator: str = None, + valid_dois_cache: dict = dict(), + settings: dict | None = None, + silencer: list = [], + meta_config_path: str = None, + ): self.settings = settings or {} self.everything_everywhere_allatonce = Graph() - self.finder = ResourceFinder(ts, base_iri, self.everything_everywhere_allatonce, settings=settings, meta_config_path=meta_config_path) + self.finder = ResourceFinder( + ts, + base_iri, + self.everything_everywhere_allatonce, + settings=settings, + meta_config_path=meta_config_path, + ) self.base_iri = base_iri self.prov_config = prov_config self.separator = separator # Preliminary pass to clear volume and issue if id is present but venue is missing for row in data: - if row['id'] and (row['volume'] or row['issue']): - if not row['venue']: - row['volume'] = '' - row['issue'] = '' - if not row['type']: - row['type'] = 'journal article' - self.data = [{field:value.strip() for field,value in row.items()} for row in data if is_a_valid_row(row)] + if row["id"] and (row["volume"] or row["issue"]): + if not row["venue"]: + row["volume"] = "" + row["issue"] = "" + if not row["type"]: + row["type"] = "journal article" + self.data = [ + {field: value.strip() for field, value in row.items()} + for row in data + if is_a_valid_row(row) + ] self.prefix = prefix # Redis counter handler self.counter_handler = counter_handler self.brdict = {} - self.radict:Dict[str, Dict[str, list]] = {} - self.ardict:Dict[str, Dict[str, list]] = {} + self.radict: Dict[str, Dict[str, list]] = {} + self.ardict: Dict[str, Dict[str, list]] = {} self.vvi = {} # Venue, Volume, Issue self.idra = {} # key id; value metaid of id related to ra self.idbr = {} # key id; value metaid of id related to br @@ -64,7 +87,7 @@ def __init__(self, data:List[dict], ts:str, prov_config:str, counter_handler:Red self.brmeta = dict() self.armeta = dict() self.remeta = dict() - self.wnb_cnt = 0 # wannabe counter + self.wnb_cnt = 0 # wannabe counter self.rowcnt = 0 self.log = dict() self.valid_dois_cache = valid_dois_cache @@ -76,67 +99,91 @@ def collect_identifiers(self, valid_dois_cache): all_idslist = set() all_vvis = set() for row in self.data: - metavals, idslist, vvis = self.extract_identifiers_and_metavals(row, valid_dois_cache=valid_dois_cache) + metavals, idslist, vvis = self.extract_identifiers_and_metavals( + row, valid_dois_cache=valid_dois_cache + ) all_metavals.update(metavals) all_idslist.update(idslist) all_vvis.update(vvis) return all_metavals, all_idslist, all_vvis - def extract_identifiers_and_metavals(self, row, valid_dois_cache) -> Tuple[set, set, set]: + def extract_identifiers_and_metavals( + self, row, valid_dois_cache + ) -> Tuple[set, set, set]: metavals = set() all_idslist = set() vvis = set() - if row['id']: - idslist, metaval = self.clean_id_list(self.split_identifiers(row['id']), br=True, valid_dois_cache=valid_dois_cache) - id_metaval = f'omid:br/{metaval}' if metaval else '' + if row["id"]: + idslist, metaval = self.clean_id_list( + self.split_identifiers(row["id"]), + br=True, + valid_dois_cache=valid_dois_cache, + ) + id_metaval = f"omid:br/{metaval}" if metaval else "" if id_metaval: metavals.add(id_metaval) if idslist: all_idslist.update(idslist) venue_metaid = None - fields_with_an_id = [(field, re.search(name_and_ids, row[field]).group(2).split()) for field in ['author', 'editor', 'publisher', 'venue', 'volume', 'issue'] if re.search(name_and_ids, row[field])] + fields_with_an_id = [ + (field, re.search(name_and_ids, row[field]).group(2).split()) + for field in ["author", "editor", "publisher", "venue", "volume", "issue"] + if re.search(name_and_ids, row[field]) + ] for field, field_ids in fields_with_an_id: - br = field in ['venue', 'volume', 'issue'] - field_idslist, field_metaval = self.clean_id_list(field_ids, br=br, valid_dois_cache=valid_dois_cache) - if field == 'venue': + br = field in ["venue", "volume", "issue"] + field_idslist, field_metaval = self.clean_id_list( + field_ids, br=br, valid_dois_cache=valid_dois_cache + ) + if field == "venue": venue_metaid = field_metaval if field_metaval: - field_metaval = f'omid:br/{field_metaval}' if br else f'omid:ra/{field_metaval}' + field_metaval = ( + f"omid:br/{field_metaval}" if br else f"omid:ra/{field_metaval}" + ) else: - field_metaval = '' + field_metaval = "" if field_metaval: metavals.add(field_metaval) if field_idslist: all_idslist.update(field_idslist) vvi = None - if not row['id'] and venue_metaid and (row['volume'] or row['issue']): - vvi = (row['volume'], row['issue'], venue_metaid) - vvis.add(vvi) + if not row["id"] and venue_metaid and (row["volume"] or row["issue"]): + vvi = (row["volume"], row["issue"], venue_metaid) + vvis.add(vvi) return metavals, all_idslist, vvis def split_identifiers(self, field_value): if self.separator: - return re.sub(colon_and_spaces, ':', field_value).split(self.separator) + return re.sub(colon_and_spaces, ":", field_value).split(self.separator) else: - return re.split(one_or_more_spaces, re.sub(colon_and_spaces, ':', field_value)) + return re.split( + one_or_more_spaces, re.sub(colon_and_spaces, ":", field_value) + ) - def curator(self, filename: str = None, path_csv: str = None, path_index: str = None): - metavals, identifiers, vvis = self.collect_identifiers(valid_dois_cache=self.valid_dois_cache) - self.finder.get_everything_about_res(metavals=metavals, identifiers=identifiers, vvis=vvis) + def curator( + self, filename: str = None, path_csv: str = None, path_index: str = None + ): + metavals, identifiers, vvis = self.collect_identifiers( + valid_dois_cache=self.valid_dois_cache + ) + self.finder.get_everything_about_res( + metavals=metavals, identifiers=identifiers, vvis=vvis + ) for row in self.data: self.log[self.rowcnt] = { - 'id': {}, - 'title': {}, - 'author': {}, - 'venue': {}, - 'editor': {}, - 'publisher': {}, - 'page': {}, - 'volume': {}, - 'issue': {}, - 'pub_date': {}, - 'type': {} + "id": {}, + "title": {}, + "author": {}, + "venue": {}, + "editor": {}, + "publisher": {}, + "page": {}, + "volume": {}, + "issue": {}, + "pub_date": {}, + "type": {}, } self.clean_id(row) self.rowcnt += 1 @@ -148,102 +195,159 @@ def curator(self, filename: str = None, path_csv: str = None, path_index: str = self.rowcnt += 1 self.rowcnt = 0 for row in self.data: - self.clean_ra(row, 'author') - self.clean_ra(row, 'publisher') - self.clean_ra(row, 'editor') + self.clean_ra(row, "author") + self.clean_ra(row, "publisher") + self.clean_ra(row, "editor") self.rowcnt += 1 self.get_preexisting_entities() self.meta_maker() self.log = self.log_update() self.enrich() # Remove duplicates - self.data = list({v['id']: v for v in self.data}.values()) + self.data = list({v["id"]: v for v in self.data}.values()) if path_index: path_index = os.path.join(path_index, filename) self.filename = filename self.indexer(path_index, path_csv) + # ID - def clean_id(self, row:Dict[str,str]) -> None: - ''' - The 'clean id()' function is executed for each CSV row. - In this process, any duplicates are detected by the IDs in the 'id' column. - For each line, a wannabeID or, if the bibliographic resource was found in the triplestore, - a MetaID is assigned. - Finally, this method enrich and clean the fields related to the + def clean_id(self, row: Dict[str, str]) -> None: + """ + The 'clean id()' function is executed for each CSV row. + In this process, any duplicates are detected by the IDs in the 'id' column. + For each line, a wannabeID or, if the bibliographic resource was found in the triplestore, + a MetaID is assigned. + Finally, this method enrich and clean the fields related to the title, venue, volume, issue, page, publication date and type. :params row: a dictionary representing a CSV row :type row: Dict[str, str] :returns: None -- This method modifies the input CSV row without returning it. - ''' - if row['title']: - name = Cleaner(row['title']).clean_title(self.settings.get('normalize_titles')) + """ + if row["title"]: + name = Cleaner(row["title"]).clean_title( + self.settings.get("normalize_titles") + ) else: - name = '' + name = "" metaval_ids_list = [] - if row['id']: + if row["id"]: if self.separator: - idslist = re.sub(colon_and_spaces, ':', row['id']).split(self.separator) + idslist = re.sub(colon_and_spaces, ":", row["id"]).split(self.separator) else: - idslist = re.split(one_or_more_spaces, re.sub(colon_and_spaces, ':', row['id'])) - idslist, metaval = self.clean_id_list(idslist, br=True, valid_dois_cache=self.valid_dois_cache) - id_metaval = f'omid:br/{metaval}' if metaval else '' + idslist = re.split( + one_or_more_spaces, re.sub(colon_and_spaces, ":", row["id"]) + ) + idslist, metaval = self.clean_id_list( + idslist, br=True, valid_dois_cache=self.valid_dois_cache + ) + id_metaval = f"omid:br/{metaval}" if metaval else "" metaval_ids_list.append((id_metaval, idslist)) - fields_with_an_id = [(field, re.search(name_and_ids, row[field]).group(2).split()) for field in ['author', 'editor', 'publisher', 'venue', 'volume', 'issue'] if re.search(name_and_ids, row[field])] + fields_with_an_id = [ + (field, re.search(name_and_ids, row[field]).group(2).split()) + for field in ["author", "editor", "publisher", "venue", "volume", "issue"] + if re.search(name_and_ids, row[field]) + ] for field, field_ids in fields_with_an_id: - if field in ['author', 'editor', 'publisher']: + if field in ["author", "editor", "publisher"]: br = False - elif field in ['venue', 'volume', 'issue']: + elif field in ["venue", "volume", "issue"]: br = True - field_idslist, field_metaval = self.clean_id_list(field_ids, br=br, valid_dois_cache=self.valid_dois_cache) + field_idslist, field_metaval = self.clean_id_list( + field_ids, br=br, valid_dois_cache=self.valid_dois_cache + ) if field_metaval: - field_metaval = f'omid:br/{field_metaval}' if br else f'omid:ra/{field_metaval}' + field_metaval = ( + f"omid:br/{field_metaval}" if br else f"omid:ra/{field_metaval}" + ) else: - field_metaval = '' + field_metaval = "" metaval_ids_list.append((field_metaval, field_idslist)) - if row['id']: - metaval = self.id_worker('id', name, idslist, metaval, ra_ent=False, br_ent=True, vvi_ent=False, publ_entity=False) + if row["id"]: + metaval = self.id_worker( + "id", + name, + idslist, + metaval, + ra_ent=False, + br_ent=True, + vvi_ent=False, + publ_entity=False, + ) else: metaval = self.new_entity(self.brdict, name) - row['title'] = self.brdict[metaval]['title'] - row['id'] = metaval - + row["title"] = self.brdict[metaval]["title"] + row["id"] = metaval + def clean_metadata_without_id(self): for row in self.data: # page - if row['page']: - row['page'] = Cleaner(row['page']).normalize_hyphens() + if row["page"]: + row["page"] = Cleaner(row["page"]).normalize_hyphens() # date - if row['pub_date']: - date = Cleaner(row['pub_date']).normalize_hyphens() + if row["pub_date"]: + date = Cleaner(row["pub_date"]).normalize_hyphens() date = Cleaner(date).clean_date() - row['pub_date'] = date + row["pub_date"] = date # type - if row['type']: - entity_type = ' '.join((row['type'].lower()).split()) - if entity_type == 'edited book' or entity_type == 'monograph': - entity_type = 'book' - elif entity_type == 'report series' or entity_type == 'standard series' or entity_type == 'proceedings series': - entity_type = 'series' - elif entity_type == 'posted content': - entity_type = 'web content' - if entity_type in {'abstract', 'archival document', 'audio document', 'book', - 'book chapter', 'book part', 'book section', 'book series', - 'book set', 'computer program', 'data file', 'data management plan', - 'dataset', 'dissertation', 'editorial', 'journal', 'journal article', - 'journal editorial', 'journal issue', 'journal volume', - 'newspaper', 'newspaper article', 'newspaper editorial', - 'newspaper issue', 'peer review', 'preprint', 'presentation', - 'proceedings', 'proceedings article', 'proceedings series', - 'reference book', 'reference entry', 'retraction notice', - 'series', 'report', 'standard', 'web content'}: - row['type'] = entity_type + if row["type"]: + entity_type = " ".join((row["type"].lower()).split()) + if entity_type == "edited book" or entity_type == "monograph": + entity_type = "book" + elif ( + entity_type == "report series" + or entity_type == "standard series" + or entity_type == "proceedings series" + ): + entity_type = "series" + elif entity_type == "posted content": + entity_type = "web content" + if entity_type in { + "abstract", + "archival document", + "audio document", + "book", + "book chapter", + "book part", + "book section", + "book series", + "book set", + "computer program", + "data file", + "data management plan", + "dataset", + "dissertation", + "editorial", + "journal", + "journal article", + "journal editorial", + "journal issue", + "journal volume", + "newspaper", + "newspaper article", + "newspaper editorial", + "newspaper issue", + "peer review", + "preprint", + "presentation", + "proceedings", + "proceedings article", + "proceedings series", + "reference book", + "reference entry", + "retraction notice", + "series", + "report", + "standard", + "web content", + }: + row["type"] = entity_type else: - row['type'] = '' + row["type"] = "" # VVI def clean_vvi(self, row: Dict[str, str]) -> None: - ''' + """ This method performs the deduplication process for venues, volumes and issues. The acquired information is stored in the 'vvi' dictionary, that has the following format: :: @@ -261,118 +365,150 @@ def clean_vvi(self, row: Dict[str, str]) -> None: { '4416': { - 'issue': {}, + 'issue': {}, 'volume': { - '166': {'id': '4388', 'issue': {'4': {'id': '4389'}}}, - '172': {'id': '4434', + '166': {'id': '4388', 'issue': {'4': {'id': '4389'}}}, + '172': {'id': '4434', 'issue': { - '22': {'id': '4435'}, - '20': {'id': '4436'}, - '21': {'id': '4437'}, + '22': {'id': '4435'}, + '20': {'id': '4436'}, + '21': {'id': '4437'}, '19': {'id': '4438'} } } } } - } + } :params row: a dictionary representing a CSV row :type row: Dict[str, str] :returns: None -- This method modifies the input CSV row without returning it. - ''' - if row['type'] not in {'journal article', 'journal volume', 'journal issue'} and (row['volume'] or row['issue']): - row['volume'] = '' - row['issue'] = '' + """ + if row["type"] not in { + "journal article", + "journal volume", + "journal issue", + } and (row["volume"] or row["issue"]): + row["volume"] = "" + row["issue"] = "" Cleaner.clean_volume_and_issue(row=row) vol_meta = None - br_type = row['type'] - volume = row['volume'] - issue = row['issue'] - br_id = row['id'] - venue = row['venue'] - + br_type = row["type"] + volume = row["volume"] + issue = row["issue"] + br_id = row["id"] + venue = row["venue"] + # Venue if venue: # The data must be invalidated, because the resource is journal but a volume or an issue have also been specified - if br_type == 'journal' and (volume or issue): - row['venue'] = '' - row['volume'] = '' - row['issue'] = '' + if br_type == "journal" and (volume or issue): + row["venue"] = "" + row["volume"] = "" + row["issue"] = "" venue_id = re.search(name_and_ids, venue) if venue_id: - name = Cleaner(venue_id.group(1)).clean_title(self.settings.get('normalize_titles')) + name = Cleaner(venue_id.group(1)).clean_title( + self.settings.get("normalize_titles") + ) venue_id = venue_id.group(2) if self.separator: - idslist = re.sub(colon_and_spaces, ':', venue_id).split(self.separator) + idslist = re.sub(colon_and_spaces, ":", venue_id).split( + self.separator + ) else: - idslist = re.split(one_or_more_spaces, re.sub(colon_and_spaces, ':', venue_id)) - idslist, metaval = self.clean_id_list(idslist, br=True, valid_dois_cache=self.valid_dois_cache) - - metaval = self.id_worker('venue', name, idslist, metaval, ra_ent=False, br_ent=True, vvi_ent=True, publ_entity=False) + idslist = re.split( + one_or_more_spaces, re.sub(colon_and_spaces, ":", venue_id) + ) + idslist, metaval = self.clean_id_list( + idslist, br=True, valid_dois_cache=self.valid_dois_cache + ) + + metaval = self.id_worker( + "venue", + name, + idslist, + metaval, + ra_ent=False, + br_ent=True, + vvi_ent=True, + publ_entity=False, + ) if metaval not in self.vvi: ts_vvi = None - if 'wannabe' not in metaval: + if "wannabe" not in metaval: ts_vvi = self.finder.retrieve_venue_from_meta(metaval) - if 'wannabe' in metaval or not ts_vvi: + if "wannabe" in metaval or not ts_vvi: self.vvi[metaval] = dict() - self.vvi[metaval]['volume'] = dict() - self.vvi[metaval]['issue'] = dict() + self.vvi[metaval]["volume"] = dict() + self.vvi[metaval]["issue"] = dict() elif ts_vvi: self.vvi[metaval] = ts_vvi else: - name = Cleaner(venue).clean_title(self.settings.get('normalize_titles')) + name = Cleaner(venue).clean_title(self.settings.get("normalize_titles")) metaval = self.new_entity(self.brdict, name) self.vvi[metaval] = dict() - self.vvi[metaval]['volume'] = dict() - self.vvi[metaval]['issue'] = dict() - row['venue'] = metaval - + self.vvi[metaval]["volume"] = dict() + self.vvi[metaval]["issue"] = dict() + row["venue"] = metaval + # Volume - if volume and (br_type == 'journal issue' or br_type == 'journal article'): - if volume in self.vvi[metaval]['volume']: - vol_meta = self.vvi[metaval]['volume'][volume]['id'] + if volume and (br_type == "journal issue" or br_type == "journal article"): + if volume in self.vvi[metaval]["volume"]: + vol_meta = self.vvi[metaval]["volume"][volume]["id"] else: - vol_meta = self.new_entity(self.brdict, '') - self.vvi[metaval]['volume'][volume] = dict() - self.vvi[metaval]['volume'][volume]['id'] = vol_meta - self.vvi[metaval]['volume'][volume]['issue'] = dict() - elif volume and br_type == 'journal volume': + vol_meta = self.new_entity(self.brdict, "") + self.vvi[metaval]["volume"][volume] = dict() + self.vvi[metaval]["volume"][volume]["id"] = vol_meta + self.vvi[metaval]["volume"][volume]["issue"] = dict() + elif volume and br_type == "journal volume": # The data must be invalidated, because the resource is a journal volume but an issue has also been specified if issue: - row['volume'] = '' - row['issue'] = '' + row["volume"] = "" + row["issue"] = "" else: vol_meta = br_id - self.volume_issue(vol_meta, self.vvi[metaval]['volume'], volume, row) - + self.volume_issue( + vol_meta, self.vvi[metaval]["volume"], volume, row + ) + # Issue - if issue and br_type == 'journal article': - row['issue'] = issue + if issue and br_type == "journal article": + row["issue"] = issue if vol_meta: - if issue not in self.vvi[metaval]['volume'][volume]['issue']: - issue_meta = self.new_entity(self.brdict, '') - self.vvi[metaval]['volume'][volume]['issue'][issue] = dict() - self.vvi[metaval]['volume'][volume]['issue'][issue]['id'] = issue_meta + if issue not in self.vvi[metaval]["volume"][volume]["issue"]: + issue_meta = self.new_entity(self.brdict, "") + self.vvi[metaval]["volume"][volume]["issue"][issue] = dict() + self.vvi[metaval]["volume"][volume]["issue"][issue][ + "id" + ] = issue_meta else: - if issue not in self.vvi[metaval]['issue']: - issue_meta = self.new_entity(self.brdict, '') - self.vvi[metaval]['issue'][issue] = dict() - self.vvi[metaval]['issue'][issue]['id'] = issue_meta - elif issue and br_type == 'journal issue': + if issue not in self.vvi[metaval]["issue"]: + issue_meta = self.new_entity(self.brdict, "") + self.vvi[metaval]["issue"][issue] = dict() + self.vvi[metaval]["issue"][issue]["id"] = issue_meta + elif issue and br_type == "journal issue": issue_meta = br_id if vol_meta: - self.volume_issue(issue_meta, self.vvi[metaval]['volume'][volume]['issue'], issue, row) + self.volume_issue( + issue_meta, + self.vvi[metaval]["volume"][volume]["issue"], + issue, + row, + ) else: - self.volume_issue(issue_meta, self.vvi[metaval]['issue'], issue, row) - + self.volume_issue( + issue_meta, self.vvi[metaval]["issue"], issue, row + ) + else: - row['venue'] = '' - row['volume'] = '' - row['issue'] = '' + row["venue"] = "" + row["volume"] = "" + row["issue"] = "" # RA def clean_ra(self, row, col_name): - ''' + """ This method performs the deduplication process for responsible agents (authors, publishers and editors). :params row: a dictionary representing a CSV row @@ -380,28 +516,35 @@ def clean_ra(self, row, col_name): :params col_name: the CSV column name. It can be 'author', 'publisher', or 'editor' :type col_name: str :returns: None -- This method modifies self.ardict, self.radict, and self.idra, and returns None. - ''' + """ + def get_br_metaval_to_check(row, col_name): - if col_name == 'editor': - return get_edited_br_metaid(row, row['id'], row['venue']) + if col_name == "editor": + return get_edited_br_metaid(row, row["id"], row["venue"]) else: - return row['id'] + return row["id"] def get_br_metaval(br_metaval_to_check): if br_metaval_to_check in self.brdict or br_metaval_to_check in self.vvi: return br_metaval_to_check - return [id for id in self.brdict if br_metaval_to_check in self.brdict[id]['others']][0] + return [ + id + for id in self.brdict + if br_metaval_to_check in self.brdict[id]["others"] + ][0] def initialize_ardict_entry(br_metaval): if br_metaval not in self.ardict: - self.ardict[br_metaval] = {'author': [], 'editor': [], 'publisher': []} + self.ardict[br_metaval] = {"author": [], "editor": [], "publisher": []} def initialize_sequence(br_metaval, col_name): sequence = [] - if 'wannabe' in br_metaval: + if "wannabe" in br_metaval: sequence = [] else: - sequence_found = self.finder.retrieve_ra_sequence_from_br_meta(br_metaval, col_name) + sequence_found = self.finder.retrieve_ra_sequence_from_br_meta( + br_metaval, col_name + ) if sequence_found: sequence = [] for agent in sequence_found: @@ -410,17 +553,17 @@ def initialize_sequence(br_metaval, col_name): sequence.append(tuple((ar_metaid, ra_metaid))) if ra_metaid not in self.radict: self.radict[ra_metaid] = dict() - self.radict[ra_metaid]['ids'] = list() - self.radict[ra_metaid]['others'] = list() - self.radict[ra_metaid]['title'] = agent[ar_metaid][0] + self.radict[ra_metaid]["ids"] = list() + self.radict[ra_metaid]["others"] = list() + self.radict[ra_metaid]["title"] = agent[ar_metaid][0] for identifier in agent[ar_metaid][1]: # other ids after meta id_metaid = identifier[0] literal = identifier[1] if id_metaid not in self.idra: self.idra[literal] = id_metaid - if literal not in self.radict[ra_metaid]['ids']: - self.radict[ra_metaid]['ids'].append(literal) + if literal not in self.radict[ra_metaid]["ids"]: + self.radict[ra_metaid]["ids"].append(literal) self.ardict[br_metaval][col_name].extend(sequence) else: sequence = [] @@ -431,7 +574,7 @@ def parse_ra_list(row): ra_list = Cleaner.clean_ra_list(ra_list) return ra_list - def process_individual_ra(ra, sequence): + def process_individual_ra(ra, sequence): new_elem_seq = True ra_id = None ra_id_match = re.search(name_and_ids, ra) @@ -444,15 +587,15 @@ def process_individual_ra(ra, sequence): name = cleaner.clean_name() if not ra_id and sequence: for _, ra_metaid in sequence: - if self.radict[ra_metaid]['title'] == name: - ra_id = 'omid:ra/' + str(ra_metaid) + if self.radict[ra_metaid]["title"] == name: + ra_id = "omid:ra/" + str(ra_metaid) new_elem_seq = False break return ra_id, name, new_elem_seq if not row[col_name]: return - + br_metaval_to_check = get_br_metaval_to_check(row, col_name) br_metaval = get_br_metaval(br_metaval_to_check) initialize_ardict_entry(br_metaval) @@ -471,65 +614,99 @@ def process_individual_ra(ra, sequence): ra_id, name, new_elem_seq = process_individual_ra(ra, sequence) if ra_id: if self.separator: - ra_id_list = re.sub(colon_and_spaces, ':', ra_id).split(self.separator) + ra_id_list = re.sub(colon_and_spaces, ":", ra_id).split( + self.separator + ) else: - ra_id_list = re.split(one_or_more_spaces, re.sub(colon_and_spaces, ':', ra_id)) + ra_id_list = re.split( + one_or_more_spaces, re.sub(colon_and_spaces, ":", ra_id) + ) if sequence: ar_ra = None for ps, el in enumerate(sequence): ra_metaid = el[1] for literal in ra_id_list: - if literal in self.radict[ra_metaid]['ids']: + if literal in self.radict[ra_metaid]["ids"]: if ps != pos: change_order = True new_elem_seq = False - if 'wannabe' not in ra_metaid: + if "wannabe" not in ra_metaid: ar_ra = ra_metaid for pos, literal_value in enumerate(ra_id_list): - if 'omid' in literal_value: - ra_id_list[pos] = '' + if "omid" in literal_value: + ra_id_list[pos] = "" break ra_id_list = list(filter(None, ra_id_list)) - ra_id_list.append('omid:ra/' + ar_ra) + ra_id_list.append("omid:ra/" + ar_ra) if not ar_ra: # new element for ar_metaid, ra_metaid in sequence: - if self.radict[ra_metaid]['title'] == name: + if self.radict[ra_metaid]["title"] == name: new_elem_seq = False - if 'wannabe' not in ra_metaid: + if "wannabe" not in ra_metaid: ar_ra = ra_metaid for pos, i in enumerate(ra_id_list): - if 'omid' in i: - ra_id_list[pos] = '' + if "omid" in i: + ra_id_list[pos] = "" break ra_id_list = list(filter(None, ra_id_list)) - ra_id_list.append('omid:ra/' + ar_ra) - if col_name == 'publisher': - ra_id_list, metaval = self.clean_id_list(ra_id_list, br=False, valid_dois_cache=self.valid_dois_cache) - metaval = self.id_worker('publisher', name, ra_id_list, metaval, ra_ent=True, br_ent=False, vvi_ent=False, publ_entity=True) + ra_id_list.append("omid:ra/" + ar_ra) + if col_name == "publisher": + ra_id_list, metaval = self.clean_id_list( + ra_id_list, br=False, valid_dois_cache=self.valid_dois_cache + ) + metaval = self.id_worker( + "publisher", + name, + ra_id_list, + metaval, + ra_ent=True, + br_ent=False, + vvi_ent=False, + publ_entity=True, + ) else: - ra_id_list, metaval = self.clean_id_list(ra_id_list, br=False, valid_dois_cache=self.valid_dois_cache) - metaval = self.id_worker(col_name, name, ra_id_list, metaval, ra_ent=True, br_ent=False, vvi_ent=False, publ_entity=False) - if col_name != 'publisher' and metaval in self.radict: - full_name:str = self.radict[metaval]['title'] - if ',' in name and ',' in full_name: - first_name = name.split(',')[1].strip() - if not full_name.split(',')[1].strip() and first_name: # first name found! - given_name = full_name.split(',')[0] - self.radict[metaval]['title'] = given_name + ', ' + first_name + ra_id_list, metaval = self.clean_id_list( + ra_id_list, br=False, valid_dois_cache=self.valid_dois_cache + ) + metaval = self.id_worker( + col_name, + name, + ra_id_list, + metaval, + ra_ent=True, + br_ent=False, + vvi_ent=False, + publ_entity=False, + ) + if col_name != "publisher" and metaval in self.radict: + full_name: str = self.radict[metaval]["title"] + if "," in name and "," in full_name: + first_name = name.split(",")[1].strip() + if ( + not full_name.split(",")[1].strip() and first_name + ): # first name found! + given_name = full_name.split(",")[0] + self.radict[metaval]["title"] = ( + given_name + ", " + first_name + ) else: metaval = self.new_entity(self.radict, name) if new_elem_seq: - role = self.prefix + str(self._add_number('ar')) + role = self.prefix + str(self._add_number("ar")) new_sequence.append(tuple((role, metaval))) if change_order: - self.log[self.rowcnt][col_name]['Info'] = 'New RA sequence proposed: refused' + self.log[self.rowcnt][col_name][ + "Info" + ] = "New RA sequence proposed: refused" sequence.extend(new_sequence) self.ardict[br_metaval][col_name] = sequence @staticmethod - def clean_id_list(id_list:List[str], br:bool, valid_dois_cache:dict=dict()) -> Tuple[list, str]: - ''' + def clean_id_list( + id_list: List[str], br: bool, valid_dois_cache: dict = dict() + ) -> Tuple[list, str]: + """ Clean IDs in the input list and check if there is a MetaID. :params: id_list: a list of IDs @@ -537,46 +714,48 @@ def clean_id_list(id_list:List[str], br:bool, valid_dois_cache:dict=dict()) -> T :params: br: True if the IDs in id_list refer to bibliographic resources, False otherwise :type: br: bool :returns: Tuple[list, str]: -- it returns a two-elements tuple, where the first element is the list of cleaned IDs, while the second is a MetaID if any was found. - ''' - pattern = 'br/' if br else 'ra/' - metaid = '' + """ + pattern = "br/" if br else "ra/" + metaid = "" id_list = list(filter(None, id_list)) clean_list = list() for elem in id_list: if elem in clean_list: continue elem = Cleaner(elem).normalize_hyphens() - identifier = elem.split(':', 1) + identifier = elem.split(":", 1) schema = identifier[0].lower() value = identifier[1] - if schema == 'omid': - metaid = value.replace(pattern, '') + if schema == "omid": + metaid = value.replace(pattern, "") else: - normalized_id = Cleaner(elem).normalize_id(valid_dois_cache=valid_dois_cache) + normalized_id = Cleaner(elem).normalize_id( + valid_dois_cache=valid_dois_cache + ) if normalized_id: clean_list.append(normalized_id) - how_many_meta = [i for i in id_list if i.lower().startswith('omid')] + how_many_meta = [i for i in id_list if i.lower().startswith("omid")] if len(how_many_meta) > 1: - clean_list = [i for i in clean_list if not i.lower().startswith('omid')] + clean_list = [i for i in clean_list if not i.lower().startswith("omid")] return clean_list, metaid - def conflict(self, idslist:List[str], name:str, id_dict:dict, col_name:str) -> str: - if col_name == 'id' or col_name == 'venue': + def conflict( + self, idslist: List[str], name: str, id_dict: dict, col_name: str + ) -> str: + if col_name == "id" or col_name == "venue": entity_dict = self.brdict - elif col_name == 'author' or col_name == 'editor' or col_name == 'publisher': + elif col_name == "author" or col_name == "editor" or col_name == "publisher": entity_dict = self.radict metaval = self.new_entity(entity_dict, name) - entity_dict[metaval] = { - 'ids': list(), - 'others': list(), - 'title': name - } - self.log[self.rowcnt][col_name]['Conflict entity'] = metaval + entity_dict[metaval] = {"ids": list(), "others": list(), "title": name} + self.log[self.rowcnt][col_name]["Conflict entity"] = metaval for identifier in idslist: - entity_dict[metaval]['ids'].append(identifier) + entity_dict[metaval]["ids"].append(identifier) if identifier not in id_dict: - schema_value = identifier.split(':', maxsplit=1) - found_metaid = self.finder.retrieve_metaid_from_id(schema_value[0], schema_value[1]) + schema_value = identifier.split(":", maxsplit=1) + found_metaid = self.finder.retrieve_metaid_from_id( + schema_value[0], schema_value[1] + ) if found_metaid: id_dict[identifier] = found_metaid else: @@ -589,7 +768,7 @@ def finder_sparql(self, list_to_find, br=True, ra=False, vvi=False, publ=False): res = None for elem in list_to_find: if len(match_elem) < 2: - identifier = elem.split(':', maxsplit=1) + identifier = elem.split(":", maxsplit=1) value = identifier[1] schema = identifier[0] if br: @@ -603,16 +782,16 @@ def finder_sparql(self, list_to_find, br=True, ra=False, vvi=False, publ=False): id_set.add(f[0]) return match_elem - def ra_update(self, row:dict, br_key:str, col_name:str) -> None: + def ra_update(self, row: dict, br_key: str, col_name: str) -> None: if row[col_name]: sequence = self.armeta[br_key][col_name] ras_list = list() for _, ra_id in sequence: - ra_name = self.rameta[ra_id]['title'] - ra_ids = self.rameta[ra_id]['ids'] + ra_name = self.rameta[ra_id]["title"] + ra_ids = self.rameta[ra_id]["ids"] ra = self.build_name_ids_string(ra_name, ra_ids) ras_list.append(ra) - row[col_name] = '; '.join(ras_list) + row[col_name] = "; ".join(ras_list) @staticmethod def build_name_ids_string(name, ids): @@ -623,214 +802,245 @@ def build_name_ids_string(name, ids): elif ids and not name: ra_string = f"[{' '.join(ids)}]" elif not ids and not name: - ra_string = '' + ra_string = "" return ra_string @staticmethod - def __local_match(list_to_match, dict_to_match:dict): + def __local_match(list_to_match, dict_to_match: dict): match_elem = dict() - match_elem['existing'] = list() - match_elem['wannabe'] = list() + match_elem["existing"] = list() + match_elem["wannabe"] = list() for elem in list_to_match: for k, va in dict_to_match.items(): - if elem in va['ids']: - if 'wannabe' in k: - if k not in match_elem['wannabe']: - match_elem['wannabe'].append(k) + if elem in va["ids"]: + if "wannabe" in k: + if k not in match_elem["wannabe"]: + match_elem["wannabe"].append(k) else: - if k not in match_elem['existing']: - match_elem['existing'].append(k) + if k not in match_elem["existing"]: + match_elem["existing"].append(k) return match_elem def __meta_ar(self, newkey, oldkey, role): for x, k in self.ardict[oldkey][role]: - if 'wannabe' in k: + if "wannabe" in k: for m in self.rameta: - if k in self.rameta[m]['others']: + if k in self.rameta[m]["others"]: new_v = m break else: new_v = k self.armeta[newkey][role].append(tuple((x, new_v))) - def __tree_traverse(self, tree:dict, key:str, values:List[Tuple]) -> None: + def __tree_traverse(self, tree: dict, key: str, values: List[Tuple]) -> None: for k, v in tree.items(): if k == key: values.append(v) elif isinstance(v, dict): found = self.__tree_traverse(v, key, values) - if found is not None: + if found is not None: values.append(found) - + def get_preexisting_entities(self) -> None: - for entity_type in {'br', 'ra'}: - for entity_metaid, data in getattr(self, f'{entity_type}dict').items(): - if not entity_metaid.startswith('wannabe'): - self.preexisting_entities.add(f'{entity_type}/{entity_metaid}') - for entity_id_literal in data['ids']: - preexisting_entity_id_metaid = getattr(self, f'id{entity_type}')[entity_id_literal] - self.preexisting_entities.add(f'id/{preexisting_entity_id_metaid}') + for entity_type in {"br", "ra"}: + for entity_metaid, data in getattr(self, f"{entity_type}dict").items(): + if not entity_metaid.startswith("wannabe"): + self.preexisting_entities.add(f"{entity_type}/{entity_metaid}") + for entity_id_literal in data["ids"]: + preexisting_entity_id_metaid = getattr( + self, f"id{entity_type}" + )[entity_id_literal] + self.preexisting_entities.add( + f"id/{preexisting_entity_id_metaid}" + ) for _, roles in self.ardict.items(): for _, ar_ras in roles.items(): for ar_ra in ar_ras: - if not ar_ra[1].startswith('wannabe'): - self.preexisting_entities.add(f'ar/{ar_ra[0]}') + if not ar_ra[1].startswith("wannabe"): + self.preexisting_entities.add(f"ar/{ar_ra[0]}") for venue_metaid, vi in self.vvi.items(): - if not venue_metaid.startswith('wannabe'): + if not venue_metaid.startswith("wannabe"): wannabe_preexisting_vis = list() - self.__tree_traverse(vi, 'id', wannabe_preexisting_vis) - self.preexisting_entities.update({f'br/{vi_metaid}' for vi_metaid in wannabe_preexisting_vis if not vi_metaid.startswith('wannabe')}) + self.__tree_traverse(vi, "id", wannabe_preexisting_vis) + self.preexisting_entities.update( + { + f"br/{vi_metaid}" + for vi_metaid in wannabe_preexisting_vis + if not vi_metaid.startswith("wannabe") + } + ) for _, re_metaid in self.remeta.items(): - self.preexisting_entities.add(f're/{re_metaid[0]}') + self.preexisting_entities.add(f"re/{re_metaid[0]}") def meta_maker(self): - ''' + """ For each dictionary ('brdict', 'ardict', 'radict', 'vvi') the corresponding MetaID dictionary is created ('brmeta', 'armeta', 'rameta', and 'vvi'). - ''' + """ for identifier in self.brdict: - if 'wannabe' in identifier: + if "wannabe" in identifier: other = identifier - count = self._add_number('br') + count = self._add_number("br") meta = self.prefix + str(count) self.brmeta[meta] = self.brdict[identifier] - self.brmeta[meta]['others'].append(other) - self.brmeta[meta]['ids'].append('omid:br/' + meta) + self.brmeta[meta]["others"].append(other) + self.brmeta[meta]["ids"].append("omid:br/" + meta) else: self.brmeta[identifier] = self.brdict[identifier] - self.brmeta[identifier]['ids'].append('omid:br/' + identifier) + self.brmeta[identifier]["ids"].append("omid:br/" + identifier) for identifier in self.radict: - if 'wannabe' in identifier: + if "wannabe" in identifier: other = identifier - count = self._add_number('ra') + count = self._add_number("ra") meta = self.prefix + str(count) self.rameta[meta] = self.radict[identifier] - self.rameta[meta]['others'].append(other) - self.rameta[meta]['ids'].append('omid:ra/' + meta) + self.rameta[meta]["others"].append(other) + self.rameta[meta]["ids"].append("omid:ra/" + meta) else: self.rameta[identifier] = self.radict[identifier] - self.rameta[identifier]['ids'].append('omid:ra/' + identifier) + self.rameta[identifier]["ids"].append("omid:ra/" + identifier) for ar_id in self.ardict: - if 'wannabe' in ar_id: + if "wannabe" in ar_id: for br_id in self.brmeta: - if ar_id in self.brmeta[br_id]['others']: + if ar_id in self.brmeta[br_id]["others"]: br_key = br_id break else: br_key = ar_id self.armeta[br_key] = dict() - self.armeta[br_key]['author'] = list() - self.armeta[br_key]['editor'] = list() - self.armeta[br_key]['publisher'] = list() - self.__meta_ar(br_key, ar_id, 'author') - self.__meta_ar(br_key, ar_id, 'editor') - self.__meta_ar(br_key, ar_id, 'publisher') + self.armeta[br_key]["author"] = list() + self.armeta[br_key]["editor"] = list() + self.armeta[br_key]["publisher"] = list() + self.__meta_ar(br_key, ar_id, "author") + self.__meta_ar(br_key, ar_id, "editor") + self.__meta_ar(br_key, ar_id, "publisher") self.VolIss = dict() if self.vvi: for venue_meta in self.vvi: - venue_issue = self.vvi[venue_meta]['issue'] + venue_issue = self.vvi[venue_meta]["issue"] if venue_issue: for issue in venue_issue: - issue_id = venue_issue[issue]['id'] - if 'wannabe' in issue_id: + issue_id = venue_issue[issue]["id"] + if "wannabe" in issue_id: for br_meta in self.brmeta: - if issue_id in self.brmeta[br_meta]['others']: - self.vvi[venue_meta]['issue'][issue]['id'] = str(br_meta) + if issue_id in self.brmeta[br_meta]["others"]: + self.vvi[venue_meta]["issue"][issue]["id"] = str( + br_meta + ) break - - venue_volume = self.vvi[venue_meta]['volume'] + + venue_volume = self.vvi[venue_meta]["volume"] if venue_volume: for volume in venue_volume: - volume_id = venue_volume[volume]['id'] - if 'wannabe' in volume_id: + volume_id = venue_volume[volume]["id"] + if "wannabe" in volume_id: for br_meta in self.brmeta: - if volume_id in self.brmeta[br_meta]['others']: - self.vvi[venue_meta]['volume'][volume]['id'] = str(br_meta) + if volume_id in self.brmeta[br_meta]["others"]: + self.vvi[venue_meta]["volume"][volume]["id"] = str( + br_meta + ) break - if venue_volume[volume]['issue']: - volume_issue = venue_volume[volume]['issue'] + if venue_volume[volume]["issue"]: + volume_issue = venue_volume[volume]["issue"] for issue in volume_issue: - volume_issue_id = volume_issue[issue]['id'] - if 'wannabe' in volume_issue_id: + volume_issue_id = volume_issue[issue]["id"] + if "wannabe" in volume_issue_id: for br_meta in self.brmeta: - if volume_issue_id in self.brmeta[br_meta]['others']: - self.vvi[venue_meta]['volume'][volume]['issue'][issue]['id'] = str(br_meta) + if ( + volume_issue_id + in self.brmeta[br_meta]["others"] + ): + self.vvi[venue_meta]["volume"][volume][ + "issue" + ][issue]["id"] = str(br_meta) break - if 'wannabe' in venue_meta: + if "wannabe" in venue_meta: for br_meta in self.brmeta: - if venue_meta in self.brmeta[br_meta]['others']: + if venue_meta in self.brmeta[br_meta]["others"]: self.__merge_VolIss_with_vvi(br_meta, venue_meta) else: self.__merge_VolIss_with_vvi(venue_meta, venue_meta) def enrich(self): - ''' + """ This method replaces the wannabeID placeholders with the actual data and MetaIDs as a result of the deduplication process. - ''' + """ for row in self.data: - if 'wannabe' in row['id']: + if "wannabe" in row["id"]: for br_metaid in self.brmeta: - if row['id'] in self.brmeta[br_metaid]['others']: + if row["id"] in self.brmeta[br_metaid]["others"]: metaid = br_metaid else: - metaid = row['id'] - if row['page'] and (metaid not in self.remeta): + metaid = row["id"] + if row["page"] and (metaid not in self.remeta): re_meta = self.finder.retrieve_re_from_br_meta(metaid) if re_meta: self.remeta[metaid] = re_meta - row['page'] = re_meta[1] + row["page"] = re_meta[1] else: - count = self.prefix + str(self._add_number('re')) - page = row['page'] + count = self.prefix + str(self._add_number("re")) + page = row["page"] self.remeta[metaid] = (count, page) - row['page'] = page + row["page"] = page elif metaid in self.remeta: - row['page'] = self.remeta[metaid][1] - row['id'] = ' '.join(self.brmeta[metaid]['ids']) - row['title'] = self.brmeta[metaid]['title'] + row["page"] = self.remeta[metaid][1] + row["id"] = " ".join(self.brmeta[metaid]["ids"]) + row["title"] = self.brmeta[metaid]["title"] venue_metaid = None - if row['venue']: - venue = row['venue'] - if 'wannabe' in venue: + if row["venue"]: + venue = row["venue"] + if "wannabe" in venue: for i in self.brmeta: - if venue in self.brmeta[i]['others']: + if venue in self.brmeta[i]["others"]: venue_metaid = i else: venue_metaid = venue - row['venue'] = self.build_name_ids_string(self.brmeta[venue_metaid]['title'], self.brmeta[venue_metaid]['ids']) + row["venue"] = self.build_name_ids_string( + self.brmeta[venue_metaid]["title"], self.brmeta[venue_metaid]["ids"] + ) br_key_for_editor = get_edited_br_metaid(row, metaid, venue_metaid) - self.ra_update(row, metaid, 'author') - self.ra_update(row, metaid, 'publisher') - self.ra_update(row, br_key_for_editor, 'editor') + self.ra_update(row, metaid, "author") + self.ra_update(row, metaid, "publisher") + self.ra_update(row, br_key_for_editor, "editor") @staticmethod def name_check(ts_name, name): - if ',' in ts_name: - names = ts_name.split(',') + if "," in ts_name: + names = ts_name.split(",") if names[0] and not names[1].strip(): # there isn't a given name in ts - if ',' in name: - gname = name.split(', ')[1] + if "," in name: + gname = name.split(", ")[1] if gname.strip(): - ts_name = names[0] + ', ' + gname + ts_name = names[0] + ", " + gname return ts_name - def _read_number(self, entity_type:str) -> int: - return self.counter_handler.read_counter(entity_type, supplier_prefix=self.prefix) + def _read_number(self, entity_type: str) -> int: + return self.counter_handler.read_counter( + entity_type, supplier_prefix=self.prefix + ) + + def _add_number(self, entity_type: str) -> int: + return self.counter_handler.increment_counter( + entity_type, supplier_prefix=self.prefix + ) - def _add_number(self, entity_type:str) -> int: - return self.counter_handler.increment_counter(entity_type, supplier_prefix=self.prefix) - - def __update_id_and_entity_dict(self, existing_ids:list, id_dict:dict, entity_dict:Dict[str, Dict[str, list]], metaval:str) -> None: + def __update_id_and_entity_dict( + self, + existing_ids: list, + id_dict: dict, + entity_dict: Dict[str, Dict[str, list]], + metaval: str, + ) -> None: for identifier in existing_ids: if identifier[1] not in id_dict: id_dict[identifier[1]] = identifier[0] - if identifier[1] not in entity_dict[metaval]['ids']: - entity_dict[metaval]['ids'].append(identifier[1]) + if identifier[1] not in entity_dict[metaval]["ids"]: + entity_dict[metaval]["ids"].append(identifier[1]) - def indexer(self, path_index:str, path_csv:str) -> None: - ''' + def indexer(self, path_index: str, path_csv: str) -> None: + """ This method is used to transform idra, idbr, armeta, remeta, brmeta and vvi in such a way as to be saved as csv and json files. As for venue, volume and issues, this method also takes care of replacing any wannabe_id with a meta_id. Finally, it generates the enriched CSV and saves it. @@ -839,135 +1049,165 @@ def indexer(self, path_index:str, path_csv:str) -> None: :type path_index: str :params path_csv: a file path. It will be the output enriched CSV :type path_csv: str - ''' + """ # ID self.index_id_ra = list() self.index_id_br = list() - for entity_type in {'ra', 'br'}: - cur_index = getattr(self, f'id{entity_type}') + for entity_type in {"ra", "br"}: + cur_index = getattr(self, f"id{entity_type}") if cur_index: for literal in cur_index: row = dict() - row['id'] = str(literal) - row['meta'] = str(cur_index[literal]) - getattr(self, f'index_id_{entity_type}').append(row) + row["id"] = str(literal) + row["meta"] = str(cur_index[literal]) + getattr(self, f"index_id_{entity_type}").append(row) else: row = dict() - row['id'] = '' - row['meta'] = '' - getattr(self, f'index_id_{entity_type}').append(row) + row["id"] = "" + row["meta"] = "" + getattr(self, f"index_id_{entity_type}").append(row) # AR self.ar_index = list() if self.armeta: for metaid in self.armeta: index = dict() - index['meta'] = metaid + index["meta"] = metaid for role in self.armeta[metaid]: list_ar = list() for ar, ra in self.armeta[metaid][role]: - list_ar.append(str(ar) + ', ' + str(ra)) - index[role] = '; '.join(list_ar) + list_ar.append(str(ar) + ", " + str(ra)) + index[role] = "; ".join(list_ar) self.ar_index.append(index) else: row = dict() - row['meta'] = '' - row['author'] = '' - row['editor'] = '' - row['publisher'] = '' + row["meta"] = "" + row["author"] = "" + row["editor"] = "" + row["publisher"] = "" self.ar_index.append(row) # RE self.re_index = list() if self.remeta: for x in self.remeta: r = dict() - r['br'] = x - r['re'] = str(self.remeta[x][0]) + r["br"] = x + r["re"] = str(self.remeta[x][0]) self.re_index.append(r) else: row = dict() - row['br'] = '' - row['re'] = '' + row["br"] = "" + row["re"] = "" self.re_index.append(row) if self.filename: if not os.path.exists(path_index): os.makedirs(path_index) - ra_path = os.path.join(path_index, 'index_id_ra.csv') + ra_path = os.path.join(path_index, "index_id_ra.csv") write_csv(ra_path, self.index_id_ra) - br_path = os.path.join(path_index, 'index_id_br.csv') + br_path = os.path.join(path_index, "index_id_br.csv") write_csv(br_path, self.index_id_br) - ar_path = os.path.join(path_index, 'index_ar.csv') + ar_path = os.path.join(path_index, "index_ar.csv") write_csv(ar_path, self.ar_index) - re_path = os.path.join(path_index, 'index_re.csv') + re_path = os.path.join(path_index, "index_re.csv") write_csv(re_path, self.re_index) - vvi_file = os.path.join(path_index, 'index_vi.json') - with open(vvi_file, 'w') as fp: + vvi_file = os.path.join(path_index, "index_vi.json") + with open(vvi_file, "w") as fp: json.dump(self.VolIss, fp) if self.log: - log_file = os.path.join(path_index + 'log.json') - with open(log_file, 'w') as lf: + log_file = os.path.join(path_index + "log.json") + with open(log_file, "w") as lf: json.dump(self.log, lf) if self.data: - name = self.filename + '.csv' + name = self.filename + ".csv" data_file = os.path.join(path_csv, name) write_csv(data_file, self.data) - - def __merge_VolIss_with_vvi(self, VolIss_venue_meta:str, vvi_venue_meta:str) -> None: + + def __merge_VolIss_with_vvi( + self, VolIss_venue_meta: str, vvi_venue_meta: str + ) -> None: if VolIss_venue_meta in self.VolIss: - for vvi_v in self.vvi[vvi_venue_meta]['volume']: - if vvi_v in self.VolIss[VolIss_venue_meta]['volume']: - self.VolIss[VolIss_venue_meta]['volume'][vvi_v]['issue'].update(self.vvi[vvi_venue_meta]['volume'][vvi_v]['issue']) + for vvi_v in self.vvi[vvi_venue_meta]["volume"]: + if vvi_v in self.VolIss[VolIss_venue_meta]["volume"]: + self.VolIss[VolIss_venue_meta]["volume"][vvi_v]["issue"].update( + self.vvi[vvi_venue_meta]["volume"][vvi_v]["issue"] + ) else: - self.VolIss[VolIss_venue_meta]['volume'][vvi_v] = self.vvi[vvi_venue_meta]['volume'][vvi_v] - self.VolIss[VolIss_venue_meta]['issue'].update(self.vvi[vvi_venue_meta]['issue']) + self.VolIss[VolIss_venue_meta]["volume"][vvi_v] = self.vvi[ + vvi_venue_meta + ]["volume"][vvi_v] + self.VolIss[VolIss_venue_meta]["issue"].update( + self.vvi[vvi_venue_meta]["issue"] + ) else: self.VolIss[VolIss_venue_meta] = self.vvi[vvi_venue_meta] - + def __update_id_count(self, id_dict, identifier): - + # Prima di creare un nuovo ID, verifichiamo se esiste giĆ  nel triplestore - schema, value = identifier.split(':', maxsplit=1) + schema, value = identifier.split(":", maxsplit=1) existing_metaid = self.finder.retrieve_metaid_from_id(schema, value) - + if existing_metaid: id_dict[identifier] = existing_metaid else: - count = self._add_number('id') + count = self._add_number("id") id_dict[identifier] = self.prefix + str(count) @staticmethod - def merge(dict_to_match:Dict[str, Dict[str, list]], metaval:str, old_meta:str, temporary_name:str) -> None: - for x in dict_to_match[old_meta]['ids']: - if x not in dict_to_match[metaval]['ids']: - dict_to_match[metaval]['ids'].append(x) - for x in dict_to_match[old_meta]['others']: - if x not in dict_to_match[metaval]['others']: - dict_to_match[metaval]['others'].append(x) - dict_to_match[metaval]['others'].append(old_meta) - if not dict_to_match[metaval]['title']: - if dict_to_match[old_meta]['title']: - dict_to_match[metaval]['title'] = dict_to_match[old_meta]['title'] + def merge( + dict_to_match: Dict[str, Dict[str, list]], + metaval: str, + old_meta: str, + temporary_name: str, + ) -> None: + for x in dict_to_match[old_meta]["ids"]: + if x not in dict_to_match[metaval]["ids"]: + dict_to_match[metaval]["ids"].append(x) + for x in dict_to_match[old_meta]["others"]: + if x not in dict_to_match[metaval]["others"]: + dict_to_match[metaval]["others"].append(x) + dict_to_match[metaval]["others"].append(old_meta) + if not dict_to_match[metaval]["title"]: + if dict_to_match[old_meta]["title"]: + dict_to_match[metaval]["title"] = dict_to_match[old_meta]["title"] else: - dict_to_match[metaval]['title'] = temporary_name + dict_to_match[metaval]["title"] = temporary_name del dict_to_match[old_meta] - - def merge_entities_in_csv(self, idslist:list, metaval:str, name:str, entity_dict:Dict[str, Dict[str, list]], id_dict:dict) -> None: + + def merge_entities_in_csv( + self, + idslist: list, + metaval: str, + name: str, + entity_dict: Dict[str, Dict[str, list]], + id_dict: dict, + ) -> None: found_others = self.__local_match(idslist, entity_dict) - if found_others['wannabe']: - for old_meta in found_others['wannabe']: + if found_others["wannabe"]: + for old_meta in found_others["wannabe"]: self.merge(entity_dict, metaval, old_meta, name) for identifier in idslist: - if identifier not in entity_dict[metaval]['ids']: - entity_dict[metaval]['ids'].append(identifier) + if identifier not in entity_dict[metaval]["ids"]: + entity_dict[metaval]["ids"].append(identifier) if identifier not in id_dict: self.__update_id_count(id_dict, identifier) self.__update_title(entity_dict, metaval, name) - - def __update_title(self, entity_dict:dict, metaval:str, name:str) -> None: - if not entity_dict[metaval]['title'] and name: - entity_dict[metaval]['title'] = name - self.log[self.rowcnt]['title']['status'] = 'New value proposed' - - def id_worker(self, col_name, name, idslist:List[str], metaval: str, ra_ent=False, br_ent=False, vvi_ent=False, publ_entity=False): + + def __update_title(self, entity_dict: dict, metaval: str, name: str) -> None: + if not entity_dict[metaval]["title"] and name: + entity_dict[metaval]["title"] = name + self.log[self.rowcnt]["title"]["status"] = "New value proposed" + + def id_worker( + self, + col_name, + name, + idslist: List[str], + metaval: str, + ra_ent=False, + br_ent=False, + vvi_ent=False, + publ_entity=False, + ): if not ra_ent: id_dict = self.idbr entity_dict = self.brdict @@ -989,56 +1229,72 @@ def id_worker(self, col_name, name, idslist:List[str], metaval: str, ra_ent=Fals # 2 Retrieve EntityA data in triplestore to update EntityA inside CSV if found_meta_ts[2]: entity_dict[metaval] = dict() - entity_dict[metaval]['ids'] = list() - if col_name == 'author' or col_name == 'editor': - entity_dict[metaval]['title'] = self.name_check(found_meta_ts[0], name) + entity_dict[metaval]["ids"] = list() + if col_name == "author" or col_name == "editor": + entity_dict[metaval]["title"] = self.name_check( + found_meta_ts[0], name + ) else: - entity_dict[metaval]['title'] = found_meta_ts[0] - entity_dict[metaval]['others'] = list() + entity_dict[metaval]["title"] = found_meta_ts[0] + entity_dict[metaval]["others"] = list() existing_ids = found_meta_ts[1] - self.__update_id_and_entity_dict(existing_ids, id_dict, entity_dict, metaval) - self.merge_entities_in_csv(idslist, metaval, name, entity_dict, id_dict) + self.__update_id_and_entity_dict( + existing_ids, id_dict, entity_dict, metaval + ) + self.merge_entities_in_csv( + idslist, metaval, name, entity_dict, id_dict + ) # Look for MetaId in the provenance else: - entity_type = 'br' if br_ent or vvi_ent else 'ra' - metaid_uri = f'{self.base_iri}/{entity_type}/{str(metaval)}' + entity_type = "br" if br_ent or vvi_ent else "ra" + metaid_uri = f"{self.base_iri}/{entity_type}/{str(metaval)}" # The entity MetaId after merge if it was merged, None otherwise. If None, the MetaId is considered invalid - metaval = self.finder.retrieve_metaid_from_merged_entity(metaid_uri=metaid_uri, prov_config=self.prov_config) + metaval = self.finder.retrieve_metaid_from_merged_entity( + metaid_uri=metaid_uri, prov_config=self.prov_config + ) # there's no meta or there was one but it didn't exist # Are there other IDs? if idslist and not metaval: local_match = self.__local_match(idslist, entity_dict) # IDs already exist among data? # check in entity_dict - if local_match['existing']: + if local_match["existing"]: # ids refer to multiple existing entities - if len(local_match['existing']) > 1: + if len(local_match["existing"]) > 1: # ! return self.conflict(idslist, name, id_dict, col_name) # ids refer to ONE existing entity - elif len(local_match['existing']) == 1: - metaval = str(local_match['existing'][0]) + elif len(local_match["existing"]) == 1: + metaval = str(local_match["existing"][0]) suspect_ids = list() for identifier in idslist: - if identifier not in entity_dict[metaval]['ids']: + if identifier not in entity_dict[metaval]["ids"]: suspect_ids.append(identifier) if suspect_ids: - sparql_match = self.finder_sparql(suspect_ids, br=br_ent, ra=ra_ent, vvi=vvi_ent, publ=publ_entity) + sparql_match = self.finder_sparql( + suspect_ids, + br=br_ent, + ra=ra_ent, + vvi=vvi_ent, + publ=publ_entity, + ) if len(sparql_match) > 1: # ! return self.conflict(idslist, name, id_dict, col_name) # ids refers to 1 or more wannabe entities - elif local_match['wannabe']: - metaval = str(local_match['wannabe'].pop(0)) + elif local_match["wannabe"]: + metaval = str(local_match["wannabe"].pop(0)) # 5 Merge data from entityA (CSV) with data from EntityX (CSV) - for old_meta in local_match['wannabe']: + for old_meta in local_match["wannabe"]: self.merge(entity_dict, metaval, old_meta, name) suspect_ids = list() for identifier in idslist: - if identifier not in entity_dict[metaval]['ids']: + if identifier not in entity_dict[metaval]["ids"]: suspect_ids.append(identifier) if suspect_ids: - sparql_match = self.finder_sparql(suspect_ids, br=br_ent, ra=ra_ent, vvi=vvi_ent, publ=publ_entity) + sparql_match = self.finder_sparql( + suspect_ids, br=br_ent, ra=ra_ent, vvi=vvi_ent, publ=publ_entity + ) if sparql_match: # if 'wannabe' not in metaval or len(sparql_match) > 1: # # Two entities previously disconnected on the triplestore now become connected @@ -1049,7 +1305,7 @@ def id_worker(self, col_name, name, idslist:List[str], metaval: str, ra_ent=Fals existing_ids = [] for match in sparql_match: existing_ids.extend(match[2]) - + # new_idslist = [x[1] for x in existing_ids] # new_sparql_match = self.finder_sparql(new_idslist, br=br_ent, ra=ra_ent, vvi=vvi_ent, publ=publ_entity) # if len(new_sparql_match) > 1: @@ -1061,13 +1317,21 @@ def id_worker(self, col_name, name, idslist:List[str], metaval: str, ra_ent=Fals old_metaval = metaval metaval = sparql_match[0][0] entity_dict[metaval] = dict() - entity_dict[metaval]['ids'] = list() - entity_dict[metaval]['others'] = list() - entity_dict[metaval]['title'] = sparql_match[0][1] if sparql_match[0][1] else '' - self.__update_id_and_entity_dict(existing_ids, id_dict, entity_dict, metaval) - self.merge(entity_dict, metaval, old_metaval, sparql_match[0][1]) + entity_dict[metaval]["ids"] = list() + entity_dict[metaval]["others"] = list() + entity_dict[metaval]["title"] = ( + sparql_match[0][1] if sparql_match[0][1] else "" + ) + self.__update_id_and_entity_dict( + existing_ids, id_dict, entity_dict, metaval + ) + self.merge( + entity_dict, metaval, old_metaval, sparql_match[0][1] + ) else: - sparql_match = self.finder_sparql(idslist, br=br_ent, ra=ra_ent, vvi=vvi_ent, publ=publ_entity) + sparql_match = self.finder_sparql( + idslist, br=br_ent, ra=ra_ent, vvi=vvi_ent, publ=publ_entity + ) # if len(sparql_match) > 1: # # ! # return self.conflict(idslist, name, id_dict, col_name) @@ -1077,7 +1341,7 @@ def id_worker(self, col_name, name, idslist:List[str], metaval: str, ra_ent=Fals existing_ids = [] for match in sparql_match: existing_ids.extend(match[2]) - + # new_idslist = [x[1] for x in existing_ids] # new_sparql_match = self.finder_sparql(new_idslist, br=br_ent, ra=ra_ent, vvi=vvi_ent, publ=publ_entity) # if len(new_sparql_match) > 1: @@ -1089,22 +1353,26 @@ def id_worker(self, col_name, name, idslist:List[str], metaval: str, ra_ent=Fals # elif len(new_sparql_match) == 1: metaval = sparql_match[0][0] entity_dict[metaval] = dict() - entity_dict[metaval]['ids'] = list() - entity_dict[metaval]['others'] = list() - if col_name == 'author' or col_name == 'editor': - entity_dict[metaval]['title'] = self.name_check(sparql_match[0][1], name) + entity_dict[metaval]["ids"] = list() + entity_dict[metaval]["others"] = list() + if col_name == "author" or col_name == "editor": + entity_dict[metaval]["title"] = self.name_check( + sparql_match[0][1], name + ) else: - entity_dict[metaval]['title'] = sparql_match[0][1] + entity_dict[metaval]["title"] = sparql_match[0][1] self.__update_title(entity_dict, metaval, name) - self.__update_id_and_entity_dict(existing_ids, id_dict, entity_dict, metaval) + self.__update_id_and_entity_dict( + existing_ids, id_dict, entity_dict, metaval + ) else: # 1 EntityA is a new one metaval = self.new_entity(entity_dict, name) for identifier in idslist: if identifier not in id_dict: self.__update_id_count(id_dict, identifier) - if identifier not in entity_dict[metaval]['ids']: - entity_dict[metaval]['ids'].append(identifier) + if identifier not in entity_dict[metaval]["ids"]: + entity_dict[metaval]["ids"].append(identifier) self.__update_title(entity_dict, metaval, name) # 1 EntityA is a new one if not idslist and not metaval: @@ -1112,116 +1380,127 @@ def id_worker(self, col_name, name, idslist:List[str], metaval: str, ra_ent=Fals return metaval def new_entity(self, entity_dict, name): - metaval = 'wannabe_' + str(self.wnb_cnt) + metaval = "wannabe_" + str(self.wnb_cnt) self.wnb_cnt += 1 entity_dict[metaval] = dict() - entity_dict[metaval]['ids'] = list() - entity_dict[metaval]['others'] = list() - entity_dict[metaval]['title'] = name + entity_dict[metaval]["ids"] = list() + entity_dict[metaval]["others"] = list() + entity_dict[metaval]["title"] = name return metaval - def volume_issue(self, meta:str, path:Dict[str, Dict[str, str]], value:str, row:Dict[str, str]) -> None: - if 'wannabe' not in meta: + def volume_issue( + self, + meta: str, + path: Dict[str, Dict[str, str]], + value: str, + row: Dict[str, str], + ) -> None: + if "wannabe" not in meta: if value in path: - if 'wannabe' in path[value]['id']: - old_meta = path[value]['id'] - self.merge(self.brdict, meta, old_meta, row['title']) - path[value]['id'] = meta + if "wannabe" in path[value]["id"]: + old_meta = path[value]["id"] + self.merge(self.brdict, meta, old_meta, row["title"]) + path[value]["id"] = meta else: path[value] = dict() - path[value]['id'] = meta - if 'issue' not in path: - path[value]['issue'] = dict() + path[value]["id"] = meta + if "issue" not in path: + path[value]["issue"] = dict() else: if value in path: - if 'wannabe' in path[value]['id']: - old_meta = path[value]['id'] + if "wannabe" in path[value]["id"]: + old_meta = path[value]["id"] if meta != old_meta: - self.merge(self.brdict, meta, old_meta, row['title']) - path[value]['id'] = meta + self.merge(self.brdict, meta, old_meta, row["title"]) + path[value]["id"] = meta else: - old_meta = path[value]['id'] - if 'wannabe' not in old_meta and old_meta not in self.brdict: + old_meta = path[value]["id"] + if "wannabe" not in old_meta and old_meta not in self.brdict: br4dict = self.finder.retrieve_br_from_meta(old_meta) self.brdict[old_meta] = dict() - self.brdict[old_meta]['ids'] = list() - self.brdict[old_meta]['others'] = list() - self.brdict[old_meta]['title'] = br4dict[0] if br4dict else None + self.brdict[old_meta]["ids"] = list() + self.brdict[old_meta]["others"] = list() + self.brdict[old_meta]["title"] = br4dict[0] if br4dict else None if br4dict: for x in br4dict[1]: identifier = x[1] - self.brdict[old_meta]['ids'].append(identifier) + self.brdict[old_meta]["ids"].append(identifier) if identifier not in self.idbr: self.idbr[identifier] = x[0] - self.merge(self.brdict, old_meta, meta, row['title']) + self.merge(self.brdict, old_meta, meta, row["title"]) else: path[value] = dict() - path[value]['id'] = meta - if 'issue' not in path: # it's a Volume - path[value]['issue'] = dict() + path[value]["id"] = meta + if "issue" not in path: # it's a Volume + path[value]["issue"] = dict() def log_update(self): new_log = dict() for x in self.log: if any(self.log[x][y].values() for y in self.log[x]): for y in self.log[x]: - if 'Conflict entity' in self.log[x][y]: - v = self.log[x][y]['Conflict entity'] - if 'wannabe' in v: - if y == 'id' or y == 'venue': + if "Conflict entity" in self.log[x][y]: + v = self.log[x][y]["Conflict entity"] + if "wannabe" in v: + if y == "id" or y == "venue": for brm in self.brmeta: - if v in self.brmeta[brm]['others']: - m = 'br/' + str(brm) - elif y == 'author' or y == 'editor' or y == 'publisher': + if v in self.brmeta[brm]["others"]: + m = "br/" + str(brm) + elif y == "author" or y == "editor" or y == "publisher": for ram in self.rameta: - if v in self.rameta[ram]['others']: - m = 'ra/' + str(ram) + if v in self.rameta[ram]["others"]: + m = "ra/" + str(ram) else: m = v - self.log[x][y]['Conflict entity'] = m + self.log[x][y]["Conflict entity"] = m new_log[x] = self.log[x] - if 'wannabe' in self.data[x]['id']: + if "wannabe" in self.data[x]["id"]: for brm in self.brmeta: - if self.data[x]['id'] in self.brmeta[brm]['others']: - met = 'br/' + str(brm) + if self.data[x]["id"] in self.brmeta[brm]["others"]: + met = "br/" + str(brm) else: - met = 'br/' + str(self.data[x]['id']) - new_log[x]['id']['meta'] = met + met = "br/" + str(self.data[x]["id"]) + new_log[x]["id"]["meta"] = met return new_log def merge_duplicate_entities(self) -> None: - ''' - The 'merge_duplicate_entities()' function merge duplicate entities. - Moreover, it modifies the CSV cells, giving precedence to the first found information - or data in the triplestore in the case of already existing entities. + """ + The 'merge_duplicate_entities()' function merge duplicate entities. + Moreover, it modifies the CSV cells, giving precedence to the first found information + or data in the triplestore in the case of already existing entities. :returns: None -- This method updates the CSV rows and returns None. - ''' + """ self.rowcnt = 0 for row in self.data: - id = row['id'] - if 'wannabe' not in id: + id = row["id"] + if "wannabe" not in id: self.equalizer(row, id) other_rowcnt = 0 for other_row in self.data: - if (other_row['id'] in self.brdict[id]['others'] or other_row['id'] == id) and self.rowcnt != other_rowcnt: - for field,_ in row.items(): + if ( + other_row["id"] in self.brdict[id]["others"] + or other_row["id"] == id + ) and self.rowcnt != other_rowcnt: + for field, _ in row.items(): if row[field] and row[field] != other_row[field]: if other_row[field]: - self.log[other_rowcnt][field]['status'] = 'New value proposed' + self.log[other_rowcnt][field][ + "status" + ] = "New value proposed" other_row[field] = row[field] other_rowcnt += 1 self.rowcnt += 1 def extract_name_and_ids(self, venue_str: str) -> Tuple[str, List[str]]: - ''' + """ Extracts the name and IDs from the venue string. :params venue_str: the venue string :type venue_str: str :returns: Tuple[str, List[str]] -- the name and list of IDs extracted from the venue string - ''' + """ match = re.search(name_and_ids, venue_str) if match: name = match.group(1).strip() @@ -1231,8 +1510,8 @@ def extract_name_and_ids(self, venue_str: str) -> Tuple[str, List[str]]: ids = [] return name, ids - def equalizer(self, row:Dict[str, str], metaval:str) -> None: - ''' + def equalizer(self, row: Dict[str, str], metaval: str) -> None: + """ Given a CSV row and its MetaID, this function equates the information present in the CSV with that present on the triplestore. :params row: a dictionary representing a CSV row @@ -1240,107 +1519,147 @@ def equalizer(self, row:Dict[str, str], metaval:str) -> None: :params metaval: the MetaID identifying the bibliographic resource contained in the input CSV row :type metaval: str :returns: None -- This method modifies the input CSV row without returning it. - ''' - self.log[self.rowcnt]['id']['status'] = 'Entity already exists' + """ + self.log[self.rowcnt]["id"]["status"] = "Entity already exists" known_data = self.finder.retrieve_br_info_from_meta(metaval) try: - known_data['author'] = self.__get_resp_agents(metaval, 'author') + known_data["author"] = self.__get_resp_agents(metaval, "author") except ValueError: print(row) - raise(ValueError) - known_data['editor'] = self.__get_resp_agents(metaval, 'editor') - known_data['publisher'] = self.finder.retrieve_publisher_from_br_metaid(metaval) - for datum in ['pub_date', 'type', 'volume', 'issue']: + raise (ValueError) + known_data["editor"] = self.__get_resp_agents(metaval, "editor") + known_data["publisher"] = self.finder.retrieve_publisher_from_br_metaid(metaval) + for datum in ["pub_date", "type", "volume", "issue"]: if known_data[datum]: if row[datum] and row[datum] != known_data[datum]: - self.log[self.rowcnt][datum]['status'] = 'New value proposed' + self.log[self.rowcnt][datum]["status"] = "New value proposed" row[datum] = known_data[datum] - for datum in ['author', 'editor', 'publisher']: + for datum in ["author", "editor", "publisher"]: if known_data[datum] and not row[datum]: row[datum] = known_data[datum] - if known_data['venue']: - current_venue = row['venue'] - known_venue = known_data['venue'] - + if known_data["venue"]: + current_venue = row["venue"] + known_venue = known_data["venue"] + if current_venue: # Extract the IDs from the current venue - current_venue_name, current_venue_ids = self.extract_name_and_ids(current_venue) - known_venue_name, known_venue_ids = self.extract_name_and_ids(known_venue) - + current_venue_name, current_venue_ids = self.extract_name_and_ids( + current_venue + ) + known_venue_name, known_venue_ids = self.extract_name_and_ids( + known_venue + ) + current_venue_ids_set = set(current_venue_ids) known_venue_ids_set = set(known_venue_ids) - + common_ids = current_venue_ids_set.intersection(known_venue_ids_set) - + if common_ids: # Merge the IDs and use the title from the known venue merged_ids = current_venue_ids_set.union(known_venue_ids_set) - row['venue'] = f"{known_venue_name} [{' '.join(sorted(merged_ids))}]" + row["venue"] = ( + f"{known_venue_name} [{' '.join(sorted(merged_ids))}]" + ) else: # Use the known venue information entirely - row['venue'] = known_venue + row["venue"] = known_venue else: - row['venue'] = known_venue - if known_data['page']: - if row['page'] and row['page'] != known_data['page'][1]: - self.log[self.rowcnt]['page']['status'] = 'New value proposed' - row['page'] = known_data['page'][1] - self.remeta[metaval] = known_data['page'] - - def __get_resp_agents(self, metaid:str, column:str) -> str: + row["venue"] = known_venue + if known_data["page"]: + if row["page"] and row["page"] != known_data["page"][1]: + self.log[self.rowcnt]["page"]["status"] = "New value proposed" + row["page"] = known_data["page"][1] + self.remeta[metaval] = known_data["page"] + + def __get_resp_agents(self, metaid: str, column: str) -> str: resp_agents = self.finder.retrieve_ra_sequence_from_br_meta(metaid, column) - output = '' + output = "" if resp_agents: full_resp_agents = list() for item in resp_agents: for _, resp_agent in item.items(): author_name = resp_agent[0] - ids = [f'omid:ra/{resp_agent[2]}'] + ids = [f"omid:ra/{resp_agent[2]}"] ids.extend([id[1] for id in resp_agent[1]]) - author_ids = '[' + ' '.join(ids) + ']' - full_resp_agent = author_name + ' ' + author_ids + author_ids = "[" + " ".join(ids) + "]" + full_resp_agent = author_name + " " + author_ids full_resp_agents.append(full_resp_agent) - output = '; '.join(full_resp_agents) + output = "; ".join(full_resp_agents) return output - -def is_a_valid_row(row:Dict[str, str]) -> bool: - ''' + + +def is_a_valid_row(row: Dict[str, str]) -> bool: + """ This method discards invalid rows in the input CSV file. :params row: a dictionary representing a CSV row :type row: Dict[str, str] :returns: bool -- This method returns True if the row is valid, False if it is invalid. - ''' - br_type = ' '.join((row['type'].lower()).split()) - br_title = row['title'] - br_volume = row['volume'] - br_issue = row['issue'] - br_venue = row['venue'] - if row['id']: + """ + br_type = " ".join((row["type"].lower()).split()) + br_title = row["title"] + br_volume = row["volume"] + br_issue = row["issue"] + br_venue = row["venue"] + if row["id"]: if (br_volume or br_issue) and (not br_type or not br_venue): return False return True if all(not row[value] for value in row): return False - br_author = row['author'] - br_editor = row['editor'] - br_pub_date = row['pub_date'] - if not br_type or br_type in {'book', 'data file', 'dataset', 'dissertation', 'edited book', 'journal article', 'monograph', - 'other', 'peer review', 'posted content', 'web content', 'proceedings article', 'report', 'reference book'}: - is_a_valid_row = True if br_title and br_pub_date and (br_author or br_editor) else False - elif br_type in {'book chapter', 'book part', 'book section', 'book track', 'component', 'reference entry'}: + br_author = row["author"] + br_editor = row["editor"] + br_pub_date = row["pub_date"] + if not br_type or br_type in { + "book", + "data file", + "dataset", + "dissertation", + "edited book", + "journal article", + "monograph", + "other", + "peer review", + "posted content", + "web content", + "proceedings article", + "report", + "reference book", + }: + is_a_valid_row = ( + True if br_title and br_pub_date and (br_author or br_editor) else False + ) + elif br_type in { + "book chapter", + "book part", + "book section", + "book track", + "component", + "reference entry", + }: is_a_valid_row = True if br_title and br_venue else False - elif br_type in {'book series', 'book set', 'journal', 'proceedings', 'proceedings series', 'report series', 'standard', 'standard series'}: + elif br_type in { + "book series", + "book set", + "journal", + "proceedings", + "proceedings series", + "report series", + "standard", + "standard series", + }: is_a_valid_row = True if br_title else False - elif br_type == 'journal volume': + elif br_type == "journal volume": is_a_valid_row = True if br_venue and (br_volume or br_title) else False - elif br_type == 'journal issue': + elif br_type == "journal issue": is_a_valid_row = True if br_venue and (br_issue or br_title) else False return is_a_valid_row -def get_edited_br_metaid(row:dict, metaid:str, venue_metaid:str) -> Tuple[str, bool]: - if row['author'] and row['venue'] and row['type'] in CONTAINER_EDITOR_TYPES: + +def get_edited_br_metaid(row: dict, metaid: str, venue_metaid: str) -> Tuple[str, bool]: + if row["author"] and row["venue"] and row["type"] in CONTAINER_EDITOR_TYPES: edited_br_metaid = venue_metaid else: edited_br_metaid = metaid - return edited_br_metaid \ No newline at end of file + return edited_br_metaid