diff --git a/coffea/processor/executor.py b/coffea/processor/executor.py index d05649140..57f463d49 100644 --- a/coffea/processor/executor.py +++ b/coffea/processor/executor.py @@ -2,12 +2,14 @@ import concurrent.futures from functools import partial from itertools import repeat +import os import time import pickle import sys import math import json import cloudpickle +import toml import uproot import uuid import warnings @@ -902,9 +904,17 @@ class Runner: cachestrategy: Optional[Union[Literal["dask-worker"], Callable[..., MutableMapping]]] = None # fmt: skip processor_compression: int = 1 use_skyhook: Optional[bool] = False - skyhook_options: Optional[Dict] = field(default_factory=dict) format: str = "root" + @staticmethod + def read_coffea_config(): + config_path = os.path.join(os.environ["HOME"], ".coffea.toml") + if os.path.exists(config_path): + with open(config_path) as f: + return toml.loads(f.read()) + else: + return dict() + def __post_init__(self): if self.pre_executor is None: self.pre_executor = self.executor @@ -1098,6 +1108,7 @@ def _filter_badfiles(self, fileset: Dict) -> List: return final_fileset def _chunk_generator(self, fileset: Dict, treename: str) -> Generator: + config = Runner.read_coffea_config() if self.format == "root": if self.maxchunks is None: last_chunksize = self.chunksize @@ -1123,6 +1134,10 @@ def _chunk_generator(self, fileset: Dict, treename: str) -> Generator: break yield from iter(chunks) else: + if not config.get("skyhook", None): + print("No skyhook config found, using defaults") + config["skyhook"] = dict() + import pyarrow.dataset as ds dataset_filelist_map = {} @@ -1134,10 +1149,10 @@ def _chunk_generator(self, fileset: Dict, treename: str) -> Generator: for filename in filelist: # If skyhook config is provided and is not empty, if self.use_skyhook: - ceph_config_path = self.skyhook_options.get( + ceph_config_path = config["skyhook"].get( "ceph_config_path", "/etc/ceph/ceph.conf" ) - ceph_data_pool = self.skyhook_options.get( + ceph_data_pool = config["skyhook"].get( "ceph_data_pool", "cephfs_data" ) filename = f"{ceph_config_path}:{ceph_data_pool}:{filename}" diff --git a/setup.py b/setup.py index e03dfda5b..36d9fcb42 100644 --- a/setup.py +++ b/setup.py @@ -70,6 +70,7 @@ def get_description(): "tqdm>=4.27.0", "lz4", "cloudpickle>=1.2.3", + "toml>=0.10.2", "mplhep>=0.1.18", "packaging", "pandas", diff --git a/tests/test_skyhook_job.py b/tests/test_skyhook_job.py index 133d2e6d2..a7842ea90 100644 --- a/tests/test_skyhook_job.py +++ b/tests/test_skyhook_job.py @@ -1,4 +1,5 @@ import os +import toml import uproot import awkward as ak from coffea import processor @@ -7,6 +8,15 @@ if __name__ == "__main__": + config_dict = { + "skyhook": { + "ceph_config_path": "/tmp/testskyhookjob/ceph.conf", + "ceph_data_pool": "cephfs_data", + } + } + with open("/root/.coffea.toml", "w") as f: + toml.dump(config_dict, f) + ak.to_parquet( uproot.lazy("tests/samples/nano_dy.root:Events"), "nano_dy.parquet", @@ -45,7 +55,6 @@ run = processor.Runner( executor=executor, use_skyhook=True, - skyhook_options={"ceph_config_path": "/tmp/testskyhookjob/ceph.conf"}, format="parquet", schema=schemas.NanoAODSchema, )