From e4652f0e13f5ba389fb6a1cdb357065192537a60 Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Wed, 5 Apr 2023 12:13:05 -0500 Subject: [PATCH] finish adding dask wrappers --- src/fastjet/_pyjet.py | 151 +++++++++++++++++++++++++++++------------- 1 file changed, 106 insertions(+), 45 deletions(-) diff --git a/src/fastjet/_pyjet.py b/src/fastjet/_pyjet.py index 258c8af6..23183255 100644 --- a/src/fastjet/_pyjet.py +++ b/src/fastjet/_pyjet.py @@ -248,11 +248,12 @@ def __call__(self, array): return getattr(seq, self.name)(**self.kwargs) -def _dak_dispatch(cluseq, method_name, **kwargs): +def _dak_dispatch(cluseq, method_name, *arrays, **kwargs): from dask_awkward.utils import hyphenize return cluseq._data.map_partitions( _FnDelayedInternalRepCaller(method_name, cluseq._jetdef, **kwargs), + *arrays, label=hyphenize(method_name), ) @@ -395,93 +396,153 @@ def constituents(self, min_pt=0): return _dak_dispatch(self, "constituents", min_pt=min_pt) def exclusive_jets_constituent_index(self, njets=10): - return self._internalrep.exclusive_jets_constituent_index(njets) + return _dak_dispatch(self, "exclusive_jets_constituent_index", njets=njets) def exclusive_jets_constituents(self, njets=10): - return self._internalrep.exclusive_jets_constituents(njets) + return _dak_dispatch(self, "exclusive_jets_constituents", njets=njets) def exclusive_jets_lund_declusterings(self, njets=10): - return self._internalrep.exclusive_jets_lund_declusterings(njets) + return _dak_dispatch(self, "exclusive_jets_lund_declusterings", njets=njets) def exclusive_dmerge(self, njets=10): - return self._internalrep.exclusive_dmerge(njets) + return _dak_dispatch(self, "exclusive_dmerge", njets=njets) def exclusive_dmerge_max(self, njets=10): - return self._internalrep.exclusive_dmerge_max(njets) + return _dak_dispatch(self, "exclusive_dmerge_max", njets=njets) def exclusive_ymerge_max(self, njets=10): - return self._internalrep.exclusive_ymerge_max(njets) + return _dak_dispatch(self, "exclusive_ymerge_max", njets=njets) def exclusive_ymerge(self, njets=10): - return self._internalrep.exclusive_ymerge(njets) + return _dak_dispatch(self, "exclusive_ymerge", njets=njets) def Q(self): - return self._internalrep.Q() + return _dak_dispatch(self, "Q") def Q2(self): - return self._internalrep.Q2() + return _dak_dispatch(self, "Q2") def exclusive_subjets(self, data, dcut=-1, nsub=-1): - if not isinstance(data, ak.Array): - raise TypeError("The input data is not an Awkward Array") - return self._internalrep.exclusive_subjets(data, dcut, nsub) + import dask_awkward as dak + + if not isinstance(data, dak.Array): + raise TypeError("The input data is not a dask-awkward Array") + if not dak.lib.core.compatible_partitions(self.data, data): + raise ValueError( + "Input data must be partition-wise compatible with clustering data!" + ) + return _dak_dispatch(self, "exclusive_subjets", data, dcut=dcut, nsub=nsub) def exclusive_subjets_up_to(self, data, nsub=0): - if not isinstance(data, ak.Array): - raise TypeError("The input data is not an Awkward Array") - return self._internalrep.exclusive_subjets_up_to(data, nsub) + import dask_awkward as dak + + if not isinstance(data, dak.Array): + raise TypeError("The input data is not a dask-awkward Array") + if not dak.lib.core.compatible_partitions(self.data, data): + raise ValueError( + "Input data must be partition-wise compatible with clustering data!" + ) + return _dak_dispatch(self, "exclusive_subjets_up_to", data, nsub=nsub) def exclusive_subdmerge(self, data, nsub=0): - if not isinstance(data, ak.Array): - raise TypeError("The input data is not an Awkward Array") - return self._internalrep.exclusive_subdmerge(data, nsub) + import dask_awkward as dak + + if not isinstance(data, dak.Array): + raise TypeError("The input data is not a dask-awkward Array") + if not dak.lib.core.compatible_partitions(self.data, data): + raise ValueError( + "Input data must be partition-wise compatible with clustering data!" + ) + return _dak_dispatch(self, "exclusive_subdmerge", data, nsub=nsub) def exclusive_subdmerge_max(self, data, nsub=0): - if not isinstance(data, ak.Array): - raise TypeError("The input data is not an Awkward Array") - return self._internalrep.exclusive_subdmerge_max(data, nsub) + import dask_awkward as dak + + if not isinstance(data, dak.Array): + raise TypeError("The input data is not a dask-awkward Array") + if not dak.lib.core.compatible_partitions(self.data, data): + raise ValueError( + "Input data must be partition-wise compatible with clustering data!" + ) + return _dak_dispatch(self, "exclusive_subdmerge_max", data, nsub=nsub) def n_exclusive_subjets(self, data, dcut=0): - if not isinstance(data, ak.Array): - raise TypeError("The input data is not an Awkward Array") - return self._internalrep.n_exclusive_subjets(data, dcut) + import dask_awkward as dak + + if not isinstance(data, dak.Array): + raise TypeError("The input data is not a dask-awkward Array") + if not dak.lib.core.compatible_partitions(self.data, data): + raise ValueError( + "Input data must be partition-wise compatible with clustering data!" + ) + return _dak_dispatch(self, "n_exclusive_subjets", data, dcut=dcut) def has_parents(self, data): - if not isinstance(data, ak.Array): - raise TypeError("The input data is not an Awkward Array") - return self._internalrep.has_parents(data) + import dask_awkward as dak + + if not isinstance(data, dak.Array): + raise TypeError("The input data is not a dask-awkward Array") + if not dak.lib.core.compatible_partitions(self.data, data): + raise ValueError( + "Input data must be partition-wise compatible with clustering data!" + ) + return _dak_dispatch(self, "has_parents", data) def has_child(self, data): - if not isinstance(data, ak.Array): - raise TypeError("The input data is not an Awkward Array") - return self._internalrep.has_child(data) + import dask_awkward as dak + + if not isinstance(data, dak.Array): + raise TypeError("The input data is not a dask-awkward Array") + if not dak.lib.core.compatible_partitions(self.data, data): + raise ValueError( + "Input data must be partition-wise compatible with clustering data!" + ) + return _dak_dispatch(self, "has_child", data) def jet_scale_for_algorithm(self, data): - if not isinstance(data, ak.Array): - raise TypeError("The input data is not an Awkward Array") - return self._internalrep.jet_scale_for_algorithm(data) + import dask_awkward as dak + + if not isinstance(data, dak.Array): + raise TypeError("The input data is not a dask-awkward Array") + if not dak.lib.core.compatible_partitions(self.data, data): + raise ValueError( + "Input data must be partition-wise compatible with clustering data!" + ) + return _dak_dispatch(self, "jet_scale_for_algorithm", data) def unique_history_order(self): - return self._internalrep.unique_history_order() + return _dak_dispatch(self, "unique_history_order") def n_particles(self): - return self._internalrep.n_particles() + return _dak_dispatch(self, "n_particles") def n_exclusive_jets(self, dcut=0): - return self._internalrep.n_exclusive_jets(dcut) + return _dak_dispatch(self, "n_exclusive_jets", dcut=dcut) def childless_pseudojets(self): - return self._internalrep.childless_pseudojets() + return _dak_dispatch(self, "childless_pseudojets") def jets(self): - return self._internalrep.jets() + return _dak_dispatch(self, "jets") def get_parents(self, data): - if not isinstance(data, ak.Array): - raise TypeError("The input data is not an Awkward Array") - return self._internalrep.get_parents(data) + import dask_awkward as dak + + if not isinstance(data, dak.Array): + raise TypeError("The input data is not a dask-awkward Array") + if not dak.lib.core.compatible_partitions(self.data, data): + raise ValueError( + "Input data must be partition-wise compatible with clustering data!" + ) + return _dak_dispatch(self, "get_parents", data) def get_child(self, data): - if not isinstance(data, ak.Array): - raise TypeError("The input data is not an Awkward Array") - return self._internalrep.get_child(data) + import dask_awkward as dak + + if not isinstance(data, dak.Array): + raise TypeError("The input data is not a dask-awkward Array") + if not dak.lib.core.compatible_partitions(self.data, data): + raise ValueError( + "Input data must be partition-wise compatible with clustering data!" + ) + return _dak_dispatch(self, "get_child", data)