Skip to content

Commit

Permalink
Merge pull request #623 from CoffeaTeam/more-exe-fixes
Browse files Browse the repository at this point in the history
More exe fixes
  • Loading branch information
lgray authored Dec 6, 2021
2 parents 1fc17be + 5ed0f15 commit 663739d
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 65 deletions.
79 changes: 30 additions & 49 deletions coffea/processor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1034,57 +1034,38 @@ def metadata_fetcher(
return out

def _preprocess_fileset(self, fileset: Dict) -> None:
if self.maxchunks is None:
# this is a bit of an abuse of map-reduce but ok
to_get = set(
filemeta
for filemeta in fileset
if not filemeta.populated(clusters=self.align_clusters)
# this is a bit of an abuse of map-reduce but ok
to_get = set(
filemeta
for filemeta in fileset
if not filemeta.populated(clusters=self.align_clusters)
)
if len(to_get) > 0:
out = set_accumulator()
pre_arg_override = {
"function_name": "get_metadata",
"desc": "Preprocessing",
"unit": "file",
"compression": None,
}
if isinstance(self.pre_executor, (FuturesExecutor, ParslExecutor)):
pre_arg_override.update({"tailtimeout": None})
if isinstance(self.pre_executor, (DaskExecutor)):
self.pre_executor.heavy_input = None
pre_arg_override.update({"worker_affinity": False})
pre_executor = self.pre_executor.copy(**pre_arg_override)
closure = partial(
self.automatic_retries,
self.retries,
self.skipbadfiles,
partial(self.metadata_fetcher, self.xrootdtimeout, self.align_clusters),
)
if len(to_get) > 0:
out = set_accumulator()
pre_arg_override = {
"function_name": "get_metadata",
"desc": "Preprocessing",
"unit": "file",
"compression": None,
}
if isinstance(self.pre_executor, (FuturesExecutor, ParslExecutor)):
pre_arg_override.update({"tailtimeout": None})
if isinstance(self.pre_executor, (DaskExecutor)):
self.pre_executor.heavy_input = None
pre_arg_override.update({"worker_affinity": False})
pre_executor = self.pre_executor.copy(**pre_arg_override)
closure = partial(
self.automatic_retries,
self.retries,
self.skipbadfiles,
partial(
self.metadata_fetcher, self.xrootdtimeout, self.align_clusters
),
)
out = pre_executor(to_get, closure, out)
while out:
item = out.pop()
self.metadata_cache[item] = item.metadata
for filemeta in fileset:
filemeta.maybe_populate(self.metadata_cache)
else:
out = pre_executor(to_get, closure, out)
while out:
item = out.pop()
self.metadata_cache[item] = item.metadata
for filemeta in fileset:
# not sure why need to check for bad files here... otherwise pop fails below with pytest.
# fmt: off
if self.skipbadfiles and not filemeta.populated(clusters=self.align_clusters): # noqa # fmt: skip <-- gets ignored, see: https://github.com/psf/black/issues/2421
continue
# fmt: on
if not filemeta.populated(clusters=self.align_clusters):
filemeta.metadata = (
self.metadata_fetcher(
self.xrootdtimeout, self.align_clusters, filemeta
)
.pop()
.metadata
)
self.metadata_cache[filemeta] = filemeta.metadata
filemeta.maybe_populate(self.metadata_cache)

def _filter_badfiles(self, fileset: Dict) -> List:
final_fileset = []
Expand Down
1 change: 0 additions & 1 deletion coffea/processor/test_items/NanoTestProcessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ def process(self, df):
output = self.accumulator.identity()

dataset = df.metadata["dataset"]
print(df.metadata)
if "checkusermeta" in df.metadata:
metaname, metavalue = self.expected_usermeta[dataset]
assert metavalue == df.metadata[metaname]
Expand Down
46 changes: 31 additions & 15 deletions tests/test_local_executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
pytest.skip("skipping tests that only function in linux", allow_module_level=True)


@pytest.mark.parametrize("maxchunks", [10, None])
@pytest.mark.parametrize("skipbadfiles", [True, False])
@pytest.mark.parametrize("maxchunks", [1, None])
@pytest.mark.parametrize("chunksize", [100000, 5])
@pytest.mark.parametrize("schema", [None, schemas.BaseSchema])
@pytest.mark.parametrize(
"executor", [processor.IterativeExecutor, processor.FuturesExecutor]
)
def test_dataframe_analysis(executor, schema, chunksize, maxchunks):
def test_dataframe_analysis(executor, schema, chunksize, maxchunks, skipbadfiles):
from coffea.processor.test_items import NanoTestProcessor

filelist = {
Expand All @@ -24,23 +25,35 @@ def test_dataframe_analysis(executor, schema, chunksize, maxchunks):

executor = executor()
run = processor.Runner(
executor=executor, schema=schema, chunksize=chunksize, maxchunks=maxchunks
executor=executor,
schema=schema,
chunksize=chunksize,
maxchunks=maxchunks,
skipbadfiles=skipbadfiles,
)

hists = run(filelist, "Events", processor_instance=NanoTestProcessor())

assert hists["cutflow"]["ZJets_pt"] == 18
assert hists["cutflow"]["ZJets_mass"] == 6
assert hists["cutflow"]["Data_pt"] == 84
assert hists["cutflow"]["Data_mass"] == 66
if maxchunks is None:
assert hists["cutflow"]["ZJets_pt"] == 18
assert hists["cutflow"]["ZJets_mass"] == 6
assert hists["cutflow"]["Data_pt"] == 84
assert hists["cutflow"]["Data_mass"] == 66
else:
assert maxchunks == 1
assert hists["cutflow"]["ZJets_pt"] == 18 if chunksize == 100_000 else 2
assert hists["cutflow"]["ZJets_mass"] == 6 if chunksize == 100_000 else 1
assert hists["cutflow"]["Data_pt"] == 84 if chunksize == 100_000 else 13
assert hists["cutflow"]["Data_mass"] == 66 if chunksize == 100_000 else 12


@pytest.mark.parametrize("skipbadfiles", [True, False])
@pytest.mark.parametrize("maxchunks", [None, 1000])
@pytest.mark.parametrize("compression", [None, 0, 2])
@pytest.mark.parametrize(
"executor", [processor.IterativeExecutor, processor.FuturesExecutor]
)
def test_nanoevents_analysis(executor, compression, maxchunks):
def test_nanoevents_analysis(executor, compression, maxchunks, skipbadfiles):
from coffea.processor.test_items import NanoEventsProcessor

filelist = {
Expand All @@ -63,14 +76,17 @@ def test_nanoevents_analysis(executor, compression, maxchunks):
executor = executor(compression=compression)
run = processor.Runner(
executor=executor,
skipbadfiles=True,
skipbadfiles=skipbadfiles,
schema=processor.NanoAODSchema,
maxchunks=maxchunks,
)

hists = run(filelist, "Events", processor_instance=NanoEventsProcessor())

assert hists["cutflow"]["ZJets_pt"] == 18
assert hists["cutflow"]["ZJets_mass"] == 6
assert hists["cutflow"]["Data_pt"] == 84
assert hists["cutflow"]["Data_mass"] == 66
if skipbadfiles:
hists = run(filelist, "Events", processor_instance=NanoEventsProcessor())
assert hists["cutflow"]["ZJets_pt"] == 18
assert hists["cutflow"]["ZJets_mass"] == 6
assert hists["cutflow"]["Data_pt"] == 84
assert hists["cutflow"]["Data_mass"] == 66
else:
with pytest.raises(FileNotFoundError):
hists = run(filelist, "Events", processor_instance=NanoEventsProcessor())

0 comments on commit 663739d

Please sign in to comment.