diff --git a/wn/_core.py b/wn/_core.py index 63dd24e..58fb356 100644 --- a/wn/_core.py +++ b/wn/_core.py @@ -3,7 +3,7 @@ import textwrap import warnings from collections.abc import Callable, Iterator, Sequence -from typing import Optional, TypeVar +from typing import Optional, TypeVar, Union import wn from wn._types import ( @@ -406,7 +406,7 @@ def senses(self) -> list['Sense']: """ lexids = self._get_lexicon_ids() iterable = get_entry_senses(self._id, lexids) - return [Sense(*sense_data, self._wordnet) for sense_data in iterable] + return [Sense(*sense_data, _wordnet=self._wordnet) for sense_data in iterable] def metadata(self) -> Metadata: """Return the word's metadata.""" @@ -463,21 +463,91 @@ def translate( return result +class _Relation(_LexiconElement): + __slots__ = 'name', 'source_id', 'target_id' + __module__ = 'wn' + + def __init__( + self, + name: str, + source_id: str, + target_id: str, + _lexid: int = NON_ROWID, + _id: int = NON_ROWID, + _wordnet: Optional['Wordnet'] = None + ): + super().__init__(_lexid=_lexid, _id=_id, _wordnet=_wordnet) + self.name = name + self.source_id = source_id + self.target_id = target_id + + def __repr__(self) -> str: + return ( + self.__class__.__name__ + + f"({self.name!r}, {self.source_id!r}, {self.target_id!r})" + ) + + def source(self) -> '_Relatable': + raise NotImplementedError + + def target(self) -> '_Relatable': + raise NotImplementedError + + def metadata(self) -> Metadata: + """Return the synset's metadata.""" + return get_metadata(self._id, self._ENTITY_TYPE) + + +class SynsetRelation(_Relation): + _ENTITY_TYPE = _EntityType.SYNSET_RELATIONS + + def source(self) -> 'Synset': + return self._wordnet.synset(id=self.source_id) + + def target(self) -> 'Synset': + return self._wordnet.synset(id=self.target_id) + + +class SenseRelation(_Relation): + _ENTITY_TYPE = _EntityType.SENSE_RELATIONS + + def source(self) -> 'Sense': + return self._wordnet.sense(id=self.source_id) + + def target(self) -> 'Sense': + return self._wordnet.sense(id=self.target_id) + + +class SenseSynsetRelation(_Relation): + _ENTITY_TYPE = _EntityType.SENSE_SYNSET_RELATIONS + + def source(self) -> 'Sense': + return self._wordnet.sense(id=self.source_id) + + def target(self) -> 'Synset': + return self._wordnet.synset(id=self.target_id) + + T = TypeVar('T', bound='_Relatable') class _Relatable(_LexiconElement): - __slots__ = 'id', + __slots__ = 'id', '_in_rel' def __init__( self, id: str, _lexid: int = NON_ROWID, _id: int = NON_ROWID, + _in_rel: Optional[_Relation] = None, _wordnet: Optional['Wordnet'] = None ): super().__init__(_lexid=_lexid, _id=_id, _wordnet=_wordnet) self.id = id + self._in_rel = _in_rel + + def incoming_relation(self) -> Optional[_Relation]: + return self._in_rel def relations(self: T, *args: str) -> dict[str, list[T]]: raise NotImplementedError @@ -528,6 +598,8 @@ class Synset(_Relatable): _ENTITY_TYPE = _EntityType.SYNSETS + _in_rel: Union[SynsetRelation, SenseSynsetRelation, None] + def __init__( self, id: str, @@ -535,9 +607,12 @@ def __init__( ili: Optional[str] = None, _lexid: int = NON_ROWID, _id: int = NON_ROWID, - _wordnet: Optional['Wordnet'] = None + _in_rel: Union[SynsetRelation, SenseSynsetRelation, None] = None, + _wordnet: Optional['Wordnet'] = None, ): - super().__init__(id=id, _lexid=_lexid, _id=_id, _wordnet=_wordnet) + super().__init__( + id=id, _lexid=_lexid, _id=_id, _in_rel=_in_rel, _wordnet=_wordnet + ) self.pos = pos self._ili = ili @@ -547,9 +622,12 @@ def empty( id: str, ili: Optional[str] = None, _lexid: int = NON_ROWID, + _in_rel: Union[SynsetRelation, SenseSynsetRelation, None] = None, _wordnet: Optional['Wordnet'] = None ): - return cls(id, pos='', ili=ili, _lexid=_lexid, _wordnet=_wordnet) + return cls( + id, pos='', ili=ili, _lexid=_lexid, _in_rel=_in_rel, _wordnet=_wordnet + ) @property def ili(self): @@ -608,7 +686,7 @@ def senses(self) -> list['Sense']: """ lexids = self._get_lexicon_ids() iterable = get_synset_members(self._id, lexids) - return [Sense(*sense_data, self._wordnet) for sense_data in iterable] + return [Sense(*sense_data, _wordnet=self._wordnet) for sense_data in iterable] def lexicalized(self) -> bool: """Return True if the synset is lexicalized.""" @@ -644,6 +722,9 @@ def lemmas(self) -> list[Form]: """ return [w.lemma() for w in self.words()] + def incoming_relation(self) -> Union[SynsetRelation, SenseSynsetRelation, None]: + return self._in_rel + def relations(self, *args: str) -> dict[str, list['Synset']]: """Return a mapping of relation names to lists of synsets. @@ -666,11 +747,13 @@ def relations(self, *args: str) -> dict[str, list['Synset']]: """ d: dict[str, list[Synset]] = {} - for relname, ss in self._get_relations(args): - if relname in d: - d[relname].append(ss) - else: - d[relname] = [ss] + for ss in self.get_related(*args): + if relation := ss.incoming_relation(): + relname = relation.name + if relname in d: + d[relname].append(ss) + else: + d[relname] = [ss] return d def get_related(self, *args: str) -> list['Synset']: @@ -691,56 +774,69 @@ def get_related(self, *args: str) -> list['Synset']: >>> [ss.lemmas() for ss in fulcrum.get_related()] [['pin', 'pivot'], ['lever']] """ - return [ss for _, ss in self._get_relations(args)] - - def _get_relations(self, args: Sequence[str]) -> list[tuple[str, 'Synset']]: - targets: list[tuple[str, Synset]] = [] - - lexids = self._get_lexicon_ids() - + targets: list[Synset] = [] # first get relations from the current lexicon(s) if self._id != NON_ROWID: - targets.extend( - (row[0], Synset(*row[2:], self._wordnet)) - for row in get_synset_relations({self._id}, args, lexids) - if row[5] in lexids - ) - + targets.extend(self._get_local_relations(args)) # then attempt to expand via ILI - if self._ili is not None and self._wordnet and self._wordnet._expanded_ids: - expids = self._wordnet._expanded_ids - - # get expanded relation - expss = find_synsets(ili=self._ili, lexicon_rowids=expids) - rowids = {rowid for _, _, _, _, rowid in expss} - {self._id, NON_ROWID} - relations: dict[str, set[str]] = {} - for rel_row in get_synset_relations(rowids, args, expids): - rel_type, ili = rel_row[0], rel_row[4] - if ili is not None: - relations.setdefault(rel_type, set()).add(ili) - - # map back to target lexicons - seen = {ss._id for _, ss in targets} - for rel_type, ilis in relations.items(): - for row in get_synsets_for_ilis(ilis, lexicon_rowids=lexids): - if row[-1] not in seen: - targets.append((rel_type, Synset(*row, self._wordnet))) - - # add empty synsets for ILIs without a target in lexids - seen_ilis = {tgt._ili for _, tgt in targets} - for rel_type, ilis in relations.items(): - unseen_ilis = ilis - seen_ilis - for ili in unseen_ilis: - ss = Synset.empty( - id=_INFERRED_SYNSET, - ili=ili, - _lexid=self._lexid, - _wordnet=self._wordnet - ) - targets.append((rel_type, ss)) - + if self._ili is not None and self._wordnet._expanded_ids: + targets.extend(self._get_expanded_relations(args)) return targets + def _get_local_relations(self, args: Sequence[str]) -> Iterator['Synset']: + _wn = self._wordnet + lexids = self._get_lexicon_ids() + iterable = get_synset_relations({self._id}, args, lexids) + for relname, rellexid, relrowid, _, ssid, pos, ili, lexid, rowid in iterable: + if lexid not in lexids: + continue + synset_rel = SynsetRelation( + relname, self.id, ssid, rellexid, relrowid, _wordnet=_wn + ) + yield Synset( + ssid, + pos, + ili, + _lexid=lexid, + _id=rowid, + _in_rel=synset_rel, + _wordnet=_wn + ) + + def _get_expanded_relations(self, args: Sequence[str]) -> Iterator['Synset']: + _wn = self._wordnet + lexids = self._get_lexicon_ids() + expids = self._wordnet._expanded_ids + + # find any relations from expanded synsets + srcids = { + rowid: ssid + for ssid, _, _, _, rowid + in find_synsets(ili=self._ili, lexicon_rowids=expids) + if rowid not in (self._id, NON_ROWID) + } + + iterable = get_synset_relations(set(srcids), args, expids) + for relname, rellexid, relrowid, srcrowid, ssid, _, ili, *_ in iterable: + if ili is None: + continue + synset_rel = SynsetRelation( + relname, srcids[srcrowid], ssid, rellexid, relrowid, _wn + ) + local_ss_rows = list(get_synsets_for_ilis([ili], lexicon_rowids=lexids)) + + for row in local_ss_rows: + yield Synset(*row, _in_rel=synset_rel, _wordnet=_wn) + + if not local_ss_rows: + yield Synset.empty( + id=_INFERRED_SYNSET, + ili=ili, + _lexid=self._lexid, + _in_rel=synset_rel, + _wordnet=_wn, + ) + def hypernym_paths(self, simulate_root: bool = False) -> list[list['Synset']]: """Return the list of hypernym paths to a root synset.""" return taxonomy.hypernym_paths(self, simulate_root=simulate_root) @@ -884,6 +980,8 @@ class Sense(_Relatable): _ENTITY_TYPE = _EntityType.SENSES + _in_rel: Optional[SenseRelation] + def __init__( self, id: str, @@ -891,9 +989,12 @@ def __init__( synset_id: str, _lexid: int = NON_ROWID, _id: int = NON_ROWID, + _in_rel: Optional[SenseRelation] = None, _wordnet: Optional['Wordnet'] = None ): - super().__init__(id=id, _lexid=_lexid, _id=_id, _wordnet=_wordnet) + super().__init__( + id=id, _lexid=_lexid, _id=_id, _in_rel=_in_rel, _wordnet=_wordnet + ) self._entry_id = entry_id self._synset_id = synset_id @@ -960,6 +1061,9 @@ def metadata(self) -> Metadata: """Return the sense's metadata.""" return get_metadata(self._id, 'senses') + def incoming_relation(self) -> Optional[SenseRelation]: + return self._in_rel + def relations(self, *args: str) -> dict[str, list['Sense']]: """Return a mapping of relation names to lists of senses. @@ -973,11 +1077,13 @@ def relations(self, *args: str) -> dict[str, list['Sense']]: """ d: dict[str, list[Sense]] = {} - for relname, s in self._get_relations(args): - if relname in d: - d[relname].append(s) - else: - d[relname] = [s] + for s in self.get_related(*args): + if relation := s.incoming_relation(): + relname = relation.name + if relname in d: + d[relname].append(s) + else: + d[relname] = [s] return d def get_related(self, *args: str) -> list['Sense']: @@ -997,22 +1103,35 @@ def get_related(self, *args: str) -> list['Sense']: incoherent """ - return [s for _, s in self._get_relations(args)] - - def _get_relations(self, args: Sequence[str]) -> list[tuple[str, 'Sense']]: lexids = self._get_lexicon_ids() iterable = get_sense_relations(self._id, args, lexids) - return [(relname, Sense(sid, eid, ssid, lexid, rowid, self._wordnet)) - for relname, _, sid, eid, ssid, lexid, rowid in iterable - if lexids is None or lexid in lexids] + targets: list['Sense'] = [] + for relname, rellexid, relrowid, sid, eid, ssid, lexid, rowid in iterable: + if lexid not in lexids: + continue + sense_rel = SenseRelation( + relname, self.id, sid, rellexid, relrowid, _wordnet=self._wordnet + ) + target = Sense( + sid, eid, ssid, lexid, rowid, _in_rel=sense_rel, _wordnet=self._wordnet + ) + targets.append(target) + return targets def get_related_synsets(self, *args: str) -> list[Synset]: """Return a list of related synsets.""" lexids = self._get_lexicon_ids() iterable = get_sense_synset_relations(self._id, args, lexids) - return [Synset(ssid, pos, ili, lexid, rowid, self._wordnet) - for _, _, ssid, pos, ili, lexid, rowid in iterable - if lexids is None or lexid in lexids] + targets: list[Synset] = [] + for relname, rellexid, relrowid, _, ssid, pos, ili, lexid, rowid in iterable: + relation = SenseSynsetRelation( + relname, self.id, ssid, rellexid, relrowid, _wordnet=self._wordnet + ) + target = Synset( + ssid, pos, ili, lexid, rowid, _in_rel=relation, _wordnet=self._wordnet + ) + targets.append(target) + return targets def translate( self, @@ -1185,7 +1304,7 @@ def synset(self, id: str) -> Synset: """Return the first synset in this wordnet with identifier *id*.""" iterable = find_synsets(id=id, lexicon_rowids=self._lexicon_ids) try: - return Synset(*next(iterable), self) + return Synset(*next(iterable), _wordnet=self) except StopIteration: raise wn.Error(f'no such synset: {id}') from None @@ -1212,7 +1331,7 @@ def sense(self, id: str) -> Sense: """Return the first sense in this wordnet with identifier *id*.""" iterable = find_senses(id=id, lexicon_rowids=self._lexicon_ids) try: - return Sense(*next(iterable), self) + return Sense(*next(iterable), _wordnet=self) except StopIteration: raise wn.Error(f'no such sense: {id}') from None @@ -1320,7 +1439,7 @@ def _find_helper( # easy case is when there is no form if form is None: - return [cls(*data, w) # type: ignore + return [cls(*data, _wordnet=w) # type: ignore for data in query_func(pos=pos, **kwargs)] # if there's a form, we may need to lemmatize and normalize @@ -1337,13 +1456,13 @@ def _find_helper( # we want unique results here, but a set can make the order # erratic, so filter manually later results = [ - cls(*data, w) # type: ignore + cls(*data, _wordnet=w) # type: ignore for _pos, _forms in forms.items() for data in query_func(forms=_forms, pos=_pos, **kwargs) ] if not results and normalize: results = [ - cls(*data, w) # type: ignore + cls(*data, _wordnet=w) # type: ignore for _pos, _forms in forms.items() for data in query_func( forms=[normalize(f) for f in _forms], pos=_pos, **kwargs diff --git a/wn/_export.py b/wn/_export.py index 0571bcd..921ddfd 100644 --- a/wn/_export.py +++ b/wn/_export.py @@ -213,14 +213,14 @@ def _export_sense_relations( {'target': id, 'relType': type, 'meta': _export_metadata(rowid, 'sense_relations')} - for type, rowid, id, *_ + for type, _, rowid, id, *_ in get_sense_relations(sense_rowid, '*', lexids) ] relations.extend( {'target': id, 'relType': type, 'meta': _export_metadata(rowid, 'sense_synset_relations')} - for type, rowid, id, *_ + for type, _, rowid, _, id, *_ in get_sense_synset_relations(sense_rowid, '*', lexids) ) return relations @@ -304,7 +304,7 @@ def _export_synset_relations( {'target': id, 'relType': type, 'meta': _export_metadata(rowid, 'synset_relations')} - for type, rowid, id, *_ + for type, _, rowid, _, id, *_ in get_synset_relations((synset_rowid,), '*', lexids) ] diff --git a/wn/_queries.py b/wn/_queries.py index 5df04e3..84ca67b 100644 --- a/wn/_queries.py +++ b/wn/_queries.py @@ -42,7 +42,17 @@ int, # lexid int, # rowid ] -_Synset_Relation = tuple[str, int, str, str, str, int, int] # relname, relid, *_Synset +_Synset_Relation = tuple[ + str, # rel_name + int, # rel_lexid + int, # rel_rowid + int, # src_rowid + str, # _Synset... + str, + str, + int, + int, +] _Sense = tuple[ str, # id str, # entry_id @@ -50,7 +60,16 @@ int, # lexid int, # rowid ] -_Sense_Relation = tuple[str, int, str, str, str, int, int] # relname, relid, *_Sense +_Sense_Relation = tuple[ + str, # rel_name + int, # rel_lexid + int, # rel_rowid + str, # _Sense... + str, + str, + int, + int, +] _Count = tuple[int, int] # count, count_id _SyntacticBehaviour = tuple[ str, # id @@ -444,10 +463,11 @@ def get_synset_relations( query = f''' WITH rt(rowid, type) AS (SELECT rowid, type FROM relation_types {constraint}) - SELECT DISTINCT rel.type, rel.rowid, tgt.id, tgt.pos, + SELECT DISTINCT rel.type, rel.lexicon_rowid, rel.rowid, + rel.source_rowid, tgt.id, tgt.pos, (SELECT ilis.id FROM ilis WHERE ilis.rowid = tgt.ili_rowid), tgt.lexicon_rowid, tgt.rowid - FROM (SELECT rt.type, target_rowid, srel.rowid + FROM (SELECT rt.type, lexicon_rowid, source_rowid, target_rowid, srel.rowid FROM synset_relations AS srel JOIN rt ON srel.type_rowid = rt.rowid WHERE source_rowid IN ({_qs(source_rowids)}) @@ -590,10 +610,10 @@ def get_sense_relations( query = f''' WITH rt(rowid, type) AS (SELECT rowid, type FROM relation_types {constraint}) - SELECT DISTINCT rel.type, rel.rowid, + SELECT DISTINCT rel.type, rel.lexicon_rowid, rel.rowid, s.id, e.id, ss.id, s.lexicon_rowid, s.rowid - FROM (SELECT rt.type, target_rowid, srel.rowid + FROM (SELECT rt.type, lexicon_rowid, target_rowid, srel.rowid FROM sense_relations AS srel JOIN rt ON srel.type_rowid = rt.rowid WHERE source_rowid = ? @@ -625,10 +645,11 @@ def get_sense_synset_relations( query = f''' WITH rt(rowid, type) AS (SELECT rowid, type FROM relation_types {constraint}) - SELECT DISTINCT rel.type, rel.rowid, ss.id, ss.pos, + SELECT DISTINCT rel.type, rel.lexicon_rowid, rel.rowid, + rel.source_rowid, ss.id, ss.pos, (SELECT ilis.id FROM ilis WHERE ilis.rowid = ss.ili_rowid), ss.lexicon_rowid, ss.rowid - FROM (SELECT rt.type, target_rowid, srel.rowid + FROM (SELECT rt.type, lexicon_rowid, source_rowid, target_rowid, srel.rowid FROM sense_synset_relations AS srel JOIN rt ON srel.type_rowid = rt.rowid WHERE source_rowid = ?