Skip to content

Commit

Permalink
Add support of Pendulum 3
Browse files Browse the repository at this point in the history
  • Loading branch information
Taragolis committed Dec 19, 2023
1 parent 0858a88 commit 5681d12
Show file tree
Hide file tree
Showing 21 changed files with 150 additions and 90 deletions.
6 changes: 3 additions & 3 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@
if TYPE_CHECKING:
from types import ModuleType

from pendulum.tz.timezone import Timezone
from pendulum.tz.timezone import FixedTimezone, Timezone
from sqlalchemy.orm.query import Query
from sqlalchemy.orm.session import Session

Expand Down Expand Up @@ -213,7 +213,7 @@ def _get_model_data_interval(
return DataInterval(start, end)


def create_timetable(interval: ScheduleIntervalArg, timezone: Timezone) -> Timetable:
def create_timetable(interval: ScheduleIntervalArg, timezone: Timezone | FixedTimezone) -> Timetable:
"""Create a Timetable instance from a ``schedule_interval`` argument."""
if interval is NOTSET:
return DeltaDataIntervalTimetable(DEFAULT_SCHEDULE_INTERVAL)
Expand Down Expand Up @@ -529,7 +529,7 @@ def __init__(

tzinfo = None if date.tzinfo else settings.TIMEZONE
tz = pendulum.instance(date, tz=tzinfo).timezone
self.timezone: Timezone = tz or settings.TIMEZONE
self.timezone: Timezone | FixedTimezone = tz or settings.TIMEZONE

# Apply the timezone we settled on to end_date if it wasn't supplied
if "end_date" in self.default_args and self.default_args["end_date"]:
Expand Down
6 changes: 3 additions & 3 deletions airflow/providers/cncf/kubernetes/pod_launcher_deprecated.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import math
import time
import warnings
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, cast

import pendulum
import tenacity
Expand Down Expand Up @@ -148,13 +148,13 @@ def monitor_pod(self, pod: V1Pod, get_logs: bool) -> tuple[State, str | None]:
"""
if get_logs:
read_logs_since_sec = None
last_log_time = None
last_log_time: pendulum.DateTime | None = None
while True:
logs = self.read_pod_logs(pod, timestamps=True, since_seconds=read_logs_since_sec)
for line in logs:
timestamp, message = self.parse_log_line(line.decode("utf-8"))
if timestamp:
last_log_time = pendulum.parse(timestamp)
last_log_time = cast(pendulum.DateTime, pendulum.parse(timestamp))
self.log.info(message)
time.sleep(1)

Expand Down
9 changes: 5 additions & 4 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
from airflow.utils.module_loading import import_string, qualname
from airflow.utils.operator_resources import Resources
from airflow.utils.task_group import MappedTaskGroup, TaskGroup
from airflow.utils.timezone import parse_timezone
from airflow.utils.types import NOTSET, ArgNotSet

if TYPE_CHECKING:
Expand Down Expand Up @@ -144,7 +145,7 @@ def decode_relativedelta(var: dict[str, Any]) -> relativedelta.relativedelta:
return relativedelta.relativedelta(**var)


def encode_timezone(var: Timezone) -> str | int:
def encode_timezone(var: Timezone | FixedTimezone) -> str | int:
"""
Encode a Pendulum Timezone for serialization.
Expand All @@ -167,9 +168,9 @@ def encode_timezone(var: Timezone) -> str | int:
)


def decode_timezone(var: str | int) -> Timezone:
def decode_timezone(var: str | int):
"""Decode a previously serialized Pendulum Timezone."""
return pendulum.tz.timezone(var)
return parse_timezone(var)


def _get_registered_timetable(importable_string: str) -> type[Timetable] | None:
Expand Down Expand Up @@ -607,7 +608,7 @@ def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any:
raise TypeError(f"Invalid type {type_!s} in deserialization.")

_deserialize_datetime = pendulum.from_timestamp
_deserialize_timezone = pendulum.tz.timezone
_deserialize_timezone = parse_timezone

@classmethod
def _deserialize_timedelta(cls, seconds: int) -> datetime.timedelta:
Expand Down
15 changes: 7 additions & 8 deletions airflow/serialization/serializers/datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
serialize as serialize_timezone,
)
from airflow.utils.module_loading import qualname
from airflow.utils.timezone import convert_to_utc, is_naive
from airflow.utils.timezone import convert_to_utc, is_naive, parse_timezone

if TYPE_CHECKING:
import datetime
Expand Down Expand Up @@ -65,23 +65,22 @@ def deserialize(classname: str, version: int, data: dict | str) -> datetime.date
import datetime

from pendulum import DateTime
from pendulum.tz import fixed_timezone, timezone

tz: datetime.tzinfo | None = None
if isinstance(data, dict) and TIMEZONE in data:
if version == 1:
# try to deserialize unsupported timezones
timezone_mapping = {
"EDT": fixed_timezone(-4 * 3600),
"CDT": fixed_timezone(-5 * 3600),
"MDT": fixed_timezone(-6 * 3600),
"PDT": fixed_timezone(-7 * 3600),
"CEST": timezone("CET"),
"EDT": parse_timezone(-4 * 3600),
"CDT": parse_timezone(-5 * 3600),
"MDT": parse_timezone(-6 * 3600),
"PDT": parse_timezone(-7 * 3600),
"CEST": parse_timezone("CET"),
}
if data[TIMEZONE] in timezone_mapping:
tz = timezone_mapping[data[TIMEZONE]]
else:
tz = timezone(data[TIMEZONE])
tz = parse_timezone(data[TIMEZONE])
else:
tz = deserialize_timezone(data[TIMEZONE][1], data[TIMEZONE][2], data[TIMEZONE][0])

Expand Down
7 changes: 2 additions & 5 deletions airflow/serialization/serializers/timezone.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,14 @@ def serialize(o: object) -> tuple[U, str, int, bool]:


def deserialize(classname: str, version: int, data: object) -> Any:
from pendulum.tz import fixed_timezone, timezone
from airflow.utils.timezone import parse_timezone

if not isinstance(data, (str, int)):
raise TypeError(f"{data} is not of type int or str but of {type(data)}")

if version > __version__:
raise TypeError(f"serialized {version} of {classname} > {__version__}")

if isinstance(data, int):
return fixed_timezone(data)

if "zoneinfo.ZoneInfo" in classname:
try:
from zoneinfo import ZoneInfo
Expand All @@ -93,7 +90,7 @@ def deserialize(classname: str, version: int, data: object) -> Any:

return ZoneInfo(data)

return timezone(data)
return parse_timezone(data)


# ported from pendulum.tz.timezone._get_tzinfo_name
Expand Down
11 changes: 5 additions & 6 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import warnings
from typing import TYPE_CHECKING, Any, Callable

import pendulum
import pluggy
import sqlalchemy
from sqlalchemy import create_engine, exc, text
Expand All @@ -40,6 +39,7 @@
from airflow.logging_config import configure_logging
from airflow.utils.orm_event_handlers import setup_event_handlers
from airflow.utils.state import State
from airflow.utils.timezone import local_timezone, parse_timezone, utc

if TYPE_CHECKING:
from sqlalchemy.engine import Engine
Expand All @@ -50,13 +50,12 @@
log = logging.getLogger(__name__)

try:
tz = conf.get_mandatory_value("core", "default_timezone")
if tz == "system":
TIMEZONE = pendulum.tz.local_timezone()
if (tz := conf.get_mandatory_value("core", "default_timezone")) != "system":
TIMEZONE = parse_timezone(tz)
else:
TIMEZONE = pendulum.tz.timezone(tz)
TIMEZONE = local_timezone()
except Exception:
TIMEZONE = pendulum.tz.timezone("UTC")
TIMEZONE = utc

log.info("Configured default timezone %s", TIMEZONE)

Expand Down
10 changes: 4 additions & 6 deletions airflow/timetables/_cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,15 @@
import datetime
from typing import TYPE_CHECKING, Any

import pendulum
from cron_descriptor import CasingTypeEnum, ExpressionDescriptor, FormatException, MissingFieldException
from croniter import CroniterBadCronError, CroniterBadDateError, croniter

from airflow.exceptions import AirflowTimetableInvalid
from airflow.utils.dates import cron_presets
from airflow.utils.timezone import convert_to_utc, make_aware, make_naive
from airflow.utils.timezone import convert_to_utc, make_aware, make_naive, parse_timezone

if TYPE_CHECKING:
from pendulum import DateTime
from pendulum.tz.timezone import Timezone
from pendulum import DateTime, FixedTimezone, Timezone


def _covers_every_hour(cron: croniter) -> bool:
Expand Down Expand Up @@ -63,11 +61,11 @@ def _covers_every_hour(cron: croniter) -> bool:
class CronMixin:
"""Mixin to provide interface to work with croniter."""

def __init__(self, cron: str, timezone: str | Timezone) -> None:
def __init__(self, cron: str, timezone: str | Timezone | FixedTimezone) -> None:
self._expression = cron_presets.get(cron, cron)

if isinstance(timezone, str):
timezone = pendulum.tz.timezone(timezone)
timezone = parse_timezone(timezone)
self._timezone = timezone

try:
Expand Down
18 changes: 14 additions & 4 deletions airflow/timetables/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

if TYPE_CHECKING:
from dateutil.relativedelta import relativedelta
from pendulum.tz.timezone import Timezone
from pendulum.tz.timezone import FixedTimezone, Timezone

from airflow.timetables.base import TimeRestriction

Expand All @@ -48,7 +48,7 @@ def __init__(
self,
cron: str,
*,
timezone: str | Timezone,
timezone: str | Timezone | FixedTimezone,
interval: datetime.timedelta | relativedelta = datetime.timedelta(),
) -> None:
super().__init__(cron, timezone)
Expand Down Expand Up @@ -77,7 +77,12 @@ def serialize(self) -> dict[str, Any]:
return {"expression": self._expression, "timezone": timezone, "interval": interval}

def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
return DataInterval(run_after - self._interval, run_after)
return DataInterval(
# pendulum.Datetime ± timedelta should return pendulum.Datetime
# however mypy decide that output would be datetime.datetime
run_after - self._interval, # type: ignore[arg-type]
run_after,
)

def next_dagrun_info(
self,
Expand All @@ -101,4 +106,9 @@ def next_dagrun_info(
next_start_time = max(start_time_candidates)
if restriction.latest is not None and restriction.latest < next_start_time:
return None
return DagRunInfo.interval(next_start_time - self._interval, next_start_time)
return DagRunInfo.interval(
# pendulum.Datetime ± timedelta should return pendulum.Datetime
# however mypy decide that output would be datetime.datetime
next_start_time - self._interval, # type: ignore[arg-type]
next_start_time,
)
5 changes: 1 addition & 4 deletions airflow/utils/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import logging
from typing import TYPE_CHECKING, Any, Generator, Iterable, overload

import pendulum
from dateutil import relativedelta
from sqlalchemy import TIMESTAMP, PickleType, and_, event, false, nullsfirst, or_, true, tuple_
from sqlalchemy.dialects import mssql, mysql
Expand All @@ -34,7 +33,7 @@
from airflow import settings
from airflow.configuration import conf
from airflow.serialization.enums import Encoding
from airflow.utils.timezone import make_naive
from airflow.utils.timezone import make_naive, utc

if TYPE_CHECKING:
from kubernetes.client.models.v1_pod import V1Pod
Expand All @@ -46,8 +45,6 @@

log = logging.getLogger(__name__)

utc = pendulum.tz.timezone("UTC")


class UtcDateTime(TypeDecorator):
"""
Expand Down
37 changes: 30 additions & 7 deletions airflow/utils/timezone.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import pendulum
from dateutil.relativedelta import relativedelta
from pendulum.datetime import DateTime
from pendulum.tz.timezone import FixedTimezone, Timezone

# UTC time zone as a tzinfo instance.
utc = pendulum.tz.timezone("UTC")
utc = Timezone("UTC")


def is_localized(value):
Expand Down Expand Up @@ -135,12 +136,10 @@ def make_aware(value: dt.datetime | None, timezone: dt.tzinfo | None = None) ->
# Check that we won't overwrite the timezone of an aware datetime.
if is_localized(value):
raise ValueError(f"make_aware expects a naive datetime, got {value}")
if hasattr(value, "fold"):
# In case of python 3.6 we want to do the same that pendulum does for python3.5
# i.e in case we move clock back we want to schedule the run at the time of the second
# instance of the same clock time rather than the first one.
# Fold parameter has no impact in other cases so we can safely set it to 1 here
value = value.replace(fold=1)
# In case we move clock back we want to schedule the run at the time of the second
# instance of the same clock time rather than the first one.
# Fold parameter has no impact in other cases, so we can safely set it to 1 here
value = value.replace(fold=1)
localized = getattr(timezone, "localize", None)
if localized is not None:
# This method is available for pytz time zones
Expand Down Expand Up @@ -273,3 +272,27 @@ def _format_part(key: str) -> str:
if not joined:
return "<1s"
return joined


def parse_timezone(name: str | int) -> FixedTimezone | Timezone:
"""
Parse timezone and return one of the pendulum Timezone.
Provide the same interface as ``pendulum.timezone(name)``
:param name: Either IANA timezone or offset to UTC in seconds.
:meta private:
"""
return pendulum.timezone(name)


def local_timezone() -> FixedTimezone | Timezone:
"""
Return local timezone.
Provide the same interface as ``pendulum.tz.local_timezone()``
:meta private:
"""
return pendulum.tz.local_timezone()
5 changes: 3 additions & 2 deletions kubernetes_tests/test_kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from unittest.mock import ANY, MagicMock
from uuid import uuid4

import pendulum
import pytest
from kubernetes import client
from kubernetes.client import V1EnvVar, V1PodSecurityContext, V1SecurityContext, models as k8s
Expand All @@ -53,7 +52,9 @@

def create_context(task) -> Context:
dag = DAG(dag_id="dag")
execution_date = timezone.datetime(2016, 1, 1, 1, 0, 0, tzinfo=pendulum.tz.timezone("Europe/Amsterdam"))
execution_date = timezone.datetime(
2016, 1, 1, 1, 0, 0, tzinfo=timezone.parse_timezone("Europe/Amsterdam")
)
dag_run = DagRun(
dag_id=dag.dag_id,
execution_date=execution_date,
Expand Down
4 changes: 1 addition & 3 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,7 @@ install_requires =
opentelemetry-exporter-otlp
packaging>=14.0
pathspec>=0.9.0
# When (if) pendulum 3 released it would introduce changes in module/objects imports,
# since we are tightly coupled with pendulum library internally it will breaks Airflow functionality.
pendulum>=2.0,<3.0
pendulum>=3.0
pluggy>=1.0
psutil>=4.2.0
pydantic>=2.3.0
Expand Down
Loading

0 comments on commit 5681d12

Please sign in to comment.