diff --git a/docs/changes/2598.feature.rst b/docs/changes/2598.feature.rst new file mode 100644 index 00000000000..16f05623c72 --- /dev/null +++ b/docs/changes/2598.feature.rst @@ -0,0 +1,2 @@ +The provenance system now records the reference metadata +of input and output files, if available. diff --git a/src/ctapipe/core/provenance.py b/src/ctapipe/core/provenance.py index 2dc6527af3a..a8f602e127f 100644 --- a/src/ctapipe/core/provenance.py +++ b/src/ctapipe/core/provenance.py @@ -11,6 +11,7 @@ import platform import sys import uuid +import warnings from collections import UserList from contextlib import contextmanager from importlib import import_module @@ -57,6 +58,10 @@ def get_module_version(name): return "not installed" +class MissingReferenceMetadata(UserWarning): + """Warning raised if reference metadata could not be read from input file.""" + + class Provenance(metaclass=Singleton): """ Manage the provenance info for a stack of *activities* @@ -91,7 +96,7 @@ def _get_current_or_start_activity(self): return self.current_activity - def add_input_file(self, filename, role=None): + def add_input_file(self, filename, role=None, add_meta=True): """register an input to the current activity Parameters @@ -102,14 +107,14 @@ def add_input_file(self, filename, role=None): role this input file satisfies (optional) """ activity = self._get_current_or_start_activity() - activity.register_input(abspath(filename), role=role) + activity.register_input(abspath(filename), role=role, add_meta=add_meta) log.debug( "added input entity '%s' to activity: '%s'", filename, activity.name, ) - def add_output_file(self, filename, role=None): + def add_output_file(self, filename, role=None, add_meta=True): """ register an output to the current activity @@ -122,7 +127,7 @@ def add_output_file(self, filename, role=None): """ activity = self._get_current_or_start_activity() - activity.register_output(abspath(filename), role=role) + activity.register_output(abspath(filename), role=role, add_meta=add_meta) log.debug( "added output entity '%s' to activity: '%s'", filename, @@ -239,7 +244,7 @@ def start(self): self._prov["start"].update(_sample_cpu_and_memory()) self._prov["system"].update(_get_system_provenance()) - def register_input(self, url, role=None): + def register_input(self, url, role=None, add_meta=True): """ Add a URL of a file to the list of inputs (can be a filename or full url, if no URL specifier is given, assume 'file://') @@ -250,22 +255,37 @@ def register_input(self, url, role=None): filename or url of input file role: str role name that this input satisfies + add_meta: bool + If true, try to load reference metadata from input file + and add to provenance. """ - self._prov["input"].append(dict(url=url, role=role)) + reference_meta = self._get_reference_meta(url=url) if add_meta else None + self._prov["input"].append( + dict(url=url, role=role, reference_meta=reference_meta) + ) - def register_output(self, url, role=None): + def register_output(self, url, role=None, add_meta=True): """ Add a URL of a file to the list of outputs (can be a filename or full url, if no URL specifier is given, assume 'file://') + Should only be called once the file is finalized, so that reference metadata + can be read. + Parameters ---------- url: str filename or url of output file role: str role name that this output satisfies + add_meta: bool + If true, try to load reference metadata from input file + and add to provenance. """ - self._prov["output"].append(dict(url=url, role=role)) + reference_meta = self._get_reference_meta(url=url) if add_meta else None + self._prov["output"].append( + dict(url=url, role=role, reference_meta=reference_meta) + ) def register_config(self, config): """add a dictionary of configuration parameters to this activity""" @@ -302,6 +322,18 @@ def sample_cpu_and_memory(self): def provenance(self): return self._prov + def _get_reference_meta(self, url): + # here to prevent circular imports / top-level cross-dependencies + from ..io.metadata import read_reference_metadata + + try: + return read_reference_metadata(url).to_dict() + except Exception: + warnings.warn( + f"Could not read reference metadata for input file: {url}", + MissingReferenceMetadata, + ) + def _get_python_packages(): def _sortkey(dist): diff --git a/src/ctapipe/core/tests/test_provenance.py b/src/ctapipe/core/tests/test_provenance.py index f34ff6d9c3a..465e11ba27e 100644 --- a/src/ctapipe/core/tests/test_provenance.py +++ b/src/ctapipe/core/tests/test_provenance.py @@ -4,6 +4,7 @@ from ctapipe.core import Provenance from ctapipe.core.provenance import _ActivityProvenance +from ctapipe.io.metadata import Reference @pytest.fixture @@ -15,19 +16,18 @@ def provenance(monkeypatch): prov = Provenance() monkeypatch.setattr(prov, "_activities", []) monkeypatch.setattr(prov, "_finished_activities", []) - - prov.start_activity("test1") - prov.add_input_file("input.txt") - prov.add_output_file("output.txt") - prov.start_activity("test2") - prov.add_input_file("input_a.txt") - prov.add_input_file("input_b.txt") - prov.finish_activity("test2") - prov.finish_activity("test1") return prov def test_provenance_activity_names(provenance): + provenance.start_activity("test1") + provenance.add_input_file("input.txt") + provenance.add_output_file("output.txt") + provenance.start_activity("test2") + provenance.add_input_file("input_a.txt") + provenance.add_input_file("input_b.txt") + provenance.finish_activity("test2") + provenance.finish_activity("test1") assert set(provenance.finished_activity_names) == {"test2", "test1"} @@ -52,6 +52,8 @@ def test_provenence_contextmanager(): def test_provenance_json(provenance: Provenance): + provenance.start_activity("test1") + provenance.finish_activity("test1") data = json.loads(provenance.as_json()) activity = data[0] @@ -60,3 +62,17 @@ def test_provenance_json(provenance: Provenance): packages = activity["system"]["python"].get("packages") assert isinstance(packages, list) assert any(p["name"] == "numpy" for p in packages) + + +def test_provenance_input_reference_meta(provenance: Provenance, dl1_file): + provenance.start_activity("test1") + provenance.add_input_file(dl1_file, "events") + provenance.finish_activity("test1") + data = json.loads(provenance.as_json()) + + inputs = data[0]["input"] + assert len(inputs) == 1 + input_meta = inputs[0] + assert "reference_meta" in input_meta + assert "CTA PRODUCT ID" in input_meta["reference_meta"] + Reference.from_dict(input_meta["reference_meta"]) diff --git a/src/ctapipe/core/tool.py b/src/ctapipe/core/tool.py index e1f7c2e2746..3579ba3c1bd 100644 --- a/src/ctapipe/core/tool.py +++ b/src/ctapipe/core/tool.py @@ -283,7 +283,7 @@ def load_config_file(self, path: str | pathlib.Path) -> None: # fall back to traitlets.config.Application's implementation super().load_config_file(str(path)) - Provenance().add_input_file(path, role="Tool Configuration") + Provenance().add_input_file(path, role="Tool Configuration", add_meta=False) def update_logging_config(self): """Update the configuration of loggers.""" diff --git a/src/ctapipe/io/datawriter.py b/src/ctapipe/io/datawriter.py index d0f93905f66..3119bc96695 100644 --- a/src/ctapipe/io/datawriter.py +++ b/src/ctapipe/io/datawriter.py @@ -388,6 +388,7 @@ def finish(self): self._write_context_metadata_headers() self._writer.close() + PROV.add_output_file(str(self.output_path), role="DL1/Event") @property def datalevels(self): @@ -432,7 +433,6 @@ def _setup_output_path(self): ", use the `overwrite` option or choose another `output_path` " ) self.log.debug("output path: %s", self.output_path) - PROV.add_output_file(str(self.output_path), role="DL1/Event") # check that options make sense writable_things = [ diff --git a/src/ctapipe/io/hdf5merger.py b/src/ctapipe/io/hdf5merger.py index 4161dba0878..95d0a8b6909 100644 --- a/src/ctapipe/io/hdf5merger.py +++ b/src/ctapipe/io/hdf5merger.py @@ -183,7 +183,6 @@ def __init__(self, output_path=None, **kwargs): mode="a" if appending else "w", filters=DEFAULT_FILTERS, ) - Provenance().add_output_file(str(self.output_path)) self.required_nodes = None self.data_model_version = None @@ -247,7 +246,7 @@ def _update_meta(self): def _read_meta(self, h5file): try: - return metadata.Reference.from_dict(metadata.read_metadata(h5file)) + return metadata._read_reference_metadata_hdf5(h5file) except Exception: raise CannotMerge( f"CTA Reference meta not found in input file: {h5file.filename}" @@ -384,6 +383,7 @@ def __exit__(self, exc_type, exc_value, traceback): def close(self): if hasattr(self, "h5file"): self.h5file.close() + Provenance().add_output_file(str(self.output_path)) def _append_subarray(self, other): # focal length choice doesn't matter here, set to equivalent so we don't get diff --git a/src/ctapipe/io/metadata.py b/src/ctapipe/io/metadata.py index b4fd670144f..2aef43d111e 100644 --- a/src/ctapipe/io/metadata.py +++ b/src/ctapipe/io/metadata.py @@ -22,6 +22,7 @@ some_astropy_table.write("output.ecsv") """ +import gzip import os import uuid import warnings @@ -29,6 +30,8 @@ from contextlib import ExitStack import tables +from astropy.io import fits +from astropy.table import Table from astropy.time import Time from tables import NaturalNameWarning from traitlets import Enum, HasTraits, Instance, List, Unicode, UseEnum, default @@ -45,7 +48,8 @@ "Activity", "Instrument", "write_to_hdf5", - "read_metadata", + "read_hdf5_metadata", + "read_reference_metadata", ] @@ -295,6 +299,12 @@ def from_dict(cls, metadata): instrument=Instrument(**kwargs["instrument"]), ) + @classmethod + def from_fits(cls, header): + # for now, just use from_dict, but we might need special handling + # of some keys + return cls.from_dict(header) + def __repr__(self): return str(self.to_dict()) @@ -320,27 +330,72 @@ def write_to_hdf5(metadata, h5file, path="/"): node._v_attrs[key] = value # pylint: disable=protected-access -def read_metadata(h5file, path="/"): +def read_hdf5_metadata(h5file, path="/"): + """ + Read hdf5 attributes into a dict + """ + with ExitStack() as stack: + if not isinstance(h5file, tables.File): + h5file = stack.enter_context(tables.open_file(h5file)) + + node = h5file.get_node(path) + return {key: node._v_attrs[key] for key in node._v_attrs._f_list()} + + +def read_reference_metadata(path): + """Read CTAO data product metadata from path + + File is first opened to determine file format, then the metadata + is read. Supported are currently FITS and HDF5. """ - Read metadata from an hdf5 file + header_bytes = 8 + with open(path, "rb") as f: + first_bytes = f.read(header_bytes) + + if first_bytes.startswith(b"\x1f\x8b"): + with gzip.open(path, "rb") as f: + first_bytes = f.read(header_bytes) + + if first_bytes.startswith(b"\x89HDF"): + return _read_reference_metadata_hdf5(path) + + if first_bytes.startswith(b"SIMPLE"): + return _read_reference_metadata_fits(path) + + if first_bytes.startswith(b"# %ECSV"): + return Reference.from_dict(Table.read(path).meta) + + raise ValueError( + f"'{path}' is not one of the supported file formats: fits, hdf5, ecsv" + ) + + +def _read_reference_metadata_hdf5(h5file, path="/"): + meta = read_hdf5_metadata(h5file, path) + return Reference.from_dict(meta) + + +def _read_reference_metadata_ecsv(path): + return Reference.from_dict(Table.read(path).meta) + + +def _read_reference_metadata_fits(fitsfile, hdu: int | str = 0): + """ + Read reference metadata from a fits file Parameters ---------- - h5filename: string, Path, or `tables.file.File` + fitsfile: string, Path, or `tables.file.File` hdf5 file - path: string - default: '/' is the path to ctapipe global metadata + hdu: int or str + HDU index or name. Returns ------- - metadata: dictionary + reference_metadata: Reference """ with ExitStack() as stack: - if not isinstance(h5file, tables.File): - h5file = stack.enter_context(tables.open_file(h5file)) - - node = h5file.get_node(path) - metadata = {key: node._v_attrs[key] for key in node._v_attrs._f_list()} - return metadata + if not isinstance(fitsfile, fits.HDUList): + fitsfile = stack.enter_context(fits.open(fitsfile)) - raise OSError("Could not read metadata") + return Reference.from_fits(fitsfile[hdu].header) diff --git a/src/ctapipe/io/simteleventsource.py b/src/ctapipe/io/simteleventsource.py index e4ff831dacb..cf89b11b461 100644 --- a/src/ctapipe/io/simteleventsource.py +++ b/src/ctapipe/io/simteleventsource.py @@ -345,8 +345,12 @@ def read_atmosphere_profile_from_simtel( if isinstance(simtelfile, str | Path): context_manager = SimTelFile(simtelfile) + # FIXME: simtel files currently do not have CTAO reference + # metadata, should be set to True once we store metadata Provenance().add_input_file( - filename=simtelfile, role="ctapipe.atmosphere.AtmosphereDensityProfile" + filename=simtelfile, + role="ctapipe.atmosphere.AtmosphereDensityProfile", + add_meta=False, ) else: diff --git a/src/ctapipe/io/tests/test_metadata.py b/src/ctapipe/io/tests/test_metadata.py index a69e9be76ab..aa242b0448f 100644 --- a/src/ctapipe/io/tests/test_metadata.py +++ b/src/ctapipe/io/tests/test_metadata.py @@ -1,16 +1,19 @@ """ Test CTA Reference metadata functionality """ +import uuid +import pytest import tables +from astropy.io import fits +from astropy.table import Table from ctapipe.core.provenance import Provenance from ctapipe.io import metadata as meta -def test_construct_and_write_metadata(tmp_path): - """basic test of making a Reference object and writing it""" - +@pytest.fixture() +def reference(): prov = Provenance() prov.start_activity("test") prov.finish_activity() @@ -43,41 +46,58 @@ def test_construct_and_write_metadata(tmp_path): id_="threshold", ), ) + return reference + +def test_to_dict(reference): + """Test for Reference.to_dict""" ref_dict = reference.to_dict() assert ref_dict["CTA PRODUCT FORMAT"] == "hdf5" assert ref_dict["CTA PRODUCT DATA LEVELS"] == "DL1_IMAGES,DL1_PARAMETERS" + assert str(uuid.UUID(ref_dict["CTA PRODUCT ID"])) == ref_dict["CTA PRODUCT ID"] - import uuid # pylint: disable=import-outside-toplevel - assert str(uuid.UUID(ref_dict["CTA PRODUCT ID"])) == ref_dict["CTA PRODUCT ID"] +def test_from_dict(reference): + as_dict = reference.to_dict() + back = meta.Reference.from_dict(as_dict) + assert back.to_dict() == as_dict + + +@pytest.mark.parametrize("format", ("fits", "fits.gz")) +def test_reference_metadata_fits(tmp_path, format, reference): + """Test for writing reference metadata""" + path = tmp_path / f"test.{format}" + + hdul = fits.HDUList(fits.PrimaryHDU()) + hdul[0].header.update(reference.to_dict(fits=True)) + hdul.writeto(path) + + back = meta.read_reference_metadata(path) + assert back.to_dict() == reference.to_dict() + + +def test_reference_metadata_h5(tmp_path, reference): + path = tmp_path / "test.h5" - # check that we can write this to the header of a typical table file in multiple - # formats: - from astropy.table import Table # pylint: disable=import-outside-toplevel + with tables.open_file(path, "w") as f: + meta.write_to_hdf5(reference.to_dict(), f) - table = Table(dict(x=[1, 2, 3], y=[15.2, 15.2, 14.5])) - for path in [tmp_path / "test.fits", tmp_path / "test.ecsv"]: - if ".fits" in path.suffixes: - reference.format = "fits" - ref_dict = reference.to_dict(fits=True) - else: - reference.format = "ecsv" - ref_dict = reference.to_dict() + back = meta.read_reference_metadata(path) + assert back.to_dict() == reference.to_dict() - table.meta = ref_dict - table.write(path) - # write to pytables file +def test_reference_metadata_ecsv(tmp_path, reference): + path = tmp_path / "test.ecsv" - import tables # pylint: disable=import-outside-toplevel + t = Table({"a": [1, 2, 3], "b": [4, 5, 6]}) + t.meta.update(reference.to_dict()) + t.write(path) - with tables.open_file(tmp_path / "test.h5", mode="w") as h5file: - h5file.create_group(where="/", name="node") - meta.write_to_hdf5(ref_dict, h5file, path="/node") + back = meta.read_reference_metadata(path) + assert back.to_dict() == reference.to_dict() -def test_read_metadata(tmp_path): +def test_read_hdf5_metadata(tmp_path): # Testing one can read both a path as well as a PyTables file object filename = tmp_path / "test.h5" metadata_in = {"SOFTWARE": "ctapipe", "FOO": "BAR"} @@ -86,49 +106,10 @@ def test_read_metadata(tmp_path): h5file.create_group(where="/node", name="subnode", createparents=True) meta.write_to_hdf5(metadata_in, h5file, path=metadata_path) - metadata_out = meta.read_metadata(filename, path=metadata_path) + metadata_out = meta.read_hdf5_metadata(filename, path=metadata_path) assert metadata_out == metadata_in with tables.open_file(filename, "r") as file: - metadata_out = meta.read_metadata(file, path=metadata_path) + metadata_out = meta.read_hdf5_metadata(file, path=metadata_path) assert metadata_out == metadata_in - - -def test_from_dict(): - prov = Provenance() - prov.start_activity("test") - prov.finish_activity() - prov_activity = prov.finished_activities[0] - - reference = meta.Reference( - contact=meta.Contact( - name="Somebody", - email="a@b.com", - organization="CTA Consortium", - ), - product=meta.Product( - description="An Amazing Product", - creation_time="2020-10-11 15:23:31", - data_category="Sim", - data_levels=["DL1_IMAGES", "DL1_PARAMETERS"], - data_association="Subarray", - data_model_name="Unofficial DL1", - data_model_version="1.0", - data_model_url="https://example.org", - format="hdf5", - ), - process=meta.Process(type_="Simulation", subtype="Prod3b", id_="423442"), - activity=meta.Activity.from_provenance(prov_activity.provenance), - instrument=meta.Instrument( - site="CTA-North", - class_="Array", - type_="Layout H1B", - version="1.0", - id_="threshold", - ), - ) - - as_dict = reference.to_dict() - back = meta.Reference.from_dict(as_dict) - assert back.to_dict() == as_dict diff --git a/src/ctapipe/reco/reconstructor.py b/src/ctapipe/reco/reconstructor.py index 24b708e9d4a..d09c7d4b362 100644 --- a/src/ctapipe/reco/reconstructor.py +++ b/src/ctapipe/reco/reconstructor.py @@ -150,7 +150,8 @@ def read(cls, path, parent=None, subarray=None, **kwargs): for attr, value in kwargs.items(): setattr(instance, attr, value) - Provenance().add_input_file(path, role="reconstructor") + # FIXME: we currently don't store metadata in the joblib / pickle files, see #2603 + Provenance().add_input_file(path, role="reconstructor", add_meta=False) return instance diff --git a/src/ctapipe/reco/sklearn.py b/src/ctapipe/reco/sklearn.py index 6b6c9cc133f..19655dc1a0c 100644 --- a/src/ctapipe/reco/sklearn.py +++ b/src/ctapipe/reco/sklearn.py @@ -670,7 +670,8 @@ def read(cls, path, **kwargs): f"{path} did not contain an instance of {cls}, got {instance}" ) - Provenance().add_input_file(path, role="ml-models") + # FIXME: we currently don't store metadata in the joblib / pickle files, see #2603 + Provenance().add_input_file(path, role="ml-models", add_meta=False) return instance @lazyproperty diff --git a/src/ctapipe/tools/dump_instrument.py b/src/ctapipe/tools/dump_instrument.py index 7f169acdefb..a3ad471696b 100644 --- a/src/ctapipe/tools/dump_instrument.py +++ b/src/ctapipe/tools/dump_instrument.py @@ -91,11 +91,15 @@ def write_camera_definitions(self): try: geom_table.write(geom_filename, **args) - readout_table.write(readout_filename, **args) Provenance().add_output_file(geom_filename, "CameraGeometry") + except OSError as err: + self.log.exception("couldn't write camera geometry because: %s", err) + + try: + readout_table.write(readout_filename, **args) Provenance().add_output_file(readout_filename, "CameraReadout") except OSError as err: - self.log.warning("couldn't write camera definition because: %s", err) + self.log.exception("couldn't write camera definition because: %s", err) def write_optics_descriptions(self): """writes out optics files for each telescope type""" @@ -109,7 +113,7 @@ def write_optics_descriptions(self): tab.write(filename, **args) Provenance().add_output_file(filename, "OpticsDescription") except OSError as err: - self.log.warning( + self.log.exception( "couldn't write optics description '%s' because: %s", filename, err ) @@ -123,7 +127,7 @@ def write_subarray_description(self): tab.write(filename, **args) Provenance().add_output_file(filename, "SubarrayDescription") except OSError as err: - self.log.warning( + self.log.exception( "couldn't write subarray description '%s' because: %s", filename, err ) diff --git a/src/ctapipe/tools/merge.py b/src/ctapipe/tools/merge.py index ba9daad1549..3665d504cd4 100644 --- a/src/ctapipe/tools/merge.py +++ b/src/ctapipe/tools/merge.py @@ -161,7 +161,7 @@ def setup(self): ) sys.exit(1) - self.merger = HDF5Merger(parent=self) + self.merger = self.enter_context(HDF5Merger(parent=self)) if self.merger.output_path in self.input_files: raise ToolConfigurationError( "Output path contained in input files. Fix your configuration / cli arguments." @@ -195,7 +195,6 @@ def finish(self): current_activity = Provenance().current_activity.provenance self.merger.meta.activity = meta.Activity.from_provenance(current_activity) meta.write_to_hdf5(self.merger.meta.to_dict(), self.merger.h5file) - self.merger.close() def main():