Skip to content

Commit

Permalink
[patch:lib] Fix random transitions for left-right HMM topology (#151)
Browse files Browse the repository at this point in the history
* Fix left-right HMM topology random transitions

* Update documentation
  • Loading branch information
eonu authored Jan 23, 2021
1 parent be79b65 commit a3b0fbb
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 112 deletions.
4 changes: 2 additions & 2 deletions lib/sequentia/classifiers/hmm/gmmhmm.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ def set_uniform_transitions(self):
def set_random_transitions(self):
"""Sets a random transition matrix according to the topology, so that given the HMM is in state :math:`i`,
all out-going transition probabilities :math:`\\mathbf{p}_i=(p_{i1},p_{i2},\\ldots,p_{iM})` from state :math:`i`
are generated by sampling :math:`\\mathbf{p}_i\\sim\\mathrm{Dir}(\\mathbf{1}_M)` and redistributed so that only the
transitions permitted by the topology are non-zero."""
are generated by sampling :math:`\\mathbf{p}_i\\sim\\mathrm{Dir}(\\mathbf{1})` with a vector of ones of appropriate
size used as concentration parameters, so that only transitions permitted by the topology are non-zero."""
self._transitions = self._topology.random_transitions()

def fit(self, X):
Expand Down
10 changes: 4 additions & 6 deletions lib/sequentia/classifiers/hmm/topologies/left_right.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,10 @@ def random_transitions(self) -> np.ndarray:
transitions: :class:`numpy:numpy.ndarray` (float)
The random transition matrix of shape `(n_states, n_states)`.
"""
transitions = self._random_state.dirichlet(np.ones(self._n_states), size=self._n_states)
lower_sums = np.sum(np.tril(transitions, k=-1), axis=1) # Amount to be redistributed per row
quantities = np.arange(self._n_states, 0, -1) # Number of elements per row to redistribute evenly to
upper_ones = np.triu(np.ones((self._n_states, self._n_states)))
redist = (lower_sums / quantities).reshape(-1, 1) * upper_ones
return np.triu(transitions) + redist
transitions = np.zeros((self._n_states, self._n_states))
for i, row in enumerate(transitions):
row[i:] = self._random_state.dirichlet(np.ones(self._n_states - i))
return transitions

def validate_transitions(self, transitions: np.ndarray) -> None:
"""Validates a transition matrix according to the topology's restrictions.
Expand Down
12 changes: 6 additions & 6 deletions lib/sequentia/classifiers/hmm/topologies/linear.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ def uniform_transitions(self) -> np.ndarray:
The uniform transition matrix of shape `(n_states, n_states)`.
"""
transitions = np.zeros((self._n_states, self._n_states))
for i, row in enumerate(transitions[:-1]):
row[i:(i+2)] = np.ones(2) / 2
transitions[self._n_states - 1][self._n_states - 1] = 1
for i, row in enumerate(transitions):
size = min(2, self._n_states - i)
row[i:(i + size)] = np.ones(size) / size
return transitions

def random_transitions(self) -> np.ndarray:
Expand All @@ -39,9 +39,9 @@ def random_transitions(self) -> np.ndarray:
The random transition matrix of shape `(n_states, n_states)`.
"""
transitions = np.zeros((self._n_states, self._n_states))
for i, row in enumerate(transitions[:-1]):
row[i:(i+2)] = self._random_state.dirichlet(np.ones(2))
transitions[self._n_states - 1][self._n_states - 1] = 1
for i, row in enumerate(transitions):
size = min(2, self._n_states - i)
row[i:(i + size)] = self._random_state.dirichlet(np.ones(size))
return transitions

def validate_transitions(self, transitions: np.ndarray) -> None:
Expand Down
12 changes: 6 additions & 6 deletions lib/test/lib/classifiers/hmm/test_gmmhmm.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ def test_left_right_random_transitions():
hmm.set_random_transitions()
assert_equal(hmm.transitions, np.array([
[0.35029635, 0.13344569, 0.02784745, 0.33782453, 0.15058597],
[0. , 0.20580715, 0.5280311 , 0.06521685, 0.20094491],
[0. , 0. , 0.33761567, 0.2333124 , 0.42907193],
[0. , 0. , 0. , 0.36824778, 0.63175222],
[0. , 0.22725263, 0.18611702, 0.56646299, 0.02016736],
[0. , 0. , 0.18542075, 0.44084593, 0.37373332],
[0. , 0. , 0. , 0.65696153, 0.34303847],
[0. , 0. , 0. , 0. , 1. ]
]))

Expand Down Expand Up @@ -237,9 +237,9 @@ def test_left_right_fit_updates_random_transitions():
before = hmm.transitions
assert_equal(hmm.transitions, np.array([
[0.19252534, 0.15767581, 0.47989976, 0.01708551, 0.15281357],
[0. , 0.21269278, 0.26671807, 0.16241481, 0.35817434],
[0. , 0. , 0.33753566, 0.19947995, 0.46298439],
[0. , 0. , 0. , 0.39158159, 0.60841841],
[0. , 0.28069128, 0.23795997, 0.31622761, 0.16512114],
[0. , 0. , 0.29431489, 0.66404724, 0.04163787],
[0. , 0. , 0. , 0.8372241 , 0.1627759 ],
[0. , 0. , 0. , 0. , 1. ]
]))
hmm.fit(X)
Expand Down
81 changes: 33 additions & 48 deletions lib/test/lib/classifiers/hmm/test_hmm_classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,81 +60,81 @@ def test_fit_list_invalid():
def test_predict_single_frequency_prior():
"""Predict a single observation sequence with a frequency prior"""
prediction = hmm_clf.predict(x, prior='frequency', return_scores=False, original_labels=False)
assert prediction == 4
assert prediction == 0

def test_predict_single_uniform_prior():
"""Predict a single observation sequence with a uniform prior"""
prediction = hmm_clf.predict(x, prior='uniform', return_scores=False, original_labels=False)
assert prediction == 4
assert prediction == 0

def test_predict_single_custom_prior():
"""Predict a single observation sequence with a custom prior"""
prediction = hmm_clf.predict(x, prior=([1-4e-10]+[1e-10]*4), return_scores=False, original_labels=False)
assert prediction == 0
prediction = hmm_clf.predict(x, prior=([1e-50]*4+[1-4e-50]), return_scores=False, original_labels=False)
assert prediction == 4

def test_predict_single_return_scores():
"""Predict a single observation sequence and return the transformed label, with the un-normalized posterior scores"""
prediction = hmm_clf.predict(x, prior='frequency', return_scores=True, original_labels=False)
assert isinstance(prediction, tuple)
assert prediction[0] == 4
assert prediction[0] == 0
assert_equal(prediction[1], np.array([
-1224.7910008 , -1302.74962802, -1269.35306368, -1230.59148179, -1222.21963107
-1225.88304108, -1266.85875999, -1266.96016441, -1226.97939403, -1274.89102844
]))

def test_predict_single_original_labels():
"""Predict a single observation sequence and return the original label, without the un-normalized posterior scores"""
prediction = hmm_clf.predict(x, prior='uniform', return_scores=False, original_labels=True)
assert prediction == 'c4'
assert prediction == 'c0'

def test_predict_single_return_scores_original_labels():
"""Predict a single observation sequence and return the original label, with the un-normalized posterior scores"""
prediction = hmm_clf.predict(x, prior='frequency', return_scores=True, original_labels=True)
assert isinstance(prediction, tuple)
assert prediction[0] == 'c4'
assert prediction[0] == 'c0'
assert_equal(prediction[1], np.array([
-1224.7910008 , -1302.74962802, -1269.35306368, -1230.59148179, -1222.21963107
-1225.88304108, -1266.85875999, -1266.96016441, -1226.97939403, -1274.89102844
]))

def test_predict_multiple_frequency_prior():
"""Predict multiple observation sequences with a frequency prior"""
predictions = hmm_clf.predict(X, prior='frequency', return_scores=False, original_labels=False)
assert_equal(predictions, np.array([4, 4, 0]))
assert_equal(predictions, np.array([0, 0, 0]))

def test_predict_multiple_uniform_prior():
"""Predict multiple observation sequences with a uniform prior"""
predictions = hmm_clf.predict(X, prior='uniform', return_scores=False, original_labels=False)
assert_equal(predictions, np.array([4, 4, 0]))
assert_equal(predictions, np.array([0, 0, 0]))

def test_predict_multiple_custom_prior():
"""Predict multiple observation sequences with a custom prior"""
predictions = hmm_clf.predict(X, prior=([1-4e-10]+[1e-10]*4), return_scores=False, original_labels=False)
predictions = hmm_clf.predict(X, prior=([1-4e-50]+[1e-50]*4), return_scores=False, original_labels=False)
assert_equal(predictions, np.array([0, 0, 0]))

def test_predict_multiple_return_scores():
"""Predict multiple observation sequences and return the transformed labels, with the un-normalized posterior scores"""
predictions = hmm_clf.predict(X, prior='frequency', return_scores=True, original_labels=False)
assert isinstance(predictions, tuple)
assert_equal(predictions[0], np.array([4, 4, 0]))
assert_equal(predictions[0], np.array([0, 0, 0]))
assert_equal(predictions[1], np.array([
[-1224.7910008 , -1302.74962802, -1269.35306368, -1230.59148179, -1222.21963107],
[-1253.13158379, -1331.2088869 , -1303.04688636, -1259.4763248 , -1251.90302358],
[-1281.50581789, -1359.72306473, -1335.76859787, -1288.46683118, -1281.66788896]
[-1225.88304108, -1266.85875999, -1266.96016441, -1226.97939403, -1274.89102844],
[-1254.2158035 , -1299.37586652, -1299.75108935, -1255.8359274 , -1308.71071239],
[-1282.57116414, -1330.90436081, -1331.63379359, -1284.79130597, -1342.45717804]
]))

def test_predict_multiple_original_labels():
"""Predict multiple observation sequences and return the original labels, without the un-normalized posterior scores"""
predictions = hmm_clf.predict(X, prior='frequency', return_scores=False, original_labels=True)
assert all(np.equal(predictions.astype(object), np.array(['c4', 'c4', 'c0'], dtype=object)))
assert all(np.equal(predictions.astype(object), np.array(['c0', 'c0', 'c0'], dtype=object)))

def test_predict_multiple_return_scores_original_labels():
"""Predict multiple observation sequences and return the original labels, with the un-normalized posterior scores"""
predictions = hmm_clf.predict(X, prior='frequency', return_scores=True, original_labels=True)
assert isinstance(predictions, tuple)
assert all(np.equal(predictions[0].astype(object), np.array(['c4', 'c4', 'c0'], dtype=object)))
assert all(np.equal(predictions[0].astype(object), np.array(['c0', 'c0', 'c0'], dtype=object)))
assert_equal(predictions[1], np.array([
[-1224.7910008 , -1302.74962802, -1269.35306368, -1230.59148179, -1222.21963107],
[-1253.13158379, -1331.2088869 , -1303.04688636, -1259.4763248 , -1251.90302358],
[-1281.50581789, -1359.72306473, -1335.76859787, -1288.46683118, -1281.66788896]
[-1225.88304108, -1266.85875999, -1266.96016441, -1226.97939403, -1274.89102844],
[-1254.2158035 , -1299.37586652, -1299.75108935, -1255.8359274 , -1308.71071239],
[-1282.57116414, -1330.90436081, -1331.63379359, -1284.79130597, -1342.45717804]
]))

# ======================== #
Expand All @@ -144,39 +144,16 @@ def test_predict_multiple_return_scores_original_labels():
def test_evaluate():
"""Evaluate performance on some observation sequences and labels"""
acc, cm = hmm_clf.evaluate(X, Y, prior='frequency')
assert acc == 0.0
assert acc == 1 / 3
print(repr(cm))
assert_equal(cm, np.array([
[0, 0, 0, 0, 1],
[1, 0, 0, 0, 1],
[1, 0, 0, 0, 0],
[2, 0, 0, 0, 0],
[0, 0, 0, 0, 0],
[0, 0, 0, 0, 0],
[0, 0, 0, 0, 0]
]))

# ===================================== #
# HMMClassifier (serialize/deserialize) #
# ===================================== #

def test_serialization():
"""Serialize and deserialize the classifier"""
model_file = 'test.pkl'
try:
with open(model_file, 'wb') as file:
pickle.dump(hmm_clf, file)
with open(model_file, 'rb') as file:
clf = pickle.load(file)
predictions = clf.predict(X, prior='frequency', return_scores=True, original_labels=True)
assert isinstance(predictions, tuple)
assert all(np.equal(predictions[0].astype(object), np.array(['c4', 'c4', 'c0'], dtype=object)))
assert_equal(predictions[1], np.array([
[-1224.7910008 , -1302.74962802, -1269.35306368, -1230.59148179, -1222.21963107],
[-1253.13158379, -1331.2088869 , -1303.04688636, -1259.4763248 , -1251.90302358],
[-1281.50581789, -1359.72306473, -1335.76859787, -1288.46683118, -1281.66788896]
]))
finally:
if os.path.exists(model_file):
os.remove(model_file)

# ==================== #
# HMMClassifier.save() #
# ==================== #
Expand Down Expand Up @@ -223,5 +200,13 @@ def test_load_valid():
assert all(isinstance(model, GMMHMM) for model in clf._models)
assert [model.label for model in clf._models] == labels
assert list(clf._encoder.classes_) == labels
predictions = clf.predict(X, prior='frequency', return_scores=True, original_labels=True)
assert isinstance(predictions, tuple)
assert all(np.equal(predictions[0].astype(object), np.array(['c0', 'c0', 'c0'], dtype=object)))
assert_equal(predictions[1], np.array([
[-1225.88304108, -1266.85875999, -1266.96016441, -1226.97939403, -1274.89102844],
[-1254.2158035 , -1299.37586652, -1299.75108935, -1255.8359274 , -1308.71071239],
[-1282.57116414, -1330.90436081, -1331.63379359, -1284.79130597, -1342.45717804]
]))
finally:
os.remove('test.pkl')
58 changes: 29 additions & 29 deletions lib/test/lib/classifiers/hmm/test_topologies.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,10 @@ def test_left_right_random_transitions_many():
transitions = topology.random_transitions()
assert_distribution(transitions)
assert_equal(transitions, np.array([
[0.56841967, 0.01612013, 0.01994328, 0.0044685 , 0.39104841],
[0. , 0.25134034, 0.43904868, 0.20609306, 0.10351793],
[0. , 0. , 0.27462001, 0.12291279, 0.60246721],
[0. , 0. , 0. , 0.61951739, 0.38048261],
[0.23169814, 0.71716356, 0.02033845, 0.02516204, 0.00563782],
[0. , 0.19474072, 0.16405008, 0.22228532, 0.41892388],
[0. , 0. , 0.42912755, 0.16545797, 0.40541448],
[0. , 0. , 0. , 0.109713 , 0.890287 ],
[0. , 0. , 0. , 0. , 1. ]
]))

Expand Down Expand Up @@ -222,8 +222,8 @@ def test_ergodic_random_transitions_small():
transitions = topology.random_transitions()
assert_distribution(transitions)
assert_equal(transitions, np.array([
[0.87353002, 0.12646998],
[0.88622334, 0.11377666]
[0.9474011 , 0.0525989 ],
[0.85567599, 0.14432401]
]))

def test_ergodic_random_transitions_many():
Expand All @@ -232,11 +232,11 @@ def test_ergodic_random_transitions_many():
transitions = topology.random_transitions()
assert_distribution(transitions)
assert_equal(transitions, np.array([
[0.46537016, 0.12619365, 0.07474032, 0.32619324, 0.00750262],
[0.38836848, 0.00103519, 0.24911885, 0.06922191, 0.29225557],
[0.5312161 , 0.04639154, 0.13922816, 0.14542372, 0.13774047],
[0.0361995 , 0.43772711, 0.08498809, 0.26867251, 0.17241279],
[0.06373359, 0.30347054, 0.09117514, 0.38445582, 0.1571649 ]
[0.58715548, 0.14491542, 0.20980762, 0.00623944, 0.05188205],
[0.0840705 , 0.23055049, 0.08297536, 0.25124688, 0.35115677],
[0.02117615, 0.37664662, 0.26705912, 0.09851123, 0.23660688],
[0.01938041, 0.16853843, 0.52046123, 0.07535256, 0.21626737],
[0.04996846, 0.44545843, 0.12079423, 0.07154241, 0.31223646]
]))

# --------------------------------------- #
Expand All @@ -256,13 +256,13 @@ def test_ergodic_validate_transitions_valid():
transitions = topology.random_transitions()
topology.validate_transitions(transitions)

# ======================== #
# =============== #
# _LinearTopology #
# ======================== #
# =============== #

# ---------------------------------------------- #
# ------------------------------------- #
# _LinearTopology.uniform_transitions() #
# ---------------------------------------------- #
# ------------------------------------- #

def test_linear_uniform_transitions_min():
"""Generate a uniform linear transition matrix with minimal states"""
Expand All @@ -289,16 +289,16 @@ def test_linear_uniform_transitions_many():
transitions = topology.uniform_transitions()
assert_distribution(transitions)
assert_equal(transitions, np.array([
[0.5, 0.5 , 0. , 0. , 0. ],
[0. , 0.5 , 0.5 , 0. , 0. ],
[0. , 0. , 0.5 , 0.5 , 0. ],
[0. , 0. , 0. , 0.5 , 0.5 ],
[0. , 0. , 0. , 0. , 1. ]
[0.5, 0.5, 0. , 0. , 0. ],
[0. , 0.5, 0.5, 0. , 0. ],
[0. , 0. , 0.5, 0.5, 0. ],
[0. , 0. , 0. , 0.5, 0.5],
[0. , 0. , 0. , 0. , 1. ]
]))

# --------------------------------------------- #
# ------------------------------------ #
# _LinearTopology.random_transitions() #
# --------------------------------------------- #
# ------------------------------------ #

def test_linear_random_transitions_min():
"""Generate a random linear transition matrix with minimal states"""
Expand All @@ -315,7 +315,7 @@ def test_linear_random_transitions_small():
transitions = topology.random_transitions()
assert_distribution(transitions)
assert_equal(transitions, np.array([
[0.87426829, 0.12573171],
[0.65157396, 0.34842604],
[0. , 1. ]
]))

Expand All @@ -325,16 +325,16 @@ def test_linear_random_transitions_many():
transitions = topology.random_transitions()
assert_distribution(transitions)
assert_equal(transitions, np.array([
[0.9294571 , 0.0705429 , 0. , 0. , 0. ],
[0. , 0.92269318, 0.07730682, 0. , 0. ],
[0. , 0. , 0.86161736, 0.13838264, 0. ],
[0. , 0. , 0. , 0.13863688, 0.86136312],
[0.44455421, 0.55544579, 0. , 0. , 0. ],
[0. , 0.57553614, 0.42446386, 0. , 0. ],
[0. , 0. , 0.92014965, 0.07985035, 0. ],
[0. , 0. , 0. , 0.66790982, 0.33209018],
[0. , 0. , 0. , 0. , 1. ]
]))

# ----------------------------------------------- #
# -------------------------------------- #
# _LinearTopology.validate_transitions() #
# ----------------------------------------------- #
# -------------------------------------- #

def test_linear_validate_transitions_invalid():
"""Validate an invalid linear transition matrix"""
Expand Down
Loading

0 comments on commit a3b0fbb

Please sign in to comment.