Skip to content

Commit

Permalink
finish adding dask wrappers
Browse files Browse the repository at this point in the history
  • Loading branch information
lgray committed Apr 5, 2023
1 parent 8dc0501 commit e4652f0
Showing 1 changed file with 106 additions and 45 deletions.
151 changes: 106 additions & 45 deletions src/fastjet/_pyjet.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,12 @@ def __call__(self, array):
return getattr(seq, self.name)(**self.kwargs)


def _dak_dispatch(cluseq, method_name, **kwargs):
def _dak_dispatch(cluseq, method_name, *arrays, **kwargs):
from dask_awkward.utils import hyphenize

return cluseq._data.map_partitions(
_FnDelayedInternalRepCaller(method_name, cluseq._jetdef, **kwargs),
*arrays,
label=hyphenize(method_name),
)

Expand Down Expand Up @@ -395,93 +396,153 @@ def constituents(self, min_pt=0):
return _dak_dispatch(self, "constituents", min_pt=min_pt)

def exclusive_jets_constituent_index(self, njets=10):
return self._internalrep.exclusive_jets_constituent_index(njets)
return _dak_dispatch(self, "exclusive_jets_constituent_index", njets=njets)

def exclusive_jets_constituents(self, njets=10):
return self._internalrep.exclusive_jets_constituents(njets)
return _dak_dispatch(self, "exclusive_jets_constituents", njets=njets)

def exclusive_jets_lund_declusterings(self, njets=10):
return self._internalrep.exclusive_jets_lund_declusterings(njets)
return _dak_dispatch(self, "exclusive_jets_lund_declusterings", njets=njets)

def exclusive_dmerge(self, njets=10):
return self._internalrep.exclusive_dmerge(njets)
return _dak_dispatch(self, "exclusive_dmerge", njets=njets)

def exclusive_dmerge_max(self, njets=10):
return self._internalrep.exclusive_dmerge_max(njets)
return _dak_dispatch(self, "exclusive_dmerge_max", njets=njets)

def exclusive_ymerge_max(self, njets=10):
return self._internalrep.exclusive_ymerge_max(njets)
return _dak_dispatch(self, "exclusive_ymerge_max", njets=njets)

def exclusive_ymerge(self, njets=10):
return self._internalrep.exclusive_ymerge(njets)
return _dak_dispatch(self, "exclusive_ymerge", njets=njets)

def Q(self):
return self._internalrep.Q()
return _dak_dispatch(self, "Q")

def Q2(self):
return self._internalrep.Q2()
return _dak_dispatch(self, "Q2")

def exclusive_subjets(self, data, dcut=-1, nsub=-1):
if not isinstance(data, ak.Array):
raise TypeError("The input data is not an Awkward Array")
return self._internalrep.exclusive_subjets(data, dcut, nsub)
import dask_awkward as dak

if not isinstance(data, dak.Array):
raise TypeError("The input data is not a dask-awkward Array")
if not dak.lib.core.compatible_partitions(self.data, data):
raise ValueError(
"Input data must be partition-wise compatible with clustering data!"
)
return _dak_dispatch(self, "exclusive_subjets", data, dcut=dcut, nsub=nsub)

def exclusive_subjets_up_to(self, data, nsub=0):
if not isinstance(data, ak.Array):
raise TypeError("The input data is not an Awkward Array")
return self._internalrep.exclusive_subjets_up_to(data, nsub)
import dask_awkward as dak

if not isinstance(data, dak.Array):
raise TypeError("The input data is not a dask-awkward Array")
if not dak.lib.core.compatible_partitions(self.data, data):
raise ValueError(
"Input data must be partition-wise compatible with clustering data!"
)
return _dak_dispatch(self, "exclusive_subjets_up_to", data, nsub=nsub)

def exclusive_subdmerge(self, data, nsub=0):
if not isinstance(data, ak.Array):
raise TypeError("The input data is not an Awkward Array")
return self._internalrep.exclusive_subdmerge(data, nsub)
import dask_awkward as dak

if not isinstance(data, dak.Array):
raise TypeError("The input data is not a dask-awkward Array")
if not dak.lib.core.compatible_partitions(self.data, data):
raise ValueError(
"Input data must be partition-wise compatible with clustering data!"
)
return _dak_dispatch(self, "exclusive_subdmerge", data, nsub=nsub)

def exclusive_subdmerge_max(self, data, nsub=0):
if not isinstance(data, ak.Array):
raise TypeError("The input data is not an Awkward Array")
return self._internalrep.exclusive_subdmerge_max(data, nsub)
import dask_awkward as dak

if not isinstance(data, dak.Array):
raise TypeError("The input data is not a dask-awkward Array")
if not dak.lib.core.compatible_partitions(self.data, data):
raise ValueError(
"Input data must be partition-wise compatible with clustering data!"
)
return _dak_dispatch(self, "exclusive_subdmerge_max", data, nsub=nsub)

def n_exclusive_subjets(self, data, dcut=0):
if not isinstance(data, ak.Array):
raise TypeError("The input data is not an Awkward Array")
return self._internalrep.n_exclusive_subjets(data, dcut)
import dask_awkward as dak

if not isinstance(data, dak.Array):
raise TypeError("The input data is not a dask-awkward Array")
if not dak.lib.core.compatible_partitions(self.data, data):
raise ValueError(
"Input data must be partition-wise compatible with clustering data!"
)
return _dak_dispatch(self, "n_exclusive_subjets", data, dcut=dcut)

def has_parents(self, data):
if not isinstance(data, ak.Array):
raise TypeError("The input data is not an Awkward Array")
return self._internalrep.has_parents(data)
import dask_awkward as dak

if not isinstance(data, dak.Array):
raise TypeError("The input data is not a dask-awkward Array")
if not dak.lib.core.compatible_partitions(self.data, data):
raise ValueError(
"Input data must be partition-wise compatible with clustering data!"
)
return _dak_dispatch(self, "has_parents", data)

def has_child(self, data):
if not isinstance(data, ak.Array):
raise TypeError("The input data is not an Awkward Array")
return self._internalrep.has_child(data)
import dask_awkward as dak

if not isinstance(data, dak.Array):
raise TypeError("The input data is not a dask-awkward Array")
if not dak.lib.core.compatible_partitions(self.data, data):
raise ValueError(
"Input data must be partition-wise compatible with clustering data!"
)
return _dak_dispatch(self, "has_child", data)

def jet_scale_for_algorithm(self, data):
if not isinstance(data, ak.Array):
raise TypeError("The input data is not an Awkward Array")
return self._internalrep.jet_scale_for_algorithm(data)
import dask_awkward as dak

if not isinstance(data, dak.Array):
raise TypeError("The input data is not a dask-awkward Array")
if not dak.lib.core.compatible_partitions(self.data, data):
raise ValueError(
"Input data must be partition-wise compatible with clustering data!"
)
return _dak_dispatch(self, "jet_scale_for_algorithm", data)

def unique_history_order(self):
return self._internalrep.unique_history_order()
return _dak_dispatch(self, "unique_history_order")

def n_particles(self):
return self._internalrep.n_particles()
return _dak_dispatch(self, "n_particles")

def n_exclusive_jets(self, dcut=0):
return self._internalrep.n_exclusive_jets(dcut)
return _dak_dispatch(self, "n_exclusive_jets", dcut=dcut)

def childless_pseudojets(self):
return self._internalrep.childless_pseudojets()
return _dak_dispatch(self, "childless_pseudojets")

def jets(self):
return self._internalrep.jets()
return _dak_dispatch(self, "jets")

def get_parents(self, data):
if not isinstance(data, ak.Array):
raise TypeError("The input data is not an Awkward Array")
return self._internalrep.get_parents(data)
import dask_awkward as dak

if not isinstance(data, dak.Array):
raise TypeError("The input data is not a dask-awkward Array")
if not dak.lib.core.compatible_partitions(self.data, data):
raise ValueError(
"Input data must be partition-wise compatible with clustering data!"
)
return _dak_dispatch(self, "get_parents", data)

def get_child(self, data):
if not isinstance(data, ak.Array):
raise TypeError("The input data is not an Awkward Array")
return self._internalrep.get_child(data)
import dask_awkward as dak

if not isinstance(data, dak.Array):
raise TypeError("The input data is not a dask-awkward Array")
if not dak.lib.core.compatible_partitions(self.data, data):
raise ValueError(
"Input data must be partition-wise compatible with clustering data!"
)
return _dak_dispatch(self, "get_child", data)

0 comments on commit e4652f0

Please sign in to comment.