Skip to content

Commit

Permalink
ci: test dask uniformly with distributed.Client (#226)
Browse files Browse the repository at this point in the history
* test dask uniformly with distributed.Client

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
lgray and pre-commit-ci[bot] authored Jun 16, 2023
1 parent 003ee35 commit a390d20
Showing 1 changed file with 70 additions and 65 deletions.
135 changes: 70 additions & 65 deletions tests/test_008-dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,77 +63,82 @@ def test_multi():


def test_single():
array = ak.Array(
[
{"px": 1.2, "py": 3.2, "pz": 5.4, "E": 2.5, "ex": 0.78},
{"px": 32.2, "py": 64.21, "pz": 543.34, "E": 24.12, "ex": 0.35},
{"px": 32.45, "py": 63.21, "pz": 543.14, "E": 24.56, "ex": 0.0},
from distributed import Client

with Client() as _:
array = ak.Array(
[
{"px": 1.2, "py": 3.2, "pz": 5.4, "E": 2.5, "ex": 0.78},
{"px": 32.2, "py": 64.21, "pz": 543.34, "E": 24.12, "ex": 0.35},
{"px": 32.45, "py": 63.21, "pz": 543.14, "E": 24.56, "ex": 0.0},
]
)
darray = dak.from_awkward(array, 1)
jetdef = fastjet.JetDefinition(fastjet.antikt_algorithm, 0.6)
cluster = fastjet._pyjet.DaskAwkwardClusterSequence(darray, jetdef)
inclusive_jets_out = [
{"px": 1.2, "py": 3.2, "pz": 5.4, "E": 2.5},
{"px": 64.65, "py": 127.41999999999999, "pz": 1086.48, "E": 48.68},
]
)
darray = dak.from_awkward(array, 1)
jetdef = fastjet.JetDefinition(fastjet.antikt_algorithm, 0.6)
cluster = fastjet._pyjet.DaskAwkwardClusterSequence(darray, jetdef)
inclusive_jets_out = [
{"px": 1.2, "py": 3.2, "pz": 5.4, "E": 2.5},
{"px": 64.65, "py": 127.41999999999999, "pz": 1086.48, "E": 48.68},
]

assert inclusive_jets_out == cluster.inclusive_jets().compute().to_list()
constituent_output = [
[{"px": 1.2, "py": 3.2, "pz": 5.4, "E": 2.5}],
[
{"px": 32.2, "py": 64.21, "pz": 543.34, "E": 24.12},
{"px": 32.45, "py": 63.21, "pz": 543.14, "E": 24.56},
],
]
assert constituent_output == cluster.constituents().compute().to_list()
constituent_index_output = [[0], [1, 2]]
assert constituent_index_output == cluster.constituent_index().compute().to_list()

assert inclusive_jets_out == cluster.inclusive_jets().compute().to_list()
constituent_output = [
[{"px": 1.2, "py": 3.2, "pz": 5.4, "E": 2.5}],
[
{"px": 32.2, "py": 64.21, "pz": 543.34, "E": 24.12},
{"px": 32.45, "py": 63.21, "pz": 543.14, "E": 24.56},
],
]
assert constituent_output == cluster.constituents().compute().to_list()
constituent_index_output = [[0], [1, 2]]
assert (
constituent_index_output == cluster.constituent_index().compute().to_list()
)


def test_inclusive_from_file():
from pathlib import Path

import uproot
from dask_awkward.lib.testutils import assert_eq
from distributed import Client

with Client() as _:
data_dir = Path(__file__).parent / "samples"
testfile = data_dir / "pfnano_skim.root"

devents = uproot.dask({testfile: "Events"})

dpfcands = dak.zip(
{
"pt": devents.PFCands_pt,
"eta": devents.PFCands_eta,
"phi": devents.PFCands_phi,
"mass": devents.PFCands_mass,
},
with_name="Momentum4D",
behavior=vector.backends.awkward.behavior,
)

djetdef = fastjet.JetDefinition(fastjet.antikt_algorithm, 0.4)
dcluseq = fastjet.ClusterSequence(dpfcands, djetdef)

events = uproot.open({testfile: "Events"}).arrays(
["PFCands_pt", "PFCands_eta", "PFCands_phi", "PFCands_mass"]
)

pfcands = ak.zip(
{
"pt": events.PFCands_pt,
"eta": events.PFCands_eta,
"phi": events.PFCands_phi,
"mass": events.PFCands_mass,
},
with_name="Momentum4D",
behavior=vector.backends.awkward.behavior,
)

jetdef = fastjet.JetDefinition(fastjet.antikt_algorithm, 0.4)
cluseq = fastjet.ClusterSequence(pfcands, jetdef)

vector = pytest.importorskip("vector")

data_dir = Path(__file__).parent / "samples"
testfile = data_dir / "pfnano_skim.root"

devents = uproot.dask({testfile: "Events"})

dpfcands = dak.zip(
{
"pt": devents.PFCands_pt,
"eta": devents.PFCands_eta,
"phi": devents.PFCands_phi,
"mass": devents.PFCands_mass,
},
with_name="Momentum4D",
behavior=vector.backends.awkward.behavior,
)

djetdef = fastjet.JetDefinition(fastjet.antikt_algorithm, 0.4)
dcluseq = fastjet.ClusterSequence(dpfcands, djetdef)

events = uproot.open({testfile: "Events"}).arrays(
["PFCands_pt", "PFCands_eta", "PFCands_phi", "PFCands_mass"]
)

pfcands = ak.zip(
{
"pt": events.PFCands_pt,
"eta": events.PFCands_eta,
"phi": events.PFCands_phi,
"mass": events.PFCands_mass,
},
with_name="Momentum4D",
behavior=vector.backends.awkward.behavior,
)

jetdef = fastjet.JetDefinition(fastjet.antikt_algorithm, 0.4)
cluseq = fastjet.ClusterSequence(pfcands, jetdef)

assert_eq(dcluseq.inclusive_jets(), cluseq.inclusive_jets())
assert_eq(dcluseq.inclusive_jets(), cluseq.inclusive_jets())

0 comments on commit a390d20

Please sign in to comment.