Skip to content

Commit

Permalink
Created new sequence similarity submodule which implements Levenshtei…
Browse files Browse the repository at this point in the history
…n distance-related functionality (among other things)
  • Loading branch information
timbernat committed Jun 17, 2024
1 parent 3e6cc8e commit 541b944
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 0 deletions.
1 change: 1 addition & 0 deletions polymerist/genutils/sequences/similarity/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
'''Methods for evaluating the similarity of and inspecting the edits between sequences'''
32 changes: 32 additions & 0 deletions polymerist/genutils/sequences/similarity/distances.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
'''Implementations of calculation methods for sequence distance ("inverse similarity") metrics'''

# NOTE: much of this could be supplanted in the future by the well-implemented textdistance library (https://github.com/life4/textdistance)
from typing import Sequence, TypeVar
T = TypeVar('T')

from .edits import levenshtein_distance

# SEQUENCE METRICS
def hamming_distance(seq1 : Sequence[T], seq2 : Sequence[T]) -> int:
'''Compute the Hamming distance between a pair of sequences with elements of compatible type (sequences must have the same length)
Denotes the number of elements at the same positions in each sequence which are different'''
if len(seq1) != len(seq2):
raise ValueError('Cannot compute Hamming distance between sequences of different lengths')

return sum(
int(elem1 != elem2) # NOTE: type conversion not strictly necessary here, but done for self-documentation
for elem1, elem2 in zip(seq1, seq2, strict=True)
)

def jaccard_distance(seq1 : Sequence[T], seq2 : Sequence[T]) -> float:
'''Compute the Jaccard distance between a pair of sequences with elements of compatible type
Denotes the complement of the ratio of shared elements (intersection) to total elements (union)'''
set1, set2 = set(seq1), set(seq2)
size_intersect = len(set.intersection(set1, set2))
size_union = len(set.union(set1, set2))
jaccard_coeff = size_intersect / size_union

return 1 - jaccard_coeff
tanimoto_distance = jaccard_distance # TOSELF: debatable whether this alias is really accurate (literature suggests it may be context/field-dependent)

levenshtein_dist = edit_distance = edit_dist = levenshtein_distance # aliases for convenience
130 changes: 130 additions & 0 deletions polymerist/genutils/sequences/similarity/edits.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
'''For calculating the edit distance between sequences and inspecting the edits needed to go between them'''

from typing import Any, Generator, Sequence, Type, TypeVar, TypeAlias
from dataclasses import dataclass, field, replace
T = TypeVar('T')

from enum import Enum
import numpy as np

from ...typetools.numpytypes import N, M, Shape
from ...bits import int_to_bits


# REPRESENTATION CLASSES
class EditOperation(Enum): # TODO: reimplement as bitwise flags
'''For annotating distinct kinds of sequence edits and their associated index offsets'''
NULL = 0 # NOTE: order of fields here intentional and CANNOT be modified!
INSERTION = 1 # ... this is because the bits of each index are the row and column
DELETION = 2 # ... offsets corresponding to that edit operation in a Wagner-Fischer matrix
SUBSTITUTION = 3

@property
def bits(self) -> tuple[int, int]:
'''Convert the integer value of the Enum field into its binary bits'''
return tuple(int_to_bits(self.value, num_bits=2, as_list=True))
offsets = bits # alias to make dependent code more readable

@dataclass
class EditInfo:
'''for bundling together information about a sequence edit step'''
edit_op : EditOperation
indices : tuple[int, int]
distance : int


# WAGNER-FISCHER MATRIX OPERATION
def compute_wf_matrix(seq1 : Sequence[T], seq2 : Sequence[T], int_type : Type=int) -> np.ndarray[Shape[N, M], int]:
'''Compute (N+1)x(M+1) matrix of Levenshtein distances between all partial prefices of a pair of sequences
where N and M are the lengths of the first and second sequence, respectively. Implements the Wagner-Fischer algorithm'''
n, m = len(seq1), len(seq2)
n_aug, m_aug = n + 1, m + 1

# initialize matrix with all zeros apart from first row and column,
wf_matrix = np.zeros((n_aug, m_aug), dtype=int_type)
wf_matrix[:, 0] = np.arange(n_aug, dtype=int_type) # index along first column is same as number of deletions to get to empty sequence - NOTE: element [0, 0] overlaps here
wf_matrix[0, :] = np.arange(m_aug, dtype=int_type) # index along first row is same as number of insertions to get from empty sequence - NOTE: element [0, 0] overlaps here

# populate matrix by iterating on distances between sub-sequence problems
for i, elem1 in enumerate(seq1, start=1):
for j, elem2 in enumerate(seq2, start=1):
wf_matrix[i, j] = 1 + np.min([
wf_matrix[i-1, j], # deletion at end of second sequence (after augmentation to end of first sequence)
wf_matrix[i, j-1], # insertion at end of second sequence
wf_matrix[i-1, j-1] - int(elem1 == elem2) # substition of last elements between sequences (if elements are equal, then substitution costs nothing)
])
# TODO: implement support for transposition weighting (i.e. Damerau-Levenshtein distance)

return wf_matrix

def traverse_wf_matrix(wf_matrix : np.ndarray[Shape[N, M], int], begin_idxs : tuple[int, int]=(0, 0), end_idxs : tuple[int, int]=(-1, -1)) -> Generator[list[EditInfo], None, None]:
'''Takes a Wagner-Fischer Levenshtein distance matrix and returns the indices of the minimal path through the matrix
from the origin (i.e. empty sequences) to the '''
assert(wf_matrix.ndim == 2)
n_aug, m_aug = wf_matrix.shape

if end_idxs == begin_idxs:
yield [ # need to wrap in list for consistent typing in recursive calls
EditInfo( # base case
edit_op=EditOperation.NULL,
indices=begin_idxs,
distance=0
)
]
else:
i, j = end_idxs
i %= n_aug # ensure values are positive
j %= m_aug # ensure values are positive
curr_dist = wf_matrix[i, j]

prev_edits = []
for edit_op in EditOperation:
if edit_op == EditOperation.NULL:
continue # need to skip over to avoid RecursionError - TOSELF: would be nice to reimplement EditOperations as bitwise flags to streamline this

di, dj = edit_op.offsets # unpack index offsets
i_prev, j_prev = i - di, j - dj
prev_edits.append(
EditInfo(
edit_op=edit_op,
indices=(i_prev, j_prev),
distance=wf_matrix[i_prev, j_prev]
)
)

min_prev_dist = min(ei.distance for ei in prev_edits)
for edit_info in prev_edits:
ret_edit_info = replace(edit_info, indices=(i, j)) # create a copy of the current edit info which reports the current
if (edit_info.edit_op == EditOperation.SUBSTITUTION) and (curr_dist == min_prev_dist):
ret_edit_info.edit_op = EditOperation.NULL

if edit_info.distance == min_prev_dist:
for edit_steps in traverse_wf_matrix(wf_matrix, begin_idxs=begin_idxs, end_idxs=edit_info.indices): # recursive tail call through all possible predecessors
yield edit_steps + [ret_edit_info]

def describe_edits(seq1 : Sequence[T], seq2 : Sequence[T], int_type : Type=int, indicator : str=' -> ', delimiter : str='\n') -> Generator[str, None, None]:
'''Describes step-by-step the insertion, deletion, or substitution operations needed to transform one sequence into another'''
seqs = (seq1, seq2) # need to bundle for zipping later
wf_matrix = compute_wf_matrix(seq1, seq2, int_type=int_type)

for edits in traverse_wf_matrix(wf_matrix):
edit_descs : list[str] = []
for edit_info in edits:
edit_op = edit_info.edit_op
if edit_op == EditOperation.NULL:
continue

elem_edit_str = indicator.join( # TODO : add printout for how the word being modified looks after each change (notnjust which symbols are changing)
str(seq[idx - 1] if to_show else None) # need to subtract 1 to get index of step previous to current edit
for idx, to_show, seq in zip(edit_info.indices, edit_op.bits, seqs)
)

edit_descs.append(f'{edit_info.edit_op.name}: {elem_edit_str}')
yield delimiter.join(edit_descs)


# EDIT DISTANCES
def levenshtein_distance(seq1 : Sequence[T], seq2 : Sequence[T], int_type : Type=int) -> int:
'''Compute the Levenshtein (edit) distance between a pair of sequences with elements of compatible type
Denotes the minimal number of insertion, deletion, or substitution operations needed to transform either sequence into the other'''
return compute_wf_matrix(seq1, seq2, int_type=int_type)[-1, -1]

0 comments on commit 541b944

Please sign in to comment.