Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev.ej/lexicon tokenizer #405

Merged
merged 7 commits into from
Nov 12, 2024
6 changes: 3 additions & 3 deletions docs/package.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ Basic usage for the language-aware tokenizer:
from g2p import make_tokenizer
tokenizer = make_tokenizer("dan")
for token in tokenizer.tokenize_text("Åh, hvordan har du det, Åbenrå?"):
if token["is_word"]:
word = token["text"]
if token.is_word
word = token.text
else:
interword_punctuation_and_spaces = token["text"]
interword_punctuation_and_spaces = token.text
```

Note that selecting the tokenizer language is important to make sure punctuation-like letters are handled correctly. For example `:` and `'` are punctuation in English but they will be part of the word tokens in Kanien'kéha (moh):
Expand Down
35 changes: 25 additions & 10 deletions g2p/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
from g2p import make_tokenizer
tokenizer = make_tokenizer(lang)
for token in tokenizer.tokenize_text(input_text):
if token["is_word"]:
word = token["text"]
if token.is_word:
word = token.text
else:
interword_punctuation_and_spaces = token["text"]
interword_punctuation_and_spaces = token.text

from g2p import get_arpabet_langs
LANGS, LANG_NAMES = get_arpabet_langs()
Expand All @@ -29,7 +29,7 @@
from typing import Dict, Optional, Tuple, Union

from g2p.exceptions import InvalidLanguageCode, NoPath
from g2p.shared_types import BaseTokenizer, BaseTransducer
from g2p.shared_types import BaseTokenizer, BaseTransducer, Token

if sys.version_info < (3, 7): # pragma: no cover
sys.exit(
Expand All @@ -47,7 +47,7 @@ def make_g2p( # noqa: C901
*,
tokenize: bool = True,
custom_tokenizer: Optional[BaseTokenizer] = None,
):
) -> BaseTransducer:
"""Make a g2p Transducer for mapping text from in_lang to out_lang via the
shortest path between them.

Expand Down Expand Up @@ -132,13 +132,13 @@ def make_g2p( # noqa: C901
return transducer


def tokenize_and_map(tokenizer, transducer, input: str):
def tokenize_and_map(tokenizer: BaseTokenizer, transducer: BaseTransducer, input: str):
result = ""
for token in tokenizer.tokenize_text(input):
if token["is_word"]:
result += transducer(token["text"]).output_string
if token.is_word:
result += transducer(token.text).output_string
else:
result += token["text"]
result += token.text
return result


Expand Down Expand Up @@ -213,7 +213,7 @@ def get_arpabet_langs():
return _langs_cache, _lang_names_cache


def make_tokenizer(in_lang=None, out_lang=None, tok_path=None):
def make_tokenizer(in_lang=None, out_lang=None, tok_path=None) -> BaseTokenizer:
"""Make the tokenizer for input in language in_lang

Logic used when only in_lang is provided:
Expand All @@ -234,3 +234,18 @@ def make_tokenizer(in_lang=None, out_lang=None, tok_path=None):
from g2p.mappings.tokenizer import make_tokenizer as _make_tokenizer

return _make_tokenizer(in_lang, out_lang, tok_path)


# Declare what's actually part of g2p's programmatic API.
# Please don't import anything else from g2p directly.
__all__ = [
"BaseTokenizer",
"BaseTransducer",
"InvalidLanguageCode",
"NoPath",
"Token",
"get_arpabet_langs",
"make_g2p",
"make_tokenizer",
"tokenize_and_map",
]
8 changes: 4 additions & 4 deletions g2p/api_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def convert_one_writing_or_phonetic_system_to_another( # noqa: C901
tokenizer = g2p.make_tokenizer(in_lang)
tokens = tokenizer.tokenize_text(request.text)
else:
tokens = [{"text": request.text, "is_word": True}]
tokens = [g2p.Token(request.text, is_word=True)]
except NoPath:
raise HTTPException(
status_code=400, detail=f"No path from {in_lang} to {out_lang}"
Expand All @@ -314,16 +314,16 @@ def convert_one_writing_or_phonetic_system_to_another( # noqa: C901
segments: List[Segment] = []
for token in tokens:
conversions: List[Conversion] = []
if not token["is_word"]: # non-word, has no in_lang/out_lang
tg = TransductionGraph(token["text"])
if not token.is_word: # non-word, has no in_lang/out_lang
tg = TransductionGraph(token.text)
conv = Conversion(substring_alignments=tg.substring_alignments())
if request.indices:
conv.alignments = tg.alignments()
conv.input_nodes = list(tg.input_string)
conv.output_nodes = list(tg.output_string)
conversions.append(conv)
else:
tg = transducer(token["text"])
tg = transducer(token.text)
if request.compose_from:
composed_tiers: List[TransductionGraph] = []
for tr, tier in zip(transducer.transducers, tg.tiers):
Expand Down
73 changes: 58 additions & 15 deletions g2p/mappings/tokenizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@

from g2p.exceptions import MappingMissing
from g2p.log import LOGGER
from g2p.mappings import Mapping
from g2p.mappings import Mapping, utils
from g2p.mappings.langs import LANGS_NETWORK
from g2p.mappings.utils import get_unicode_category, is_ipa, merge_if_same_label
from g2p.shared_types import BaseTokenizer
from g2p.mappings.utils import is_ipa
from g2p.shared_types import BaseTokenizer, Token


class Tokenizer(BaseTokenizer):
Expand Down Expand Up @@ -42,23 +42,18 @@ def is_word_character(self, c):
if self.delim and c == self.delim:
return True
assert len(c) <= 1
if get_unicode_category(c) in ["letter", "number", "diacritic"]:
if utils.get_unicode_category(c) in ["letter", "number", "diacritic"]:
return True
return False

def tokenize_text(self, text):
def tokenize_text(self, text: str) -> List[Token]:
matches = self.tokenize_aux(text)
units = [{"text": m, "is_word": self.is_word_character(m)} for m in matches]
units = [Token(m, self.is_word_character(m)) for m in matches]
if self.dot_is_letter:
for i, unit in enumerate(units):
if (
unit["text"] == "."
and i + 1 < len(units)
and units[i + 1]["is_word"]
):
unit["is_word"] = True
units = merge_if_same_label(units, "text", "is_word")
return units
if unit.text == "." and i + 1 < len(units) and units[i + 1].is_word:
unit.is_word = True
return utils.merge_same_type_tokens(units)


class SpecializedTokenizer(Tokenizer):
Expand Down Expand Up @@ -98,6 +93,51 @@ def tokenize_aux(self, text):
return self.regex.findall(text)


class LexiconTokenizer(Tokenizer):
"""Lexicon-based tokenizer will consider any entry in the lexicon a token,
even if it contains punctuation characters. For text not in the lexicon,
falls back to the default tokenization.
"""

def __init__(self, mapping: Mapping):
super().__init__()
self.mapping = mapping
self.lang = mapping.language_name

def _recursive_helper(self, tokens: list, output_tokens: list):
"""Emit the longest prefix found in the lexicon, if any, as a token.
If None, emit the first unit as a token.
Recursively process the rest of the units.
"""
if not tokens:
return
if len(tokens) == 1:
output_tokens.append(tokens[0])
return
for i in range(len(tokens), 0, -1):
candidate = "".join([u.text for u in tokens[:i]])
if utils.find_alignment(self.mapping.alignments, candidate.lower()):
output_tokens.append(Token(candidate, True))
return self._recursive_helper(tokens[i:], output_tokens)
# No prefix found, emit the first unit as a token
output_tokens.append(tokens[0])
self._recursive_helper(tokens[1:], output_tokens)

def tokenize_text(self, text: str) -> List[Token]:
blocks = re.split(r"(\s+)", text)
output_tokens = []
for i, block in enumerate(blocks):
if i % 2 == 1 and block:
output_tokens.append(Token(block, False))
else:
default_tokens = super().tokenize_text(block)
# Split non-word tokens into smaller parts for lexicon lookup
candidate_tokens = utils.split_non_word_tokens(default_tokens)
self._recursive_helper(candidate_tokens, output_tokens)

return utils.merge_non_word_tokens(output_tokens)


class MultiHopTokenizer(SpecializedTokenizer):
def __init__(self, mappings: List[Mapping]):
self.delim = ""
Expand Down Expand Up @@ -202,7 +242,10 @@ def make_tokenizer( # noqa C901
# Build a one-hop tokenizer
try:
mapping = Mapping.find_mapping(in_lang=in_lang, out_lang=out_lang)
self.tokenizers[tokenizer_key] = SpecializedTokenizer(mapping)
if mapping.type == utils.MAPPING_TYPE.lexicon:
self.tokenizers[tokenizer_key] = LexiconTokenizer(mapping)
else:
self.tokenizers[tokenizer_key] = SpecializedTokenizer(mapping)
except MappingMissing:
self.tokenizers[tokenizer_key] = self.tokenizers[None]
LOGGER.warning(
Expand Down
94 changes: 74 additions & 20 deletions g2p/mappings/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import unicodedata as ud
from bisect import bisect_left
from collections import defaultdict
from copy import deepcopy
from enum import Enum
from pathlib import Path
from typing import (
Expand Down Expand Up @@ -43,6 +42,7 @@
from g2p import exceptions
from g2p.log import LOGGER
from g2p.mappings import langs
from g2p.shared_types import Token

GEN_DIR = os.path.join(os.path.dirname(langs.__file__), "generated")
GEN_CONFIG = os.path.join(GEN_DIR, "config-g2p.yaml")
Expand Down Expand Up @@ -151,7 +151,7 @@ def normalize(inp: str, norm_form: Union[str, None]):
if norm_form is None or norm_form == "none":
return unicode_escape(inp)
if norm_form not in ["NFC", "NFD", "NFKC", "NFKD"]:
raise exceptions.InvalidNormalization(normalize)
raise exceptions.InvalidNormalization(norm_form)
# Sadly mypy doesn't do narrowing to literals properly
norm_form = cast(Literal["NFC", "NFD", "NFKC", "NFKD"], norm_form)
normalized = ud.normalize(norm_form, unicode_escape(inp))
Expand All @@ -178,8 +178,8 @@ def compose_indices(
"""Compose indices1 + indices2 into direct arcs from the inputs of indices1
to the outputs of indices 2.

E.g., [(0,1), (1,4)] composed with [(0,0), (1,2), (1,3), (4,2)] is
[(0,2), (0,3), (1,2)]
>>> compose_indices([(0,1), (1,4)], [(0,0), (1,2), (1,3), (4,2)])
[(0, 2), (0, 3), (1, 2)]
"""
# for O(1) lookup of arcs leaving indices2
indices2_as_dict = defaultdict(dict) # type: ignore
Expand Down Expand Up @@ -239,7 +239,7 @@ def normalize_with_indices(
return normalize_to_NFD_with_indices(inp, norm_form)
if norm_form in ("none", None):
return inp, [(i, i) for i in range(len(inp))]
raise exceptions.InvalidNormalization(normalize)
raise exceptions.InvalidNormalization(norm_form)


def unicode_escape(text):
Expand Down Expand Up @@ -596,22 +596,76 @@ def ignore_aliases(self, *_args):
return True


def merge_if_same_label(lst_of_dicts, text_key, label_key):
results = []
current_item = None
for dct in lst_of_dicts:
if label_key not in dct:
dct[label_key] = None
if not current_item:
current_item = deepcopy(dct)
elif dct[label_key] == current_item[label_key]:
current_item[text_key] += dct[text_key]
def merge_same_type_tokens(tokens: List[Token]) -> List[Token]:
"""Merge tokens that have the same type. Destroys tokens in the process.

>>> merge_same_type_tokens([Token("test", True), Token("b", True), Token(":", False), Token(",", False)])
[Token(text='testb', is_word=True), Token(text=':,', is_word=False)]
>>> merge_same_type_tokens([])
[]
"""
if not tokens:
return []
merged_tokens = [tokens[0]]
for token in tokens[1:]:
if token.is_word == merged_tokens[-1].is_word:
merged_tokens[-1].text += token.text
else:
merged_tokens.append(token)
return merged_tokens


def split_non_word_tokens(tokens: List[Token]) -> List[Token]:
"""Split non-word units into characters. Reuses the word tokens.

Generates a maximum of 5 units per non-word token: if the input token is
more than 5 non-word characters, the output will be the first two
individually, the middle as a block, and the last two individually, because
lexicon-based tokenization does not need more granularity than that.
This prevents degenerate input like a large number of consecutive punctuation
marks from taking quadratic time in lexicon-based tokenization.

>>> split_non_word_tokens([Token("test", True), Token(":,- ", False), Token("", False)])
[Token(text='test', is_word=True), Token(text=':', is_word=False), Token(text=',', is_word=False), Token(text='-', is_word=False), Token(text=' ', is_word=False)]
>>> split_non_word_tokens([])
[]
>>> split_non_word_tokens([Token(".,.,.,.", False)])
[Token(text='.', is_word=False), Token(text=',', is_word=False), Token(text='.,.', is_word=False), Token(text=',', is_word=False), Token(text='.', is_word=False)]
"""
new_tokens = []
for token in tokens:
if not token.is_word:
text = token.text
if len(text) > 5:
new_tokens.append(Token(text[0], False))
new_tokens.append(Token(text[1], False))
new_tokens.append(Token(text[2:-2], False))
new_tokens.append(Token(text[-2], False))
new_tokens.append(Token(text[-1], False))
else:
new_tokens.extend([Token(char, False) for char in text])
else:
new_tokens.append(token)
return new_tokens


def merge_non_word_tokens(tokens: List[Token]) -> List[Token]:
"""Merge consecutive non-word units into a single token. Destroys tokens in the process.

>>> merge_non_word_tokens([Token("test", True), Token(":", False), Token(",", False)])
[Token(text='test', is_word=True), Token(text=':,', is_word=False)]
>>> merge_non_word_tokens([])
[]
"""
if not tokens:
return tokens
merged_tokens = [tokens[0]]
for token in tokens[1:]:
if not token.is_word and not merged_tokens[-1].is_word:
merged_tokens[-1].text += token.text
else:
results.append(current_item)
current_item = deepcopy(dct)
if current_item:
results.append(current_item)
return results
merged_tokens.append(token)
return merged_tokens


CATEGORIES = {
Expand Down
Loading