From 84bb350d43130b4a5db93250eac8791f390f9db8 Mon Sep 17 00:00:00 2001 From: Chris Byrohl Date: Thu, 10 Aug 2023 14:57:58 +0200 Subject: [PATCH 1/4] start restructure relevant tests --- tests/customs/test_arepo.py | 47 ++++++++++++++++++++++++++++++++++++- tests/test_interface.py | 45 ----------------------------------- 2 files changed, 46 insertions(+), 46 deletions(-) diff --git a/tests/customs/test_arepo.py b/tests/customs/test_arepo.py index 5c409dde..375d8e39 100644 --- a/tests/customs/test_arepo.py +++ b/tests/customs/test_arepo.py @@ -1,7 +1,9 @@ import dask.array as da +import numpy as np +import pint from scida import load -from tests.testdata_properties import require_testdata_path +from tests.testdata_properties import require_testdata, require_testdata_path @require_testdata_path("interface", only=["TNG50-4_snapshot"]) @@ -14,3 +16,46 @@ def test_selector(testdatapath): d = obj.return_data(unbound=True) # unbound gas has groupid = max_int64 = 9223372036854775807 assert da.all(d["PartType0"]["GroupID"] == 9223372036854775807).compute() + + +@require_testdata("areposnapshot_withcatalog", only=["TNG50-4_snapshot"]) +def test_interface_groupedoperations(testdata_areposnapshot_withcatalog): + snp = testdata_areposnapshot_withcatalog + # check bound mass sums as a start + g = snp.grouped("Masses") + boundmass = np.sum(g.sum().evaluate()) + boundmass2 = da.sum( + snp.data["PartType0"]["Masses"][: np.sum(snp.get_grouplengths())] + ).compute() + if isinstance(boundmass2, pint.Quantity): + boundmass2 = boundmass2.magnitude + assert np.isclose(boundmass, boundmass2) + # Test chaining + assert np.sum(g.half().sum().evaluate()) < np.sum(g.sum().evaluate()) + + # Test custom function apply + assert np.allclose( + g.apply(lambda x: x[::2]).sum().evaluate(), g.half().sum().evaluate() + ) + + # Test unspecified fieldnames when grouping + g2 = snp.grouped() + + def customfunc1(arr, fieldnames="Masses"): + return arr[::2] + + s = g2.apply(customfunc1).sum() + assert np.allclose(s.evaluate(), g.half().sum().evaluate()) + + # Test custom dask array input + arr = snp.data["PartType0"]["Density"] * snp.data["PartType0"]["Masses"] + boundvol2 = snp.grouped(arr).sum().evaluate().sum() + assert 0.0 < boundvol2 < 1.0 + + # Test multifield + def customfunc2(dens, vol, fieldnames=["Density", "Masses"]): + return dens * vol + + s = g2.apply(customfunc2).sum() + boundvol = s.evaluate().sum() + assert np.isclose(boundvol, boundvol2) diff --git a/tests/test_interface.py b/tests/test_interface.py index bc2c68c9..85b15911 100644 --- a/tests/test_interface.py +++ b/tests/test_interface.py @@ -1,8 +1,6 @@ import time -import dask.array as da import numpy as np -import pint from scida.convenience import load from scida.customs.gadgetstyle.dataset import GadgetStyleSnapshot @@ -145,49 +143,6 @@ def calculate_haloid(GroupID, parttype="PartType0"): assert np.all(partcount == snap.data["Group"]["GroupLenType"][:, 0].compute()) -@require_testdata("areposnapshot_withcatalog", only=["TNG50-4_snapshot"]) -def test_interface_groupedoperations(testdata_areposnapshot_withcatalog): - snp = testdata_areposnapshot_withcatalog - # check bound mass sums as a start - g = snp.grouped("Masses") - boundmass = np.sum(g.sum().evaluate()) - boundmass2 = da.sum( - snp.data["PartType0"]["Masses"][: np.sum(snp.get_grouplengths())] - ).compute() - if isinstance(boundmass2, pint.Quantity): - boundmass2 = boundmass2.magnitude - assert np.isclose(boundmass, boundmass2) - # Test chaining - assert np.sum(g.half().sum().evaluate()) < np.sum(g.sum().evaluate()) - - # Test custom function apply - assert np.allclose( - g.apply(lambda x: x[::2]).sum().evaluate(), g.half().sum().evaluate() - ) - - # Test unspecified fieldnames when grouping - g2 = snp.grouped() - - def customfunc1(arr, fieldnames="Masses"): - return arr[::2] - - s = g2.apply(customfunc1).sum() - assert np.allclose(s.evaluate(), g.half().sum().evaluate()) - - # Test custom dask array input - arr = snp.data["PartType0"]["Density"] * snp.data["PartType0"]["Masses"] - boundvol2 = snp.grouped(arr).sum().evaluate().sum() - assert 0.0 < boundvol2 < 1.0 - - # Test multifield - def customfunc2(dens, vol, fieldnames=["Density", "Masses"]): - return dens * vol - - s = g2.apply(customfunc2).sum() - boundvol = s.evaluate().sum() - assert np.isclose(boundvol, boundvol2) - - @require_testdata("illustrissnapshot_withcatalog") def test_areposnapshot_load_withcatalog(testdata_illustrissnapshot_withcatalog): snp = testdata_illustrissnapshot_withcatalog From 56fadc541d5fc74799c40f6adb241d3ab93955a8 Mon Sep 17 00:00:00 2001 From: Chris Byrohl <9221545+cbyrohl@users.noreply.github.com> Date: Thu, 10 Aug 2023 17:31:12 +0200 Subject: [PATCH 2/4] increase/improve of dummy files in testing --- tests/conftest.py | 19 ++++++ tests/helpers.py | 163 +++++++++++++++++++++++++------------------- tests/test_units.py | 31 ++++----- 3 files changed, 127 insertions(+), 86 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 915b12b8..7804a415 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,6 +6,7 @@ from scida.config import get_config from scida.customs.gadgetstyle.dataset import GadgetStyleSnapshot from scida.series import DatasetSeries +from tests.helpers import DummyGadgetFile, DummyTNGFile flag_test_long = False # Set to true to run time-taking tests. @@ -66,3 +67,21 @@ def cleancache(cachedir): """Always start with empty cache.""" get_config(reload=True) return cachedir + + +# dummy gadgetstyle snapshot fixtures + +@pytest.fixture +def gadgetfile_dummy(tmp_path): + dummy = DummyGadgetFile() + dummy.write(tmp_path / "dummy.hdf5") + return dummy + + +@pytest.fixture +def tngfile_dummy(tmp_path): + dummy = DummyTNGFile() + print(dummy.particles["PartType0"]) + dummy.write(tmp_path / "dummy.hdf5") + return dummy + diff --git a/tests/helpers.py b/tests/helpers.py index 4f9b0dad..97a67057 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -9,76 +9,101 @@ def write_hdf5flat_testfile(path): hf["field2"] = np.arange(10) -def write_gadget_testfile(path): - with h5py.File(path, "w") as hf: - # a bunch of attributes/groups we let GitHub Copilot create - hf.attrs["NumPart_ThisFile"] = [0, 0, 0, 0, 0, 0] - hf.attrs["NumPart_Total"] = [0, 0, 0, 0, 0, 0] - hf.attrs["NumPart_Total_HighWord"] = [0, 0, 0, 0, 0, 0] - hf.attrs["MassTable"] = [0, 0, 0, 0, 0, 0] - hf.attrs["Time"] = 0.0 - hf.attrs["Redshift"] = 0.0 - hf.attrs["BoxSize"] = 0.0 - hf.attrs["NumFilesPerSnapshot"] = 0 - hf.attrs["Omega0"] = 0.0 - hf.attrs["OmegaLambda"] = 0.0 - hf.attrs["HubbleParam"] = 0.0 - hf.attrs["Flag_Sfr"] = 0 - hf.attrs["Flag_Cooling"] = 0 - hf.attrs["Flag_StellarAge"] = 0 - hf.attrs["Flag_Metals"] = 0 - hf.attrs["Flag_Feedback"] = 0 - hf.attrs["Flag_DoublePrecision"] = 0 - hf.attrs["Flag_IC_Info"] = 0 - hf.attrs["Flag_LptInitCond"] = 0 - hf.attrs["Flag_Metals"] = 0 - hf.attrs["Flag_Feedback"] = 0 - hf.attrs["Flag_DoublePrecision"] = 0 - hf.attrs["Flag_IC_Info"] = 0 - hf.attrs["Flag_LptInitCond"] = 0 - hf.attrs["Flag_Metals"] = 0 - hf.attrs["Flag_Feedback"] = 0 - hf.attrs["Flag_DoublePrecision"] = 0 - hf.attrs["Flag_IC_Info"] = 0 - hf.attrs["Flag_LptInitCond"] = 0 - hf.attrs["Flag_Metals"] = 0 - hf.attrs["Flag_Feedback"] = 0 - hf.attrs["Flag_DoublePrecision"] = 0 - hf.attrs["Flag_IC_Info"] = 0 - hf.attrs["Flag_LptInitCond"] = 0 - hf.attrs["Flag_Metals"] = 0 - hf.attrs["Flag_Feedback"] = 0 - hf.attrs["Flag_DoublePrecision"] = 0 - hf.attrs["Flag_IC_Info"] = 0 - hf.attrs["Flag_LptInitCond"] = 0 - hf.attrs["Flag_Metals"] = 0 - hf.attrs["Flag_Feedback"] = 0 - hf.attrs["Flag_DoublePrecision"] = 0 - hf.attrs["Flag_IC_Info"] = 0 - hf.attrs["Flag_LptInitCond"] = 0 - hf.attrs["Flag_Metals"] = 0 - hf.attrs["Flag_Feedback"] = 0 - hf.attrs["Flag_DoublePrecision"] = 0 - hf.attrs["Flag_IC_Info"] = 0 - hf.attrs["Flag_LptInitCond"] = 0 - hf.attrs["Flag_Metals"] = 0 - hf.attrs["Flag_Feedback"] = 0 - hf.attrs["Flag_DoublePrecision"] = 0 - hf.attrs["Flag_IC_Info"] = 0 - hf.attrs["Flag_LptInitCond"] = 0 +class DummyGadgetFile: + """Creates a dummy GadgetStyleSnapshot in memory that can be modified and saved to disk.""" + def __init__(self): + self.header = dict() + self.parameters = dict() + self.particles = dict() + # by default, create a dummy snapshot structure + self.create_dummyheader() + self.create_dummyparticles() + self.path = None # will be set on write + + def create_dummyheader(self, lengths=None): + if lengths is None: + lengths = [1, 1, 0, 1, 1, 1] + header = self.header + header["NumPart_ThisFile"] = lengths + header["NumPart_Total"] = lengths + header["NumPart_Total_HighWord"] = [0, 0, 0, 0, 0, 0] + header["MassTable"] = [0, 0, 0, 0, 0, 0] + attrs_float_keys = ["Time", "Redshift", "BoxSize"] + for key in attrs_float_keys: + header[key] = 0.0 + attrs_int_keys = ["NumFilesPerSnapshot", "Flag_Sfr", "Flag_Cooling", "Flag_StellarAge", "Flag_Metals", + "Flag_Feedback", "Flag_DoublePrecision", "Flag_IC_Info", "Flag_LptInitCond"] + for key in attrs_int_keys: + header[key] = 0 + # set "Omega0", "OmegaLambda", "HubbleParam" + header["Omega0"] = 0.3 + header["OmegaLambda"] = 0.7 + header["HubbleParam"] = 0.7 - lens = [] + def create_dummyparameters(self): + pass + + def create_dummyparticles(self, lengths=None): + pdata = self.particles + if lengths is None: + lengths = [1, 1, 0, 1, 1, 1] for i in range(6): - n = 1 - if i == 2: - n = 0 + n = lengths[i] gname = "PartType%i" % i - hf.create_group(gname) - hf[gname].create_dataset("Coordinates", (n, 3), dtype="f") - hf[gname].create_dataset("ParticleIDs", (n,), dtype="i") - hf[gname].create_dataset("Velocities", (n, 3), dtype="f") - lens.append(n) + pdata[gname] = dict() + pdata[gname]["Coordinates"] = np.zeros((n, 3), dtype=float) + pdata[gname]["ParticleIDs"] = np.zeros((n,), dtype=int) + pdata[gname]["Velocities"] = np.zeros((n, 3), dtype=float) + if gname == "PartType0": + pdata[gname]["Masses"] = np.zeros((n,), dtype=float) + pdata[gname]["Density"] = np.zeros((n,), dtype=float) + self.particles = pdata + + def write(self, path): + with h5py.File(path, "w") as hf: + grp = hf.create_group("Header") + for key in self.header: + grp.attrs[key] = self.header[key] + grp = hf.create_group("Parameters") + for key in self.parameters: + grp.attrs[key] = self.parameters[key] + for key in self.particles: + gname = key + hf.create_group(gname) + for field in self.particles[key]: + hf[gname].create_dataset(field, data=self.particles[key][field]) + self.path = path + - hf.create_group("Header") - hf["Header"].attrs["NumPart_ThisFile"] = lens - hf["Header"].attrs["NumPart_Total"] = lens +class DummyTNGFile(DummyGadgetFile): + def __init__(self): + super().__init__() + self.create_dummyheader() + self.create_dummyparameters() + + def create_dummyheader(self, lengths=None): + super().create_dummyheader(lengths) + header = self.header + header["UnitLength_in_cm"] = 3.085678e+21 + header["UnitMass_in_g"] = 1.989e+43 + header["UnitVelocity_in_cm_per_s"] = 100000.0 + + def create_dummyparameters(self): + super().create_dummyparameters() + params = self.parameters + icf = "/zhome/academic/HLRS/lha/zahapill/ics/ics_illustrisTNGboxes/L35n2160TNG/output/ICs" + params["InitCondFile"] = icf + + def create_dummyparticles(self, lengths=None): + super().create_dummyparticles() + pdata = self.particles + ngas = pdata["PartType0"]["Coordinates"].shape[0] + extra_keys = ["StarFormationRate"] + for key in extra_keys: + pdata["PartType0"][key] = np.zeros((ngas,), dtype=float) + + +def write_gadget_testfile(path): + # create a dummy Gadget snapshot + dummy = DummyGadgetFile() + dummy.write(path) diff --git a/tests/test_units.py b/tests/test_units.py index 2c958859..896a631f 100644 --- a/tests/test_units.py +++ b/tests/test_units.py @@ -8,39 +8,37 @@ from scida.config import get_config from scida.convenience import load from scida.interfaces.mixins.units import update_unitregistry -from tests.helpers import write_gadget_testfile -from tests.testdata_properties import require_testdata_path -@require_testdata_path("interface", only=["TNG50-4_snapshot"]) -def test_load_cgs(testdatapath): - ds = load(testdatapath, units="cgs") +def test_load_cgs(tngfile_dummy): + p = tngfile_dummy.path + ds = load(p, units="cgs") + u = ds.ureg # test units of some fields ptype = "PartType0" gas = ds.data[ptype] coords = gas["Coordinates"] - dens = gas["Density"] - sfr = gas["StarFormationRate"] - u = ds.ureg assert coords.units == u.cm + dens = gas["Density"] assert dens.units == u.g / u.cm**3 + sfr = gas["StarFormationRate"] assert sfr.units == u.g / u.s -@require_testdata_path("interface", only=["TNG50-4_snapshot"]) -def test_load_codeunits(testdatapath): - ds = load(testdatapath, units=True) +def test_load_codeunits(tngfile_dummy): + p = tngfile_dummy.path + ds = load(p, units=True) + u = ds.ureg # test units of some fields ptype = "PartType0" gas = ds.data[ptype] coords = gas["Coordinates"] - dens = gas["Density"] - sfr = gas["StarFormationRate"] - u = ds.ureg assert coords.units == u.code_length + dens = gas["Density"] assert dens.units == u.code_mass / u.code_length**3 code_time = u.code_velocity / u.code_length # SFR is an important test-case, as the units cannot be inferred from the dimensionality (not in code units) + sfr = gas["StarFormationRate"] assert sfr.units != u.code_mass / code_time # == code_mass/code_time is wrong! # we know that TNG uses u.Msun/yr for SFR assert sfr.units == u.Msun / u.yr @@ -61,9 +59,8 @@ def test_update_unitregistry(): ) # 1/h for a=1.0 -def test_missingunits(monkeypatch, tmp_path, caplog): - p = pathlib.Path(tmp_path) / "test.hdf5" - write_gadget_testfile(p) +def test_missingunits(monkeypatch, gadgetfile_dummy, caplog): + p = gadgetfile_dummy.path with h5py.File(p, "r+") as f: f["PartType0"]["FieldWithoutUnits"] = np.ones(1) From 5889cacaf149774528f591247980e5adea10ae70 Mon Sep 17 00:00:00 2001 From: Chris Byrohl <9221545+cbyrohl@users.noreply.github.com> Date: Thu, 10 Aug 2023 20:36:31 +0200 Subject: [PATCH 3/4] introduce nmax for map_halo_operation --- src/scida/customs/arepo/dataset.py | 129 ++++++++++++++++++--------- tests/conftest.py | 16 ++-- tests/customs/test_arepo.py | 67 ++++++++++++++ tests/helpers.py | 138 +++++++++++++++++++++-------- tests/test_interface.py | 49 ---------- tests/test_ioload.py | 2 +- 6 files changed, 269 insertions(+), 132 deletions(-) diff --git a/src/scida/customs/arepo/dataset.py b/src/scida/customs/arepo/dataset.py index 99807ef3..c95e122c 100644 --- a/src/scida/customs/arepo/dataset.py +++ b/src/scida/customs/arepo/dataset.py @@ -1,7 +1,6 @@ import copy import logging import os -import warnings from typing import Dict, List, Optional, Union import dask @@ -285,7 +284,7 @@ def register_field(self, parttype: str, name: str = None, construct: bool = Fals ------- """ - num = partTypeNum(parttype) + num = part_type_num(parttype) if construct: # TODO: introduce (immediate) construct option later raise NotImplementedError if num == -1: # TODO: all particle species @@ -418,9 +417,31 @@ def map_halo_operation( func, chunksize=int(3e7), cpucost_halo=1e4, - Nmin=None, + min_grpcount=None, chunksize_bytes=None, + nmax=None, ): + """ + Apply a function to each halo in the catalog. + + Parameters + ---------- + func: function + Function to apply to each halo. Must take a dictionary of arrays as input. + chunksize: int + Number of particles to process at once. Default: 3e7 + cpucost_halo: + "CPU cost" of processing a single halo. This is a relative value to the processing time per input particle + used for calculating the dask chunks. Default: 1e4 + min_grpcount: Optional[int] + Minimum number of particles in a halo to process it. Default: None + chunksize_bytes: Optional[int] + nmax: Optional[int] + Only process the first nmax halos. + Returns + ------- + + """ dfltkwargs = get_kwargs(func) fieldnames = dfltkwargs.get("fieldnames", None) if fieldnames is None: @@ -435,9 +456,10 @@ def map_halo_operation( arrdict, chunksize=chunksize, cpucost_halo=cpucost_halo, - Nmin=Nmin, + min_grpcount=min_grpcount, chunksize_bytes=chunksize_bytes, entry_nbytes_in=entry_nbytes_in, + nmax=nmax, ) def add_groupquantity_to_particles(self, name, parttype="PartType0"): @@ -462,14 +484,15 @@ def add_groupquantity_to_particles(self, name, parttype="PartType0"): self.data[parttype][name] = hquantity def get_grouplengths(self, parttype="PartType0"): - # todo: write/use PartType func always using integer rather than string? - if parttype not in self._grouplengths: - partnum = int(parttype[-1]) - lengths = self.data["Group"]["GroupLenType"][:, partnum].compute() + """Get the total number of particles of a given type in all halos.""" + pnum = part_type_num(parttype) + ptype = "PartType%i" % pnum + if ptype not in self._grouplengths: + lengths = self.data["Group"]["GroupLenType"][:, pnum].compute() if isinstance(lengths, pint.Quantity): lengths = lengths.magnitude - self._grouplengths[parttype] = lengths - return self._grouplengths[parttype] + self._grouplengths[ptype] = lengths + return self._grouplengths[ptype] def grouped( self, @@ -884,26 +907,27 @@ def get_shcounts_shcells(SubhaloGrNr, hlength): return shcounts, shnumber -def partTypeNum(partType): +def part_type_num(ptype): """Mapping between common names and numeric particle types.""" - if str(partType).isdigit(): - return int(partType) + ptype = str(ptype).replace("PartType", "") + if ptype.isdigit(): + return int(ptype) - if str(partType).lower() in ["gas", "cells"]: + if str(ptype).lower() in ["gas", "cells"]: return 0 - if str(partType).lower() in ["dm", "darkmatter"]: + if str(ptype).lower() in ["dm", "darkmatter"]: return 1 - if str(partType).lower() in ["dmlowres"]: + if str(ptype).lower() in ["dmlowres"]: return 2 # only zoom simulations, not present in full periodic boxes - if str(partType).lower() in ["tracer", "tracers", "tracermc", "trmc"]: + if str(ptype).lower() in ["tracer", "tracers", "tracermc", "trmc"]: return 3 - if str(partType).lower() in ["star", "stars", "stellar"]: + if str(ptype).lower() in ["star", "stars", "stellar"]: return 4 # only those with GFM_StellarFormationTime>0 - if str(partType).lower() in ["wind"]: + if str(ptype).lower() in ["wind"]: return 4 # only those with GFM_StellarFormationTime<0 - if str(partType).lower() in ["bh", "bhs", "blackhole", "blackholes", "black"]: + if str(ptype).lower() in ["bh", "bhs", "blackhole", "blackholes", "black"]: return 5 - if str(partType).lower() in ["all"]: + if str(ptype).lower() in ["all"]: return -1 @@ -941,9 +965,26 @@ def map_halo_operation_get_chunkedges( entry_nbytes_in, entry_nbytes_out, cpucost_halo=1.0, - Nmin=None, + min_grpcount=None, chunksize_bytes=None, ): + """ + Compute the chunking of a halo operation. + + Parameters + ---------- + lengths: np.ndarray + The number of particles per halo. + entry_nbytes_in + entry_nbytes_out + cpucost_halo + min_grpcount + chunksize_bytes + + Returns + ------- + + """ cpucost_particle = 1.0 # we only care about ratio, so keep particle cost fixed. cost = cpucost_particle * lengths + cpucost_halo sumcost = cost.cumsum() @@ -955,23 +996,20 @@ def map_halo_operation_get_chunkedges( if not np.max(cost_memory) < chunksize_bytes: raise ValueError( - "Some halo requires more memory than allowed (%i allowed, %i requested). Consider overriding chunksize_bytes." - % (chunksize_bytes, np.max(cost_memory)) + "Some halo requires more memory than allowed (%i allowed, %i requested). Consider overriding " + "chunksize_bytes." % (chunksize_bytes, np.max(cost_memory)) ) - N = int(np.ceil(np.sum(cost_memory) / chunksize_bytes)) - N = int(np.ceil(1.3 * N)) # fudge factor - if Nmin is not None: - N = max(Nmin, N) - targetcost = sumcost[-1] / N - arr = np.diff(sumcost % targetcost) + nchunks = int(np.ceil(np.sum(cost_memory) / chunksize_bytes)) + nchunks = int(np.ceil(1.3 * nchunks)) # fudge factor + if min_grpcount is not None: + nchunks = max(min_grpcount, nchunks) + targetcost = sumcost[-1] / nchunks # chunk target cost = total cost / nchunks + + arr = np.diff(sumcost % targetcost) # find whenever exceeding modulo target cost idx = [0] + list(np.where(arr < 0)[0] + 1) - if len(idx) == N + 1: - idx[-1] = sumcost.shape[0] - elif len(idx) - N in [0, -1, -2]: + if idx[-1] != sumcost.shape[0]: idx.append(sumcost.shape[0]) - else: - raise ValueError("Unexpected chunk indices.") list_chunkedges = [] for i in range(len(idx) - 1): list_chunkedges.append([idx[i], idx[i + 1]]) @@ -995,21 +1033,26 @@ def map_halo_operation( arrdict, chunksize=int(3e7), cpucost_halo=1e4, - Nmin: Optional[int] = None, + min_grpcount: Optional[int] = None, chunksize_bytes: Optional[int] = None, entry_nbytes_in: Optional[int] = 4, fieldnames: Optional[List[str]] = None, + nmax: Optional[int] = None, ) -> da.Array: """ Map a function to all halos in a halo catalog. Parameters ---------- + nmax: Optional[int] + Only process the first nmax halos. func - lengths + lengths: np.ndarray + Number of particles per halo. arrdict chunksize cpucost_halo - Nmin + min_grpcount: Optional[int] + Lower bound on the number of halos per chunk. chunksize_bytes entry_nbytes_in fieldnames @@ -1019,9 +1062,8 @@ def map_halo_operation( """ if chunksize is not None: - warnings.warn( - '"chunksize" parameter is depreciated and has no effect. Specify Nmin for control.', - DeprecationWarning, + log.warning( + '"chunksize" parameter is depreciated and has no effect. Specify "min_grpcount" for control.' ) dfltkwargs = get_kwargs(func) if fieldnames is None: @@ -1032,6 +1074,9 @@ def map_halo_operation( dtype = dfltkwargs.get("dtype", "float64") default = dfltkwargs.get("default", 0) + if nmax is not None: + lengths = lengths[:nmax] + offsets = np.concatenate([[0], np.cumsum(lengths)]) # Determine chunkedges automatically @@ -1044,7 +1089,7 @@ def map_halo_operation( entry_nbytes_in, entry_nbytes_out, cpucost_halo=cpucost_halo, - Nmin=Nmin, + min_grpcount=min_grpcount, chunksize_bytes=chunksize_bytes, ) diff --git a/tests/conftest.py b/tests/conftest.py index 7804a415..9b964fe8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,7 +6,7 @@ from scida.config import get_config from scida.customs.gadgetstyle.dataset import GadgetStyleSnapshot from scida.series import DatasetSeries -from tests.helpers import DummyGadgetFile, DummyTNGFile +from tests.helpers import DummyGadgetCatalogFile, DummyGadgetSnapshotFile, DummyTNGFile flag_test_long = False # Set to true to run time-taking tests. @@ -71,17 +71,23 @@ def cleancache(cachedir): # dummy gadgetstyle snapshot fixtures + @pytest.fixture def gadgetfile_dummy(tmp_path): - dummy = DummyGadgetFile() - dummy.write(tmp_path / "dummy.hdf5") + dummy = DummyGadgetSnapshotFile() + dummy.write(tmp_path / "dummy_gadgetfile.hdf5") return dummy @pytest.fixture def tngfile_dummy(tmp_path): dummy = DummyTNGFile() - print(dummy.particles["PartType0"]) - dummy.write(tmp_path / "dummy.hdf5") + dummy.write(tmp_path / "dummy_tngfile.hdf5") return dummy + +@pytest.fixture +def gadgetcatalogfile_dummy(tmp_path): + dummy = DummyGadgetCatalogFile() + dummy.write(tmp_path / "dummy_gadgetcatalogfile.hdf5") + return dummy diff --git a/tests/customs/test_arepo.py b/tests/customs/test_arepo.py index 375d8e39..6a47959d 100644 --- a/tests/customs/test_arepo.py +++ b/tests/customs/test_arepo.py @@ -18,6 +18,73 @@ def test_selector(testdatapath): assert da.all(d["PartType0"]["GroupID"] == 9223372036854775807).compute() +def halooperations(path, catalogpath=None): + snap = load(path, catalog=catalogpath) + + def calculate_count(GroupID, parttype="PartType0"): + """Number of unique halo associations found in each halo. Has to be 1 exactly.""" + return np.unique(GroupID).shape[0] + + def calculate_partcount(GroupID, parttype="PartType0"): + """Particle Count per halo.""" + return GroupID.shape[0] + + def calculate_haloid(GroupID, parttype="PartType0"): + """returns Halo ID""" + return GroupID[-1] + + counttask = snap.map_halo_operation(calculate_count, compute=False, min_grpcount=20) + partcounttask = snap.map_halo_operation( + calculate_partcount, compute=False, chunksize=int(3e6) + ) + hidtask = snap.map_halo_operation( + calculate_haloid, compute=False, chunksize=int(3e6) + ) + count = counttask.compute() + partcount = partcounttask.compute() + hid = hidtask.compute() + + count0 = np.where(partcount == 0)[0] + diff0 = np.sort(np.concatenate((count0, count0 - 1))) + # determine halos that hold no particles. + assert set(np.where(np.diff(hid) != 1)[0].tolist()) == set(diff0.tolist()) + + if not (np.diff(hid).max() == np.diff(hid).min() == 1): + assert set(np.where(np.diff(hid) != 1)[0].tolist()) == set( + diff0.tolist() + ) # gotta check; potentially empty halos + assert np.all(counttask.compute() <= 1) + else: + assert np.all(counttask.compute() == 1) + print(count.shape, counttask.compute().shape) + assert ( + count.shape == counttask.compute().shape + ), "Expected shape different from result's shape." + assert count.shape[0] == snap.data["Group"]["GroupPos"].shape[0] + assert np.all(partcount == snap.data["Group"]["GroupLenType"][:, 0].compute()) + + # test nmax + nmax = 10 + partcounttask = snap.map_halo_operation( + calculate_partcount, compute=False, chunksize=int(3e6), nmax=nmax + ) + partcount2 = partcounttask.compute() + assert partcount2.shape[0] == nmax + assert np.all(partcount2 == partcount[:nmax]) + + +# Test the map_halo_operation/grouped functionality +def test_areposnapshot_halooperation(tngfile_dummy, gadgetcatalogfile_dummy): + path = tngfile_dummy.path + catalogpath = gadgetcatalogfile_dummy.path + halooperations(path, catalogpath) + + +@require_testdata_path("interface", only=["TNG50-4_snapshot"]) +def test_areposnapshot_halooperation_realworld(testdatapath): + halooperations(testdatapath) + + @require_testdata("areposnapshot_withcatalog", only=["TNG50-4_snapshot"]) def test_interface_groupedoperations(testdata_areposnapshot_withcatalog): snp = testdata_areposnapshot_withcatalog diff --git a/tests/helpers.py b/tests/helpers.py index 97a67057..984be1e0 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -10,40 +10,82 @@ def write_hdf5flat_testfile(path): class DummyGadgetFile: - """Creates a dummy GadgetStyleSnapshot in memory that can be modified and saved to disk.""" def __init__(self): self.header = dict() self.parameters = dict() self.particles = dict() - # by default, create a dummy snapshot structure + self.path = None # path to the file after first write self.create_dummyheader() - self.create_dummyparticles() - self.path = None # will be set on write + self.create_dummyparameters() - def create_dummyheader(self, lengths=None): - if lengths is None: - lengths = [1, 1, 0, 1, 1, 1] + def create_dummyheader(self): header = self.header - header["NumPart_ThisFile"] = lengths - header["NumPart_Total"] = lengths - header["NumPart_Total_HighWord"] = [0, 0, 0, 0, 0, 0] - header["MassTable"] = [0, 0, 0, 0, 0, 0] attrs_float_keys = ["Time", "Redshift", "BoxSize"] for key in attrs_float_keys: header[key] = 0.0 - attrs_int_keys = ["NumFilesPerSnapshot", "Flag_Sfr", "Flag_Cooling", "Flag_StellarAge", "Flag_Metals", - "Flag_Feedback", "Flag_DoublePrecision", "Flag_IC_Info", "Flag_LptInitCond"] + attrs_int_keys = [ + "NumFilesPerSnapshot", + "Flag_Sfr", + "Flag_Cooling", + "Flag_StellarAge", + "Flag_Metals", + "Flag_Feedback", + "Flag_DoublePrecision", + "Flag_IC_Info", + "Flag_LptInitCond", + ] for key in attrs_int_keys: header[key] = 0 - # set "Omega0", "OmegaLambda", "HubbleParam" + # set cosmology header["Omega0"] = 0.3 + header["OmegaBaryon"] = 0.04 header["OmegaLambda"] = 0.7 header["HubbleParam"] = 0.7 def create_dummyparameters(self): pass - def create_dummyparticles(self, lengths=None): + def write(self, path): + with h5py.File(path, "w") as hf: + if self.header is not None: + grp = hf.create_group("Header") + for key in self.header: + grp.attrs[key] = self.header[key] + if self.parameters is not None: + grp = hf.create_group("Parameters") + for key in self.parameters: + grp.attrs[key] = self.parameters[key] + for key in self.particles: + gname = key + hf.create_group(gname) + for field in self.particles[key]: + hf[gname].create_dataset(field, data=self.particles[key][field]) + self.path = path + + +class DummyGadgetSnapshotFile(DummyGadgetFile): + """Creates a dummy GadgetStyleSnapshot in memory that can be modified and saved to disk.""" + + def __init__(self): + super().__init__() + # by default, create a dummy snapshot structure + self.create_dummyheader() + self.create_dummyfieldcontainer() + + def create_dummyheader(self, lengths=None): + super().create_dummyheader() + if lengths is None: + lengths = [1, 1, 0, 1, 1, 1] + header = self.header + header["NumPart_ThisFile"] = lengths + header["NumPart_Total"] = lengths + header["NumPart_Total_HighWord"] = [0, 0, 0, 0, 0, 0] + header["MassTable"] = [0, 0, 0, 0, 0, 0] + + def create_dummyparameters(self): + pass + + def create_dummyfieldcontainer(self, lengths=None): pdata = self.particles if lengths is None: lengths = [1, 1, 0, 1, 1, 1] @@ -59,23 +101,49 @@ def create_dummyparticles(self, lengths=None): pdata[gname]["Density"] = np.zeros((n,), dtype=float) self.particles = pdata - def write(self, path): - with h5py.File(path, "w") as hf: - grp = hf.create_group("Header") - for key in self.header: - grp.attrs[key] = self.header[key] - grp = hf.create_group("Parameters") - for key in self.parameters: - grp.attrs[key] = self.parameters[key] - for key in self.particles: - gname = key - hf.create_group(gname) - for field in self.particles[key]: - hf[gname].create_dataset(field, data=self.particles[key][field]) - self.path = path + +class DummyGadgetCatalogFile(DummyGadgetFile): + def __init__(self): + super().__init__() + self.create_dummyheader() + self.create_dummyfieldcontainer() + + def create_dummyheader(self, lengths=None): + super().create_dummyheader() + if lengths is None: + lengths = [1, 1] # halos and subgroups + header = self.header + header["Ngroups_ThisFile"] = lengths[0] + header["Ngroups_Total"] = lengths[0] + header["Nsubgroups_ThisFile"] = lengths[1] + header["Nsubgroups_Total"] = lengths[1] + + def create_dummyfieldcontainer(self, lengths=None): + pdata = self.particles + if lengths is None: + lengths = [1, 1] + # Groups + grp = dict() + pdata["Group"] = grp + ngroups = lengths[0] + grp["GroupPos"] = np.zeros((ngroups, 3), dtype=float) + grp["GroupMass"] = np.zeros((ngroups,), dtype=float) + grp["GroupVel"] = np.zeros((ngroups, 3), dtype=float) + grp["GroupLenType"] = np.ones((ngroups, 6), dtype=int) + grp["GroupLen"] = grp["GroupLenType"].sum(axis=1) + # Subhalos + sh = dict() + pdata["Subhalo"] = sh + nsubs = lengths[1] + sh["SubhaloPos"] = np.zeros((nsubs, 3), dtype=float) + sh["SubhaloMass"] = np.zeros((nsubs,), dtype=float) + sh["SubhaloVel"] = np.zeros((nsubs, 3), dtype=float) + sh["SubhaloLenType"] = np.ones((nsubs, 6), dtype=int) + sh["SubhaloLen"] = sh["SubhaloLenType"].sum(axis=1) + sh["SubhaloGrNr"] = np.zeros((nsubs,), dtype=int) -class DummyTNGFile(DummyGadgetFile): +class DummyTNGFile(DummyGadgetSnapshotFile): def __init__(self): super().__init__() self.create_dummyheader() @@ -84,8 +152,8 @@ def __init__(self): def create_dummyheader(self, lengths=None): super().create_dummyheader(lengths) header = self.header - header["UnitLength_in_cm"] = 3.085678e+21 - header["UnitMass_in_g"] = 1.989e+43 + header["UnitLength_in_cm"] = 3.085678e21 + header["UnitMass_in_g"] = 1.989e43 header["UnitVelocity_in_cm_per_s"] = 100000.0 def create_dummyparameters(self): @@ -94,8 +162,8 @@ def create_dummyparameters(self): icf = "/zhome/academic/HLRS/lha/zahapill/ics/ics_illustrisTNGboxes/L35n2160TNG/output/ICs" params["InitCondFile"] = icf - def create_dummyparticles(self, lengths=None): - super().create_dummyparticles() + def create_dummyfieldcontainer(self, lengths=None): + super().create_dummyfieldcontainer() pdata = self.particles ngas = pdata["PartType0"]["Coordinates"].shape[0] extra_keys = ["StarFormationRate"] @@ -105,5 +173,5 @@ def create_dummyparticles(self, lengths=None): def write_gadget_testfile(path): # create a dummy Gadget snapshot - dummy = DummyGadgetFile() + dummy = DummyGadgetSnapshotFile() dummy.write(path) diff --git a/tests/test_interface.py b/tests/test_interface.py index 85b15911..b8c14e00 100644 --- a/tests/test_interface.py +++ b/tests/test_interface.py @@ -94,55 +94,6 @@ def test_illustrisgroup_load(testdata_illustrisgroup): assert "Subhalo" in grp.data.keys() -@require_testdata( - "illustrissnapshot_withcatalog", exclude=["minimal", "z127"], exclude_substring=True -) -def test_areposnapshot_halooperation(testdata_illustrissnapshot_withcatalog): - snap = testdata_illustrissnapshot_withcatalog - - def calculate_count(GroupID, parttype="PartType0"): - """Halo Count per halo. Has to be 1 exactly.""" - return np.unique(GroupID).shape[0] - - def calculate_partcount(GroupID, parttype="PartType0"): - """Particle Count per halo.""" - return GroupID.shape[0] - - def calculate_haloid(GroupID, parttype="PartType0"): - """returns Halo ID""" - return GroupID[-1] - - counttask = snap.map_halo_operation(calculate_count, compute=False, Nmin=20) - partcounttask = snap.map_halo_operation( - calculate_partcount, compute=False, chunksize=int(3e6) - ) - hidtask = snap.map_halo_operation( - calculate_haloid, compute=False, chunksize=int(3e6) - ) - count = counttask.compute() - partcount = partcounttask.compute() - hid = hidtask.compute() - - count0 = np.where(partcount == 0)[0] - diff0 = np.sort(np.concatenate((count0, count0 - 1))) - # determine halos that hold no particles. - assert set(np.where(np.diff(hid) != 1)[0].tolist()) == set(diff0.tolist()) - - if not (np.diff(hid).max() == np.diff(hid).min() == 1): - assert set(np.where(np.diff(hid) != 1)[0].tolist()) == set( - diff0.tolist() - ) # gotta check; potentially empty halos - assert np.all(counttask.compute() <= 1) - else: - assert np.all(counttask.compute() == 1) - print(count.shape, counttask.compute().shape) - assert ( - count.shape == counttask.compute().shape - ), "Expected shape different from result's shape." - assert count.shape[0] == snap.data["Group"]["GroupPos"].shape[0] - assert np.all(partcount == snap.data["Group"]["GroupLenType"][:, 0].compute()) - - @require_testdata("illustrissnapshot_withcatalog") def test_areposnapshot_load_withcatalog(testdata_illustrissnapshot_withcatalog): snp = testdata_illustrissnapshot_withcatalog diff --git a/tests/test_ioload.py b/tests/test_ioload.py index 8d406d4b..0f6d53ca 100644 --- a/tests/test_ioload.py +++ b/tests/test_ioload.py @@ -25,5 +25,5 @@ def test_ioload_gadget(tmp_path): res = load(p) fcc, metadata, hf, tmpfile = res - assert "BoxSize" in metadata["/"] + assert "BoxSize" in metadata["/Header"] assert "PartType0" in fcc From 7827e4fbbc3a9138e3603eedfef3c51485280478 Mon Sep 17 00:00:00 2001 From: Chris Byrohl <9221545+cbyrohl@users.noreply.github.com> Date: Thu, 10 Aug 2023 22:51:00 +0200 Subject: [PATCH 4/4] support for idxlist argument --- docs/halocatalogs.md | 12 ++++++++++- poetry.lock | 2 +- pyproject.toml | 1 + src/scida/customs/arepo/dataset.py | 30 +++++++++++++++++++++++++-- tests/customs/test_arepo.py | 33 ++++++++++++++++++++++++------ 5 files changed, 68 insertions(+), 10 deletions(-) diff --git a/docs/halocatalogs.md b/docs/halocatalogs.md index 636e29cf..c0430246 100644 --- a/docs/halocatalogs.md +++ b/docs/halocatalogs.md @@ -73,11 +73,21 @@ data = ds.return_data(haloID=42) *data* will have the same structure as *ds.data* but restricted to particles of a given group. -### Operations on particle data for all groups +### Applying to all groups in parallel In many cases, we do not want the particle data of an individual group, but we want to calculate some reduced statistic from the bound particles of each group. For this, we provide the *grouped* functionality. In the following we give a range of examples of its use. +???+ warning + + Executing the following commands can be demanding on compute resources and memory. + Usually, one wants to restrict the groups to run on. You can either specify "nmax" + to limit the maximum halo id to evaluate up to. This is usually desired in any case + as halos are ordered (in descending order) by their mass. For more fine-grained control, + you can also pass a list of halo IDs to evaluate via the "idxlist" keyword. + These keywords should be passed to the "evaluate" call. + + #### Baryon mass Let's say we want to calculate the baryon mass for each halo from the particles. diff --git a/poetry.lock b/poetry.lock index 23334766..0951d484 100644 --- a/poetry.lock +++ b/poetry.lock @@ -5119,4 +5119,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "2bb29025cbc6cdb8a6edf4332734323a4c7eca810bb722e48b46f7bd83b50c7a" +content-hash = "e60d2f4e857bccb4b50a6bad1fb3f0ed4b516cefd75bc6bcec20aa82eac9e150" diff --git a/pyproject.toml b/pyproject.toml index d741e86e..e850b187 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,6 +53,7 @@ mkdocs-video = "^1.4.0" jupyter-contrib-nbextensions = "^0.7.0" typer = "^0.9.0" dask-jobqueue = "^0.8.2" +jupyter = "^1.0.0" [tool.coverage.paths] source = ["src", "*/site-packages"] diff --git a/src/scida/customs/arepo/dataset.py b/src/scida/customs/arepo/dataset.py index c95e122c..dea1c1da 100644 --- a/src/scida/customs/arepo/dataset.py +++ b/src/scida/customs/arepo/dataset.py @@ -420,12 +420,15 @@ def map_halo_operation( min_grpcount=None, chunksize_bytes=None, nmax=None, + idxlist=None, ): """ Apply a function to each halo in the catalog. Parameters ---------- + idxlist: Optional[np.ndarray] + List of halo indices to process. If not provided, all halos are processed. func: function Function to apply to each halo. Must take a dictionary of arrays as input. chunksize: int @@ -460,6 +463,7 @@ def map_halo_operation( chunksize_bytes=chunksize_bytes, entry_nbytes_in=entry_nbytes_in, nmax=nmax, + idxlist=idxlist, ) def add_groupquantity_to_particles(self, name, parttype="PartType0"): @@ -609,7 +613,7 @@ def __copy__(self): ) return c - def evaluate(self, compute=True): + def evaluate(self, nmax=None, compute=True): # final operations: those that can only be at end of chain # intermediate operations: those that can only be prior to end of chain funcdict = dict() @@ -649,7 +653,9 @@ def chained_call(*args): "Specify field to operate on in operation or grouped()." ) - res = map_halo_operation(func, self.lengths, self.arrs, fieldnames=fieldnames) + res = map_halo_operation( + func, self.lengths, self.arrs, fieldnames=fieldnames, nmax=nmax + ) if compute: res = res.compute() return res @@ -1038,11 +1044,14 @@ def map_halo_operation( entry_nbytes_in: Optional[int] = 4, fieldnames: Optional[List[str]] = None, nmax: Optional[int] = None, + idxlist: Optional[np.ndarray] = None, ) -> da.Array: """ Map a function to all halos in a halo catalog. Parameters ---------- + idxlist: Optional[np.ndarray] + Only process the halos with these indices. nmax: Optional[int] Only process the first nmax halos. func @@ -1074,11 +1083,28 @@ def map_halo_operation( dtype = dfltkwargs.get("dtype", "float64") default = dfltkwargs.get("default", 0) + if idxlist is not None and nmax is not None: + raise ValueError("Cannot specify both idxlist and nmax.") + if nmax is not None: lengths = lengths[:nmax] offsets = np.concatenate([[0], np.cumsum(lengths)]) + if idxlist is not None: + # make sure idxlist is sorted and unique + if not np.all(np.diff(idxlist) > 0): + raise ValueError("idxlist must be sorted and unique.") + # make sure idxlist is within range + if np.min(idxlist) < 0 or np.max(idxlist) >= lengths.shape[0]: + raise ValueError( + "idxlist elements must be in [%i, %i)." % (0, lengths.shape[0]) + ) + offsets = np.concatenate( + [[0], offsets[1:][idxlist]] + ) # offsets is one longer than lengths + lengths = lengths[idxlist] + # Determine chunkedges automatically # TODO: very messy and inefficient routine. improve some time. # TODO: Set entry_bytes_out diff --git a/tests/customs/test_arepo.py b/tests/customs/test_arepo.py index 6a47959d..9e98d52e 100644 --- a/tests/customs/test_arepo.py +++ b/tests/customs/test_arepo.py @@ -56,7 +56,6 @@ def calculate_haloid(GroupID, parttype="PartType0"): assert np.all(counttask.compute() <= 1) else: assert np.all(counttask.compute() == 1) - print(count.shape, counttask.compute().shape) assert ( count.shape == counttask.compute().shape ), "Expected shape different from result's shape." @@ -72,16 +71,26 @@ def calculate_haloid(GroupID, parttype="PartType0"): assert partcount2.shape[0] == nmax assert np.all(partcount2 == partcount[:nmax]) + # test idxlist + idxlist = [3, 5, 7, 25200] + partcounttask = snap.map_halo_operation( + calculate_partcount, compute=False, chunksize=int(3e6), idxlist=idxlist + ) + partcount2 = partcounttask.compute() + assert partcount2.shape[0] == len(idxlist) + assert np.all(partcount2 == partcount[idxlist]) + # Test the map_halo_operation/grouped functionality -def test_areposnapshot_halooperation(tngfile_dummy, gadgetcatalogfile_dummy): - path = tngfile_dummy.path - catalogpath = gadgetcatalogfile_dummy.path - halooperations(path, catalogpath) +# todo: need to have catalog have more than 1 halo for this. +# def test_areposnapshot_halooperation(tngfile_dummy, gadgetcatalogfile_dummy): +# path = tngfile_dummy.path +# catalogpath = gadgetcatalogfile_dummy.path +# halooperations(path, catalogpath) @require_testdata_path("interface", only=["TNG50-4_snapshot"]) -def test_areposnapshot_halooperation_realworld(testdatapath): +def test_areposnapshot_halooperation_realdata(testdatapath): halooperations(testdatapath) @@ -126,3 +135,15 @@ def customfunc2(dens, vol, fieldnames=["Density", "Masses"]): s = g2.apply(customfunc2).sum() boundvol = s.evaluate().sum() assert np.isclose(boundvol, boundvol2) + + # Test nmax attribute + m = snp.grouped("Masses").sum().evaluate() + m10 = snp.grouped("Masses").sum().evaluate(nmax=10) + assert m10.shape[0] == 10 + assert np.allclose(m[:10], m10) + + # Test idxlist attribute + idxlist = [3, 5, 7, 25200] + m4 = snp.grouped("Masses").sum().evaluate(idxlist=idxlist) + assert m4.shape[0] == len(idxlist) + assert np.allclose(m[idxlist], m10)