Skip to content

Commit

Permalink
Issue #656: make sure CsvJobDatabase uses expected dtypes
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Nov 13, 2024
1 parent 803fa2b commit 89f6fe1
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `MultiBackendJobManager`: Fix issue with duplicate job starting across multiple backends ([#654](https://github.com/Open-EO/openeo-python-client/pull/654))
- `MultiBackendJobManager`: Fix encoding issue of job metadata in `on_job_done` ([#657](https://github.com/Open-EO/openeo-python-client/issues/657))
- `MultiBackendJobManager`: Avoid `SettingWithCopyWarning` ([#641](https://github.com/Open-EO/openeo-python-client/issues/641))
- `MultiBackendJobManager`: avoid dtype loading mistakes in `CsvJobDatabase` on empty columns ([#656](https://github.com/Open-EO/openeo-python-client/issues/656))


## [0.34.0] - 2024-10-31
Expand Down
55 changes: 35 additions & 20 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import abc
import collections
import contextlib
import dataclasses
import datetime
import json
import logging
Expand All @@ -9,7 +10,7 @@
import warnings
from pathlib import Path
from threading import Thread
from typing import Callable, Dict, List, NamedTuple, Optional, Union
from typing import Any, Callable, Dict, List, Mapping, NamedTuple, Optional, Union

import numpy
import pandas as pd
Expand Down Expand Up @@ -104,6 +105,14 @@ def _start_job_default(row: pd.Series, connection: Connection, *args, **kwargs):
raise NotImplementedError("No 'start_job' callable provided")


@dataclasses.dataclass(frozen=True)
class _ColumnProperties:
"""Expected/required properties of a column in the job manager related dataframes"""

dtype: str = "object"
default: Any = None


class MultiBackendJobManager:
"""
Tracker for multiple jobs on multiple backends.
Expand Down Expand Up @@ -171,6 +180,24 @@ def start_job(
Added ``cancel_running_job_after`` parameter.
"""

# Expected columns in the job DB dataframes.
# Mapping of column name to (dtype, default value)
# TODO: make this part of public API when settled?
_COLUMN_REQUIREMENTS: Mapping[str, _ColumnProperties] = {
"id": _ColumnProperties(dtype="str"),
"backend_name": _ColumnProperties(dtype="str"),
"status": _ColumnProperties(dtype="str", default="not_started"),
# TODO: use proper date/time dtype instead of leagacy str for start times?
"start_time": _ColumnProperties(dtype="str"),
"running_start_time": _ColumnProperties(dtype="str"),
# TODO: these columns "cpu", "memory", "duration" are not referenced explicitly from MultiBackendJobManager,
# but are indirectly coupled through handling of VITO-specific "usage" metadata in `_track_statuses`.
# Since bfd99e34 they are not really required to be present anymore, can we make that more explicit?
"cpu": _ColumnProperties(dtype="str"),
"memory": _ColumnProperties(dtype="str"),
"duration": _ColumnProperties(dtype="str"),
}

def __init__(
self,
poll_sleep: int = 60,
Expand Down Expand Up @@ -267,31 +294,16 @@ def _make_resilient(connection):
connection.session.mount("https://", HTTPAdapter(max_retries=retries))
connection.session.mount("http://", HTTPAdapter(max_retries=retries))

@staticmethod
def _normalize_df(df: pd.DataFrame) -> pd.DataFrame:
@classmethod
def _normalize_df(cls, df: pd.DataFrame) -> pd.DataFrame:
"""
Normalize given pandas dataframe (creating a new one):
ensure we have the required columns.
:param df: The dataframe to normalize.
:return: a new dataframe that is normalized.
"""
# check for some required columns.
required_with_default = [
("status", "not_started"),
("id", None),
("start_time", None),
("running_start_time", None),
# TODO: columns "cpu", "memory", "duration" are not referenced directly
# within MultiBackendJobManager making it confusing to claim they are required.
# However, they are through assumptions about job "usage" metadata in `_track_statuses`.
# => proposed solution: allow to configure usage columns when adding a backend
("cpu", None),
("memory", None),
("duration", None),
("backend_name", None),
]
new_columns = {col: val for (col, val) in required_with_default if col not in df.columns}
new_columns = {col: req.default for (col, req) in cls._COLUMN_REQUIREMENTS.items() if col not in df.columns}
df = df.assign(**new_columns)

return df
Expand Down Expand Up @@ -832,7 +844,10 @@ def _is_valid_wkt(self, wkt: str) -> bool:
return False

def read(self) -> pd.DataFrame:
df = pd.read_csv(self.path)
df = pd.read_csv(
self.path,
dtype={c: r.dtype for (c, r) in MultiBackendJobManager._COLUMN_REQUIREMENTS.items()},
)
if (
"geometry" in df.columns
and df["geometry"].dtype.name != "geometry"
Expand Down
31 changes: 31 additions & 0 deletions tests/extra/test_job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,37 @@ def test_automatic_cancel_of_too_long_running_jobs(

assert fake_backend.cancel_job_mock.called == (expected_status == "canceled")

def test_empty_csv_handling(self, tmp_path, requests_mock, sleep_mock, recwarn):
"""
Check how starting from an empty CSV is handled: will columns accepts string values without warning/error?
"""
manager = self._create_basic_mocked_manager(requests_mock, tmp_path)

df = pd.DataFrame({"year": [2021, 2022]})

job_db_path = tmp_path / "jobs.csv"
# Initialize job db and trigger writing it to CSV file
_ = CsvJobDatabase(job_db_path).initialize_from_df(df)

assert job_db_path.exists()
# Simple check for empty columns in the CSV file
assert ",,,,," in job_db_path.read_text()

# Start over with existing file
job_db = CsvJobDatabase(job_db_path)

def start_job(row, connection, **kwargs):
year = int(row["year"])
return BatchJob(job_id=f"job-{year}", connection=connection)

run_stats = manager.run_jobs(job_db=job_db, start_job=start_job)
assert run_stats == dirty_equals.IsPartialDict({"start_job call": 2, "job finished": 2})

result = pd.read_csv(job_db_path)
assert [(r.id, r.status) for r in result.itertuples()] == [("job-2021", "finished"), ("job-2022", "finished")]

assert [(w.category, w.message, str(w)) for w in recwarn.list] == []




Expand Down

0 comments on commit 89f6fe1

Please sign in to comment.