Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix mmap settings used by joblib.Parallel #428

Merged
merged 10 commits into from
Jul 21, 2020
1 change: 1 addition & 0 deletions CODE_AUTHORS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ Where component authors are known, add them here.
| Anibal Medina-Mardones, anibal.medinamardones@epfl.ch
| Wojciech Reise, reisewojciech@gmail.com
| Roman Yurchak, roman.yurchak@symerio.com
| Nick Sale, nicholas.j.sale@gmail.com
13 changes: 7 additions & 6 deletions gtda/diagrams/_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def landscapes(diagrams, sampling, n_layers):


def _heat(image, sampled_diag, sigma):
_sample_image(image, sampled_diag) # modifies `heat` inplace
_sample_image(image, sampled_diag)
image[:] = gaussian_filter(image, sigma, mode="reflect")


Expand Down Expand Up @@ -258,18 +258,19 @@ def _parallel_pairwise(X1, X2, metric, metric_params,
if X2 is None:
X2 = X1

n_columns = len(X2)
distance_matrices = Parallel(n_jobs=n_jobs)(
delayed(metric_func)(_subdiagrams(X1, [dim], remove_dim=True),
_subdiagrams(X2[s], [dim], remove_dim=True),
sampling=samplings[dim],
step_size=step_sizes[dim],
**effective_metric_params)
for dim in homology_dimensions
for s in gen_even_slices(X2.shape[0], effective_n_jobs(n_jobs)))
for s in gen_even_slices(n_columns, effective_n_jobs(n_jobs)))

distance_matrices = np.concatenate(distance_matrices, axis=1)
distance_matrices = np.stack(
[distance_matrices[:, i * X2.shape[0]:(i + 1) * X2.shape[0]]
[distance_matrices[:, i * n_columns:(i + 1) * n_columns]
for i in range(len(homology_dimensions))],
axis=2)
return distance_matrices
Expand Down Expand Up @@ -341,13 +342,13 @@ def _parallel_amplitude(X, metric, metric_params, homology_dimensions, n_jobs):

amplitude_arrays = Parallel(n_jobs=n_jobs)(
delayed(amplitude_func)(
_subdiagrams(X, [dim], remove_dim=True)[s],
_subdiagrams(X[s], [dim], remove_dim=True),
sampling=samplings[dim], step_size=step_sizes[dim],
**effective_metric_params)
for dim in homology_dimensions
for s in gen_even_slices(_num_samples(X), effective_n_jobs(n_jobs)))

amplitude_arrays = (np.concatenate(amplitude_arrays).reshape(
len(homology_dimensions), X.shape[0]).T)
amplitude_arrays = np.concatenate(amplitude_arrays).\
reshape(len(homology_dimensions), len(X)).T

return amplitude_arrays
1 change: 1 addition & 0 deletions gtda/diagrams/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def _sort(Xs):


def _sample_image(image, sampled_diag):
# NOTE: Modifies `image` in-place
unique, counts = np.unique(sampled_diag, axis=0, return_counts=True)
unique = tuple(tuple(row) for row in unique.astype(np.int).T)
image[unique] = counts
Expand Down
5 changes: 2 additions & 3 deletions gtda/diagrams/features.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,9 @@ def transform(self, X, y=None):

with np.errstate(divide='ignore', invalid='ignore'):
Xt = Parallel(n_jobs=self.n_jobs)(
delayed(self._persistence_entropy)(_subdiagrams(X, [dim])[s])
delayed(self._persistence_entropy)(_subdiagrams(X[s], [dim]))
for dim in self.homology_dimensions_
for s in gen_even_slices(
X.shape[0], effective_n_jobs(self.n_jobs))
for s in gen_even_slices(len(X), effective_n_jobs(self.n_jobs))
)
Xt = np.concatenate(Xt).reshape(self._n_dimensions, X.shape[0]).T
return Xt
Expand Down
58 changes: 27 additions & 31 deletions gtda/diagrams/representations.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,12 @@ def transform(self, X, y=None):
X = check_diagrams(X)

Xt = Parallel(n_jobs=self.n_jobs)(delayed(betti_curves)(
_subdiagrams(X, [dim], remove_dim=True)[s],
_subdiagrams(X[s], [dim], remove_dim=True),
self._samplings[dim])
for dim in self.homology_dimensions_
for s in gen_even_slices(X.shape[0],
effective_n_jobs(self.n_jobs)))
for s in gen_even_slices(len(X), effective_n_jobs(self.n_jobs)))
Xt = np.concatenate(Xt).\
reshape(self._n_dimensions, X.shape[0], -1).\
reshape(self._n_dimensions, len(X), -1).\
transpose((1, 0, 2))
return Xt

Expand Down Expand Up @@ -354,14 +353,13 @@ def transform(self, X, y=None):
X = check_diagrams(X)

Xt = Parallel(n_jobs=self.n_jobs)(delayed(landscapes)(
_subdiagrams(X, [dim], remove_dim=True)[s],
_subdiagrams(X[s], [dim], remove_dim=True),
self._samplings[dim],
self.n_layers)
for dim in self.homology_dimensions_
for s in gen_even_slices(X.shape[0],
effective_n_jobs(self.n_jobs)))
Xt = np.concatenate(Xt).reshape(self._n_dimensions, X.shape[0],
self.n_layers, self.n_bins).\
for s in gen_even_slices(len(X), effective_n_jobs(self.n_jobs)))
Xt = np.concatenate(Xt).\
reshape(self._n_dimensions, len(X), self.n_layers, self.n_bins).\
transpose((1, 0, 2, 3))
return Xt

Expand Down Expand Up @@ -587,14 +585,13 @@ def transform(self, X, y=None):
check_is_fitted(self)
X = check_diagrams(X, copy=True)

Xt = Parallel(n_jobs=self.n_jobs)(delayed(
heats)(_subdiagrams(X, [dim], remove_dim=True)[s],
Xt = Parallel(n_jobs=self.n_jobs, mmap_mode='c')(delayed(
heats)(_subdiagrams(X[s], [dim], remove_dim=True),
self._samplings[dim], self._step_size[dim], self.sigma)
for dim in self.homology_dimensions_
for s in gen_even_slices(X.shape[0],
effective_n_jobs(self.n_jobs)))
Xt = np.concatenate(Xt).reshape(self._n_dimensions, X.shape[0],
self.n_bins, self.n_bins).\
for s in gen_even_slices(len(X), effective_n_jobs(self.n_jobs)))
Xt = np.concatenate(Xt).\
reshape(self._n_dimensions, len(X), self.n_bins, self.n_bins).\
transpose((1, 0, 2, 3))
return Xt

Expand Down Expand Up @@ -792,19 +789,19 @@ def transform(self, X, y=None):
check_is_fitted(self)
X = check_diagrams(X, copy=True)

Xt = Parallel(n_jobs=self.n_jobs)(
delayed(persistence_images)(_subdiagrams(X, [dim],
remove_dim=True)[s],
self._samplings[dim],
self._step_size[dim],
self.weights_[dim],
self.sigma)
Xt = Parallel(n_jobs=self.n_jobs, mmap_mode='c')(
delayed(persistence_images)(
_subdiagrams(X[s], [dim], remove_dim=True),
self._samplings[dim],
self._step_size[dim],
self.weights_[dim],
self.sigma
)
for dim in self.homology_dimensions_
for s in gen_even_slices(X.shape[0],
effective_n_jobs(self.n_jobs))
for s in gen_even_slices(len(X), effective_n_jobs(self.n_jobs))
)
Xt = np.concatenate(Xt).reshape(self._n_dimensions, X.shape[0],
self.n_bins, self.n_bins).\
Xt = np.concatenate(Xt).\
reshape(self._n_dimensions, len(X), self.n_bins, self.n_bins).\
transpose((1, 0, 2, 3))
return Xt

Expand Down Expand Up @@ -975,14 +972,13 @@ def transform(self, X, y=None):
X = check_diagrams(X)

Xt = (Parallel(n_jobs=self.n_jobs)
(delayed(silhouettes)(_subdiagrams(X, [dim], remove_dim=True)[s],
(delayed(silhouettes)(_subdiagrams(X[s], [dim], remove_dim=True),
self._samplings[dim], power=self.power)
for dim in self.homology_dimensions_
for s in gen_even_slices(X.shape[0],
effective_n_jobs(self.n_jobs))))
for s in gen_even_slices(len(X), effective_n_jobs(self.n_jobs))))

Xt = np.concatenate(Xt). \
reshape(self._n_dimensions, X.shape[0], -1). \
Xt = np.concatenate(Xt).\
reshape(self._n_dimensions, len(X), -1).\
transpose((1, 0, 2))
return Xt

Expand Down
34 changes: 34 additions & 0 deletions gtda/diagrams/tests/test_features_representations.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,22 @@ def test_pi_positive(pts):
assert np.all(pi.fit_transform(diagrams) >= 0.)


def test_large_pi_null_multithreaded():
"""Test that pi is computed correctly when the input array
is at least 1MB and more than 1 process is used, triggering
joblib's use of memmaps"""
X = np.linspace(0, 100, 300000)
pi = PersistenceImage(sigma=1, n_bins=10, n_jobs=2)
diagrams = np.expand_dims(np.stack([X, X,
np.zeros((X.shape[0],),
dtype=int)]).transpose(),
axis=0)
diagrams = np.repeat(diagrams, 2, axis=0)
diagrams[1, :, 1] += 1

assert_almost_equal(pi.fit_transform(diagrams)[0], 0)


def test_silhouette_transform():
sht = Silhouette(n_bins=31, power=1.)
X_sht_res = np.array([0., 0.05, 0.1, 0.15, 0.2, 0.25, 0.2, 0.15, 0.1,
Expand Down Expand Up @@ -234,3 +250,21 @@ def test_hk_with_diag_points(pts):
for x_ in [x, x_with_diag_points]]

assert_almost_equal(x_with_diag_points_t, x_t, decimal=13)


def test_large_hk_shape_multithreaded():
"""Test that HeatKernel returns something of the right shape when the input
array is at least 1MB and more than 1 process is used, triggering joblib's
use of memmaps"""
X = np.linspace(0, 100, 300000)
n_bins = 10
diagrams = np.expand_dims(np.stack([X, X,
np.zeros((X.shape[0],),
dtype=int)]).transpose(),
axis=0)

hk = HeatKernel(sigma=1, n_bins=n_bins, n_jobs=2)
num_dimensions = 1
x_t = hk.fit_transform(diagrams)

assert x_t.shape == (diagrams.shape[0], num_dimensions, n_bins, n_bins)
4 changes: 1 addition & 3 deletions gtda/plotting/persistence_diagrams.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ def plot_diagram(diagram, homology_dimensions=None, **input_layout):
homology dimensions which appear in `diagram` will be plotted.

"""
from ..diagrams._utils import _subdiagrams # To avoid circular imports

# TODO: increase the marker size
if homology_dimensions is None:
Expand Down Expand Up @@ -76,8 +75,7 @@ def plot_diagram(diagram, homology_dimensions=None, **input_layout):

for dim in homology_dimensions:
name = f'H{int(dim)}' if dim != np.inf else 'Any homology dimension'
subdiagram = _subdiagrams(np.asarray([diagram]), [dim],
remove_dim=True)[0]
subdiagram = diagram[diagram[:, 2] == dim]
diff = (subdiagram[:, 1] != subdiagram[:, 0])
subdiagram = subdiagram[diff]
fig.add_trace(gobj.Scatter(x=subdiagram[:, 0], y=subdiagram[:, 1],
Expand Down