-
Notifications
You must be signed in to change notification settings - Fork 12
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
tests: give repack and split their own test module
- Loading branch information
1 parent
4542e7b
commit 3d743d3
Showing
3 changed files
with
163 additions
and
138 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
from dclab import cli, new_dataset, rtdc_dataset | ||
|
||
import h5py | ||
import numpy as np | ||
import pytest | ||
|
||
from helper_methods import retrieve_data | ||
|
||
|
||
@pytest.mark.filterwarnings( | ||
"ignore::dclab.rtdc_dataset.config.WrongConfigurationTypeWarning") | ||
def test_repack_basic(): | ||
path_in = retrieve_data("fmt-hdf5_mask-contour_2018.zip") | ||
# same directory (will be cleaned up with path_in) | ||
path_out = path_in.with_name("repacked.rtdc") | ||
|
||
cli.repack(path_in=path_in, path_out=path_out) | ||
|
||
with new_dataset(path_out) as dsj, new_dataset(path_in) as ds0: | ||
assert len(dsj) | ||
assert len(dsj) == len(ds0) | ||
for feat in ds0.features_innate: | ||
if feat in ds0.features_scalar: | ||
assert np.all(dsj[feat] == ds0[feat]), feat | ||
for ii in range(len(ds0)): | ||
assert np.all(dsj["contour"][ii] == ds0["contour"][ii]) | ||
assert np.all(dsj["image"][ii] == ds0["image"][ii]) | ||
assert np.all(dsj["mask"][ii] == ds0["mask"][ii]) | ||
|
||
|
||
@pytest.mark.filterwarnings( | ||
"ignore::dclab.rtdc_dataset.config.WrongConfigurationTypeWarning") | ||
def test_repack_remove_secrets(): | ||
path_in = retrieve_data("fmt-hdf5_mask-contour_2018.zip") | ||
# same directory (will be cleaned up with path_in) | ||
path_out = path_in.with_name("repacked.rtdc") | ||
|
||
with h5py.File(path_in, "a") as h5: | ||
h5.attrs["experiment:sample"] = "my dirty secret" | ||
|
||
with h5py.File(path_in, "a") as h5: | ||
h5.attrs["experiment:sample"] = "sunshine" | ||
|
||
# test whether the dirty secret is still there | ||
with open(str(path_in), "rb") as fd: | ||
data = fd.read() | ||
assert str(data).count("my dirty secret") | ||
|
||
# now repack | ||
cli.repack(path_in=path_in, path_out=path_out) | ||
|
||
# clean? | ||
with open(str(path_out), "rb") as fd: | ||
data = fd.read() | ||
assert not str(data).count("my dirty secret") | ||
|
||
|
||
@pytest.mark.filterwarnings( | ||
"ignore::dclab.rtdc_dataset.config.WrongConfigurationTypeWarning") | ||
def test_repack_strip_logs(): | ||
path_in = retrieve_data("fmt-hdf5_mask-contour_2018.zip") | ||
# same directory (will be cleaned up with path_in) | ||
path_out = path_in.with_name("repacked.rtdc") | ||
|
||
# write some logs | ||
with h5py.File(path_in, "a") as h5: | ||
hw = rtdc_dataset.RTDCWriter(h5) | ||
hw.store_log("test_log", ["peter", "hans"]) | ||
|
||
cli.repack(path_in=path_in, path_out=path_out, strip_logs=True) | ||
|
||
with new_dataset(path_out) as dsj, new_dataset(path_in) as ds0: | ||
assert ds0.logs | ||
assert not dsj.logs | ||
|
||
|
||
@pytest.mark.filterwarnings( | ||
"ignore::dclab.rtdc_dataset.config.WrongConfigurationTypeWarning") | ||
def test_repack_user_metadata(): | ||
path_in = retrieve_data("fmt-hdf5_mask-contour_2018.zip") | ||
with h5py.File(path_in, "a") as h5: | ||
h5.attrs["user:peter"] = "hans" | ||
|
||
# same directory (will be cleaned up with path_in) | ||
path_out = path_in.with_name("repacked.rtdc") | ||
|
||
cli.repack(path_in=path_in, path_out=path_out) | ||
|
||
with new_dataset(path_out) as ds: | ||
assert ds.config["user"]["peter"] == "hans" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
import sys | ||
import tempfile | ||
import pathlib | ||
import shutil | ||
|
||
from dclab import cli, new_dataset, rtdc_dataset | ||
|
||
import h5py | ||
import numpy as np | ||
import pytest | ||
|
||
from helper_methods import retrieve_data | ||
|
||
|
||
|
||
@pytest.mark.filterwarnings( | ||
"ignore::dclab.rtdc_dataset.config.WrongConfigurationTypeWarning") | ||
def test_compressed_split(): | ||
"""Make sure the split output data are compressed""" | ||
path_in = retrieve_data("fmt-hdf5_mask-contour_2018.zip") | ||
# same directory (will be cleaned up with path_in) | ||
path_out = path_in.parent | ||
|
||
paths = cli.split(path_in=path_in, | ||
path_out=path_out, | ||
split_events=3, | ||
ret_out_paths=True) | ||
|
||
for pp in paths: | ||
ic = rtdc_dataset.check.IntegrityChecker(pp) | ||
ccue = ic.check_compression()[0] | ||
assert ccue.data["uncompressed"] == 0 | ||
|
||
|
||
|
||
|
||
|
||
@pytest.mark.filterwarnings( | ||
"ignore::dclab.rtdc_dataset.config.WrongConfigurationTypeWarning") | ||
def test_split(): | ||
path_in = retrieve_data("fmt-hdf5_mask-contour_2018.zip") | ||
paths = cli.split(path_in=path_in, split_events=3, ret_out_paths=True) | ||
with new_dataset(path_in) as ds: | ||
ecount = 0 | ||
for pp in paths: | ||
with new_dataset(pp) as di: | ||
for feat in ds.features_scalar: | ||
if feat in ["index", | ||
"time", # issue 204 | ||
]: | ||
continue | ||
assert np.all( | ||
ds[feat][ecount:ecount + len(di)] == di[feat]), feat | ||
ecount += len(di) | ||
|
||
|
||
|
||
|
||
@pytest.mark.filterwarnings( | ||
"ignore::dclab.rtdc_dataset.config.WrongConfigurationTypeWarning") | ||
def test_split_traces(): | ||
path_in = retrieve_data("fmt-hdf5_fl_2018.zip") | ||
paths = cli.split(path_in=path_in, split_events=3, ret_out_paths=True) | ||
with new_dataset(path_in) as ds: | ||
ecount = 0 | ||
for pp in paths: | ||
with new_dataset(pp) as di: | ||
for flkey in ds["trace"].keys(): | ||
trace1 = ds["trace"][flkey][ecount:ecount + len(di)] | ||
trace2 = di["trace"][flkey][:] | ||
assert len(trace1) == len(trace2) | ||
assert np.all(trace1 == trace2), flkey | ||
ecount += len(di) |