Skip to content

Commit

Permalink
Issue #656: make sure CsvJobDatabase uses expected dtypes
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Nov 13, 2024
1 parent 0d4caba commit 4989d4b
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `MultiBackendJobManager`: Fix issue with duplicate job starting across multiple backends ([#654](https://github.com/Open-EO/openeo-python-client/pull/654))
- `MultiBackendJobManager`: Fix encoding issue of job metadata in `on_job_done` ([#657](https://github.com/Open-EO/openeo-python-client/issues/657))
- `MultiBackendJobManager`: Avoid `SettingWithCopyWarning` ([#641](https://github.com/Open-EO/openeo-python-client/issues/641))
- `MultiBackendJobManager`: avoid dtype loading mistakes in `CsvJobDatabase` on empty columns ([#656](https://github.com/Open-EO/openeo-python-client/issues/656))


## [0.34.0] - 2024-10-31
Expand Down
55 changes: 35 additions & 20 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import abc
import collections
import contextlib
import dataclasses
import datetime
import json
import logging
Expand All @@ -9,7 +10,7 @@
import warnings
from pathlib import Path
from threading import Thread
from typing import Callable, Dict, List, NamedTuple, Optional, Union
from typing import Any, Callable, Dict, List, Mapping, NamedTuple, Optional, Union

import numpy
import pandas as pd
Expand Down Expand Up @@ -104,6 +105,14 @@ def _start_job_default(row: pd.Series, connection: Connection, *args, **kwargs):
raise NotImplementedError("No 'start_job' callable provided")


@dataclasses.dataclass(frozen=True)
class _ColumnProperties:
"""Expected/required properties of a column in the job manager related dataframes"""

dtype: str = "object"
default: Any = None


class MultiBackendJobManager:
"""
Tracker for multiple jobs on multiple backends.
Expand Down Expand Up @@ -171,6 +180,23 @@ def start_job(
Added ``cancel_running_job_after`` parameter.
"""

# Expected columns in the job DB dataframes.
# TODO: make this part of public API when settled?
_COLUMN_REQUIREMENTS: Mapping[str, _ColumnProperties] = {
"id": _ColumnProperties(dtype="str"),
"backend_name": _ColumnProperties(dtype="str"),
"status": _ColumnProperties(dtype="str", default="not_started"),
# TODO: use proper date/time dtype instead of legacy str for start times?
"start_time": _ColumnProperties(dtype="str"),
"running_start_time": _ColumnProperties(dtype="str"),
# TODO: these columns "cpu", "memory", "duration" are not referenced explicitly from MultiBackendJobManager,
# but are indirectly coupled through handling of VITO-specific "usage" metadata in `_track_statuses`.
# Since bfd99e34 they are not really required to be present anymore, can we make that more explicit?
"cpu": _ColumnProperties(dtype="str"),
"memory": _ColumnProperties(dtype="str"),
"duration": _ColumnProperties(dtype="str"),
}

def __init__(
self,
poll_sleep: int = 60,
Expand Down Expand Up @@ -267,31 +293,16 @@ def _make_resilient(connection):
connection.session.mount("https://", HTTPAdapter(max_retries=retries))
connection.session.mount("http://", HTTPAdapter(max_retries=retries))

@staticmethod
def _normalize_df(df: pd.DataFrame) -> pd.DataFrame:
@classmethod
def _normalize_df(cls, df: pd.DataFrame) -> pd.DataFrame:
"""
Normalize given pandas dataframe (creating a new one):
ensure we have the required columns.
:param df: The dataframe to normalize.
:return: a new dataframe that is normalized.
"""
# check for some required columns.
required_with_default = [
("status", "not_started"),
("id", None),
("start_time", None),
("running_start_time", None),
# TODO: columns "cpu", "memory", "duration" are not referenced directly
# within MultiBackendJobManager making it confusing to claim they are required.
# However, they are through assumptions about job "usage" metadata in `_track_statuses`.
# => proposed solution: allow to configure usage columns when adding a backend
("cpu", None),
("memory", None),
("duration", None),
("backend_name", None),
]
new_columns = {col: val for (col, val) in required_with_default if col not in df.columns}
new_columns = {col: req.default for (col, req) in cls._COLUMN_REQUIREMENTS.items() if col not in df.columns}
df = df.assign(**new_columns)

return df
Expand Down Expand Up @@ -832,7 +843,11 @@ def _is_valid_wkt(self, wkt: str) -> bool:
return False

def read(self) -> pd.DataFrame:
df = pd.read_csv(self.path)
df = pd.read_csv(
self.path,
# TODO: possible to avoid hidden coupling with MultiBackendJobManager here?
dtype={c: r.dtype for (c, r) in MultiBackendJobManager._COLUMN_REQUIREMENTS.items()},
)
if (
"geometry" in df.columns
and df["geometry"].dtype.name != "geometry"
Expand Down
34 changes: 33 additions & 1 deletion tests/extra/test_job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ def start_job(row, connection, **kwargs):
assert metadata_path.exists()

def _create_basic_mocked_manager(self, requests_mock, tmp_path):
# TODO: separate aspects of job manager and dummy backends
# TODO: separate aspects of job manager and dummy backends (e.g. reuse DummyBackend here)
requests_mock.get("http://foo.test/", json={"api_version": "1.1.0"})
requests_mock.get("http://bar.test/", json={"api_version": "1.1.0"})

Expand Down Expand Up @@ -702,6 +702,38 @@ def test_automatic_cancel_of_too_long_running_jobs(

assert fake_backend.cancel_job_mock.called == (expected_status == "canceled")

def test_empty_csv_handling(self, tmp_path, requests_mock, sleep_mock, recwarn):
"""
Check how starting from an empty CSV is handled:
will empty columns accepts string values without warning/error?
"""
manager = self._create_basic_mocked_manager(requests_mock, tmp_path)

df = pd.DataFrame({"year": [2021, 2022]})

job_db_path = tmp_path / "jobs.csv"
# Initialize job db and trigger writing it to CSV file
_ = CsvJobDatabase(job_db_path).initialize_from_df(df)

assert job_db_path.exists()
# Simple check for empty columns in the CSV file
assert ",,,,," in job_db_path.read_text()

# Start over with existing file
job_db = CsvJobDatabase(job_db_path)

def start_job(row, connection, **kwargs):
year = int(row["year"])
return BatchJob(job_id=f"job-{year}", connection=connection)

run_stats = manager.run_jobs(job_db=job_db, start_job=start_job)
assert run_stats == dirty_equals.IsPartialDict({"start_job call": 2, "job finished": 2})

result = pd.read_csv(job_db_path)
assert [(r.id, r.status) for r in result.itertuples()] == [("job-2021", "finished"), ("job-2022", "finished")]

assert [(w.category, w.message, str(w)) for w in recwarn.list] == []




Expand Down

0 comments on commit 4989d4b

Please sign in to comment.