diff --git a/.gitignore b/.gitignore index 5c725e5..de1e654 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,5 @@ wheels/ .vscode docker/config/.bash_history docker/assets +dev +dev/simple_with_class \ No newline at end of file diff --git a/dev/scheduler.py b/dev/scheduler.py deleted file mode 100644 index c17db91..0000000 --- a/dev/scheduler.py +++ /dev/null @@ -1,10 +0,0 @@ -from test_job import TestJob - - -def main(): - test_job = TestJob("job1") - test_job.add_job(1, 2, 3, a="A") - - -if __name__ == "__main__": - main() diff --git a/dev/test_job.py b/dev/test_job.py deleted file mode 100644 index 461dd02..0000000 --- a/dev/test_job.py +++ /dev/null @@ -1,40 +0,0 @@ -from apscheduler import Scheduler -from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore -from apscheduler.eventbrokers.redis import RedisEventBroker -from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker - - -class SM(Scheduler): - def __init__(self, name, ds_uri, eb_uri): - self.name = name - self._data_store = SQLAlchemyDataStore(engine_or_url=ds_uri) - if "redis" in eb_uri: - self._event_broker = RedisEventBroker(eb_uri) - else: - self._event_broker = AsyncpgEventBroker.from_async_sqla_engine( - self._data_store._engine - ) - - super().__init__( - identity=self.name, - data_store=self._data_store, - event_broker=self._event_broker, - # job_executors=self._job_executors, - ) - - -class TestJob: - def __init__(self, name): - self.name = name - - def run(self, *args, **kwargs): - print(self.name, args, kwargs) - - def add_job(self, *args, **kwargs): - with SM( - name="test", - # ds_uri="sqlite+aiosqlite:///test.db", - ds_uri="postgresql+asyncpg://edge:edge@localhost:5432/flowerpower", - eb_uri="redis://localhost:6379", - ) as sched: - sched.add_job(self.run, args=args, kwargs=kwargs) diff --git a/dev/worker.py b/dev/worker.py deleted file mode 100644 index 69ceebe..0000000 --- a/dev/worker.py +++ /dev/null @@ -1,17 +0,0 @@ -from test_job import TestJob -from apscheduler import Scheduler -from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore -from apscheduler.eventbrokers.redis import RedisEventBroker - - -def main(): - data_store = SQLAlchemyDataStore( - engine_or_url="postgresql+asyncpg://edge:edge@localhost:5432/flowerpower" - ) - event_broker = RedisEventBroker("redis://localhost:6379") - with Scheduler(data_store, event_broker) as sched: - sched.run_until_stopped() - - -if __name__ == "__main__": - main() diff --git a/pyproject.toml b/pyproject.toml index ed62f07..ed00c8d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,11 +36,11 @@ mqtt = ["paho-mqtt>=2.1.0"] ray = ["ray>=2.34.0"] redis = ["redis>=5.0.4"] scheduler = [ - "aiosqlite>=0.20.0", - "greenlet>=3.0.3", - "asyncpg>=0.29.0", - "sqlalchemy>=2.0.30", - "apscheduler>=4.0.0a5", + "aiosqlite>=0.20.0", + "greenlet>=3.0.3", + "asyncpg>=0.29.0", + "sqlalchemy>=2.0.30", + "apscheduler @ git+https://github.com/agronholm/apscheduler.git", ] ui = ["sf-hamilton-ui>=0.0.11"] @@ -57,6 +57,7 @@ dev-dependencies = [ "dill>=0.3.8", "cbor2>=5.6.4", "lxml>=5.3.0", + "msgspec>=0.18.6", ] #[tool.rye.scripts] #flowerpower = {cmd = ["python", "-m", "flowerpower.cli"]} diff --git a/requirements-dev.lock b/requirements-dev.lock index d39efb8..7201827 100644 --- a/requirements-dev.lock +++ b/requirements-dev.lock @@ -12,9 +12,9 @@ -e file:. aiofiles==24.1.0 # via sf-hamilton-ui -aiohappyeyeballs==2.3.7 +aiohappyeyeballs==2.4.0 # via aiohttp -aiohttp==3.10.3 +aiohttp==3.10.5 # via sf-hamilton-sdk # via sf-hamilton-ui aiosignal==1.3.1 @@ -26,7 +26,7 @@ annotated-types==0.7.0 # via pydantic anyio==4.4.0 # via apscheduler -apscheduler==4.0.0a5 +apscheduler @ git+https://github.com/agronholm/apscheduler.git@8ed2b1babf6c7c173f2c8ed11b9417c891aa0dac # via flowerpower asgiref==3.8.1 # via django @@ -86,7 +86,7 @@ graphviz==0.20.3 # via sf-hamilton greenlet==3.0.3 # via flowerpower -idna==3.7 +idna==3.8 # via anyio # via requests # via yarl @@ -116,6 +116,7 @@ monotonic==1.6 # via posthog msgpack==1.0.8 # via ray +msgspec==0.18.6 multidict==6.0.5 # via aiohttp # via yarl @@ -125,7 +126,7 @@ mypy-extensions==1.0.0 # via typing-inspect networkx==3.3 # via sf-hamilton -numpy==2.0.1 +numpy==2.1.0 # via pandas # via sf-hamilton packaging==24.1 @@ -141,11 +142,11 @@ passlib==1.7.4 pexpect==4.9.0 # via ipython pillow==10.4.0 -posthog==3.5.0 +posthog==3.6.0 # via sf-hamilton-sdk prompt-toolkit==3.0.47 # via ipython -protobuf==5.27.3 +protobuf==5.28.0 # via ray psycopg2-binary==2.9.9 # via sf-hamilton-ui @@ -172,7 +173,7 @@ pytz==2024.1 pyyaml==6.0.2 # via flowerpower # via ray -ray==2.34.0 +ray==2.35.0 # via flowerpower redis==5.0.8 # via flowerpower @@ -184,16 +185,16 @@ requests==2.32.3 # via ray # via sf-hamilton-sdk # via sf-hamilton-ui -rich==13.7.1 +rich==13.8.0 # via flowerpower # via typer rpds-py==0.20.0 # via jsonschema # via referencing -sf-hamilton==1.73.2 +sf-hamilton==1.75.0 # via flowerpower # via sf-hamilton-sdk -sf-hamilton-sdk==0.6.0 +sf-hamilton-sdk==0.7.0 # via flowerpower sf-hamilton-ui==0.0.14 # via flowerpower @@ -209,7 +210,7 @@ sniffio==1.3.1 # via anyio sqlalchemy==2.0.32 # via flowerpower -sqlglot==25.12.0 +sqlglot==25.18.0 # via sf-hamilton-sdk sqlparse==0.5.1 # via django @@ -220,7 +221,7 @@ tenacity==8.5.0 traitlets==5.14.3 # via ipython # via matplotlib-inline -typer==0.12.4 +typer==0.12.5 # via flowerpower # via sf-hamilton-ui typing-extensions==4.12.2 diff --git a/requirements.lock b/requirements.lock index 8df7f44..9a005ca 100644 --- a/requirements.lock +++ b/requirements.lock @@ -12,9 +12,9 @@ -e file:. aiofiles==24.1.0 # via sf-hamilton-ui -aiohappyeyeballs==2.3.7 +aiohappyeyeballs==2.4.0 # via aiohttp -aiohttp==3.10.3 +aiohttp==3.10.5 # via sf-hamilton-sdk # via sf-hamilton-ui aiosignal==1.3.1 @@ -26,7 +26,7 @@ annotated-types==0.7.0 # via pydantic anyio==4.4.0 # via apscheduler -apscheduler==4.0.0a5 +apscheduler @ git+https://github.com/agronholm/apscheduler.git@8ed2b1babf6c7c173f2c8ed11b9417c891aa0dac # via flowerpower asgiref==3.8.1 # via django @@ -78,7 +78,7 @@ graphviz==0.20.3 # via sf-hamilton greenlet==3.0.3 # via flowerpower -idna==3.7 +idna==3.8 # via anyio # via requests # via yarl @@ -110,7 +110,7 @@ mypy-extensions==1.0.0 # via typing-inspect networkx==3.3 # via sf-hamilton -numpy==2.0.1 +numpy==2.1.0 # via pandas # via sf-hamilton packaging==24.1 @@ -121,9 +121,9 @@ pandas==2.2.2 # via sf-hamilton passlib==1.7.4 # via sf-hamilton-ui -posthog==3.5.0 +posthog==3.6.0 # via sf-hamilton-sdk -protobuf==5.27.3 +protobuf==5.28.0 # via ray psycopg2-binary==2.9.9 # via sf-hamilton-ui @@ -145,7 +145,7 @@ pytz==2024.1 pyyaml==6.0.2 # via flowerpower # via ray -ray==2.34.0 +ray==2.35.0 # via flowerpower redis==5.0.8 # via flowerpower @@ -157,16 +157,16 @@ requests==2.32.3 # via ray # via sf-hamilton-sdk # via sf-hamilton-ui -rich==13.7.1 +rich==13.8.0 # via flowerpower # via typer rpds-py==0.20.0 # via jsonschema # via referencing -sf-hamilton==1.73.2 +sf-hamilton==1.75.0 # via flowerpower # via sf-hamilton-sdk -sf-hamilton-sdk==0.6.0 +sf-hamilton-sdk==0.7.0 # via flowerpower sf-hamilton-ui==0.0.14 # via flowerpower @@ -181,13 +181,13 @@ sniffio==1.3.1 # via anyio sqlalchemy==2.0.32 # via flowerpower -sqlglot==25.12.0 +sqlglot==25.18.0 # via sf-hamilton-sdk sqlparse==0.5.1 # via django tenacity==8.5.0 # via apscheduler -typer==0.12.4 +typer==0.12.5 # via flowerpower # via sf-hamilton-ui typing-extensions==4.12.2 diff --git a/src/flowerpower/cfg.py b/src/flowerpower/cfg.py index 78c8038..bad759e 100644 --- a/src/flowerpower/cfg.py +++ b/src/flowerpower/cfg.py @@ -7,8 +7,8 @@ from .helpers.templates import ( PIPELINE_TEMPLATE, # noqa: F401 - SCHEDULER_TEMPLATE, - TRACKER_TEMPLATE, + SCHEDULER_TEMPLATE, # noqa: F401 + TRACKER_TEMPLATE, # noqa: F401 ) diff --git a/src/flowerpower/pipeline.py b/src/flowerpower/pipeline.py index 0840715..9a30735 100644 --- a/src/flowerpower/pipeline.py +++ b/src/flowerpower/pipeline.py @@ -1,6 +1,6 @@ import datetime as dt -import importlib import importlib.util +import importlib import os import sys from typing import Any, Callable @@ -10,9 +10,9 @@ from hamilton_sdk import adapters from loguru import logger from munch import unmunchify - -from .cfg import Config from .helpers.templates import PIPELINE_PY_TEMPLATE +from .cfg import Config + if importlib.util.find_spec("apscheduler"): from .scheduler import SchedulerManager @@ -21,7 +21,7 @@ from .helpers.executor import get_executor -from .helpers.trigger import ALL_TRIGGER_KWARGS, get_trigger +from .helpers.trigger import get_trigger, ALL_TRIGGER_KWARGS class PipelineManager: @@ -41,10 +41,9 @@ def __init__(self, base_dir: str | None = None): sys.path.append(self._pipeline_dir) - # self._load_module() self._load_config() - def _load_module(self, name: str): + def load_module(self, name: str): """ Load a module dynamically. @@ -59,7 +58,7 @@ def _load_module(self, name: str): else: self._module = importlib.reload(self._module) - def _load_config(self): + def load_config(self): """ Load the configuration file. @@ -74,27 +73,8 @@ def _load_config(self): """ self.cfg = Config(base_dir=self._base_dir) - def reload_module(self, name: str): - """ - Reloads the specified module. - Args: - name (str): The name of the module to reload. - """ - self._load_module(name) - def reload_config(self): - """ - Reloads the configuration by creating a new instance of the Config class with the base path - set to the specified configuration path. - - Parameters: - - self: The current instance of the Pipeline class. - - Returns: - - None - """ - self.cfg = Config(base_dir=self._base_dir) def _get_driver( self, @@ -133,7 +113,7 @@ def _get_driver( executor or "local", max_tasks=max_tasks, num_cpus=num_cpus ) if reload or not hasattr(self, "_module"): - self._load_module(name) + self.load_module(name) if with_tracker: tracker_cfg = self.cfg.tracker.pipeline.get(name, {}) @@ -603,7 +583,7 @@ class Pipeline(PipelineManager): def __init__(self, name: str, base_dir: str | None = None): super().__init__(base_dir) self.name = name - self._load_module(name) + self.load_module(name) def run( self, @@ -713,8 +693,8 @@ def show(self, format: str = "png", view: bool = False, reload: bool = False): def delete(self, cfg: bool = True, module: bool = False): return super().delete(self.name, cfg, module) - def reload_module(self): - return super().reload_module(self.name) + def load_module(self): + super().load_module(self.name) def add(