Skip to content

Commit

Permalink
UTC datetime handling improvements (#716)
Browse files Browse the repository at this point in the history
Addresses some deprecation warnings about use of `datetime.utcnow()`.

Additionally, sqlite does not store timezone info even if provided, so
retrieving the data may be interpreted according to the host machine's
*local* timezone, which may or may not be UTC.

To mitigate this, this change ensures that all timestamps are 
1. first converted to UTC before storing into the DB,
2. converted (or augmented with zoneinfo) to UTC on retrieval

Additionally, we expand the tests to check for this behavior, first with
some additional conversion matrixes when telemetry or status data is
received in implicit local vs. explicit timezone data as well as
executions where the implicit local timezone has be overridden with a
`TZ` environment variable, to simulate different default timezone hosts.

Closes #718

---------

Co-authored-by: Brian Kroth <bpkroth@microsoft.com>
Co-authored-by: Brian Kroth <bpkroth@users.noreply.github.com>
  • Loading branch information
3 people authored Mar 19, 2024
1 parent 020add2 commit 647e3a2
Show file tree
Hide file tree
Showing 22 changed files with 371 additions and 131 deletions.
4 changes: 3 additions & 1 deletion mlos_bench/mlos_bench/environments/base_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Type, TYPE_CHECKING, Union
from typing_extensions import Literal

from pytz import UTC

from mlos_bench.config.schemas import ConfigSchema
from mlos_bench.dict_templater import DictTemplater
from mlos_bench.environments.status import Status
Expand Down Expand Up @@ -411,7 +413,7 @@ def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]:
"""
# Make sure we create a context before invoking setup/run/status/teardown
assert self._in_context
timestamp = datetime.utcnow()
timestamp = datetime.now(UTC)
if self._is_ready:
return (Status.READY, timestamp, [])
_LOG.warning("Environment not ready: %s", self)
Expand Down
38 changes: 3 additions & 35 deletions mlos_bench/mlos_bench/environments/local/local_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple, Type, Union
from typing_extensions import Literal

import pytz
import pandas

from mlos_bench.environments.status import Status
Expand All @@ -28,7 +27,7 @@
from mlos_bench.services.types.local_exec_type import SupportsLocalExec
from mlos_bench.tunables.tunable import TunableValue
from mlos_bench.tunables.tunable_groups import TunableGroups
from mlos_bench.util import path_join
from mlos_bench.util import datetime_parser, path_join

_LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -216,37 +215,6 @@ def _normalize_columns(data: pandas.DataFrame) -> pandas.DataFrame:
data.rename(str.rstrip, axis='columns', inplace=True)
return data

# All timestamps in the telemetry data must be greater than this date
# (a very rough approximation for the start of this feature).
_MIN_TS = datetime(2024, 1, 1, 0, 0, 0, tzinfo=pytz.UTC)

@staticmethod
def _datetime_parser(datetime_col: pandas.Series) -> pandas.Series:
"""
Attempt to convert a column to a datetime format.
Parameters
----------
datetime_col : pandas.Series
The column to convert.
Returns
-------
pandas.Series
The converted datetime column.
Raises
------
ValueError
On parse errors.
"""
new_datetime_col = pandas.to_datetime(datetime_col, utc=True)
if new_datetime_col.isna().any():
raise ValueError(f"Invalid date format in the telemetry data: {datetime_col}")
if new_datetime_col.le(LocalEnv._MIN_TS).any():
raise ValueError(f"Invalid date range in the telemetry data: {datetime_col}")
return new_datetime_col

def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]:

(status, timestamp, _) = super().status()
Expand All @@ -264,7 +232,7 @@ def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]:

data = self._normalize_columns(
pandas.read_csv(fname, index_col=False))
data.iloc[:, 0] = self._datetime_parser(data.iloc[:, 0])
data.iloc[:, 0] = datetime_parser(data.iloc[:, 0], origin="local")

expected_col_names = ["timestamp", "metric", "value"]
if len(data.columns) != len(expected_col_names):
Expand All @@ -274,7 +242,7 @@ def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]:
# Assume no header - this is ok for telemetry data.
data = pandas.read_csv(
fname, index_col=False, names=expected_col_names)
data.iloc[:, 0] = self._datetime_parser(data.iloc[:, 0])
data.iloc[:, 0] = datetime_parser(data.iloc[:, 0], origin="local")

except FileNotFoundError as ex:
_LOG.warning("Telemetry CSV file not found: %s :: %s", self._read_telemetry_file, ex)
Expand Down
4 changes: 3 additions & 1 deletion mlos_bench/mlos_bench/environments/remote/remote_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from datetime import datetime
from typing import Dict, Iterable, Optional, Tuple

from pytz import UTC

from mlos_bench.environments.status import Status
from mlos_bench.environments.script_env import ScriptEnv
from mlos_bench.services.base_service import Service
Expand Down Expand Up @@ -174,5 +176,5 @@ def _remote_exec(self, script: Iterable[str]) -> Tuple[Status, datetime, Optiona
(status, output) = self._remote_exec_service.get_remote_exec_results(output)
_LOG.debug("Status: %s :: %s", status, output)
# FIXME: get the timestamp from the remote environment!
timestamp = datetime.utcnow()
timestamp = datetime.now(UTC)
return (status, timestamp, output)
4 changes: 3 additions & 1 deletion mlos_bench/mlos_bench/schedulers/base_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from typing import Any, Dict, Optional, Tuple, Type
from typing_extensions import Literal

from pytz import UTC

from mlos_bench.environments.base_environment import Environment
from mlos_bench.optimizers.base_optimizer import Optimizer
from mlos_bench.storage.base_storage import Storage
Expand Down Expand Up @@ -231,7 +233,7 @@ def _run_schedule(self, running: bool = False) -> None:
Scheduler part of the loop. Check for pending trials in the queue and run them.
"""
assert self.experiment is not None
for trial in self.experiment.pending_trials(datetime.utcnow(), running=running):
for trial in self.experiment.pending_trials(datetime.now(UTC), running=running):
self.run_trial(trial)

@abstractmethod
Expand Down
4 changes: 3 additions & 1 deletion mlos_bench/mlos_bench/schedulers/sync_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import logging
from datetime import datetime

from pytz import UTC

from mlos_bench.environments.status import Status
from mlos_bench.schedulers.base_scheduler import Scheduler
from mlos_bench.storage.base_storage import Storage
Expand Down Expand Up @@ -49,7 +51,7 @@ def run_trial(self, trial: Storage.Trial) -> None:
_LOG.warning("Setup failed: %s :: %s", self.environment, trial.tunables)
# FIXME: Use the actual timestamp from the environment.
_LOG.info("QUEUE: Update trial results: %s :: %s", trial, Status.FAILED)
trial.update(Status.FAILED, datetime.utcnow())
trial.update(Status.FAILED, datetime.now(UTC))
return

(status, timestamp, results) = self.environment.run() # Block and wait for the final result.
Expand Down
10 changes: 6 additions & 4 deletions mlos_bench/mlos_bench/services/remote/azure/azure_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
A collection Service functions for managing VMs on Azure.
"""

import datetime
import logging
from base64 import b64decode
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Union

from pytz import UTC

import azure.identity as azure_id
from azure.keyvault.secrets import SecretClient

Expand Down Expand Up @@ -60,7 +62,7 @@ def __init__(self,
self._req_interval = float(self.config.get("tokenRequestInterval", self._REQ_INTERVAL))

self._access_token = "RENEW *NOW*"
self._token_expiration_ts = datetime.datetime.utcnow() # Typically, some future timestamp.
self._token_expiration_ts = datetime.now(UTC) # Typically, some future timestamp.

# Login as ourselves
self._cred: Union[azure_id.AzureCliCredential, azure_id.CertificateCredential]
Expand Down Expand Up @@ -113,12 +115,12 @@ def get_access_token(self) -> str:
if "spClientId" in self.config:
self._init_sp()

ts_diff = (self._token_expiration_ts - datetime.datetime.utcnow()).total_seconds()
ts_diff = (self._token_expiration_ts - datetime.now(UTC)).total_seconds()
_LOG.debug("Time to renew the token: %.2f sec.", ts_diff)
if ts_diff < self._req_interval:
_LOG.debug("Request new accessToken")
res = self._cred.get_token("https://management.azure.com/.default")
self._token_expiration_ts = datetime.datetime.utcfromtimestamp(res.expires_on)
self._token_expiration_ts = datetime.fromtimestamp(res.expires_on, tz=UTC)
self._access_token = res.token
_LOG.info("Got new accessToken. Expiration time: %s", self._token_expiration_ts)
return self._access_token
Expand Down
3 changes: 3 additions & 0 deletions mlos_bench/mlos_bench/storage/base_trial_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import Any, Dict, Optional, TYPE_CHECKING

import pandas
from pytz import UTC

from mlos_bench.environments.status import Status
from mlos_bench.tunables.tunable import TunableValue
Expand Down Expand Up @@ -38,6 +39,8 @@ def __init__(self, *,
self._experiment_id = experiment_id
self._trial_id = trial_id
self._tunable_config_id = tunable_config_id
assert ts_start.tzinfo == UTC, "ts_start must be in UTC"
assert ts_end is None or ts_end.tzinfo == UTC, "ts_end must be in UTC if not None"
self._ts_start = ts_start
self._ts_end = ts_end
self._status = status
Expand Down
25 changes: 20 additions & 5 deletions mlos_bench/mlos_bench/storage/sql/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from mlos_bench.storage.base_experiment_data import ExperimentData
from mlos_bench.storage.base_trial_data import TrialData
from mlos_bench.storage.sql.schema import DbSchema
from mlos_bench.util import utcify_timestamp, utcify_nullable_timestamp


def get_trials(
Expand Down Expand Up @@ -48,8 +49,8 @@ def get_trials(
experiment_id=experiment_id,
trial_id=trial.trial_id,
config_id=trial.config_id,
ts_start=trial.ts_start,
ts_end=trial.ts_end,
ts_start=utcify_timestamp(trial.ts_start, origin="utc"),
ts_end=utcify_nullable_timestamp(trial.ts_end, origin="utc"),
status=Status[trial.status],
)
for trial in trials.fetchall()
Expand Down Expand Up @@ -107,9 +108,23 @@ def get_results_df(
)
cur_trials = conn.execute(cur_trials_stmt)
trials_df = pandas.DataFrame(
[(row.trial_id, row.ts_start, row.ts_end, row.config_id, row.tunable_config_trial_group_id, row.status)
for row in cur_trials.fetchall()],
columns=['trial_id', 'ts_start', 'ts_end', 'tunable_config_id', 'tunable_config_trial_group_id', 'status'])
[(
row.trial_id,
utcify_timestamp(row.ts_start, origin="utc"),
utcify_nullable_timestamp(row.ts_end, origin="utc"),
row.config_id,
row.tunable_config_trial_group_id,
row.status,
) for row in cur_trials.fetchall()],
columns=[
'trial_id',
'ts_start',
'ts_end',
'tunable_config_id',
'tunable_config_trial_group_id',
'status',
]
)

# Get each trial's config in wide format.
configs_stmt = schema.trial.select().with_only_columns(
Expand Down
14 changes: 10 additions & 4 deletions mlos_bench/mlos_bench/storage/sql/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@
from datetime import datetime
from typing import Optional, Tuple, List, Dict, Iterator, Any

from pytz import UTC

from sqlalchemy import Engine, Connection, Table, column, func

from mlos_bench.environments.status import Status
from mlos_bench.tunables.tunable_groups import TunableGroups
from mlos_bench.storage.base_storage import Storage
from mlos_bench.storage.sql.schema import DbSchema
from mlos_bench.storage.sql.trial import Trial
from mlos_bench.util import nullable
from mlos_bench.util import nullable, utcify_timestamp

_LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -120,7 +122,9 @@ def load_telemetry(self, trial_id: int) -> List[Tuple[datetime, str, Any]]:
self._schema.trial_telemetry.c.metric_id,
)
)
return [(row.ts, row.metric_id, row.metric_value)
# Not all storage backends store the original zone info.
# We try to ensure data is entered in UTC and augment it on return again here.
return [(utcify_timestamp(row.ts, origin="utc"), row.metric_id, row.metric_value)
for row in cur_telemetry.fetchall()]

def load(self,
Expand Down Expand Up @@ -183,6 +187,7 @@ def _save_params(conn: Connection, table: Table,
])

def pending_trials(self, timestamp: datetime, *, running: bool) -> Iterator[Storage.Trial]:
timestamp = utcify_timestamp(timestamp, origin="local")
_LOG.info("Retrieve pending trials for: %s @ %s", self._experiment_id, timestamp)
if running:
pending_status = ['PENDING', 'READY', 'RUNNING']
Expand Down Expand Up @@ -238,15 +243,16 @@ def _get_config_id(self, conn: Connection, tunables: TunableGroups) -> int:

def new_trial(self, tunables: TunableGroups, ts_start: Optional[datetime] = None,
config: Optional[Dict[str, Any]] = None) -> Storage.Trial:
_LOG.debug("Create trial: %s:%d", self._experiment_id, self._trial_id)
ts_start = utcify_timestamp(ts_start or datetime.now(UTC), origin="local")
_LOG.debug("Create trial: %s:%d @ %s", self._experiment_id, self._trial_id, ts_start)
with self._engine.begin() as conn:
try:
config_id = self._get_config_id(conn, tunables)
conn.execute(self._schema.trial.insert().values(
exp_id=self._experiment_id,
trial_id=self._trial_id,
config_id=config_id,
ts_start=ts_start or datetime.utcnow(),
ts_start=ts_start,
status='PENDING',
))

Expand Down
4 changes: 2 additions & 2 deletions mlos_bench/mlos_bench/storage/sql/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def __init__(self, engine: Engine):
self._meta,
Column("exp_id", String(self._ID_LEN), nullable=False),
Column("trial_id", Integer, nullable=False),
Column("ts", DateTime, nullable=False, default="now"),
Column("ts", DateTime(timezone=True), nullable=False, default="now"),
Column("status", String(self._STATUS_LEN), nullable=False),

UniqueConstraint("exp_id", "trial_id", "ts"),
Expand All @@ -181,7 +181,7 @@ def __init__(self, engine: Engine):
self._meta,
Column("exp_id", String(self._ID_LEN), nullable=False),
Column("trial_id", Integer, nullable=False),
Column("ts", DateTime, nullable=False, default="now"),
Column("ts", DateTime(timezone=True), nullable=False, default="now"),
Column("metric_id", String(self._ID_LEN), nullable=False),
Column("metric_value", String(self._METRIC_VALUE_LEN)),

Expand Down
9 changes: 8 additions & 1 deletion mlos_bench/mlos_bench/storage/sql/trial.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from mlos_bench.tunables.tunable_groups import TunableGroups
from mlos_bench.storage.base_storage import Storage
from mlos_bench.storage.sql.schema import DbSchema
from mlos_bench.util import nullable
from mlos_bench.util import nullable, utcify_timestamp

_LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -47,6 +47,8 @@ def __init__(self, *,
def update(self, status: Status, timestamp: datetime,
metrics: Optional[Union[Dict[str, Any], float]] = None
) -> Optional[Dict[str, Any]]:
# Make sure to convert the timestamp to UTC before storing it in the database.
timestamp = utcify_timestamp(timestamp, origin="local")
metrics = super().update(status, timestamp, metrics)
with self._engine.begin() as conn:
self._update_status(conn, status, timestamp)
Expand Down Expand Up @@ -106,6 +108,9 @@ def update(self, status: Status, timestamp: datetime,
def update_telemetry(self, status: Status, timestamp: datetime,
metrics: List[Tuple[datetime, str, Any]]) -> None:
super().update_telemetry(status, timestamp, metrics)
# Make sure to convert the timestamp to UTC before storing it in the database.
timestamp = utcify_timestamp(timestamp, origin="local")
metrics = [(utcify_timestamp(ts, origin="local"), key, val) for (ts, key, val) in metrics]
# NOTE: Not every SQLAlchemy dialect supports `Insert.on_conflict_do_nothing()`
# and we need to keep `.update_telemetry()` idempotent; hence a loop instead of
# a bulk upsert.
Expand All @@ -130,6 +135,8 @@ def _update_status(self, conn: Connection, status: Status, timestamp: datetime)
Insert a new status record into the database.
This call is idempotent.
"""
# Make sure to convert the timestamp to UTC before storing it in the database.
timestamp = utcify_timestamp(timestamp, origin="local")
try:
conn.execute(self._schema.trial_status.insert().values(
exp_id=self._experiment_id,
Expand Down
5 changes: 4 additions & 1 deletion mlos_bench/mlos_bench/storage/sql/trial_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from mlos_bench.environments.status import Status
from mlos_bench.storage.sql.schema import DbSchema
from mlos_bench.storage.sql.tunable_config_data import TunableConfigSqlData
from mlos_bench.util import utcify_timestamp

if TYPE_CHECKING:
from mlos_bench.storage.base_tunable_config_trial_group_data import TunableConfigTrialGroupData
Expand Down Expand Up @@ -100,8 +101,10 @@ def telemetry_df(self) -> pandas.DataFrame:
self._schema.trial_telemetry.c.metric_id,
)
)
# Not all storage backends store the original zone info.
# We try to ensure data is entered in UTC and augment it on return again here.
return pandas.DataFrame(
[(row.ts, row.metric_id, row.metric_value) for row in cur_telemetry.fetchall()],
[(utcify_timestamp(row.ts, origin="utc"), row.metric_id, row.metric_value) for row in cur_telemetry.fetchall()],
columns=['ts', 'metric', 'value'])

@property
Expand Down
Loading

0 comments on commit 647e3a2

Please sign in to comment.