Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add end_offset arg to @_partitioned_config decorators #4741

Merged
merged 1 commit into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,18 @@ class TimeWindowPartitionsDefinition(
("start", datetime),
("timezone", str),
("fmt", str),
("end_offset", int),
],
),
):
def __new__(
cls, schedule_type: ScheduleType, start: datetime, timezone: str, fmt: str, end_offset: int
):
check.param_invariant(end_offset >= 0, "end_offset", "end_offset must be non-negative")
return super(TimeWindowPartitionsDefinition, cls).__new__(
cls, schedule_type, start, timezone, fmt, end_offset
)

def get_partitions(
self, current_time: Optional[datetime] = None
) -> List[Partition[TimeWindow]]:
Expand All @@ -53,19 +62,29 @@ def get_partitions(

partitions: List[Partition[TimeWindow]] = []
prev_time = next(iterator)
while prev_time.timestamp() <= current_timestamp:
next_time = next(iterator)
while prev_time.timestamp() < start_timestamp:
prev_time = next(iterator)

end_offset = self.end_offset
partitions_past_current_time = 0
while True:
next_time = next(iterator)
if (
prev_time.timestamp() >= start_timestamp
and next_time.timestamp() <= current_timestamp
next_time.timestamp() <= current_timestamp
or partitions_past_current_time < end_offset
):
partitions.append(
Partition(
value=TimeWindow(prev_time, next_time), name=prev_time.strftime(self.fmt)
value=TimeWindow(prev_time, next_time),
name=prev_time.strftime(self.fmt),
)
)

if next_time.timestamp() > current_timestamp:
partitions_past_current_time += 1
else:
break

prev_time = next_time

return partitions
Expand All @@ -75,21 +94,28 @@ def daily_partitioned_config(
start_date: Union[datetime, str],
timezone: Optional[str] = None,
fmt: Optional[str] = None,
end_offset: int = 0,
) -> Callable[[Callable[[datetime, datetime], Dict[str, Any]]], PartitionedConfig]:
"""Defines run config over a set of daily partitions.

The decorated function should accept a start datetime and end datetime, which represent the date
partition the config should delineate.
The decorated function should accept a start datetime and end datetime, which represent the bounds
of the date partition the config should delineate.
Comment on lines +101 to +102
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That helps make things so much clearer.


The decorated function should return a run config dictionary.

The resulting object created by this decorator can be provided to the config argument of a Job.
The first partition in the set will start at the start_date. The last partition in the set will
end before the current time, unless the end_offset argument is set to a positive number.

Args:
start_date (Union[datetime.datetime, str]): The date from which to run the schedule. Can
provide in either a datetime or string format.
timezone (Optional[str]): The timezone in which each date should exist.
fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`.
end_offset (int): Extends the partition set by a number of partitions equal to the value
passed. If end_offset is 0 (the default), the last partition ends before the current
time. If end_offset is 1, the second-to-last partition ends before the current time,
and so on.
"""

experimental_fn_warning("daily_partitioned_config")
Expand All @@ -114,6 +140,7 @@ def inner(fn: Callable[[datetime, datetime], Dict[str, Any]]) -> PartitionedConf
start=_start_date,
timezone=_timezone,
fmt=_fmt,
end_offset=end_offset,
),
)

Expand All @@ -124,6 +151,7 @@ def hourly_partitioned_config(
start_date: Union[datetime, str],
timezone: Optional[str] = None,
fmt: Optional[str] = None,
end_offset: int = 0,
) -> Callable[[Callable[[datetime, datetime], Dict[str, Any]]], PartitionedConfig]:
"""Defines run config over a set of hourly partitions.

Expand All @@ -133,6 +161,8 @@ def hourly_partitioned_config(
The decorated function should return a run config dictionary.

The resulting object created by this decorator can be provided to the config argument of a Job.
The first partition in the set will start at the start_date. The last partition in the set will
end before the current time, unless the end_offset argument is set to a positive number.

Args:
start_date (Union[datetime.datetime, str]): The date from which to run the schedule. Can
Expand Down Expand Up @@ -163,6 +193,7 @@ def inner(fn: Callable[[datetime, datetime], Dict[str, Any]]) -> PartitionedConf
start=_start_date,
timezone=_timezone,
fmt=_fmt,
end_offset=end_offset,
),
)

Expand All @@ -173,6 +204,7 @@ def monthly_partitioned_config(
start_date: Union[datetime, str],
timezone: Optional[str] = None,
fmt: Optional[str] = None,
end_offset: int = 0,
) -> Callable[[Callable[[datetime, datetime], Dict[str, Any]]], PartitionedConfig]:
"""Defines run config over a set of monthly partitions.

Expand All @@ -182,12 +214,18 @@ def monthly_partitioned_config(
The decorated function should return a run config dictionary.

The resulting object created by this decorator can be provided to the config argument of a Job.
The first partition in the set will start at the start_date. The last partition in the set will
end before the current time, unless the end_offset argument is set to a positive number.

Args:
start_date (Union[datetime.datetime, str]): The date from which to run the schedule. Can
provide in either a datetime or string format.
timezone (Optional[str]): The timezone in which each date should exist.
fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`.
end_offset (int): Extends the partition set by a number of partitions equal to the value
passed. If end_offset is 0 (the default), the last partition ends before the current
time. If end_offset is 1, the second-to-last partition ends before the current time,
and so on.
"""
experimental_fn_warning("monthly_partitioned_config")

Expand All @@ -211,6 +249,7 @@ def inner(fn: Callable[[datetime, datetime], Dict[str, Any]]) -> PartitionedConf
start=_start_date,
timezone=_timezone,
fmt=_fmt,
end_offset=end_offset,
),
)

Expand All @@ -221,6 +260,7 @@ def weekly_partitioned_config(
start_date: Union[datetime, str],
timezone: Optional[str] = None,
fmt: Optional[str] = None,
end_offset: int = 0,
) -> Callable[[Callable[[datetime, datetime], Dict[str, Any]]], PartitionedConfig]:
"""Defines run config over a set of weekly partitions.

Expand All @@ -230,13 +270,18 @@ def weekly_partitioned_config(
The decorated function should return a run config dictionary.

The resulting object created by this decorator can be provided to the config argument of a Job.

The first partition in the set will start at the start_date. The last partition in the set will
end before the current time, unless the end_offset argument is set to a positive number.

Args:
start_date (Union[datetime.datetime, str]): The date from which to run the schedule. Can
provide in either a datetime or string format.
timezone (Optional[str]): The timezone in which each date should exist.
fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`.
end_offset (int): Extends the partition set by a number of partitions equal to the value
passed. If end_offset is 0 (the default), the last partition ends before the current
time. If end_offset is 1, the second-to-last partition ends before the current time,
and so on.
"""

experimental_fn_warning("weekly_partitioned_config")
Expand All @@ -261,6 +306,7 @@ def inner(fn: Callable[[datetime, datetime], Dict[str, Any]]) -> PartitionedConf
start=_start_date,
timezone=_timezone,
fmt=_fmt,
end_offset=end_offset,
),
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from datetime import datetime

import pendulum
from dagster.core.definitions.time_window_partitions import TimeWindow, daily_partitioned_config
from dagster.core.definitions.time_window_partitions import (
TimeWindow,
daily_partitioned_config,
monthly_partitioned_config,
)

DATE_FORMAT = "%Y-%m-%d"

Expand All @@ -24,3 +28,55 @@ def my_partitioned_config(_start, _end):
time_window("2021-05-05", "2021-05-06"),
time_window("2021-05-06", "2021-05-07"),
]


def test_daily_partitions_with_end_offset():
@daily_partitioned_config(start_date="2021-05-05", end_offset=2)
def my_partitioned_config(_start, _end):
return {}

assert [
partition.value
for partition in my_partitioned_config.partitions_def.get_partitions(
datetime.strptime("2021-05-07", DATE_FORMAT)
)
] == [
time_window("2021-05-05", "2021-05-06"),
time_window("2021-05-06", "2021-05-07"),
time_window("2021-05-07", "2021-05-08"),
time_window("2021-05-08", "2021-05-09"),
]


def test_monthly_partitions():
@monthly_partitioned_config(start_date="2021-05-01")
def my_partitioned_config(_start, _end):
return {}

assert [
partition.value
for partition in my_partitioned_config.partitions_def.get_partitions(
datetime.strptime("2021-07-03", DATE_FORMAT)
)
] == [
time_window("2021-05-01", "2021-06-01"),
time_window("2021-06-01", "2021-07-01"),
]


def test_monthly_partitions_with_end_offset():
@monthly_partitioned_config(start_date="2021-05-01", end_offset=2)
def my_partitioned_config(_start, _end):
return {}

assert [
partition.value
for partition in my_partitioned_config.partitions_def.get_partitions(
datetime.strptime("2021-07-03", DATE_FORMAT)
)
] == [
time_window("2021-05-01", "2021-06-01"),
time_window("2021-06-01", "2021-07-01"),
time_window("2021-07-01", "2021-08-01"),
time_window("2021-08-01", "2021-09-01"),
]