Skip to content

Commit

Permalink
wip: more small fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
JHolba committed Aug 12, 2024
1 parent dec9295 commit 8f47cbd
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 28 deletions.
3 changes: 2 additions & 1 deletion src/ert/ensemble_evaluator/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ async def handle_dispatch(self, websocket: WebSocketServerProtocol) -> None:
continue
try:
if event["type"] == EVTYPE_FORWARD_MODEL_CHECKSUM:
await self._messages_to_send.put(msg)
event["data"] = {event["run_path"]: event["data"]}
await self._messages_to_send.put(orjson.dumps(event))
else:
await self._events.put(event)
except BaseException as ex:
Expand Down
2 changes: 0 additions & 2 deletions src/ert/ensemble_evaluator/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,6 @@ async def _receiver(self) -> None:
self._connection = conn
self._connected.set()
async for message in self._connection:
if not message or len(message) == 0:
logger.error("oh noes!")
event = orjson.loads(message)
await self._event_queue.put(event)
except (ConnectionRefusedError, ConnectionClosed, ClientError) as exc:
Expand Down
4 changes: 2 additions & 2 deletions src/ert/ensemble_evaluator/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ def from_cloudevent(self, event: Dict) -> "PartialSnapshot":
end_time = convert_iso8601_to_datetime(timestamp)
# Make sure error msg from previous failed run is replaced
error = ""
if event.get("data") is not None:
error = event["data"].get(ids.ERROR_MSG)
if event_data := event.get("data"):
error = event_data.get(ids.ERROR_MSG, "")

fm = ForwardModel(
**_filter_nones( # type: ignore
Expand Down
29 changes: 12 additions & 17 deletions src/ert/gui/model/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,22 +204,17 @@ def _add_partial_snapshot(self, partial: PartialSnapshot, iter_: str) -> None:
real_node = iter_node.children[real_id]
job_node = real_node.children[forward_model_id]

if "start_time" in job:
job["start_time"] = convert_iso8601_to_datetime(job["start_time"])
if "end_time" in job:
job["end_time"] = convert_iso8601_to_datetime(job["end_time"])
if start_time := job.get("start_time", None):
job["start_time"] = convert_iso8601_to_datetime(start_time)
if end_time := job.get("end_time", None):
job["end_time"] = convert_iso8601_to_datetime(end_time)
# Errors may be unset as the queue restarts the job
job[ids.ERROR] = job.get(ids.ERROR, "")
job_node.data.update(job)
if (
"current_memory_usage" in job
and job["current_memory_usage"] is not None
):
cur_mem_usage = int(float(job["current_memory_usage"]))
real_node.data.current_memory_usage = cur_mem_usage
if "max_memory_usage" in job and job["max_memory_usage"] is not None:
max_mem_usage = int(float(job["max_memory_usage"]))

if cur_mem_usage := job.get("current_memory_usage", None):
real_node.data.current_memory_usage = int(float(cur_mem_usage))
if max_mem_usage := job.get("max_memory_usage", None):
max_mem_usage = int(float(max_mem_usage))
real_node.data.max_memory_usage = max(
real_node.data.max_memory_usage or 0, max_mem_usage
)
Expand Down Expand Up @@ -277,10 +272,10 @@ def _add_snapshot(self, snapshot: Snapshot, iter_: str) -> None:
"sorted_forward_model_ids", defaultdict(None)
)[real_id]:
job = snapshot.get_job(real_id, forward_model_id)
if "start_time" in job:
job["start_time"] = convert_iso8601_to_datetime(job["start_time"])
if "end_time" in job:
job["end_time"] = convert_iso8601_to_datetime(job["end_time"])
if start_time := job.get("start_time", None):
job["start_time"] = convert_iso8601_to_datetime(start_time)
if end_time := job.get("end_time", None):
job["end_time"] = convert_iso8601_to_datetime(end_time)
job_node = ForwardModelStepNode(
id_=forward_model_id, data=job, parent=real_node
)
Expand Down
6 changes: 0 additions & 6 deletions src/ert/serialization/__init__.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ async def evaluate(self, config):

event_id += 1
for real in range(0, self.test_reals):
real = str(real)
job_failed = False

await send_dispatch_event(
Expand All @@ -108,6 +109,7 @@ async def evaluate(self, config):
)
event_id += 1
for job in range(0, self.jobs):
job = str(job)
await send_dispatch_event(
client=dispatch,
type=identifiers.EVTYPE_FORWARD_MODEL_RUNNING,
Expand Down

0 comments on commit 8f47cbd

Please sign in to comment.