diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 9150adaf6ff4..fb26c0a3b688 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -138,7 +138,7 @@ if TYPE_CHECKING: from types import ModuleType - from pendulum.tz.timezone import Timezone + from pendulum.tz.timezone import FixedTimezone, Timezone from sqlalchemy.orm.query import Query from sqlalchemy.orm.session import Session @@ -213,7 +213,7 @@ def _get_model_data_interval( return DataInterval(start, end) -def create_timetable(interval: ScheduleIntervalArg, timezone: Timezone) -> Timetable: +def create_timetable(interval: ScheduleIntervalArg, timezone: Timezone | FixedTimezone) -> Timetable: """Create a Timetable instance from a ``schedule_interval`` argument.""" if interval is NOTSET: return DeltaDataIntervalTimetable(DEFAULT_SCHEDULE_INTERVAL) @@ -529,7 +529,7 @@ def __init__( tzinfo = None if date.tzinfo else settings.TIMEZONE tz = pendulum.instance(date, tz=tzinfo).timezone - self.timezone: Timezone = tz or settings.TIMEZONE + self.timezone: Timezone | FixedTimezone = tz or settings.TIMEZONE # Apply the timezone we settled on to end_date if it wasn't supplied if "end_date" in self.default_args and self.default_args["end_date"]: diff --git a/airflow/providers/cncf/kubernetes/pod_launcher_deprecated.py b/airflow/providers/cncf/kubernetes/pod_launcher_deprecated.py index 18799ed920e7..6c5f038b0abe 100644 --- a/airflow/providers/cncf/kubernetes/pod_launcher_deprecated.py +++ b/airflow/providers/cncf/kubernetes/pod_launcher_deprecated.py @@ -21,7 +21,7 @@ import math import time import warnings -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, cast import pendulum import tenacity @@ -148,13 +148,13 @@ def monitor_pod(self, pod: V1Pod, get_logs: bool) -> tuple[State, str | None]: """ if get_logs: read_logs_since_sec = None - last_log_time = None + last_log_time: pendulum.DateTime | None = None while True: logs = self.read_pod_logs(pod, timestamps=True, since_seconds=read_logs_since_sec) for line in logs: timestamp, message = self.parse_log_line(line.decode("utf-8")) if timestamp: - last_log_time = pendulum.parse(timestamp) + last_log_time = cast(pendulum.DateTime, pendulum.parse(timestamp)) self.log.info(message) time.sleep(1) diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 48aa59593346..ccd4022953b0 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -65,6 +65,7 @@ from airflow.utils.module_loading import import_string, qualname from airflow.utils.operator_resources import Resources from airflow.utils.task_group import MappedTaskGroup, TaskGroup +from airflow.utils.timezone import parse_timezone from airflow.utils.types import NOTSET, ArgNotSet if TYPE_CHECKING: @@ -144,7 +145,7 @@ def decode_relativedelta(var: dict[str, Any]) -> relativedelta.relativedelta: return relativedelta.relativedelta(**var) -def encode_timezone(var: Timezone) -> str | int: +def encode_timezone(var: Timezone | FixedTimezone) -> str | int: """ Encode a Pendulum Timezone for serialization. @@ -167,9 +168,9 @@ def encode_timezone(var: Timezone) -> str | int: ) -def decode_timezone(var: str | int) -> Timezone: +def decode_timezone(var: str | int): """Decode a previously serialized Pendulum Timezone.""" - return pendulum.tz.timezone(var) + return parse_timezone(var) def _get_registered_timetable(importable_string: str) -> type[Timetable] | None: @@ -607,7 +608,7 @@ def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any: raise TypeError(f"Invalid type {type_!s} in deserialization.") _deserialize_datetime = pendulum.from_timestamp - _deserialize_timezone = pendulum.tz.timezone + _deserialize_timezone = parse_timezone @classmethod def _deserialize_timedelta(cls, seconds: int) -> datetime.timedelta: diff --git a/airflow/serialization/serializers/datetime.py b/airflow/serialization/serializers/datetime.py index 49f0899a5918..ea030a8afcba 100644 --- a/airflow/serialization/serializers/datetime.py +++ b/airflow/serialization/serializers/datetime.py @@ -24,7 +24,7 @@ serialize as serialize_timezone, ) from airflow.utils.module_loading import qualname -from airflow.utils.timezone import convert_to_utc, is_naive +from airflow.utils.timezone import convert_to_utc, is_naive, parse_timezone if TYPE_CHECKING: import datetime @@ -65,23 +65,22 @@ def deserialize(classname: str, version: int, data: dict | str) -> datetime.date import datetime from pendulum import DateTime - from pendulum.tz import fixed_timezone, timezone tz: datetime.tzinfo | None = None if isinstance(data, dict) and TIMEZONE in data: if version == 1: # try to deserialize unsupported timezones timezone_mapping = { - "EDT": fixed_timezone(-4 * 3600), - "CDT": fixed_timezone(-5 * 3600), - "MDT": fixed_timezone(-6 * 3600), - "PDT": fixed_timezone(-7 * 3600), - "CEST": timezone("CET"), + "EDT": parse_timezone(-4 * 3600), + "CDT": parse_timezone(-5 * 3600), + "MDT": parse_timezone(-6 * 3600), + "PDT": parse_timezone(-7 * 3600), + "CEST": parse_timezone("CET"), } if data[TIMEZONE] in timezone_mapping: tz = timezone_mapping[data[TIMEZONE]] else: - tz = timezone(data[TIMEZONE]) + tz = parse_timezone(data[TIMEZONE]) else: tz = deserialize_timezone(data[TIMEZONE][1], data[TIMEZONE][2], data[TIMEZONE][0]) diff --git a/airflow/serialization/serializers/timezone.py b/airflow/serialization/serializers/timezone.py index 23901b9d444e..0f580adef83f 100644 --- a/airflow/serialization/serializers/timezone.py +++ b/airflow/serialization/serializers/timezone.py @@ -74,7 +74,7 @@ def serialize(o: object) -> tuple[U, str, int, bool]: def deserialize(classname: str, version: int, data: object) -> Any: - from pendulum.tz import fixed_timezone, timezone + from airflow.utils.timezone import parse_timezone if not isinstance(data, (str, int)): raise TypeError(f"{data} is not of type int or str but of {type(data)}") @@ -82,9 +82,6 @@ def deserialize(classname: str, version: int, data: object) -> Any: if version > __version__: raise TypeError(f"serialized {version} of {classname} > {__version__}") - if isinstance(data, int): - return fixed_timezone(data) - if "zoneinfo.ZoneInfo" in classname: try: from zoneinfo import ZoneInfo @@ -93,7 +90,7 @@ def deserialize(classname: str, version: int, data: object) -> Any: return ZoneInfo(data) - return timezone(data) + return parse_timezone(data) # ported from pendulum.tz.timezone._get_tzinfo_name diff --git a/airflow/settings.py b/airflow/settings.py index 1a38a59ed301..53c5cc6aa4b6 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -26,7 +26,6 @@ import warnings from typing import TYPE_CHECKING, Any, Callable -import pendulum import pluggy import sqlalchemy from sqlalchemy import create_engine, exc, text @@ -40,6 +39,7 @@ from airflow.logging_config import configure_logging from airflow.utils.orm_event_handlers import setup_event_handlers from airflow.utils.state import State +from airflow.utils.timezone import local_timezone, parse_timezone, utc if TYPE_CHECKING: from sqlalchemy.engine import Engine @@ -50,13 +50,12 @@ log = logging.getLogger(__name__) try: - tz = conf.get_mandatory_value("core", "default_timezone") - if tz == "system": - TIMEZONE = pendulum.tz.local_timezone() + if (tz := conf.get_mandatory_value("core", "default_timezone")) != "system": + TIMEZONE = parse_timezone(tz) else: - TIMEZONE = pendulum.tz.timezone(tz) + TIMEZONE = local_timezone() except Exception: - TIMEZONE = pendulum.tz.timezone("UTC") + TIMEZONE = utc log.info("Configured default timezone %s", TIMEZONE) diff --git a/airflow/timetables/_cron.py b/airflow/timetables/_cron.py index 45bfe3640fed..15e4f820ea8f 100644 --- a/airflow/timetables/_cron.py +++ b/airflow/timetables/_cron.py @@ -19,17 +19,15 @@ import datetime from typing import TYPE_CHECKING, Any -import pendulum from cron_descriptor import CasingTypeEnum, ExpressionDescriptor, FormatException, MissingFieldException from croniter import CroniterBadCronError, CroniterBadDateError, croniter from airflow.exceptions import AirflowTimetableInvalid from airflow.utils.dates import cron_presets -from airflow.utils.timezone import convert_to_utc, make_aware, make_naive +from airflow.utils.timezone import convert_to_utc, make_aware, make_naive, parse_timezone if TYPE_CHECKING: - from pendulum import DateTime - from pendulum.tz.timezone import Timezone + from pendulum import DateTime, FixedTimezone, Timezone def _covers_every_hour(cron: croniter) -> bool: @@ -63,11 +61,11 @@ def _covers_every_hour(cron: croniter) -> bool: class CronMixin: """Mixin to provide interface to work with croniter.""" - def __init__(self, cron: str, timezone: str | Timezone) -> None: + def __init__(self, cron: str, timezone: str | Timezone | FixedTimezone) -> None: self._expression = cron_presets.get(cron, cron) if isinstance(timezone, str): - timezone = pendulum.tz.timezone(timezone) + timezone = parse_timezone(timezone) self._timezone = timezone try: diff --git a/airflow/timetables/trigger.py b/airflow/timetables/trigger.py index 95d29238037c..2a0df645daca 100644 --- a/airflow/timetables/trigger.py +++ b/airflow/timetables/trigger.py @@ -26,7 +26,7 @@ if TYPE_CHECKING: from dateutil.relativedelta import relativedelta - from pendulum.tz.timezone import Timezone + from pendulum.tz.timezone import FixedTimezone, Timezone from airflow.timetables.base import TimeRestriction @@ -48,7 +48,7 @@ def __init__( self, cron: str, *, - timezone: str | Timezone, + timezone: str | Timezone | FixedTimezone, interval: datetime.timedelta | relativedelta = datetime.timedelta(), ) -> None: super().__init__(cron, timezone) @@ -77,7 +77,12 @@ def serialize(self) -> dict[str, Any]: return {"expression": self._expression, "timezone": timezone, "interval": interval} def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval: - return DataInterval(run_after - self._interval, run_after) + return DataInterval( + # pendulum.Datetime ± timedelta should return pendulum.Datetime + # however mypy decide that output would be datetime.datetime + run_after - self._interval, # type: ignore[arg-type] + run_after, + ) def next_dagrun_info( self, @@ -101,4 +106,9 @@ def next_dagrun_info( next_start_time = max(start_time_candidates) if restriction.latest is not None and restriction.latest < next_start_time: return None - return DagRunInfo.interval(next_start_time - self._interval, next_start_time) + return DagRunInfo.interval( + # pendulum.Datetime ± timedelta should return pendulum.Datetime + # however mypy decide that output would be datetime.datetime + next_start_time - self._interval, # type: ignore[arg-type] + next_start_time, + ) diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py index a042d4e9024d..fb241f482f51 100644 --- a/airflow/utils/sqlalchemy.py +++ b/airflow/utils/sqlalchemy.py @@ -24,7 +24,6 @@ import logging from typing import TYPE_CHECKING, Any, Generator, Iterable, overload -import pendulum from dateutil import relativedelta from sqlalchemy import TIMESTAMP, PickleType, and_, event, false, nullsfirst, or_, true, tuple_ from sqlalchemy.dialects import mssql, mysql @@ -34,7 +33,7 @@ from airflow import settings from airflow.configuration import conf from airflow.serialization.enums import Encoding -from airflow.utils.timezone import make_naive +from airflow.utils.timezone import make_naive, utc if TYPE_CHECKING: from kubernetes.client.models.v1_pod import V1Pod @@ -46,8 +45,6 @@ log = logging.getLogger(__name__) -utc = pendulum.tz.timezone("UTC") - class UtcDateTime(TypeDecorator): """ diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py index 12c75bef5976..fb32c093f84c 100644 --- a/airflow/utils/timezone.py +++ b/airflow/utils/timezone.py @@ -23,9 +23,10 @@ import pendulum from dateutil.relativedelta import relativedelta from pendulum.datetime import DateTime +from pendulum.tz.timezone import FixedTimezone, Timezone # UTC time zone as a tzinfo instance. -utc = pendulum.tz.timezone("UTC") +utc = Timezone("UTC") def is_localized(value): @@ -135,12 +136,10 @@ def make_aware(value: dt.datetime | None, timezone: dt.tzinfo | None = None) -> # Check that we won't overwrite the timezone of an aware datetime. if is_localized(value): raise ValueError(f"make_aware expects a naive datetime, got {value}") - if hasattr(value, "fold"): - # In case of python 3.6 we want to do the same that pendulum does for python3.5 - # i.e in case we move clock back we want to schedule the run at the time of the second - # instance of the same clock time rather than the first one. - # Fold parameter has no impact in other cases so we can safely set it to 1 here - value = value.replace(fold=1) + # In case we move clock back we want to schedule the run at the time of the second + # instance of the same clock time rather than the first one. + # Fold parameter has no impact in other cases, so we can safely set it to 1 here + value = value.replace(fold=1) localized = getattr(timezone, "localize", None) if localized is not None: # This method is available for pytz time zones @@ -273,3 +272,27 @@ def _format_part(key: str) -> str: if not joined: return "<1s" return joined + + +def parse_timezone(name: str | int) -> FixedTimezone | Timezone: + """ + Parse timezone and return one of the pendulum Timezone. + + Provide the same interface as ``pendulum.timezone(name)`` + + :param name: Either IANA timezone or offset to UTC in seconds. + + :meta private: + """ + return pendulum.timezone(name) + + +def local_timezone() -> FixedTimezone | Timezone: + """ + Return local timezone. + + Provide the same interface as ``pendulum.tz.local_timezone()`` + + :meta private: + """ + return pendulum.tz.local_timezone() diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py index 249cf667e10f..8d7dad9d12f0 100644 --- a/kubernetes_tests/test_kubernetes_pod_operator.py +++ b/kubernetes_tests/test_kubernetes_pod_operator.py @@ -26,7 +26,6 @@ from unittest.mock import ANY, MagicMock from uuid import uuid4 -import pendulum import pytest from kubernetes import client from kubernetes.client import V1EnvVar, V1PodSecurityContext, V1SecurityContext, models as k8s @@ -53,7 +52,9 @@ def create_context(task) -> Context: dag = DAG(dag_id="dag") - execution_date = timezone.datetime(2016, 1, 1, 1, 0, 0, tzinfo=pendulum.tz.timezone("Europe/Amsterdam")) + execution_date = timezone.datetime( + 2016, 1, 1, 1, 0, 0, tzinfo=timezone.parse_timezone("Europe/Amsterdam") + ) dag_run = DagRun( dag_id=dag.dag_id, execution_date=execution_date, diff --git a/setup.cfg b/setup.cfg index 9077a02915d7..3c8208af015f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -129,9 +129,7 @@ install_requires = opentelemetry-exporter-otlp packaging>=14.0 pathspec>=0.9.0 - # When (if) pendulum 3 released it would introduce changes in module/objects imports, - # since we are tightly coupled with pendulum library internally it will breaks Airflow functionality. - pendulum>=2.0,<3.0 + pendulum>=3.0 pluggy>=1.0 psutil>=4.2.0 pydantic>=2.3.0 diff --git a/tests/api_connexion/endpoints/test_dag_endpoint.py b/tests/api_connexion/endpoints/test_dag_endpoint.py index c02e8b0ff3fc..19504b889467 100644 --- a/tests/api_connexion/endpoints/test_dag_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_endpoint.py @@ -316,7 +316,7 @@ def test_should_respond_200(self, url_safe_serializer): "tags": [], "template_searchpath": None, "timetable_description": None, - "timezone": "Timezone('UTC')", + "timezone": "UTC", } assert response.json == expected @@ -367,7 +367,7 @@ def test_should_response_200_with_doc_md_none(self, url_safe_serializer): "tags": [], "template_searchpath": None, "timetable_description": None, - "timezone": "Timezone('UTC')", + "timezone": "UTC", } assert response.json == expected @@ -418,7 +418,7 @@ def test_should_response_200_for_null_start_date(self, url_safe_serializer): "tags": [], "template_searchpath": None, "timetable_description": None, - "timezone": "Timezone('UTC')", + "timezone": "UTC", } assert response.json == expected @@ -478,7 +478,7 @@ def test_should_respond_200_serialized(self, url_safe_serializer): "tags": [], "template_searchpath": None, "timetable_description": None, - "timezone": "Timezone('UTC')", + "timezone": "UTC", } response = self.client.get( f"/api/v1/dags/{self.dag_id}/details", environ_overrides={"REMOTE_USER": "test"} @@ -539,7 +539,7 @@ def test_should_respond_200_serialized(self, url_safe_serializer): "tags": [], "template_searchpath": None, "timetable_description": None, - "timezone": "Timezone('UTC')", + "timezone": "UTC", } expected.update({"last_parsed": response.json["last_parsed"]}) assert response.json == expected diff --git a/tests/api_connexion/schemas/test_dag_schema.py b/tests/api_connexion/schemas/test_dag_schema.py index f3e54c0a9611..4d3f5abfd1b2 100644 --- a/tests/api_connexion/schemas/test_dag_schema.py +++ b/tests/api_connexion/schemas/test_dag_schema.py @@ -184,7 +184,7 @@ def test_serialize_test_dag_detail_schema(url_safe_serializer): "start_date": "2020-06-19T00:00:00+00:00", "tags": [{"name": "example1"}, {"name": "example2"}], "template_searchpath": None, - "timezone": "Timezone('UTC')", + "timezone": "UTC", "max_active_runs": 16, "pickle_id": None, "end_date": None, diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py index 30b5c475ea4a..a1c0b1affd76 100644 --- a/tests/cli/commands/test_dag_command.py +++ b/tests/cli/commands/test_dag_command.py @@ -161,7 +161,7 @@ def test_backfill(self, mock_run): ) output = stdout.getvalue() - assert f"Dry run of DAG example_bash_operator on {DEFAULT_DATE.isoformat()}\n" in output + assert f"Dry run of DAG example_bash_operator on {DEFAULT_DATE.isoformat(sep=' ')}\n" in output assert "Task runme_0 located in DAG example_bash_operator\n" in output mock_run.assert_not_called() # Dry run shouldn't run the backfill @@ -236,10 +236,10 @@ def test_backfill(self, mock_run): assert ( f"Dry run of DAG example_branch_python_operator_decorator on " - f"{DEFAULT_DATE.isoformat()}\n" in output + f"{DEFAULT_DATE.isoformat(sep=' ')}\n" in output ) assert "Task run_this_first located in DAG example_branch_python_operator_decorator\n" in output - assert f"Dry run of DAG example_branch_operator on {DEFAULT_DATE.isoformat()}\n" in output + assert f"Dry run of DAG example_branch_operator on {DEFAULT_DATE.isoformat(sep=' ')}\n" in output assert "Task run_this_first located in DAG example_branch_operator\n" in output @mock.patch("airflow.cli.commands.dag_command.get_dag") diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index f7bf1ad6d0c9..de7fa87d5add 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -37,6 +37,7 @@ import pytest import time_machine from dateutil.relativedelta import relativedelta +from pendulum.tz.timezone import Timezone from sqlalchemy import inspect from airflow import settings @@ -676,8 +677,8 @@ def test_following_previous_schedule(self): """ Make sure DST transitions are properly observed """ - local_tz = pendulum.timezone("Europe/Zurich") - start = local_tz.convert(datetime.datetime(2018, 10, 28, 2, 55), dst_rule=pendulum.PRE_TRANSITION) + local_tz = Timezone("Europe/Zurich") + start = local_tz.convert(datetime.datetime(2018, 10, 28, 2, 55, fold=0)) assert start.isoformat() == "2018-10-28T02:55:00+02:00", "Pre-condition: start date is in DST" utc = timezone.convert_to_utc(start) @@ -706,7 +707,7 @@ def test_following_previous_schedule_daily_dag_cest_to_cet(self): Make sure DST transitions are properly observed """ local_tz = pendulum.timezone("Europe/Zurich") - start = local_tz.convert(datetime.datetime(2018, 10, 27, 3), dst_rule=pendulum.PRE_TRANSITION) + start = local_tz.convert(datetime.datetime(2018, 10, 27, 3, fold=0)) utc = timezone.convert_to_utc(start) @@ -735,7 +736,7 @@ def test_following_previous_schedule_daily_dag_cet_to_cest(self): Make sure DST transitions are properly observed """ local_tz = pendulum.timezone("Europe/Zurich") - start = local_tz.convert(datetime.datetime(2018, 3, 25, 2), dst_rule=pendulum.PRE_TRANSITION) + start = local_tz.convert(datetime.datetime(2018, 3, 25, 2, fold=0)) utc = timezone.convert_to_utc(start) diff --git a/tests/providers/openlineage/plugins/test_utils.py b/tests/providers/openlineage/plugins/test_utils.py index 54710bcd9e47..b7ced7a37cb4 100644 --- a/tests/providers/openlineage/plugins/test_utils.py +++ b/tests/providers/openlineage/plugins/test_utils.py @@ -23,7 +23,6 @@ from json import JSONEncoder from typing import Any -import pendulum import pytest from attrs import define from openlineage.client.utils import RedactMixin @@ -39,6 +38,7 @@ to_json_encodable, url_to_https, ) +from airflow.utils import timezone from airflow.utils.log.secrets_masker import _secrets_masker from airflow.utils.state import State @@ -86,8 +86,8 @@ def test_get_dagrun_start_end(): state=State.NONE, run_id=run_id, data_interval=dag.get_next_data_interval(dag_model) ) assert dagrun.data_interval_start is not None - start_date_tz = datetime.datetime(2022, 1, 1, tzinfo=pendulum.tz.timezone("UTC")) - end_date_tz = datetime.datetime(2022, 1, 1, hour=2, tzinfo=pendulum.tz.timezone("UTC")) + start_date_tz = datetime.datetime(2022, 1, 1, tzinfo=timezone.utc) + end_date_tz = datetime.datetime(2022, 1, 1, hour=2, tzinfo=timezone.utc) assert dagrun.data_interval_start, dagrun.data_interval_end == (start_date_tz, end_date_tz) diff --git a/tests/sensors/test_time_sensor.py b/tests/sensors/test_time_sensor.py index 935d1cb12817..54a0212a247a 100644 --- a/tests/sensors/test_time_sensor.py +++ b/tests/sensors/test_time_sensor.py @@ -18,12 +18,10 @@ from __future__ import annotations from datetime import datetime, time -from unittest.mock import patch import pendulum import pytest import time_machine -from pendulum.tz.timezone import UTC from airflow.exceptions import TaskDeferred from airflow.models.dag import DAG @@ -33,7 +31,7 @@ DEFAULT_TIMEZONE = "Asia/Singapore" # UTC+08:00 DEFAULT_DATE_WO_TZ = datetime(2015, 1, 1) -DEFAULT_DATE_WITH_TZ = datetime(2015, 1, 1, tzinfo=pendulum.tz.timezone(DEFAULT_TIMEZONE)) +DEFAULT_DATE_WITH_TZ = datetime(2015, 1, 1, tzinfo=timezone.parse_timezone(DEFAULT_TIMEZONE)) class TestTimeSensor: @@ -46,11 +44,11 @@ class TestTimeSensor: ], ) @time_machine.travel(timezone.datetime(2020, 1, 1, 23, 0).replace(tzinfo=timezone.utc)) - def test_timezone(self, default_timezone, start_date, expected): - with patch("airflow.settings.TIMEZONE", pendulum.timezone(default_timezone)): - dag = DAG("test", default_args={"start_date": start_date}) - op = TimeSensor(task_id="test", target_time=time(10, 0), dag=dag) - assert op.poke(None) == expected + def test_timezone(self, default_timezone, start_date, expected, monkeypatch): + monkeypatch.setattr("airflow.settings.TIMEZONE", timezone.parse_timezone(default_timezone)) + dag = DAG("test", default_args={"start_date": start_date}) + op = TimeSensor(task_id="test", target_time=time(10, 0), dag=dag) + assert op.poke(None) == expected class TestTimeSensorAsync: @@ -72,8 +70,7 @@ def test_target_time_aware(self): with DAG("test_target_time_aware", start_date=timezone.datetime(2020, 1, 1, 23, 0)): aware_time = time(0, 1).replace(tzinfo=pendulum.local_timezone()) op = TimeSensorAsync(task_id="test", target_time=aware_time) - assert hasattr(op.target_datetime.tzinfo, "offset") - assert op.target_datetime.tzinfo.offset == 0 + assert op.target_datetime.tzinfo == timezone.utc def test_target_time_naive_dag_timezone(self): """ @@ -85,4 +82,4 @@ def test_target_time_naive_dag_timezone(self): ): op = TimeSensorAsync(task_id="test", target_time=pendulum.time(9, 0)) assert op.target_datetime.time() == pendulum.time(1, 0) - assert op.target_datetime.tzinfo == UTC + assert op.target_datetime.tzinfo == timezone.utc diff --git a/tests/serialization/test_serialized_objects.py b/tests/serialization/test_serialized_objects.py index c059a8d236e7..96b196357991 100644 --- a/tests/serialization/test_serialized_objects.py +++ b/tests/serialization/test_serialized_objects.py @@ -142,7 +142,7 @@ def equal_time(a: datetime, b: datetime) -> bool: (1, None, equals), (datetime.utcnow(), DAT.DATETIME, equal_time), (timedelta(minutes=2), DAT.TIMEDELTA, equals), - (pendulum.tz.timezone("UTC"), DAT.TIMEZONE, lambda a, b: a.name == b.name), + (pendulum.tz.Timezone("UTC"), DAT.TIMEZONE, lambda a, b: a.name == b.name), (relativedelta.relativedelta(hours=+1), DAT.RELATIVEDELTA, lambda a, b: a.hours == b.hours), ({"test": "dict", "test-1": 1}, None, equals), (["array_item", 2], None, equals), diff --git a/tests/triggers/test_temporal.py b/tests/triggers/test_temporal.py index 655910394fb8..52cc2c64f673 100644 --- a/tests/triggers/test_temporal.py +++ b/tests/triggers/test_temporal.py @@ -64,9 +64,9 @@ def test_timedelta_trigger_serialization(): @pytest.mark.parametrize( "tz", [ - pendulum.tz.timezone("UTC"), - pendulum.tz.timezone("Europe/Paris"), - pendulum.tz.timezone("America/Toronto"), + timezone.parse_timezone("UTC"), + timezone.parse_timezone("Europe/Paris"), + timezone.parse_timezone("America/Toronto"), ], ) @pytest.mark.asyncio diff --git a/tests/utils/test_timezone.py b/tests/utils/test_timezone.py index ff5ad26f5a31..df8af0460423 100644 --- a/tests/utils/test_timezone.py +++ b/tests/utils/test_timezone.py @@ -21,13 +21,14 @@ import pendulum import pytest +from pendulum.tz.timezone import Timezone from airflow.utils import timezone -from airflow.utils.timezone import coerce_datetime +from airflow.utils.timezone import coerce_datetime, parse_timezone -CET = pendulum.tz.timezone("Europe/Paris") -EAT = pendulum.tz.timezone("Africa/Nairobi") # Africa/Nairobi -ICT = pendulum.tz.timezone("Asia/Bangkok") # Asia/Bangkok +CET = Timezone("Europe/Paris") +EAT = Timezone("Africa/Nairobi") # Africa/Nairobi +ICT = Timezone("Asia/Bangkok") # Asia/Bangkok UTC = timezone.utc @@ -117,3 +118,41 @@ def test_td_format(self): ) def test_coerce_datetime(input_datetime, output_datetime): assert output_datetime == coerce_datetime(input_datetime) + + +@pytest.mark.parametrize( + "tz_name", + [ + pytest.param("Europe/Paris", id="CET"), + pytest.param("Africa/Nairobi", id="EAT"), + pytest.param("Asia/Bangkok", id="ICT"), + ], +) +def test_parse_timezone_iana(tz_name: str): + tz = parse_timezone(tz_name) + assert tz.name == tz_name + assert parse_timezone(tz_name) is tz + + +@pytest.mark.parametrize("tz_name", ["utc", "UTC", "uTc"]) +def test_parse_timezone_utc(tz_name): + tz = parse_timezone(tz_name) + assert tz.name == "UTC" + assert parse_timezone(tz_name) is tz + assert tz is timezone.utc, "Expected that UTC timezone is same object as `airflow.utils.timezone.utc`" + + +@pytest.mark.parametrize( + "tz_offset, expected_offset, expected_name", + [ + pytest.param(0, 0, "+00:00", id="zero-offset"), + pytest.param(-3600, -3600, "-01:00", id="1-hour-behind"), + pytest.param(19800, 19800, "+05:30", id="5.5-hours-ahead"), + ], +) +def test_parse_timezone_offset(tz_offset: int, expected_offset, expected_name): + tz = parse_timezone(tz_offset) + assert hasattr(tz, "offset") + assert tz.offset == expected_offset + assert tz.name == expected_name + assert parse_timezone(tz_offset) is tz