From 2d5e5402c57dc12c2a61a8ea2c81b1e41249b593 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Mon, 25 Nov 2024 17:10:07 -0500 Subject: [PATCH 1/9] Adding py11 tests --- .github/workflows/run-tests-py11.yml | 91 ++++++++++++++++++++++++++++ .github/workflows/run-tests.yml | 5 +- 2 files changed, 95 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/run-tests-py11.yml diff --git a/.github/workflows/run-tests-py11.yml b/.github/workflows/run-tests-py11.yml new file mode 100644 index 0000000..6d1e726 --- /dev/null +++ b/.github/workflows/run-tests-py11.yml @@ -0,0 +1,91 @@ +name: Tests on py11 +on: [pull_request] + +jobs: + + build: + runs-on: ubuntu-latest + timeout-minutes: 40 + if: "!contains(github.event.head_commit.message, 'CI Bot')" + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python 3.11 + uses: actions/setup-python@v5 + with: + python-version: "3.11" + cache: "pip" + + - name: Show OS Info + run: '[[ "$OSTYPE" == "linux-gnu"* ]] && { echo "OS Type: Linux"; (command -v lsb_release &> /dev/null && lsb_release -a) || cat /etc/os-release; uname -r; } || [[ "$OSTYPE" == "darwin"* ]] && { echo "OS Type: macOS"; sw_vers; uname -r; } || echo "Unsupported OS type: $OSTYPE"' + + - name: Start docker compose with redis + run: docker compose -f deployment/compose.yml up -d + + - name: Upgrade pip + run: python -m pip install --upgrade pip + + - name: Show Python version + run: python --version && pip --version + + - name: Install default dependencies and run simple test + run: | + pip install . + python examples/instrumentation/simple_script.py + + - name: Install Dask dependencies alone and run a simple Dask test + run: | + pip uninstall flowcept -y + pip install .[dask] + python examples/dask_example.py + + - name: Install MLFlow dependencies alone and run a simple MLFlow test + run: | + pip uninstall flowcept -y + pip install .[mlflow] + python examples/mlflow_example.py + + - name: Install Tensorboard dependencies alone and run a simple Tensorboard test + run: | + pip uninstall flowcept -y + pip install .[tensorboard] + python examples/tensorboard_example.py + + - name: Install all dependencies + run: | + python -m pip install --upgrade pip + python -m pip install .[all] + + - name: List installed packages + run: pip list + + - name: Test with pytest and redis + run: | + pytest --ignore=tests/decorator_tests/ml_tests/llm_tests + + - name: Test notebooks with pytest and redis + run: pytest --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb + + - name: Shut down docker compose + run: docker compose -f deployment/compose.yml down + + - name: Start docker compose with kafka + run: docker compose -f deployment/compose-kafka.yml up -d + + - name: Wait for one minute + run: sleep 60 + + - name: Check liveness + run: | + export MQ_TYPE=kafka + export MQ_PORT=9092 + python -c 'from flowcept.configs import MQ_TYPE, MQ_PORT; print(f"MQ_TYPE={MQ_TYPE}"); print(f"MQ_PORT={MQ_PORT}")' + python -c 'from flowcept import Flowcept; assert Flowcept.services_alive()' + + - name: Run tests with kafka + run: | + export MQ_TYPE=kafka + export MQ_PORT=9092 + # Ignoring heavy tests. They are executed with Kafka in another GH Action. + pytest --ignore=tests/decorator_tests/ml_tests --ignore=tests/adapters/test_tensorboard.py diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index b3f9e1a..8e64e7b 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -1,5 +1,8 @@ name: Unit, integration, and notebook tests -on: [push] +on: + push: + schedule: + - cron: '0 12 * * *' # Runs every day at 12 PM UTC (7 AM EST) jobs: From b8793a698ca7edc862d50cd077da390866925f3b Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Mon, 25 Nov 2024 17:10:23 -0500 Subject: [PATCH 2/9] Adding more asserts to dask tests --- examples/dask_example.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/examples/dask_example.py b/examples/dask_example.py index d1126fd..1acaad9 100644 --- a/examples/dask_example.py +++ b/examples/dask_example.py @@ -23,6 +23,8 @@ def sum_list(values): scheduler = cluster.scheduler client = Client(scheduler.address) + client.forward_logging() + # Registering Flowcept's worker and scheduler adapters scheduler.add_plugin(FlowceptDaskSchedulerAdapter(scheduler)) client.register_plugin(FlowceptDaskWorkerAdapter()) @@ -39,22 +41,25 @@ def sum_list(values): t4 = client.submit(sum_list, [t1, t2, t3]) result = t4.result() print("Result:", result) - + assert result == 30 # Closing Dask and Flowcept - client.close() - cluster.close() + client.close() # This is to avoid generating errors + cluster.close() # This calls the needed closeouts to inform Flowcept that the workflow is done. flowcept.stop() - # Querying Flowcept's database about this run print(f"t1_key={t1.key}") print("Getting first task only:") task1 = Flowcept.db.query(filter={"task_id": t1.key})[0] assert task1["workflow_id"] == wf_id print(task1) + print("\n\n") print("Getting all tasks from this workflow:") all_tasks = Flowcept.db.query(filter={"workflow_id": wf_id}) assert len(all_tasks) == 4 + assert all(t.get("finished") is True for t in all_tasks) + assert all_tasks[-1]["generated"]["arg0"] == 30, "Checking if the last result was saved." print(all_tasks) + print("\n\n") print("Getting workflow info:") wf_info = Flowcept.db.query(filter={"workflow_id": wf_id}, type="workflow")[0] assert wf_info["workflow_id"] == wf_id From 372a9a1f0283362172131ee331eefefd839c3a27 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Mon, 25 Nov 2024 17:10:56 -0500 Subject: [PATCH 3/9] Removing base interceptor from __init__.py --- .../flowcept_api/flowcept_controller.py | 2 +- .../instrumentation/decorators/__init__.py | 26 +++++++++---------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/flowcept/flowcept_api/flowcept_controller.py b/src/flowcept/flowcept_api/flowcept_controller.py index f9fc047..2438d28 100644 --- a/src/flowcept/flowcept_api/flowcept_controller.py +++ b/src/flowcept/flowcept_api/flowcept_controller.py @@ -77,7 +77,7 @@ def __init__( if not INSTRUMENTATION_ENABLED: self.enabled = False return - interceptors = [flowcept.instrumentation.decorators.instrumentation_interceptor] + interceptors = [BaseInterceptor(kind="instrumentation")] elif not isinstance(interceptors, list): interceptors = [interceptors] self._interceptors: List[BaseInterceptor] = interceptors diff --git a/src/flowcept/instrumentation/decorators/__init__.py b/src/flowcept/instrumentation/decorators/__init__.py index 1c599e1..ebeb058 100644 --- a/src/flowcept/instrumentation/decorators/__init__.py +++ b/src/flowcept/instrumentation/decorators/__init__.py @@ -1,15 +1,15 @@ """Decorators subpackage.""" -from flowcept.flowceptor.adapters.base_interceptor import BaseInterceptor - -# TODO :base-interceptor-refactor: :ml-refactor: :code-reorg: :usability: -# Consider creating a new concept for instrumentation-based 'interception'. -# These adaptors were made for data observability. -# Perhaps we should have a BaseAdaptor that would work for both and -# observability and instrumentation adapters. This would be a major refactor -# in the code. https://github.com/ORNL/flowcept/issues/109 -instrumentation_interceptor = BaseInterceptor(kind="instrumentation") -# TODO This above is bad because I am reusing the same BaseInterceptor both -# for adapter-based observability + traditional instrumentation via @decorator -# I'm just setting _registered_workflow to avoid the auto wf register that -# exists in the BaseInterceptor +# from flowcept.flowceptor.adapters.base_interceptor import BaseInterceptor +# +# # TODO :base-interceptor-refactor: :ml-refactor: :code-reorg: :usability: +# # Consider creating a new concept for instrumentation-based 'interception'. +# # These adaptors were made for data observability. +# # Perhaps we should have a BaseAdaptor that would work for both and +# # observability and instrumentation adapters. This would be a major refactor +# # in the code. https://github.com/ORNL/flowcept/issues/109 +# instrumentation_interceptor = BaseInterceptor(kind="instrumentation") +# # TODO This above is bad because I am reusing the same BaseInterceptor both +# # for adapter-based observability + traditional instrumentation via @decorator +# # I'm just setting _registered_workflow to avoid the auto wf register that +# # exists in the BaseInterceptor From 5acee00f84e5f7986e5554d7318874c01419f897 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Tue, 26 Nov 2024 16:24:44 -0500 Subject: [PATCH 4/9] Changes to use InstrumentationIntereptor singleton - Instantiating objects in a static way in __init__.py files was creating other bad issues and making it harder to make Flowcept optional once you import it in one's code. This commit is mostly to address this. - Creating new commands in the Makefile and using it in the CI tests. - Adding more asserts in the dask example --- .github/workflows/run-tests-py11.yml | 8 ++--- .github/workflows/run-tests.yml | 11 +++---- Makefile | 10 ++++++ examples/dask_example.py | 29 ++++++++++------- .../flowcept_api/flowcept_controller.py | 9 ++---- .../flowceptor/adapters/base_interceptor.py | 3 ++ .../adapters/instrumentation_interceptor.py | 23 ++++++++++++++ .../decorators/flowcept_task.py | 19 +++++++----- .../decorators/flowcept_torch.py | 31 +++++++++---------- tests/adapters/test_mlflow.py | 1 + tests/api/flowcept_api_test.py | 15 +++++++++ 11 files changed, 105 insertions(+), 54 deletions(-) create mode 100644 src/flowcept/flowceptor/adapters/instrumentation_interceptor.py diff --git a/.github/workflows/run-tests-py11.yml b/.github/workflows/run-tests-py11.yml index 6d1e726..ab5033d 100644 --- a/.github/workflows/run-tests-py11.yml +++ b/.github/workflows/run-tests-py11.yml @@ -21,7 +21,7 @@ jobs: run: '[[ "$OSTYPE" == "linux-gnu"* ]] && { echo "OS Type: Linux"; (command -v lsb_release &> /dev/null && lsb_release -a) || cat /etc/os-release; uname -r; } || [[ "$OSTYPE" == "darwin"* ]] && { echo "OS Type: macOS"; sw_vers; uname -r; } || echo "Unsupported OS type: $OSTYPE"' - name: Start docker compose with redis - run: docker compose -f deployment/compose.yml up -d + run: make services - name: Upgrade pip run: python -m pip install --upgrade pip @@ -62,13 +62,13 @@ jobs: - name: Test with pytest and redis run: | - pytest --ignore=tests/decorator_tests/ml_tests/llm_tests + make tests - name: Test notebooks with pytest and redis - run: pytest --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb + run: make tests-notebooks - name: Shut down docker compose - run: docker compose -f deployment/compose.yml down + run: make services-stop - name: Start docker compose with kafka run: docker compose -f deployment/compose-kafka.yml up -d diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 8e64e7b..b0ca89b 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -24,7 +24,7 @@ jobs: run: '[[ "$OSTYPE" == "linux-gnu"* ]] && { echo "OS Type: Linux"; (command -v lsb_release &> /dev/null && lsb_release -a) || cat /etc/os-release; uname -r; } || [[ "$OSTYPE" == "darwin"* ]] && { echo "OS Type: macOS"; sw_vers; uname -r; } || echo "Unsupported OS type: $OSTYPE"' - name: Start docker compose with redis - run: docker compose -f deployment/compose.yml up -d + run: make services - name: Upgrade pip run: python -m pip install --upgrade pip @@ -62,13 +62,13 @@ jobs: - name: Test with pytest and redis run: | - pytest --ignore=tests/decorator_tests/ml_tests/llm_tests + make tests - name: Test notebooks with pytest and redis - run: pytest --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb + run: make tests-notebooks - name: Shut down docker compose - run: docker compose -f deployment/compose.yml down + run: make services-stop - name: Start docker compose with kafka run: docker compose -f deployment/compose-kafka.yml up -d @@ -87,5 +87,4 @@ jobs: run: | export MQ_TYPE=kafka export MQ_PORT=9092 - # Ignoring heavy tests. They are executed with Kafka in another GH Action. - pytest --ignore=tests/decorator_tests/ml_tests --ignore=tests/adapters/test_tensorboard.py + make tests diff --git a/Makefile b/Makefile index c94b141..19ec7fe 100644 --- a/Makefile +++ b/Makefile @@ -7,6 +7,8 @@ help: @printf "\033[32mservices\033[0m run services using Docker\n" @printf "\033[32mservices-stop\033[0m stop the running Docker services\n" @printf "\033[32mtests\033[0m run unit tests with pytest\n" + @printf "\033[32mtests-all\033[0m run all unit tests with pytest, including very long-running ones\n" + @printf "\033[32mtests-notebooks\033[0m Tests the notebooks, using pytest\n" # Run linter and formatter checks using ruff @@ -44,3 +46,11 @@ services-stop: .PHONY: tests tests: pytest --ignore=tests/decorator_tests/ml_tests/llm_tests + +.PHONY: tests-notebooks +tests-notebooks: + pytest --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb + +.PHONY: tests-all +tests-all: + pytest diff --git a/examples/dask_example.py b/examples/dask_example.py index 1acaad9..38192f6 100644 --- a/examples/dask_example.py +++ b/examples/dask_example.py @@ -34,18 +34,23 @@ def sum_list(values): print(f"workflow_id={wf_id}") # Start Flowcept's Dask observer - flowcept = Flowcept("dask").start() - t1 = client.submit(add, 1, 2) - t2 = client.submit(multiply, 3, 4) - t3 = client.submit(add, t1.result(), t2.result()) - t4 = client.submit(sum_list, [t1, t2, t3]) - result = t4.result() - print("Result:", result) - assert result == 30 - # Closing Dask and Flowcept - client.close() # This is to avoid generating errors - cluster.close() # This calls the needed closeouts to inform Flowcept that the workflow is done. - flowcept.stop() + + with Flowcept("dask"): # Optionally: Flowcept("dask").start() + + t1 = client.submit(add, 1, 2) + t2 = client.submit(multiply, 3, 4) + t3 = client.submit(add, t1.result(), t2.result()) + t4 = client.submit(sum_list, [t1, t2, t3]) + result = t4.result() + print("Result:", result) + assert result == 30 + + # Closing Dask and Flowcept + client.close() # This is to avoid generating errors + cluster.close() # This calls are needed closeouts to inform of workflow conclusion. + + # Optionally: flowcept.stop() + # Querying Flowcept's database about this run print(f"t1_key={t1.key}") print("Getting first task only:") diff --git a/src/flowcept/flowcept_api/flowcept_controller.py b/src/flowcept/flowcept_api/flowcept_controller.py index 2438d28..5b62d25 100644 --- a/src/flowcept/flowcept_api/flowcept_controller.py +++ b/src/flowcept/flowcept_api/flowcept_controller.py @@ -7,7 +7,6 @@ WorkflowObject, ) -import flowcept.instrumentation.decorators from flowcept.commons import logger from flowcept.commons.daos.document_db_dao import DocumentDBDao from flowcept.commons.daos.mq_dao.mq_dao_base import MQDao @@ -16,6 +15,7 @@ INSTRUMENTATION_ENABLED, ) from flowcept.flowcept_api.db_api import DBAPI +from flowcept.flowceptor.adapters.instrumentation_interceptor import InstrumentationInterceptor from flowcept.flowceptor.consumers.document_inserter import DocumentInserter from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.flowceptor.adapters.base_interceptor import BaseInterceptor @@ -77,7 +77,7 @@ def __init__( if not INSTRUMENTATION_ENABLED: self.enabled = False return - interceptors = [BaseInterceptor(kind="instrumentation")] + interceptors = [InstrumentationInterceptor.get_instance()] elif not isinstance(interceptors, list): interceptors = [interceptors] self._interceptors: List[BaseInterceptor] = interceptors @@ -179,11 +179,6 @@ def __exit__(self, exc_type, exc_val, exc_tb): """Run the stop function.""" self.stop() - @staticmethod - def start_instrumentation_interceptor(): - """Start it.""" - flowcept.instrumentation.decorators.instrumentation_interceptor.start(None) - @staticmethod def services_alive() -> bool: """Get alive services.""" diff --git a/src/flowcept/flowceptor/adapters/base_interceptor.py b/src/flowcept/flowceptor/adapters/base_interceptor.py index 8d9ab51..e2199da 100644 --- a/src/flowcept/flowceptor/adapters/base_interceptor.py +++ b/src/flowcept/flowceptor/adapters/base_interceptor.py @@ -1,6 +1,7 @@ """Base module.""" from abc import abstractmethod +from time import time from uuid import uuid4 from flowcept.commons.flowcept_dataclasses.workflow_object import ( @@ -29,6 +30,8 @@ class BaseInterceptor(object): def __init__(self, plugin_key=None, kind=None): self.logger = FlowceptLogger() + self.logger.debug(f"Starting Interceptor{id(self)} at {time()}") + if plugin_key is not None: # TODO :base-interceptor-refactor: :code-reorg: :usability: self.settings = get_settings(plugin_key) else: diff --git a/src/flowcept/flowceptor/adapters/instrumentation_interceptor.py b/src/flowcept/flowceptor/adapters/instrumentation_interceptor.py new file mode 100644 index 0000000..4465050 --- /dev/null +++ b/src/flowcept/flowceptor/adapters/instrumentation_interceptor.py @@ -0,0 +1,23 @@ +"""Instrumentation Insterceptor.""" + +from flowcept.flowceptor.adapters.base_interceptor import ( + BaseInterceptor, +) + + +# TODO: :base-interceptor-refactor: :ml-refactor: :code-reorg: +class InstrumentationInterceptor: + """Interceptor class.""" + + _instance: "BaseInterceptor" = None + + def __new__(cls, *args, **kwargs): + """Construct method, which should not be used. Use get_instance instead.""" + raise Exception("Please utilize the InstrumentationInterceptor.get_instance method.") + + @classmethod + def get_instance(cls): + """Get instance method for this singleton.""" + if not cls._instance: + cls._instance = BaseInterceptor(kind="instrumentation") + return cls._instance diff --git a/src/flowcept/instrumentation/decorators/flowcept_task.py b/src/flowcept/instrumentation/decorators/flowcept_task.py index 0b8cf35..8be0661 100644 --- a/src/flowcept/instrumentation/decorators/flowcept_task.py +++ b/src/flowcept/instrumentation/decorators/flowcept_task.py @@ -9,12 +9,12 @@ Status, ) -from flowcept.instrumentation.decorators import instrumentation_interceptor from flowcept.commons.utils import replace_non_serializable from flowcept.configs import ( REPLACE_NON_JSON_SERIALIZABLE, INSTRUMENTATION_ENABLED, ) +from flowcept.flowceptor.adapters.instrumentation_interceptor import InstrumentationInterceptor # TODO: :code-reorg: consider moving it to utils and reusing it in dask interceptor @@ -35,6 +35,7 @@ def default_args_handler(task_message: TaskObject, *args, **kwargs): def telemetry_flowcept_task(func=None): """Get telemetry task.""" + interceptor = InstrumentationInterceptor.get_instance() def decorator(func): @wraps(func) @@ -46,7 +47,7 @@ def wrapper(*args, **kwargs): task_obj["task_id"] = str(id(task_obj)) task_obj["workflow_id"] = kwargs.pop("workflow_id") task_obj["used"] = kwargs - tel = instrumentation_interceptor.telemetry_capture.capture() + tel = interceptor.telemetry_capture.capture() if tel is not None: task_obj["telemetry_at_start"] = tel.to_dict() try: @@ -57,11 +58,11 @@ def wrapper(*args, **kwargs): result = None task_obj["stderr"] = str(e) # task_obj["ended_at"] = time() - tel = instrumentation_interceptor.telemetry_capture.capture() + tel = interceptor.telemetry_capture.capture() if tel is not None: task_obj["telemetry_at_end"] = tel.to_dict() task_obj["generated"] = result - instrumentation_interceptor.intercept(task_obj) + interceptor.intercept(task_obj) return result return wrapper @@ -74,6 +75,7 @@ def wrapper(*args, **kwargs): def lightweight_flowcept_task(func=None): """Get lightweight task.""" + interceptor = InstrumentationInterceptor.get_instance() def decorator(func): @wraps(func) @@ -108,7 +110,7 @@ def wrapper(*args, **kwargs): used=kwargs, generated=result, ) - instrumentation_interceptor.intercept(task_dict) + interceptor.intercept(task_dict) return result return wrapper @@ -121,6 +123,7 @@ def wrapper(*args, **kwargs): def flowcept_task(func=None, **decorator_kwargs): """Get flowcept task.""" + interceptor = InstrumentationInterceptor.get_instance() def decorator(func): @wraps(func) @@ -135,7 +138,7 @@ def wrapper(*args, **kwargs): task_obj.used = args_handler(task_obj, *args, **kwargs) task_obj.started_at = time() task_obj.task_id = str(task_obj.started_at) - task_obj.telemetry_at_start = instrumentation_interceptor.telemetry_capture.capture() + task_obj.telemetry_at_start = interceptor.telemetry_capture.capture() try: result = func(*args, **kwargs) task_obj.status = Status.FINISHED @@ -144,7 +147,7 @@ def wrapper(*args, **kwargs): result = None task_obj.stderr = str(e) task_obj.ended_at = time() - task_obj.telemetry_at_end = instrumentation_interceptor.telemetry_capture.capture() + task_obj.telemetry_at_end = interceptor.telemetry_capture.capture() try: if isinstance(result, dict): task_obj.generated = args_handler(task_obj, **result) @@ -153,7 +156,7 @@ def wrapper(*args, **kwargs): except Exception as e: flowcept.commons.logger.exception(e) - instrumentation_interceptor.intercept(task_obj.to_dict()) + interceptor.intercept(task_obj.to_dict()) return result return wrapper diff --git a/src/flowcept/instrumentation/decorators/flowcept_torch.py b/src/flowcept/instrumentation/decorators/flowcept_torch.py index f8224c9..c306a70 100644 --- a/src/flowcept/instrumentation/decorators/flowcept_torch.py +++ b/src/flowcept/instrumentation/decorators/flowcept_torch.py @@ -2,11 +2,9 @@ from time import time from functools import wraps -import flowcept.commons from flowcept.commons.flowcept_dataclasses.task_object import ( Status, ) -from flowcept.instrumentation.decorators import instrumentation_interceptor from typing import List, Dict import uuid @@ -21,6 +19,7 @@ INSTRUMENTATION, TELEMETRY_CAPTURE, ) +from flowcept.flowceptor.adapters.instrumentation_interceptor import InstrumentationInterceptor def _inspect_torch_tensor(tensor: torch.Tensor): @@ -54,6 +53,7 @@ def _inspect_torch_tensor(tensor: torch.Tensor): def full_torch_task(func=None): """Generate pytorch task.""" + interceptor = InstrumentationInterceptor.get_instance() def decorator(func): @wraps(func) @@ -71,9 +71,7 @@ def wrapper(*args, **kwargs): "tensor": _inspect_torch_tensor(args[1]), **{k: v for k, v in vars(args[0]).items() if not k.startswith("_")}, } - task_obj["telemetry_at_start"] = ( - instrumentation_interceptor.telemetry_capture.capture().to_dict() - ) + task_obj["telemetry_at_start"] = interceptor.telemetry_capture.capture().to_dict() try: result = func(*args, **kwargs) task_obj["status"] = Status.FINISHED.value @@ -82,14 +80,12 @@ def wrapper(*args, **kwargs): result = None task_obj["stderr"] = str(e) task_obj["ended_at"] = time() - task_obj["telemetry_at_end"] = ( - instrumentation_interceptor.telemetry_capture.capture().to_dict() - ) + task_obj["telemetry_at_end"] = interceptor.telemetry_capture.capture().to_dict() task_obj["generated"] = { "tensor": _inspect_torch_tensor(args[1]), # add other module metadata } - instrumentation_interceptor.intercept(task_obj) + interceptor.intercept(task_obj) return result return wrapper @@ -114,6 +110,7 @@ def wrapper(*args, **kwargs): def lightweight_tensor_inspection_torch_task(func=None): """Get lightweight pytorch task.""" + interceptor = InstrumentationInterceptor.get_instance() def decorator(func): @wraps(func) @@ -136,7 +133,7 @@ def wrapper(*args, **kwargs): used=used, generated={"tensor": _inspect_torch_tensor(result)}, ) - instrumentation_interceptor.intercept(task_dict) + interceptor.intercept(task_dict) return result return wrapper @@ -149,6 +146,7 @@ def wrapper(*args, **kwargs): def lightweight_telemetry_tensor_inspection_torch_task(func=None): """Get lightweight tensor inspect task.""" + interceptor = InstrumentationInterceptor.get_instance() def decorator(func): @wraps(func) @@ -170,9 +168,9 @@ def wrapper(*args, **kwargs): activity_id=args[0].__class__.__name__, used=used, generated={"tensor": _inspect_torch_tensor(result)}, - telemetry_at_start=instrumentation_interceptor.telemetry_capture.capture().to_dict(), + telemetry_at_start=interceptor.telemetry_capture.capture().to_dict(), ) - instrumentation_interceptor.intercept(task_dict) + interceptor.intercept(task_dict) return result return wrapper @@ -185,6 +183,7 @@ def wrapper(*args, **kwargs): def lightweight_telemetry_torch_task(func=None): """Get lightweight telemetry torch task.""" + interceptor = InstrumentationInterceptor.get_instance() def decorator(func): @wraps(func) @@ -196,9 +195,9 @@ def wrapper(*args, **kwargs): type="task", workflow_id=args[0].workflow_id, activity_id=func.__qualname__, - telemetry_at_start=instrumentation_interceptor.telemetry_capture.capture().to_dict(), + telemetry_at_start=interceptor.telemetry_capture.capture().to_dict(), ) - instrumentation_interceptor.intercept(task_dict) + interceptor.intercept(task_dict) return result return wrapper @@ -295,7 +294,5 @@ def register_module_as_workflow( # workflow_obj.parent_task_id = parent_task_id if REGISTER_WORKFLOW: - flowcept.instrumentation.decorators.instrumentation_interceptor.send_workflow_message( - workflow_obj - ) + InstrumentationInterceptor.get_instance().send_workflow_message(workflow_obj) return workflow_obj.workflow_id diff --git a/tests/adapters/test_mlflow.py b/tests/adapters/test_mlflow.py index 73abd3e..fe36773 100644 --- a/tests/adapters/test_mlflow.py +++ b/tests/adapters/test_mlflow.py @@ -76,6 +76,7 @@ def test_observer_and_consumption(self): assert TestMLFlow.interceptor is not None with Flowcept(TestMLFlow.interceptor): run_uuid = self.test_pure_run_mlflow() + sleep(5) print(run_uuid) assert evaluate_until( lambda: self.interceptor.state_manager.has_element_id(run_uuid), diff --git a/tests/api/flowcept_api_test.py b/tests/api/flowcept_api_test.py index 2127418..33764e7 100644 --- a/tests/api/flowcept_api_test.py +++ b/tests/api/flowcept_api_test.py @@ -4,7 +4,9 @@ Flowcept, flowcept_task, ) +from flowcept.commons import FlowceptLogger from flowcept.commons.utils import assert_by_querying_tasks_until, get_current_config_values +from flowcept.flowceptor.adapters.instrumentation_interceptor import InstrumentationInterceptor @flowcept_task @@ -74,6 +76,19 @@ def test_simple_workflow(self): == 1 ) + def test_instrumentation_interceptor(self): + logger = FlowceptLogger() + try: + InstrumentationInterceptor() + except Exception as e: + logger.debug(f"This exception is expected: {e}") + + a = InstrumentationInterceptor.get_instance() + b = InstrumentationInterceptor.get_instance() + + assert a == b + assert id(a) == id(b) + @unittest.skip("Test only for dev.") def test_continuous_run(self): import numpy as np From 13b1dd8fdcdee8cc7cedfa6d5538e4faa535a66a Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Tue, 26 Nov 2024 16:28:10 -0500 Subject: [PATCH 5/9] Removing commented code --- .../instrumentation/decorators/__init__.py | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/src/flowcept/instrumentation/decorators/__init__.py b/src/flowcept/instrumentation/decorators/__init__.py index ebeb058..57aafba 100644 --- a/src/flowcept/instrumentation/decorators/__init__.py +++ b/src/flowcept/instrumentation/decorators/__init__.py @@ -1,15 +1 @@ """Decorators subpackage.""" - -# from flowcept.flowceptor.adapters.base_interceptor import BaseInterceptor -# -# # TODO :base-interceptor-refactor: :ml-refactor: :code-reorg: :usability: -# # Consider creating a new concept for instrumentation-based 'interception'. -# # These adaptors were made for data observability. -# # Perhaps we should have a BaseAdaptor that would work for both and -# # observability and instrumentation adapters. This would be a major refactor -# # in the code. https://github.com/ORNL/flowcept/issues/109 -# instrumentation_interceptor = BaseInterceptor(kind="instrumentation") -# # TODO This above is bad because I am reusing the same BaseInterceptor both -# # for adapter-based observability + traditional instrumentation via @decorator -# # I'm just setting _registered_workflow to avoid the auto wf register that -# # exists in the BaseInterceptor From 491045cb92c33b98a37aacc671d56de0d356d485 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Tue, 26 Nov 2024 16:49:58 -0500 Subject: [PATCH 6/9] This commit is to partially address this https://github.com/ORNL/flowcept/issues/172 --- .github/workflows/run-tests.yml | 2 +- Makefile | 7 +++++- ...cript.py => simple_instrumented_script.py} | 0 src/flowcept/commons/__init__.py | 4 ---- src/flowcept/commons/utils.py | 22 +++++++++---------- .../flowcept_api/flowcept_controller.py | 2 +- .../flowceptor/consumers/document_inserter.py | 5 +++-- .../decorators/flowcept_task.py | 5 +++-- tests/log_tests/log_test.py | 2 +- 9 files changed, 25 insertions(+), 24 deletions(-) rename examples/{instrumentation/simple_script.py => simple_instrumented_script.py} (100%) diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index b0ca89b..3a0faef 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -32,7 +32,7 @@ jobs: - name: Install default dependencies and run simple test run: | pip install . - python examples/instrumentation/simple_script.py + python examples/simple_instrumented_script.py - name: Install Dask dependencies alone and run a simple Dask test run: | diff --git a/Makefile b/Makefile index 19ec7fe..20f86ed 100644 --- a/Makefile +++ b/Makefile @@ -2,13 +2,14 @@ help: @printf "\nCommands:\n" @printf "\033[32mchecks\033[0m run ruff linter and formatter checks\n" + @printf "\033[32mreformat\033[0m run ruff linter and formatter\n" @printf "\033[32mclean\033[0m remove cache directories and Sphinx build output\n" @printf "\033[32mdocs\033[0m build HTML documentation using Sphinx\n" @printf "\033[32mservices\033[0m run services using Docker\n" @printf "\033[32mservices-stop\033[0m stop the running Docker services\n" @printf "\033[32mtests\033[0m run unit tests with pytest\n" @printf "\033[32mtests-all\033[0m run all unit tests with pytest, including very long-running ones\n" - @printf "\033[32mtests-notebooks\033[0m Tests the notebooks, using pytest\n" + @printf "\033[32mtests-notebooks\033[0m tests the notebooks, using pytest\n" # Run linter and formatter checks using ruff @@ -16,6 +17,10 @@ checks: ruff check src ruff format --check src +reformat: + ruff check src + ruff format src + # Remove cache directories and Sphinx build output clean: rm -rf .ruff_cache diff --git a/examples/instrumentation/simple_script.py b/examples/simple_instrumented_script.py similarity index 100% rename from examples/instrumentation/simple_script.py rename to examples/simple_instrumented_script.py diff --git a/src/flowcept/commons/__init__.py b/src/flowcept/commons/__init__.py index d18c294..5d6e9ff 100644 --- a/src/flowcept/commons/__init__.py +++ b/src/flowcept/commons/__init__.py @@ -1,5 +1 @@ """Commons subpackage.""" - -from flowcept.commons.flowcept_logger import FlowceptLogger - -logger = FlowceptLogger() diff --git a/src/flowcept/commons/utils.py b/src/flowcept/commons/utils.py index 278662c..83515c9 100644 --- a/src/flowcept/commons/utils.py +++ b/src/flowcept/commons/utils.py @@ -10,8 +10,8 @@ import types import numpy as np -import flowcept.commons from flowcept import configs +from flowcept.commons import FlowceptLogger from flowcept.configs import ( PERF_LOG, ) @@ -42,11 +42,11 @@ def get_utc_minutes_ago(minutes_ago=1): return rounded.timestamp() -def perf_log(func_name, t0: float): +def perf_log(func_name, t0: float, logger=FlowceptLogger()): """Configure the performance log.""" if PERF_LOG: t1 = time() - flowcept.commons.logger.debug(f"[PERFEVAL][{func_name}]={t1 - t0}") + logger.debug(f"[PERFEVAL][{func_name}]={t1 - t0}") return t1 return None @@ -71,6 +71,7 @@ def assert_by_querying_tasks_until( """Assert by query.""" from flowcept.flowcept_api.task_query_api import TaskQueryAPI + logger = FlowceptLogger() query_api = TaskQueryAPI() start_time = time() trials = 0 @@ -79,24 +80,20 @@ def assert_by_querying_tasks_until( docs = query_api.query(filter) if condition_to_evaluate is None: if docs is not None and len(docs): - flowcept.commons.logger.debug("Query conditions have been met! :D") + logger.debug("Query conditions have been met! :D") return True else: try: if condition_to_evaluate(docs): - flowcept.commons.logger.debug("Query conditions have been met! :D") + logger.debug("Query conditions have been met! :D") return True except Exception: pass trials += 1 - flowcept.commons.logger.debug( - f"Task Query condition not yet met. Trials={trials}/{max_trials}." - ) + logger.debug(f"Task Query condition not yet met. Trials={trials}/{max_trials}.") sleep(1) - flowcept.commons.logger.debug( - "We couldn't meet the query conditions after all trials or timeout! :(" - ) + logger.debug("We couldn't meet the query conditions after all trials or timeout! :(") return False @@ -109,6 +106,7 @@ def chunked(iterable, size): # TODO: consider reusing this function in the function assert_by_querying_task_collections_until def evaluate_until(evaluation_condition: Callable, max_trials=30, max_time=60, msg=""): """Evaluate something.""" + logger = FlowceptLogger() start_time = time() trials = 0 @@ -117,7 +115,7 @@ def evaluate_until(evaluation_condition: Callable, max_trials=30, max_time=60, m return True # Condition met trials += 1 - flowcept.commons.logger.debug(f"Condition not yet met. Trials={trials}/{max_trials}. {msg}") + logger.debug(f"Condition not yet met. Trials={trials}/{max_trials}. {msg}") sleep(1) return False # Condition not met within max_trials or max_time diff --git a/src/flowcept/flowcept_api/flowcept_controller.py b/src/flowcept/flowcept_api/flowcept_controller.py index 5b62d25..ef2260b 100644 --- a/src/flowcept/flowcept_api/flowcept_controller.py +++ b/src/flowcept/flowcept_api/flowcept_controller.py @@ -7,7 +7,6 @@ WorkflowObject, ) -from flowcept.commons import logger from flowcept.commons.daos.document_db_dao import DocumentDBDao from flowcept.commons.daos.mq_dao.mq_dao_base import MQDao from flowcept.configs import ( @@ -182,6 +181,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): @staticmethod def services_alive() -> bool: """Get alive services.""" + logger = FlowceptLogger() if not MQDao.build().liveness_test(): logger.error("MQ Not Ready!") return False diff --git a/src/flowcept/flowceptor/consumers/document_inserter.py b/src/flowcept/flowceptor/consumers/document_inserter.py index fe7688e..d86a51f 100644 --- a/src/flowcept/flowceptor/consumers/document_inserter.py +++ b/src/flowcept/flowceptor/consumers/document_inserter.py @@ -8,7 +8,6 @@ import pytz -import flowcept.commons from flowcept.commons.daos.autoflush_buffer import AutoflushBuffer from flowcept.commons.flowcept_dataclasses.workflow_object import ( WorkflowObject, @@ -99,8 +98,10 @@ def _set_buffer_size(self): ) @staticmethod - def flush_function(buffer, doc_dao, logger=flowcept.commons.logger): + def flush_function(buffer, doc_dao, logger=None): """Flush it.""" + if logger is None: + logger = FlowceptLogger() logger.info( f"Current Doc buffer size: {len(buffer)}, " f"Gonna flush {len(buffer)} msgs to DocDB!" ) diff --git a/src/flowcept/instrumentation/decorators/flowcept_task.py b/src/flowcept/instrumentation/decorators/flowcept_task.py index 8be0661..a1e547f 100644 --- a/src/flowcept/instrumentation/decorators/flowcept_task.py +++ b/src/flowcept/instrumentation/decorators/flowcept_task.py @@ -2,12 +2,12 @@ from time import time from functools import wraps -import flowcept.commons from flowcept import Flowcept from flowcept.commons.flowcept_dataclasses.task_object import ( TaskObject, Status, ) +from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.commons.utils import replace_non_serializable from flowcept.configs import ( @@ -124,6 +124,7 @@ def wrapper(*args, **kwargs): def flowcept_task(func=None, **decorator_kwargs): """Get flowcept task.""" interceptor = InstrumentationInterceptor.get_instance() + logger = FlowceptLogger() def decorator(func): @wraps(func) @@ -154,7 +155,7 @@ def wrapper(*args, **kwargs): else: task_obj.generated = args_handler(task_obj, result) except Exception as e: - flowcept.commons.logger.exception(e) + logger.exception(e) interceptor.intercept(task_obj.to_dict()) return result diff --git a/tests/log_tests/log_test.py b/tests/log_tests/log_test.py index 5a5f20c..ec457ba 100644 --- a/tests/log_tests/log_test.py +++ b/tests/log_tests/log_test.py @@ -19,7 +19,7 @@ def test_log(self): _logger.exception(e) _logger.info("It's ok") - _logger2 = flowcept.commons.logger + _logger2 = FlowceptLogger() # Testing singleton assert ( From 4c928fd445a6f49b11c6d1847a2978dcb4fa4ab4 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Tue, 26 Nov 2024 17:11:37 -0500 Subject: [PATCH 7/9] Fixing imports --- src/flowcept/commons/utils.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/flowcept/commons/utils.py b/src/flowcept/commons/utils.py index 83515c9..7864823 100644 --- a/src/flowcept/commons/utils.py +++ b/src/flowcept/commons/utils.py @@ -11,10 +11,8 @@ import numpy as np from flowcept import configs -from flowcept.commons import FlowceptLogger -from flowcept.configs import ( - PERF_LOG, -) +from flowcept.commons.flowcept_logger import FlowceptLogger +from flowcept.configs import PERF_LOG from flowcept.commons.flowcept_dataclasses.task_object import Status From 85a213d02113f6504fd8e090dbd32ca953f52662 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Tue, 26 Nov 2024 22:00:54 -0500 Subject: [PATCH 8/9] Fixing imports --- tests/api/flowcept_api_test.py | 2 +- tests/decorator_tests/flowcept_task_decorator_test.py | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/tests/api/flowcept_api_test.py b/tests/api/flowcept_api_test.py index 33764e7..147eb04 100644 --- a/tests/api/flowcept_api_test.py +++ b/tests/api/flowcept_api_test.py @@ -4,7 +4,7 @@ Flowcept, flowcept_task, ) -from flowcept.commons import FlowceptLogger +from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.commons.utils import assert_by_querying_tasks_until, get_current_config_values from flowcept.flowceptor.adapters.instrumentation_interceptor import InstrumentationInterceptor diff --git a/tests/decorator_tests/flowcept_task_decorator_test.py b/tests/decorator_tests/flowcept_task_decorator_test.py index 9e8d002..c5e5705 100644 --- a/tests/decorator_tests/flowcept_task_decorator_test.py +++ b/tests/decorator_tests/flowcept_task_decorator_test.py @@ -3,18 +3,15 @@ import uuid import random -from time import sleep import pandas as pd from time import time, sleep -from flowcept.commons import FlowceptLogger - -import flowcept.commons import flowcept.instrumentation.decorators from flowcept import Flowcept import unittest +from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.commons.utils import assert_by_querying_tasks_until from flowcept.instrumentation.decorators.flowcept_task import ( flowcept_task, From ca15be2e2c83a86748b1dc147a2a2b11238af05f Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Tue, 26 Nov 2024 22:08:31 -0500 Subject: [PATCH 9/9] Fix simple example path in CI --- .github/workflows/run-tests-py11.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/run-tests-py11.yml b/.github/workflows/run-tests-py11.yml index ab5033d..d1321b3 100644 --- a/.github/workflows/run-tests-py11.yml +++ b/.github/workflows/run-tests-py11.yml @@ -32,7 +32,7 @@ jobs: - name: Install default dependencies and run simple test run: | pip install . - python examples/instrumentation/simple_script.py + python examples/simple_instrumented_script.py - name: Install Dask dependencies alone and run a simple Dask test run: |