Skip to content

Commit

Permalink
fixed some bugs in Trigger and added func to display all schedules
Browse files Browse the repository at this point in the history
  • Loading branch information
Volker Lorrmann committed Nov 5, 2024
1 parent 812ac32 commit 4a7bbfe
Show file tree
Hide file tree
Showing 14 changed files with 400 additions and 147 deletions.
9 changes: 6 additions & 3 deletions examples/hello-world/conf/pipelines/hello_world.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,29 @@ run:
schedule:
run:
coalesce: latest
conflict_poilcy: do_nothing
conflict_policy: do_nothing
executor: null
id: null
id_: null
max_jitter: null
max_running_jobs: null
misfire_grace_time: null
paused: false
trigger:
crontab: null
day: null
day_of_week: null
days: 0
end_time: null
hour: null
hours: 0
minute: null
minutes: 0
month: null
second: null
seconds: 0
start_time: null
timezone: null
type: null
type_: null
week: null
weeks: 0
year: null
Expand Down
37 changes: 0 additions & 37 deletions examples/hello-world/conf/pipelines/test123.yml

This file was deleted.

37 changes: 0 additions & 37 deletions examples/hello-world/conf/pipelines/test1234.yml

This file was deleted.

4 changes: 3 additions & 1 deletion examples/hello-world/conf/project.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
scheduler:
cleanup_interval: 900
data_store: {}
data_store:
type: sqlalchemy
uri: sqlite+aiosqlite:///scheduler.db
event_broker: {}
max_concurrent_jobs: 100
tracker:
Expand Down
11 changes: 0 additions & 11 deletions examples/hello-world/pipelines/test1234.py

This file was deleted.

5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,8 @@ exclude = [
"docker",
".gitignore",
]

[tool.uv]
dev-dependencies = [
"prettytable>=3.12.0",
]
Binary file added scheduler.db
Binary file not shown.
19 changes: 16 additions & 3 deletions src/flowerpower/cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,17 @@ def __post_init__(self):

@dataclass
class PipelineScheduleTriggerConfig(BaseConfig):
type: str | None = None
type_: str | None = None
crontab: str | None = None
year: str | int | None = None
month: str | int | None = None
weeks: int | float = 0
week: str | int | None = None
days: int | float = 0
day: str | int | None = None
day_of_week: str | int | None = None
hours: int | float = 0
hour: str | int | None = None
minutes: int | float = 0
minute: str | int | None = None
seconds: int | float = 0
Expand All @@ -59,14 +62,14 @@ class PipelineScheduleTriggerConfig(BaseConfig):

@dataclass
class PipelineScheduleRunConfig(BaseConfig):
id: str | None = None
id_: str | None = None
executor: str | None = None
paused: bool = False
coalesce: str = "latest" # other options are "all" and "earliest"
misfire_grace_time: int | float | dt.timedelta | None = None
max_jitter: int | float | dt.timedelta | None = None
max_running_jobs: int | None = None
conflict_poilcy: str | None = (
conflict_policy: str | None = (
"do_nothing" # other options are "replace" and "exception"
)

Expand All @@ -78,6 +81,16 @@ class PipelineScheduleConfig(BaseConfig):
default_factory=PipelineScheduleTriggerConfig
)

def __post_init__(self):
self.run = PipelineScheduleRunConfig(
**self.run if isinstance(self.run, dict | Munch) else self.run.to_dict()
)
self.trigger = PipelineScheduleTriggerConfig(
**self.trigger
if isinstance(self.trigger, dict | Munch)
else self.trigger.to_dict()
)


@dataclass
class PipelineTrackerConfig(BaseConfig):
Expand Down
37 changes: 35 additions & 2 deletions src/flowerpower/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ def schedule(


@app.command()
def new(
def new_pipeline(
name: str,
base_dir: str = "",
overwrite: bool = False,
Expand Down Expand Up @@ -296,6 +296,39 @@ def new(
)


@app.command()
def add_pipeline(
name: str,
base_dir: str = "",
overwrite: bool = False,
pipeline_params: str = "",
run_params: str = "",
schedule_params: str = "",
tracker_params: str = "",
):
"""
Create a new pipeline with the given parameters.
Args:
name (str): The name of the pipeline.
base_dir (str, optional): The base path for the pipeline. Defaults to "".
overwrite (bool, optional): Whether to overwrite an existing pipeline with the same name. Defaults to False.
pipeline_params (str, optional): Additional parameters for the pipeline. Defaults to "".
run_params (str, optional): Additional parameters for the run. Defaults to "".
schedule_params (str, optional): Additional parameters for the schedule. Defaults to "".
tracker_params (str, optional): Additional parameters for the tracker. Defaults to "".
"""
new_pipeline(
name=name,
base_dir=base_dir,
overwrite=overwrite,
pipeline_params=pipeline_params,
run_params=run_params,
schedule_params=schedule_params,
tracker_params=tracker_params,
)


@app.command()
def delete(name: str, base_dir: str = "", module: bool = False):
"""
Expand Down Expand Up @@ -336,7 +369,7 @@ def start_worker(name: str, base_dir: str = ""):


@app.command()
def show(name: str, base_dir: str = ""):
def show_pipeline(name: str, base_dir: str = ""):
"""
Show the pipeline.
Expand Down
Loading

0 comments on commit 4a7bbfe

Please sign in to comment.