Skip to content

Commit

Permalink
Merge pull request #648 from uccross/ux-revamp
Browse files Browse the repository at this point in the history
Read Coffea config from TOML file
  • Loading branch information
lgray authored Mar 16, 2022
2 parents 6e71e3a + 5376aeb commit a3ba69e
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 4 deletions.
21 changes: 18 additions & 3 deletions coffea/processor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
import concurrent.futures
from functools import partial
from itertools import repeat
import os
import time
import pickle
import sys
import math
import json
import cloudpickle
import toml
import uproot
import uuid
import warnings
Expand Down Expand Up @@ -902,9 +904,17 @@ class Runner:
cachestrategy: Optional[Union[Literal["dask-worker"], Callable[..., MutableMapping]]] = None # fmt: skip
processor_compression: int = 1
use_skyhook: Optional[bool] = False
skyhook_options: Optional[Dict] = field(default_factory=dict)
format: str = "root"

@staticmethod
def read_coffea_config():
config_path = os.path.join(os.environ["HOME"], ".coffea.toml")
if os.path.exists(config_path):
with open(config_path) as f:
return toml.loads(f.read())
else:
return dict()

def __post_init__(self):
if self.pre_executor is None:
self.pre_executor = self.executor
Expand Down Expand Up @@ -1098,6 +1108,7 @@ def _filter_badfiles(self, fileset: Dict) -> List:
return final_fileset

def _chunk_generator(self, fileset: Dict, treename: str) -> Generator:
config = Runner.read_coffea_config()
if self.format == "root":
if self.maxchunks is None:
last_chunksize = self.chunksize
Expand All @@ -1123,6 +1134,10 @@ def _chunk_generator(self, fileset: Dict, treename: str) -> Generator:
break
yield from iter(chunks)
else:
if not config.get("skyhook", None):
print("No skyhook config found, using defaults")
config["skyhook"] = dict()

import pyarrow.dataset as ds

dataset_filelist_map = {}
Expand All @@ -1134,10 +1149,10 @@ def _chunk_generator(self, fileset: Dict, treename: str) -> Generator:
for filename in filelist:
# If skyhook config is provided and is not empty,
if self.use_skyhook:
ceph_config_path = self.skyhook_options.get(
ceph_config_path = config["skyhook"].get(
"ceph_config_path", "/etc/ceph/ceph.conf"
)
ceph_data_pool = self.skyhook_options.get(
ceph_data_pool = config["skyhook"].get(
"ceph_data_pool", "cephfs_data"
)
filename = f"{ceph_config_path}:{ceph_data_pool}:{filename}"
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def get_description():
"tqdm>=4.27.0",
"lz4",
"cloudpickle>=1.2.3",
"toml>=0.10.2",
"mplhep>=0.1.18",
"packaging",
"pandas",
Expand Down
11 changes: 10 additions & 1 deletion tests/test_skyhook_job.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import toml
import uproot
import awkward as ak
from coffea import processor
Expand All @@ -7,6 +8,15 @@


if __name__ == "__main__":
config_dict = {
"skyhook": {
"ceph_config_path": "/tmp/testskyhookjob/ceph.conf",
"ceph_data_pool": "cephfs_data",
}
}
with open("/root/.coffea.toml", "w") as f:
toml.dump(config_dict, f)

ak.to_parquet(
uproot.lazy("tests/samples/nano_dy.root:Events"),
"nano_dy.parquet",
Expand Down Expand Up @@ -45,7 +55,6 @@
run = processor.Runner(
executor=executor,
use_skyhook=True,
skyhook_options={"ceph_config_path": "/tmp/testskyhookjob/ceph.conf"},
format="parquet",
schema=schemas.NanoAODSchema,
)
Expand Down

0 comments on commit a3ba69e

Please sign in to comment.