Skip to content

Commit

Permalink
Fix core static checks
Browse files Browse the repository at this point in the history
  • Loading branch information
Taragolis committed Dec 18, 2023
1 parent 1097e39 commit 91763bd
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 12 deletions.
5 changes: 3 additions & 2 deletions airflow/example_dags/plugins/workday.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ def next_dagrun_info(
tzinfo=UTC
)
else: # This is the first ever run on the regular schedule.
next_start = restriction.earliest
if next_start is None: # No start_date. Don't schedule.
if restriction.earliest is None: # No start_date. Don't schedule.
return None
next_start = restriction.earliest

if not restriction.catchup:
# If the DAG has catchup=False, today is the earliest to consider.
next_start = max(next_start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC))
Expand Down
6 changes: 3 additions & 3 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@
if TYPE_CHECKING:
from types import ModuleType

from pendulum.tz.timezone import Timezone
from pendulum.tz.timezone import FixedTimezone, Timezone
from sqlalchemy.orm.query import Query
from sqlalchemy.orm.session import Session

Expand Down Expand Up @@ -213,7 +213,7 @@ def _get_model_data_interval(
return DataInterval(start, end)


def create_timetable(interval: ScheduleIntervalArg, timezone: Timezone) -> Timetable:
def create_timetable(interval: ScheduleIntervalArg, timezone: Timezone | FixedTimezone) -> Timetable:
"""Create a Timetable instance from a ``schedule_interval`` argument."""
if interval is NOTSET:
return DeltaDataIntervalTimetable(DEFAULT_SCHEDULE_INTERVAL)
Expand Down Expand Up @@ -529,7 +529,7 @@ def __init__(

tzinfo = None if date.tzinfo else settings.TIMEZONE
tz = pendulum.instance(date, tz=tzinfo).timezone
self.timezone: Timezone = tz or settings.TIMEZONE
self.timezone: Timezone | FixedTimezone = tz or settings.TIMEZONE

# Apply the timezone we settled on to end_date if it wasn't supplied
if "end_date" in self.default_args and self.default_args["end_date"]:
Expand Down
2 changes: 1 addition & 1 deletion airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def decode_relativedelta(var: dict[str, Any]) -> relativedelta.relativedelta:
return relativedelta.relativedelta(**var)


def encode_timezone(var: Timezone) -> str | int:
def encode_timezone(var: Timezone | FixedTimezone) -> str | int:
"""
Encode a Pendulum Timezone for serialization.
Expand Down
18 changes: 14 additions & 4 deletions airflow/timetables/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

if TYPE_CHECKING:
from dateutil.relativedelta import relativedelta
from pendulum.tz.timezone import Timezone
from pendulum.tz.timezone import FixedTimezone, Timezone

from airflow.timetables.base import TimeRestriction

Expand All @@ -48,7 +48,7 @@ def __init__(
self,
cron: str,
*,
timezone: str | Timezone,
timezone: str | Timezone | FixedTimezone,
interval: datetime.timedelta | relativedelta = datetime.timedelta(),
) -> None:
super().__init__(cron, timezone)
Expand Down Expand Up @@ -77,7 +77,12 @@ def serialize(self) -> dict[str, Any]:
return {"expression": self._expression, "timezone": timezone, "interval": interval}

def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
return DataInterval(run_after - self._interval, run_after)
return DataInterval(
# pendulum.Datetime ± timedelta should return pendulum.Datetime
# however mypy decide that output would be datetime.datetime
run_after - self._interval, # type: ignore[arg-type]
run_after,
)

def next_dagrun_info(
self,
Expand All @@ -101,4 +106,9 @@ def next_dagrun_info(
next_start_time = max(start_time_candidates)
if restriction.latest is not None and restriction.latest < next_start_time:
return None
return DagRunInfo.interval(next_start_time - self._interval, next_start_time)
return DagRunInfo.interval(
# pendulum.Datetime ± timedelta should return pendulum.Datetime
# however mypy decide that output would be datetime.datetime
next_start_time - self._interval, # type: ignore[arg-type]
next_start_time,
)
5 changes: 3 additions & 2 deletions tests/plugins/workday.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ def next_dagrun_info(
tzinfo=UTC
)
else: # This is the first ever run on the regular schedule.
next_start = restriction.earliest
if next_start is None: # No start_date. Don't schedule.
if restriction.earliest is None: # No start_date. Don't schedule.
return None

next_start = restriction.earliest
if not restriction.catchup:
# If the DAG has catchup=False, today is the earliest to consider.
next_start = max(next_start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC))
Expand Down

0 comments on commit 91763bd

Please sign in to comment.