Skip to content

Commit

Permalink
overhaul CLI methods and related functionality
Browse files Browse the repository at this point in the history
 - fix: support input files without "events" in `rtdc_copy`
 - fix: CLI methods now store basin information in output paths (#239):
   compress, condense, and repack preserve the original input basins;
   split writes mapped basins (using export.hdf5); join does not write
   basins at all
 - enh: CLI methods now return the output path
 - enh: the repack CLI method now allows to strip basins
 - docs: update doc strings for CLI methods
 - ref: put input and output path in order for CLI methods
  • Loading branch information
paulmueller committed Apr 1, 2024
1 parent 3d743d3 commit abc9c44
Show file tree
Hide file tree
Showing 15 changed files with 688 additions and 178 deletions.
19 changes: 14 additions & 5 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,20 +1,29 @@
0.58.0
- feat: implement "mapped basins" that have a superset of events
- feat: allow to export basin-only HDF5 files
- enh: introduce `RTDCBase.features_ancillary`, a list of all ancillary
features (computed on-the-fly) for a dataset
- enh: add `include_basins` keyword argument for `rtdc_copy`
- enh: `RTDCWriter.store_basin` now returns the basin hash/identifier
- fix: support input files without "events" in `rtdc_copy`
- fix: CLI methods now store basin information in output paths (#239):
compress, condense, and repack preserve the original input basins;
split writes mapped basins (using export.hdf5); join does not write
basins at all
- fix: enable nested basins for the HDF5, S3, and HTTP basins,
because we have the `_local_basins_allowed` property since 0.57.6
- fix: when joining datasets, the name of the log holding the configuration
of the source datasets should be src-#{i}_cfg instead of cfg_src-#{i}
- enh: introduce `RTDCBase.features_ancillary`, a list of all ancillary
features (computed on-the-fly) for a dataset
- enh: add `include_basins` keyword argument for `rtdc_copy`
- enh: `RTDCWriter.store_basin` now returns the basin hash/identifier
- enh: CLI methods now return the output path
- enh: the repack CLI method now allows to strip basins
- docs: add an advanced usage section on basins
- docs: update doc strings for CLI methods
- setup: pin nptdms<1.9, because tests started to fail
- ci: install lme4 from archive
- ref: modify basin `load_dataset` methods to support mapped basins
- ref: put input and output path in order for CLI methods
- tests: make sure basin features are not written automatically in
CLI methods (#246)
- ci: install lme4 from archive
0.57.7
- fix: raise a ValueError in HTTPFile when the server does not return
the status code 200
Expand Down
29 changes: 27 additions & 2 deletions dclab/cli/task_compress.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""Compress .rtdc files"""
from __future__ import annotations

import argparse
import pathlib
import warnings
Expand All @@ -13,8 +15,29 @@
from . import common


def compress(path_out=None, path_in=None, force=False, check_suffix=True):
"""Create a new dataset with all features compressed losslessly"""
def compress(
path_in: str | pathlib.Path = None,
path_out: str | pathlib.Path = None,
force: bool = False,
check_suffix: bool = True):
"""Create a new dataset with all features compressed lossless
Parameters
----------
path_in: str or pathlib.Path
file to compress
path_out: str or pathlib
output file path
force: bool
DEPRECATED
check_suffix: bool
check suffixes for input and output files
Returns
-------
path_out: pathlib.Path
output path (with possibly corrected suffix)
"""
cmp_kw = hdf5plugin.Zstd(clevel=5)
if path_out is None or path_in is None:
parser = compress_parser()
Expand Down Expand Up @@ -46,6 +69,7 @@ def compress(path_out=None, path_in=None, force=False, check_suffix=True):
rtdc_copy(src_h5file=h5,
dst_h5file=hc,
features="all",
include_basins=True,
include_logs=True,
include_tables=True,
meta_prefix="",
Expand Down Expand Up @@ -74,6 +98,7 @@ def compress(path_out=None, path_in=None, force=False, check_suffix=True):

# Finally, rename temp to out
path_temp.rename(path_out)
return path_out


def compress_parser():
Expand Down
122 changes: 97 additions & 25 deletions dclab/cli/task_condense.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,67 @@
"""Create .rtdc files with scalar-only features"""
from __future__ import annotations

import argparse
import pathlib
from typing import List
import warnings

import h5py
import hdf5plugin

from ..rtdc_dataset import fmt_hdf5, new_dataset, rtdc_copy, RTDCWriter
from ..rtdc_dataset import (
fmt_hdf5, new_dataset, rtdc_copy, RTDCWriter, RTDCBase
)
from .. import util
from .._version import version

from . import common


def condense(path_out=None, path_in=None, ancillaries=True,
check_suffix=True):
"""Create a new dataset with all (ancillary) scalar-only features"""
def condense(
path_in: str | pathlib.Path = None,
path_out: str | pathlib.Path = None,
ancillaries: bool = None,
store_ancillary_features: bool = True,
store_basin_features: bool = True,
check_suffix: bool = True):
"""Create a new dataset with all available scalar-only features
Besides the innate scalar features, this also includes all
fast-to-compute ancillary and all basin features (`features_loaded`).
Parameters
----------
path_in: str or pathlib.Path
file to compress
path_out: str or pathlib
output file path
ancillaries: bool
DEPRECATED, use `store_ancillary_features` instead
store_ancillary_features: bool
compute and store ancillary features in the output file
store_basin_features: bool
copy basin features from the input path to the output file
check_suffix: bool
check suffixes for input and output files
Returns
-------
path_out: pathlib.Path
output path (with possibly corrected suffix)
"""
if ancillaries is not None:
warnings.warn("Please use `store_ancillary_features` instead of "
"`ancillaries`", DeprecationWarning)
store_ancillary_features = ancillaries

if path_out is None or path_in is None:
parser = condense_parser()
args = parser.parse_args()
path_in = args.input
path_out = args.output
ancillaries = not args.no_ancillaries
store_ancillary_features = not args.no_ancillaries
store_basin_features = not args.no_basins

allowed_input_suffixes = [".rtdc", ".tdms"]
if not check_suffix:
Expand All @@ -32,19 +72,38 @@ def condense(path_out=None, path_in=None, ancillaries=True,

with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
with new_dataset(path_in) as ds, \
# We use `store_basin_features` during initialization (to avoid
# conflicts with ancillary features) and in the actual function
# as well, to correctly determine which features to use.
with new_dataset(path_in, enable_basins=store_basin_features) as ds, \
h5py.File(path_temp, "w") as h5_cond:
condense_dataset(ds=ds,
h5_cond=h5_cond,
ancillaries=ancillaries,
store_ancillary_features=store_ancillary_features,
store_basin_features=store_basin_features,
warnings_list=w)

# Finally, rename temp to out
path_temp.rename(path_out)
return path_out


def condense_dataset(
ds: RTDCBase,
h5_cond: h5py.File,
ancillaries: bool = None,
store_ancillary_features: bool = True,
store_basin_features: bool = True,
warnings_list: List = None):
"""Condense a dataset using low-level HDF5 methods
For ancillary and basin features, high-level dclab methods are used.
"""
if ancillaries is not None:
warnings.warn("Please use `store_ancillary_features` instead of "
"`ancillaries`", DeprecationWarning)
store_ancillary_features = ancillaries

def condense_dataset(ds, h5_cond, ancillaries=True, warnings_list=None):
"""Condense a dataset using low-level HDF5 methods"""
cmp_kw = hdf5plugin.Zstd(clevel=5)
cmd_dict = {}

Expand All @@ -55,6 +114,7 @@ def condense_dataset(ds, h5_cond, ancillaries=True, warnings_list=None):
rtdc_copy(src_h5file=ds.h5file,
dst_h5file=h5_cond,
features="scalar",
include_basins=True,
include_logs=True,
include_tables=True,
meta_prefix="")
Expand All @@ -65,40 +125,44 @@ def condense_dataset(ds, h5_cond, ancillaries=True, warnings_list=None):
feats_sc = ds.features_scalar
# loaded (computationally cheap) scalar features
feats_sc_in = [f for f in ds.features_loaded if f in feats_sc]
# ancillary features
feats_sc_anc = list(set(feats_sc) - set(feats_sc_in))

cmd_dict["features_original_innate"] = ds.features_innate

if ancillaries:
features = feats_sc
cmd_dict["features_computed"] = feats_sc_anc
features = set(feats_sc_in)
if store_basin_features:
feats_sc_basin = [f for f in ds.features_basin if
(f in feats_sc and f not in feats_sc_in)]
cmd_dict["features_basin"] = feats_sc_basin
if feats_sc_basin:
print(f"Using basin features {feats_sc_basin}")
features |= set(feats_sc_basin)

if store_ancillary_features:
feats_sc_anc = [f for f in ds.features_ancillary if
(f in feats_sc and f not in feats_sc_in)]
cmd_dict["features_ancillary"] = feats_sc_anc
if feats_sc_anc:
print("Computing ancillary features:",
" ".join(feats_sc_anc))
else:
print("No ancillary features to compute.")
else:
features = feats_sc_in
features |= set(feats_sc_anc)
print(f"Using ancillary features {feats_sc_anc}")

# command log
logs = {"dclab-condense": common.get_command_log(
paths=[ds.path], custom_dict=cmd_dict)}

# rename old dclab-condense logs
for lkey in ["dclab-condense", "dclab-condense-warnings"]:
if lkey in h5_cond["logs"]:
for l_key in ["dclab-condense", "dclab-condense-warnings"]:
if l_key in h5_cond["logs"]:
# This is cached, so no worry calling it multiple times.
md5_cfg = util.hashobj(ds.config)
# rename
new_log_name = f"{lkey}_{md5_cfg}"
new_log_name = f"{l_key}_{md5_cfg}"
if new_log_name not in h5_cond["logs"]:
# If the user repeatedly condensed one file, then there is
# no benefit in storing the log under a different name (the
# metadata did not change). Only write the log if it does
# not already exist.
h5_cond["logs"][f"{lkey}_{md5_cfg}"] = h5_cond["logs"][lkey]
del h5_cond["logs"][lkey]
h5_cond["logs"][f"{l_key}_{md5_cfg}"] = h5_cond["logs"][l_key]
del h5_cond["logs"][l_key]

with RTDCWriter(h5_cond,
mode="append",
Expand Down Expand Up @@ -135,6 +199,14 @@ def condense_parser():
help='Do not compute expensive ancillary features '
'such as volume'
)
parser.set_defaults(no_ancillaries=False)
parser.add_argument('--no-basin-features',
dest='no_basins',
action='store_true',
help='Do not store basin-based feature data from the '
'input file in the output file'
)
parser.set_defaults(no_basins=False)
parser.add_argument('--version', action='version',
version=f'dclab-condense {version}')
return parser
45 changes: 38 additions & 7 deletions dclab/cli/task_join.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
"""Concatenate .rtdc files"""
from __future__ import annotations

import argparse
import pathlib
import time
from typing import Dict, List
import warnings

import hdf5plugin
Expand All @@ -17,8 +21,33 @@ class FeatureSetNotIdenticalJoinWarning(UserWarning):
pass


def join(path_out=None, paths_in=None, metadata=None):
"""Join multiple RT-DC measurements into a single .rtdc file"""
def join(
paths_in: List[str | pathlib.Path] = None,
path_out: str | pathlib.Path = None,
metadata: Dict = None):
"""Join multiple RT-DC measurements into a single .rtdc file
Parameters
----------
paths_in: list of paths
input paths to join
path_out: str or pathlib.Path
output path
metadata: dict
optional metadata dictionary (configuration dict) to store
in the output file
Returns
-------
path_out: pathlib.Path
output path (with corrected path suffix if applicable)
Notes
-----
The first input file defines the metadata written to the output
file. Only features that are present in all input files are written
to the output file.
"""
cmp_kw = hdf5plugin.Zstd(clevel=5)
if metadata is None:
metadata = {"experiment": {"run index": 1}}
Expand Down Expand Up @@ -49,18 +78,18 @@ def join(path_out=None, paths_in=None, metadata=None):
logs = {"dclab-join": common.get_command_log(paths=sorted_paths)}

# Determine temporal offsets
toffsets = np.zeros(len(sorted_paths), dtype=np.float64)
t_offsets = np.zeros(len(sorted_paths), dtype=np.float64)
for ii, pp in enumerate(sorted_paths):
with new_dataset(pp) as dsb:
etime = dsb.config["experiment"]["time"]
st = time.strptime(dsb.config["experiment"]["date"]
+ etime[:8],
"%Y-%m-%d%H:%M:%S")
toffsets[ii] = time.mktime(st)
t_offsets[ii] = time.mktime(st)
if len(etime) > 8:
# floating point time stored as well (HH:MM:SS.SS)
toffsets[ii] += float(etime[8:])
toffsets -= toffsets[0]
t_offsets[ii] += float(etime[8:])
t_offsets -= t_offsets[0]

# Determine features to export (based on first file)
with warnings.catch_warnings(record=True) as w:
Expand Down Expand Up @@ -111,6 +140,7 @@ def join(path_out=None, paths_in=None, metadata=None):
override=True,
logs=True,
tables=True,
basins=False,
meta_prefix="src-#1_",
compression_kwargs=cmp_kw)
# store configuration
Expand All @@ -124,7 +154,7 @@ def join(path_out=None, paths_in=None, metadata=None):
hw.store_log(name="src-#1_cfg", lines=cfg0)
ii = 1
# Append data from other files
for pi, ti in zip(sorted_paths[1:], toffsets[1:]):
for pi, ti in zip(sorted_paths[1:], t_offsets[1:]):
ii += 1 # we start with the second dataset
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
Expand Down Expand Up @@ -174,6 +204,7 @@ def join(path_out=None, paths_in=None, metadata=None):

# Finally, rename temp to out
path_temp.rename(path_out)
return path_out


def join_parser():
Expand Down
Loading

0 comments on commit abc9c44

Please sign in to comment.