Skip to content

Commit

Permalink
Try to skip BlockingIOError
Browse files Browse the repository at this point in the history
  • Loading branch information
jan-janssen committed Nov 20, 2024
1 parent c0d3642 commit fe2bbb7
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 303 deletions.
61 changes: 35 additions & 26 deletions executorlib/standalone/hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def dump(file_name: str, data_dict: dict) -> None:
"output": "output",
"queue_id": "queue_id",
}
with h5py.File(file_name, "a") as fname:
with h5py.File(name=file_name, mode="a") as fname:
for data_key, data_value in data_dict.items():
if data_key in group_dict.keys():
fname.create_dataset(
Expand All @@ -39,21 +39,24 @@ def load(file_name: str) -> dict:
Returns:
dict: dictionary containing the python function to be executed {"fn": ..., "args": (), "kwargs": {}}
"""
with h5py.File(file_name, "r") as hdf:
data_dict = {}
if "function" in hdf:
data_dict["fn"] = cloudpickle.loads(np.void(hdf["/function"]))
else:
raise TypeError("Function not found in HDF5 file.")
if "input_args" in hdf:
data_dict["args"] = cloudpickle.loads(np.void(hdf["/input_args"]))
else:
data_dict["args"] = ()
if "input_kwargs" in hdf:
data_dict["kwargs"] = cloudpickle.loads(np.void(hdf["/input_kwargs"]))
else:
data_dict["kwargs"] = {}
return data_dict
try:
with h5py.File(name=file_name, mode="r") as hdf:
data_dict = {}
if "function" in hdf:
data_dict["fn"] = cloudpickle.loads(np.void(hdf["/function"]))
else:
raise TypeError("Function not found in HDF5 file.")
if "input_args" in hdf:
data_dict["args"] = cloudpickle.loads(np.void(hdf["/input_args"]))
else:
data_dict["args"] = ()
if "input_kwargs" in hdf:
data_dict["kwargs"] = cloudpickle.loads(np.void(hdf["/input_kwargs"]))
else:
data_dict["kwargs"] = {}
return data_dict
except BlockingIOError:
return load(file_name=file_name)


def get_output(file_name: str) -> Tuple[bool, object]:
Expand All @@ -66,16 +69,22 @@ def get_output(file_name: str) -> Tuple[bool, object]:
Returns:
Tuple[bool, object]: boolean flag indicating if output is available and the output object itself
"""
with h5py.File(file_name, "r") as hdf:
if "output" in hdf:
return True, cloudpickle.loads(np.void(hdf["/output"]))
else:
return False, None
try:
with h5py.File(name=file_name, mode="r") as hdf:
if "output" in hdf:
return True, cloudpickle.loads(np.void(hdf["/output"]))
else:
return False, None
except BlockingIOError:
return get_output(file_name=file_name)


def get_queue_id(file_name: str) -> Optional[int]:
with h5py.File(file_name, "r") as hdf:
if "queue_id" in hdf:
return cloudpickle.loads(np.void(hdf["/queue_id"]))
else:
return None
try:
with h5py.File(name=file_name, mode="r") as hdf:
if "queue_id" in hdf:
return cloudpickle.loads(np.void(hdf["/queue_id"]))
else:
return None
except BlockingIOError:
return get_queue_id(file_name=file_name)
Loading

0 comments on commit fe2bbb7

Please sign in to comment.