Skip to content

Commit

Permalink
Fix mmap settings used by joblib.Parallel (#428)
Browse files Browse the repository at this point in the history
* Fix mmap settings used by joblib.Parallel in HeatKernel and PersistenceImage

* Add tests

* Slice X first inside parallel calls to _subdiagrams

* Minor simplifications in plot_diagram

* Add @NickSale to CODE_AUTHORS.rst

Co-authored-by: Umberto Lupo <umberto.lupo@gmail.com>
  • Loading branch information
NickSale and ulupo authored Jul 21, 2020
1 parent aad1eba commit 1c24ce3
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 43 deletions.
1 change: 1 addition & 0 deletions CODE_AUTHORS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ Where component authors are known, add them here.
| Anibal Medina-Mardones, anibal.medinamardones@epfl.ch
| Wojciech Reise, reisewojciech@gmail.com
| Roman Yurchak, roman.yurchak@symerio.com
| Nick Sale, nicholas.j.sale@gmail.com
13 changes: 7 additions & 6 deletions gtda/diagrams/_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def landscapes(diagrams, sampling, n_layers):


def _heat(image, sampled_diag, sigma):
_sample_image(image, sampled_diag) # modifies `heat` inplace
_sample_image(image, sampled_diag)
image[:] = gaussian_filter(image, sigma, mode="reflect")


Expand Down Expand Up @@ -258,18 +258,19 @@ def _parallel_pairwise(X1, X2, metric, metric_params,
if X2 is None:
X2 = X1

n_columns = len(X2)
distance_matrices = Parallel(n_jobs=n_jobs)(
delayed(metric_func)(_subdiagrams(X1, [dim], remove_dim=True),
_subdiagrams(X2[s], [dim], remove_dim=True),
sampling=samplings[dim],
step_size=step_sizes[dim],
**effective_metric_params)
for dim in homology_dimensions
for s in gen_even_slices(X2.shape[0], effective_n_jobs(n_jobs)))
for s in gen_even_slices(n_columns, effective_n_jobs(n_jobs)))

distance_matrices = np.concatenate(distance_matrices, axis=1)
distance_matrices = np.stack(
[distance_matrices[:, i * X2.shape[0]:(i + 1) * X2.shape[0]]
[distance_matrices[:, i * n_columns:(i + 1) * n_columns]
for i in range(len(homology_dimensions))],
axis=2)
return distance_matrices
Expand Down Expand Up @@ -341,13 +342,13 @@ def _parallel_amplitude(X, metric, metric_params, homology_dimensions, n_jobs):

amplitude_arrays = Parallel(n_jobs=n_jobs)(
delayed(amplitude_func)(
_subdiagrams(X, [dim], remove_dim=True)[s],
_subdiagrams(X[s], [dim], remove_dim=True),
sampling=samplings[dim], step_size=step_sizes[dim],
**effective_metric_params)
for dim in homology_dimensions
for s in gen_even_slices(_num_samples(X), effective_n_jobs(n_jobs)))

amplitude_arrays = (np.concatenate(amplitude_arrays).reshape(
len(homology_dimensions), X.shape[0]).T)
amplitude_arrays = np.concatenate(amplitude_arrays).\
reshape(len(homology_dimensions), len(X)).T

return amplitude_arrays
1 change: 1 addition & 0 deletions gtda/diagrams/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def _sort(Xs):


def _sample_image(image, sampled_diag):
# NOTE: Modifies `image` in-place
unique, counts = np.unique(sampled_diag, axis=0, return_counts=True)
unique = tuple(tuple(row) for row in unique.astype(np.int).T)
image[unique] = counts
Expand Down
5 changes: 2 additions & 3 deletions gtda/diagrams/features.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,9 @@ def transform(self, X, y=None):

with np.errstate(divide='ignore', invalid='ignore'):
Xt = Parallel(n_jobs=self.n_jobs)(
delayed(self._persistence_entropy)(_subdiagrams(X, [dim])[s])
delayed(self._persistence_entropy)(_subdiagrams(X[s], [dim]))
for dim in self.homology_dimensions_
for s in gen_even_slices(
X.shape[0], effective_n_jobs(self.n_jobs))
for s in gen_even_slices(len(X), effective_n_jobs(self.n_jobs))
)
Xt = np.concatenate(Xt).reshape(self._n_dimensions, X.shape[0]).T
return Xt
Expand Down
58 changes: 27 additions & 31 deletions gtda/diagrams/representations.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,12 @@ def transform(self, X, y=None):
X = check_diagrams(X)

Xt = Parallel(n_jobs=self.n_jobs)(delayed(betti_curves)(
_subdiagrams(X, [dim], remove_dim=True)[s],
_subdiagrams(X[s], [dim], remove_dim=True),
self._samplings[dim])
for dim in self.homology_dimensions_
for s in gen_even_slices(X.shape[0],
effective_n_jobs(self.n_jobs)))
for s in gen_even_slices(len(X), effective_n_jobs(self.n_jobs)))
Xt = np.concatenate(Xt).\
reshape(self._n_dimensions, X.shape[0], -1).\
reshape(self._n_dimensions, len(X), -1).\
transpose((1, 0, 2))
return Xt

Expand Down Expand Up @@ -354,14 +353,13 @@ def transform(self, X, y=None):
X = check_diagrams(X)

Xt = Parallel(n_jobs=self.n_jobs)(delayed(landscapes)(
_subdiagrams(X, [dim], remove_dim=True)[s],
_subdiagrams(X[s], [dim], remove_dim=True),
self._samplings[dim],
self.n_layers)
for dim in self.homology_dimensions_
for s in gen_even_slices(X.shape[0],
effective_n_jobs(self.n_jobs)))
Xt = np.concatenate(Xt).reshape(self._n_dimensions, X.shape[0],
self.n_layers, self.n_bins).\
for s in gen_even_slices(len(X), effective_n_jobs(self.n_jobs)))
Xt = np.concatenate(Xt).\
reshape(self._n_dimensions, len(X), self.n_layers, self.n_bins).\
transpose((1, 0, 2, 3))
return Xt

Expand Down Expand Up @@ -587,14 +585,13 @@ def transform(self, X, y=None):
check_is_fitted(self)
X = check_diagrams(X, copy=True)

Xt = Parallel(n_jobs=self.n_jobs)(delayed(
heats)(_subdiagrams(X, [dim], remove_dim=True)[s],
Xt = Parallel(n_jobs=self.n_jobs, mmap_mode='c')(delayed(
heats)(_subdiagrams(X[s], [dim], remove_dim=True),
self._samplings[dim], self._step_size[dim], self.sigma)
for dim in self.homology_dimensions_
for s in gen_even_slices(X.shape[0],
effective_n_jobs(self.n_jobs)))
Xt = np.concatenate(Xt).reshape(self._n_dimensions, X.shape[0],
self.n_bins, self.n_bins).\
for s in gen_even_slices(len(X), effective_n_jobs(self.n_jobs)))
Xt = np.concatenate(Xt).\
reshape(self._n_dimensions, len(X), self.n_bins, self.n_bins).\
transpose((1, 0, 2, 3))
return Xt

Expand Down Expand Up @@ -792,19 +789,19 @@ def transform(self, X, y=None):
check_is_fitted(self)
X = check_diagrams(X, copy=True)

Xt = Parallel(n_jobs=self.n_jobs)(
delayed(persistence_images)(_subdiagrams(X, [dim],
remove_dim=True)[s],
self._samplings[dim],
self._step_size[dim],
self.weights_[dim],
self.sigma)
Xt = Parallel(n_jobs=self.n_jobs, mmap_mode='c')(
delayed(persistence_images)(
_subdiagrams(X[s], [dim], remove_dim=True),
self._samplings[dim],
self._step_size[dim],
self.weights_[dim],
self.sigma
)
for dim in self.homology_dimensions_
for s in gen_even_slices(X.shape[0],
effective_n_jobs(self.n_jobs))
for s in gen_even_slices(len(X), effective_n_jobs(self.n_jobs))
)
Xt = np.concatenate(Xt).reshape(self._n_dimensions, X.shape[0],
self.n_bins, self.n_bins).\
Xt = np.concatenate(Xt).\
reshape(self._n_dimensions, len(X), self.n_bins, self.n_bins).\
transpose((1, 0, 2, 3))
return Xt

Expand Down Expand Up @@ -975,14 +972,13 @@ def transform(self, X, y=None):
X = check_diagrams(X)

Xt = (Parallel(n_jobs=self.n_jobs)
(delayed(silhouettes)(_subdiagrams(X, [dim], remove_dim=True)[s],
(delayed(silhouettes)(_subdiagrams(X[s], [dim], remove_dim=True),
self._samplings[dim], power=self.power)
for dim in self.homology_dimensions_
for s in gen_even_slices(X.shape[0],
effective_n_jobs(self.n_jobs))))
for s in gen_even_slices(len(X), effective_n_jobs(self.n_jobs))))

Xt = np.concatenate(Xt). \
reshape(self._n_dimensions, X.shape[0], -1). \
Xt = np.concatenate(Xt).\
reshape(self._n_dimensions, len(X), -1).\
transpose((1, 0, 2))
return Xt

Expand Down
34 changes: 34 additions & 0 deletions gtda/diagrams/tests/test_features_representations.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,22 @@ def test_pi_positive(pts):
assert np.all(pi.fit_transform(diagrams) >= 0.)


def test_large_pi_null_multithreaded():
"""Test that pi is computed correctly when the input array
is at least 1MB and more than 1 process is used, triggering
joblib's use of memmaps"""
X = np.linspace(0, 100, 300000)
pi = PersistenceImage(sigma=1, n_bins=10, n_jobs=2)
diagrams = np.expand_dims(np.stack([X, X,
np.zeros((X.shape[0],),
dtype=int)]).transpose(),
axis=0)
diagrams = np.repeat(diagrams, 2, axis=0)
diagrams[1, :, 1] += 1

assert_almost_equal(pi.fit_transform(diagrams)[0], 0)


def test_silhouette_transform():
sht = Silhouette(n_bins=31, power=1.)
X_sht_res = np.array([0., 0.05, 0.1, 0.15, 0.2, 0.25, 0.2, 0.15, 0.1,
Expand Down Expand Up @@ -234,3 +250,21 @@ def test_hk_with_diag_points(pts):
for x_ in [x, x_with_diag_points]]

assert_almost_equal(x_with_diag_points_t, x_t, decimal=13)


def test_large_hk_shape_multithreaded():
"""Test that HeatKernel returns something of the right shape when the input
array is at least 1MB and more than 1 process is used, triggering joblib's
use of memmaps"""
X = np.linspace(0, 100, 300000)
n_bins = 10
diagrams = np.expand_dims(np.stack([X, X,
np.zeros((X.shape[0],),
dtype=int)]).transpose(),
axis=0)

hk = HeatKernel(sigma=1, n_bins=n_bins, n_jobs=2)
num_dimensions = 1
x_t = hk.fit_transform(diagrams)

assert x_t.shape == (diagrams.shape[0], num_dimensions, n_bins, n_bins)
4 changes: 1 addition & 3 deletions gtda/plotting/persistence_diagrams.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ def plot_diagram(diagram, homology_dimensions=None, **input_layout):
homology dimensions which appear in `diagram` will be plotted.
"""
from ..diagrams._utils import _subdiagrams # To avoid circular imports

# TODO: increase the marker size
if homology_dimensions is None:
Expand Down Expand Up @@ -76,8 +75,7 @@ def plot_diagram(diagram, homology_dimensions=None, **input_layout):

for dim in homology_dimensions:
name = f'H{int(dim)}' if dim != np.inf else 'Any homology dimension'
subdiagram = _subdiagrams(np.asarray([diagram]), [dim],
remove_dim=True)[0]
subdiagram = diagram[diagram[:, 2] == dim]
diff = (subdiagram[:, 1] != subdiagram[:, 0])
subdiagram = subdiagram[diff]
fig.add_trace(gobj.Scatter(x=subdiagram[:, 0], y=subdiagram[:, 1],
Expand Down

0 comments on commit 1c24ce3

Please sign in to comment.