diff --git a/.github/workflows/run-tests-py11.yml b/.github/workflows/run-tests-py11.yml new file mode 100644 index 00000000..d1321b30 --- /dev/null +++ b/.github/workflows/run-tests-py11.yml @@ -0,0 +1,91 @@ +name: Tests on py11 +on: [pull_request] + +jobs: + + build: + runs-on: ubuntu-latest + timeout-minutes: 40 + if: "!contains(github.event.head_commit.message, 'CI Bot')" + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python 3.11 + uses: actions/setup-python@v5 + with: + python-version: "3.11" + cache: "pip" + + - name: Show OS Info + run: '[[ "$OSTYPE" == "linux-gnu"* ]] && { echo "OS Type: Linux"; (command -v lsb_release &> /dev/null && lsb_release -a) || cat /etc/os-release; uname -r; } || [[ "$OSTYPE" == "darwin"* ]] && { echo "OS Type: macOS"; sw_vers; uname -r; } || echo "Unsupported OS type: $OSTYPE"' + + - name: Start docker compose with redis + run: make services + + - name: Upgrade pip + run: python -m pip install --upgrade pip + + - name: Show Python version + run: python --version && pip --version + + - name: Install default dependencies and run simple test + run: | + pip install . + python examples/simple_instrumented_script.py + + - name: Install Dask dependencies alone and run a simple Dask test + run: | + pip uninstall flowcept -y + pip install .[dask] + python examples/dask_example.py + + - name: Install MLFlow dependencies alone and run a simple MLFlow test + run: | + pip uninstall flowcept -y + pip install .[mlflow] + python examples/mlflow_example.py + + - name: Install Tensorboard dependencies alone and run a simple Tensorboard test + run: | + pip uninstall flowcept -y + pip install .[tensorboard] + python examples/tensorboard_example.py + + - name: Install all dependencies + run: | + python -m pip install --upgrade pip + python -m pip install .[all] + + - name: List installed packages + run: pip list + + - name: Test with pytest and redis + run: | + make tests + + - name: Test notebooks with pytest and redis + run: make tests-notebooks + + - name: Shut down docker compose + run: make services-stop + + - name: Start docker compose with kafka + run: docker compose -f deployment/compose-kafka.yml up -d + + - name: Wait for one minute + run: sleep 60 + + - name: Check liveness + run: | + export MQ_TYPE=kafka + export MQ_PORT=9092 + python -c 'from flowcept.configs import MQ_TYPE, MQ_PORT; print(f"MQ_TYPE={MQ_TYPE}"); print(f"MQ_PORT={MQ_PORT}")' + python -c 'from flowcept import Flowcept; assert Flowcept.services_alive()' + + - name: Run tests with kafka + run: | + export MQ_TYPE=kafka + export MQ_PORT=9092 + # Ignoring heavy tests. They are executed with Kafka in another GH Action. + pytest --ignore=tests/decorator_tests/ml_tests --ignore=tests/adapters/test_tensorboard.py diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 93a45927..3a0faeff 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -1,5 +1,8 @@ name: Unit, integration, and notebook tests -on: [push] +on: + push: + schedule: + - cron: '0 12 * * *' # Runs every day at 12 PM UTC (7 AM EST) jobs: @@ -21,30 +24,30 @@ jobs: run: '[[ "$OSTYPE" == "linux-gnu"* ]] && { echo "OS Type: Linux"; (command -v lsb_release &> /dev/null && lsb_release -a) || cat /etc/os-release; uname -r; } || [[ "$OSTYPE" == "darwin"* ]] && { echo "OS Type: macOS"; sw_vers; uname -r; } || echo "Unsupported OS type: $OSTYPE"' - name: Start docker compose with redis - run: docker compose -f deployment/compose.yml up -d + run: make services - name: Upgrade pip run: python -m pip install --upgrade pip - name: Install default dependencies and run simple test - run: | + run: | pip install . - python examples/instrumentation/simple_script.py + python examples/simple_instrumented_script.py - name: Install Dask dependencies alone and run a simple Dask test - run: | + run: | pip uninstall flowcept -y pip install .[dask] python examples/dask_example.py - name: Install MLFlow dependencies alone and run a simple MLFlow test - run: | + run: | pip uninstall flowcept -y pip install .[mlflow] python examples/mlflow_example.py - name: Install Tensorboard dependencies alone and run a simple Tensorboard test - run: | + run: | pip uninstall flowcept -y pip install .[tensorboard] python examples/tensorboard_example.py @@ -59,13 +62,13 @@ jobs: - name: Test with pytest and redis run: | - pytest --ignore=tests/decorator_tests/ml_tests/llm_tests + make tests - name: Test notebooks with pytest and redis - run: pytest --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb + run: make tests-notebooks - name: Shut down docker compose - run: docker compose -f deployment/compose.yml down + run: make services-stop - name: Start docker compose with kafka run: docker compose -f deployment/compose-kafka.yml up -d @@ -84,5 +87,4 @@ jobs: run: | export MQ_TYPE=kafka export MQ_PORT=9092 - # Ignoring heavy tests. They are executed with Kafka in another GH Action. - pytest --ignore=tests/decorator_tests/ml_tests --ignore=tests/adapters/test_tensorboard.py + make tests diff --git a/Makefile b/Makefile index bb68820e..20f86ed4 100644 --- a/Makefile +++ b/Makefile @@ -2,11 +2,14 @@ help: @printf "\nCommands:\n" @printf "\033[32mchecks\033[0m run ruff linter and formatter checks\n" + @printf "\033[32mreformat\033[0m run ruff linter and formatter\n" @printf "\033[32mclean\033[0m remove cache directories and Sphinx build output\n" @printf "\033[32mdocs\033[0m build HTML documentation using Sphinx\n" @printf "\033[32mservices\033[0m run services using Docker\n" @printf "\033[32mservices-stop\033[0m stop the running Docker services\n" @printf "\033[32mtests\033[0m run unit tests with pytest\n" + @printf "\033[32mtests-all\033[0m run all unit tests with pytest, including very long-running ones\n" + @printf "\033[32mtests-notebooks\033[0m tests the notebooks, using pytest\n" # Run linter and formatter checks using ruff @@ -14,10 +17,21 @@ checks: ruff check src ruff format --check src +reformat: + ruff check src + ruff format src + # Remove cache directories and Sphinx build output clean: rm -rf .ruff_cache rm -rf .pytest_cache + rm -rf mlruns + rm -rf mnist_data + rm -rf tensorboard_events + rm -f docs_dump_tasks_* + rm -f dump_test.json + rm -f flowcept.log + rm -f mlflow.db sphinx-build -M clean docs docs/_build # Build the HTML documentation using Sphinx @@ -34,5 +48,14 @@ services-stop: docker compose --file deployment/compose.yml down --volumes # Run unit tests using pytest +.PHONY: tests tests: pytest --ignore=tests/decorator_tests/ml_tests/llm_tests + +.PHONY: tests-notebooks +tests-notebooks: + pytest --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb + +.PHONY: tests-all +tests-all: + pytest diff --git a/deployment/compose-full.yml b/deployment/compose-full.yml index abd7dbdc..69ab44ba 100644 --- a/deployment/compose-full.yml +++ b/deployment/compose-full.yml @@ -1,4 +1,3 @@ -version: '3.8' name: flowcept services: flowcept_redis: diff --git a/deployment/compose-kafka.yml b/deployment/compose-kafka.yml index e1792313..6bfa4783 100644 --- a/deployment/compose-kafka.yml +++ b/deployment/compose-kafka.yml @@ -1,4 +1,3 @@ -version: '3.8' name: flowcept services: flowcept_redis: diff --git a/deployment/compose.yml b/deployment/compose.yml index d6ebe6c4..f3347c4f 100644 --- a/deployment/compose.yml +++ b/deployment/compose.yml @@ -1,4 +1,3 @@ -version: '3.8' name: flowcept services: flowcept_redis: diff --git a/examples/dask_example.py b/examples/dask_example.py index d1126fd6..38192f67 100644 --- a/examples/dask_example.py +++ b/examples/dask_example.py @@ -23,6 +23,8 @@ def sum_list(values): scheduler = cluster.scheduler client = Client(scheduler.address) + client.forward_logging() + # Registering Flowcept's worker and scheduler adapters scheduler.add_plugin(FlowceptDaskSchedulerAdapter(scheduler)) client.register_plugin(FlowceptDaskWorkerAdapter()) @@ -32,18 +34,22 @@ def sum_list(values): print(f"workflow_id={wf_id}") # Start Flowcept's Dask observer - flowcept = Flowcept("dask").start() - t1 = client.submit(add, 1, 2) - t2 = client.submit(multiply, 3, 4) - t3 = client.submit(add, t1.result(), t2.result()) - t4 = client.submit(sum_list, [t1, t2, t3]) - result = t4.result() - print("Result:", result) - - # Closing Dask and Flowcept - client.close() - cluster.close() - flowcept.stop() + + with Flowcept("dask"): # Optionally: Flowcept("dask").start() + + t1 = client.submit(add, 1, 2) + t2 = client.submit(multiply, 3, 4) + t3 = client.submit(add, t1.result(), t2.result()) + t4 = client.submit(sum_list, [t1, t2, t3]) + result = t4.result() + print("Result:", result) + assert result == 30 + + # Closing Dask and Flowcept + client.close() # This is to avoid generating errors + cluster.close() # This calls are needed closeouts to inform of workflow conclusion. + + # Optionally: flowcept.stop() # Querying Flowcept's database about this run print(f"t1_key={t1.key}") @@ -51,10 +57,14 @@ def sum_list(values): task1 = Flowcept.db.query(filter={"task_id": t1.key})[0] assert task1["workflow_id"] == wf_id print(task1) + print("\n\n") print("Getting all tasks from this workflow:") all_tasks = Flowcept.db.query(filter={"workflow_id": wf_id}) assert len(all_tasks) == 4 + assert all(t.get("finished") is True for t in all_tasks) + assert all_tasks[-1]["generated"]["arg0"] == 30, "Checking if the last result was saved." print(all_tasks) + print("\n\n") print("Getting workflow info:") wf_info = Flowcept.db.query(filter={"workflow_id": wf_id}, type="workflow")[0] assert wf_info["workflow_id"] == wf_id diff --git a/examples/instrumentation/simple_script.py b/examples/simple_instrumented_script.py similarity index 100% rename from examples/instrumentation/simple_script.py rename to examples/simple_instrumented_script.py diff --git a/src/flowcept/commons/__init__.py b/src/flowcept/commons/__init__.py index d18c2945..5d6e9ff5 100644 --- a/src/flowcept/commons/__init__.py +++ b/src/flowcept/commons/__init__.py @@ -1,5 +1 @@ """Commons subpackage.""" - -from flowcept.commons.flowcept_logger import FlowceptLogger - -logger = FlowceptLogger() diff --git a/src/flowcept/commons/utils.py b/src/flowcept/commons/utils.py index 278662c5..7864823c 100644 --- a/src/flowcept/commons/utils.py +++ b/src/flowcept/commons/utils.py @@ -10,11 +10,9 @@ import types import numpy as np -import flowcept.commons from flowcept import configs -from flowcept.configs import ( - PERF_LOG, -) +from flowcept.commons.flowcept_logger import FlowceptLogger +from flowcept.configs import PERF_LOG from flowcept.commons.flowcept_dataclasses.task_object import Status @@ -42,11 +40,11 @@ def get_utc_minutes_ago(minutes_ago=1): return rounded.timestamp() -def perf_log(func_name, t0: float): +def perf_log(func_name, t0: float, logger=FlowceptLogger()): """Configure the performance log.""" if PERF_LOG: t1 = time() - flowcept.commons.logger.debug(f"[PERFEVAL][{func_name}]={t1 - t0}") + logger.debug(f"[PERFEVAL][{func_name}]={t1 - t0}") return t1 return None @@ -71,6 +69,7 @@ def assert_by_querying_tasks_until( """Assert by query.""" from flowcept.flowcept_api.task_query_api import TaskQueryAPI + logger = FlowceptLogger() query_api = TaskQueryAPI() start_time = time() trials = 0 @@ -79,24 +78,20 @@ def assert_by_querying_tasks_until( docs = query_api.query(filter) if condition_to_evaluate is None: if docs is not None and len(docs): - flowcept.commons.logger.debug("Query conditions have been met! :D") + logger.debug("Query conditions have been met! :D") return True else: try: if condition_to_evaluate(docs): - flowcept.commons.logger.debug("Query conditions have been met! :D") + logger.debug("Query conditions have been met! :D") return True except Exception: pass trials += 1 - flowcept.commons.logger.debug( - f"Task Query condition not yet met. Trials={trials}/{max_trials}." - ) + logger.debug(f"Task Query condition not yet met. Trials={trials}/{max_trials}.") sleep(1) - flowcept.commons.logger.debug( - "We couldn't meet the query conditions after all trials or timeout! :(" - ) + logger.debug("We couldn't meet the query conditions after all trials or timeout! :(") return False @@ -109,6 +104,7 @@ def chunked(iterable, size): # TODO: consider reusing this function in the function assert_by_querying_task_collections_until def evaluate_until(evaluation_condition: Callable, max_trials=30, max_time=60, msg=""): """Evaluate something.""" + logger = FlowceptLogger() start_time = time() trials = 0 @@ -117,7 +113,7 @@ def evaluate_until(evaluation_condition: Callable, max_trials=30, max_time=60, m return True # Condition met trials += 1 - flowcept.commons.logger.debug(f"Condition not yet met. Trials={trials}/{max_trials}. {msg}") + logger.debug(f"Condition not yet met. Trials={trials}/{max_trials}. {msg}") sleep(1) return False # Condition not met within max_trials or max_time diff --git a/src/flowcept/flowcept_api/flowcept_controller.py b/src/flowcept/flowcept_api/flowcept_controller.py index f9fc0471..ef2260b4 100644 --- a/src/flowcept/flowcept_api/flowcept_controller.py +++ b/src/flowcept/flowcept_api/flowcept_controller.py @@ -7,8 +7,6 @@ WorkflowObject, ) -import flowcept.instrumentation.decorators -from flowcept.commons import logger from flowcept.commons.daos.document_db_dao import DocumentDBDao from flowcept.commons.daos.mq_dao.mq_dao_base import MQDao from flowcept.configs import ( @@ -16,6 +14,7 @@ INSTRUMENTATION_ENABLED, ) from flowcept.flowcept_api.db_api import DBAPI +from flowcept.flowceptor.adapters.instrumentation_interceptor import InstrumentationInterceptor from flowcept.flowceptor.consumers.document_inserter import DocumentInserter from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.flowceptor.adapters.base_interceptor import BaseInterceptor @@ -77,7 +76,7 @@ def __init__( if not INSTRUMENTATION_ENABLED: self.enabled = False return - interceptors = [flowcept.instrumentation.decorators.instrumentation_interceptor] + interceptors = [InstrumentationInterceptor.get_instance()] elif not isinstance(interceptors, list): interceptors = [interceptors] self._interceptors: List[BaseInterceptor] = interceptors @@ -179,14 +178,10 @@ def __exit__(self, exc_type, exc_val, exc_tb): """Run the stop function.""" self.stop() - @staticmethod - def start_instrumentation_interceptor(): - """Start it.""" - flowcept.instrumentation.decorators.instrumentation_interceptor.start(None) - @staticmethod def services_alive() -> bool: """Get alive services.""" + logger = FlowceptLogger() if not MQDao.build().liveness_test(): logger.error("MQ Not Ready!") return False diff --git a/src/flowcept/flowceptor/adapters/base_interceptor.py b/src/flowcept/flowceptor/adapters/base_interceptor.py index 8d9ab517..e2199dac 100644 --- a/src/flowcept/flowceptor/adapters/base_interceptor.py +++ b/src/flowcept/flowceptor/adapters/base_interceptor.py @@ -1,6 +1,7 @@ """Base module.""" from abc import abstractmethod +from time import time from uuid import uuid4 from flowcept.commons.flowcept_dataclasses.workflow_object import ( @@ -29,6 +30,8 @@ class BaseInterceptor(object): def __init__(self, plugin_key=None, kind=None): self.logger = FlowceptLogger() + self.logger.debug(f"Starting Interceptor{id(self)} at {time()}") + if plugin_key is not None: # TODO :base-interceptor-refactor: :code-reorg: :usability: self.settings = get_settings(plugin_key) else: diff --git a/src/flowcept/flowceptor/adapters/instrumentation_interceptor.py b/src/flowcept/flowceptor/adapters/instrumentation_interceptor.py new file mode 100644 index 00000000..44650501 --- /dev/null +++ b/src/flowcept/flowceptor/adapters/instrumentation_interceptor.py @@ -0,0 +1,23 @@ +"""Instrumentation Insterceptor.""" + +from flowcept.flowceptor.adapters.base_interceptor import ( + BaseInterceptor, +) + + +# TODO: :base-interceptor-refactor: :ml-refactor: :code-reorg: +class InstrumentationInterceptor: + """Interceptor class.""" + + _instance: "BaseInterceptor" = None + + def __new__(cls, *args, **kwargs): + """Construct method, which should not be used. Use get_instance instead.""" + raise Exception("Please utilize the InstrumentationInterceptor.get_instance method.") + + @classmethod + def get_instance(cls): + """Get instance method for this singleton.""" + if not cls._instance: + cls._instance = BaseInterceptor(kind="instrumentation") + return cls._instance diff --git a/src/flowcept/flowceptor/consumers/document_inserter.py b/src/flowcept/flowceptor/consumers/document_inserter.py index fe7688e7..d86a51f1 100644 --- a/src/flowcept/flowceptor/consumers/document_inserter.py +++ b/src/flowcept/flowceptor/consumers/document_inserter.py @@ -8,7 +8,6 @@ import pytz -import flowcept.commons from flowcept.commons.daos.autoflush_buffer import AutoflushBuffer from flowcept.commons.flowcept_dataclasses.workflow_object import ( WorkflowObject, @@ -99,8 +98,10 @@ def _set_buffer_size(self): ) @staticmethod - def flush_function(buffer, doc_dao, logger=flowcept.commons.logger): + def flush_function(buffer, doc_dao, logger=None): """Flush it.""" + if logger is None: + logger = FlowceptLogger() logger.info( f"Current Doc buffer size: {len(buffer)}, " f"Gonna flush {len(buffer)} msgs to DocDB!" ) diff --git a/src/flowcept/instrumentation/decorators/__init__.py b/src/flowcept/instrumentation/decorators/__init__.py index 1c599e16..57aafba8 100644 --- a/src/flowcept/instrumentation/decorators/__init__.py +++ b/src/flowcept/instrumentation/decorators/__init__.py @@ -1,15 +1 @@ """Decorators subpackage.""" - -from flowcept.flowceptor.adapters.base_interceptor import BaseInterceptor - -# TODO :base-interceptor-refactor: :ml-refactor: :code-reorg: :usability: -# Consider creating a new concept for instrumentation-based 'interception'. -# These adaptors were made for data observability. -# Perhaps we should have a BaseAdaptor that would work for both and -# observability and instrumentation adapters. This would be a major refactor -# in the code. https://github.com/ORNL/flowcept/issues/109 -instrumentation_interceptor = BaseInterceptor(kind="instrumentation") -# TODO This above is bad because I am reusing the same BaseInterceptor both -# for adapter-based observability + traditional instrumentation via @decorator -# I'm just setting _registered_workflow to avoid the auto wf register that -# exists in the BaseInterceptor diff --git a/src/flowcept/instrumentation/decorators/flowcept_task.py b/src/flowcept/instrumentation/decorators/flowcept_task.py index 0b8cf358..a1e547f6 100644 --- a/src/flowcept/instrumentation/decorators/flowcept_task.py +++ b/src/flowcept/instrumentation/decorators/flowcept_task.py @@ -2,19 +2,19 @@ from time import time from functools import wraps -import flowcept.commons from flowcept import Flowcept from flowcept.commons.flowcept_dataclasses.task_object import ( TaskObject, Status, ) +from flowcept.commons.flowcept_logger import FlowceptLogger -from flowcept.instrumentation.decorators import instrumentation_interceptor from flowcept.commons.utils import replace_non_serializable from flowcept.configs import ( REPLACE_NON_JSON_SERIALIZABLE, INSTRUMENTATION_ENABLED, ) +from flowcept.flowceptor.adapters.instrumentation_interceptor import InstrumentationInterceptor # TODO: :code-reorg: consider moving it to utils and reusing it in dask interceptor @@ -35,6 +35,7 @@ def default_args_handler(task_message: TaskObject, *args, **kwargs): def telemetry_flowcept_task(func=None): """Get telemetry task.""" + interceptor = InstrumentationInterceptor.get_instance() def decorator(func): @wraps(func) @@ -46,7 +47,7 @@ def wrapper(*args, **kwargs): task_obj["task_id"] = str(id(task_obj)) task_obj["workflow_id"] = kwargs.pop("workflow_id") task_obj["used"] = kwargs - tel = instrumentation_interceptor.telemetry_capture.capture() + tel = interceptor.telemetry_capture.capture() if tel is not None: task_obj["telemetry_at_start"] = tel.to_dict() try: @@ -57,11 +58,11 @@ def wrapper(*args, **kwargs): result = None task_obj["stderr"] = str(e) # task_obj["ended_at"] = time() - tel = instrumentation_interceptor.telemetry_capture.capture() + tel = interceptor.telemetry_capture.capture() if tel is not None: task_obj["telemetry_at_end"] = tel.to_dict() task_obj["generated"] = result - instrumentation_interceptor.intercept(task_obj) + interceptor.intercept(task_obj) return result return wrapper @@ -74,6 +75,7 @@ def wrapper(*args, **kwargs): def lightweight_flowcept_task(func=None): """Get lightweight task.""" + interceptor = InstrumentationInterceptor.get_instance() def decorator(func): @wraps(func) @@ -108,7 +110,7 @@ def wrapper(*args, **kwargs): used=kwargs, generated=result, ) - instrumentation_interceptor.intercept(task_dict) + interceptor.intercept(task_dict) return result return wrapper @@ -121,6 +123,8 @@ def wrapper(*args, **kwargs): def flowcept_task(func=None, **decorator_kwargs): """Get flowcept task.""" + interceptor = InstrumentationInterceptor.get_instance() + logger = FlowceptLogger() def decorator(func): @wraps(func) @@ -135,7 +139,7 @@ def wrapper(*args, **kwargs): task_obj.used = args_handler(task_obj, *args, **kwargs) task_obj.started_at = time() task_obj.task_id = str(task_obj.started_at) - task_obj.telemetry_at_start = instrumentation_interceptor.telemetry_capture.capture() + task_obj.telemetry_at_start = interceptor.telemetry_capture.capture() try: result = func(*args, **kwargs) task_obj.status = Status.FINISHED @@ -144,16 +148,16 @@ def wrapper(*args, **kwargs): result = None task_obj.stderr = str(e) task_obj.ended_at = time() - task_obj.telemetry_at_end = instrumentation_interceptor.telemetry_capture.capture() + task_obj.telemetry_at_end = interceptor.telemetry_capture.capture() try: if isinstance(result, dict): task_obj.generated = args_handler(task_obj, **result) else: task_obj.generated = args_handler(task_obj, result) except Exception as e: - flowcept.commons.logger.exception(e) + logger.exception(e) - instrumentation_interceptor.intercept(task_obj.to_dict()) + interceptor.intercept(task_obj.to_dict()) return result return wrapper diff --git a/src/flowcept/instrumentation/decorators/flowcept_torch.py b/src/flowcept/instrumentation/decorators/flowcept_torch.py index f8224c91..c306a703 100644 --- a/src/flowcept/instrumentation/decorators/flowcept_torch.py +++ b/src/flowcept/instrumentation/decorators/flowcept_torch.py @@ -2,11 +2,9 @@ from time import time from functools import wraps -import flowcept.commons from flowcept.commons.flowcept_dataclasses.task_object import ( Status, ) -from flowcept.instrumentation.decorators import instrumentation_interceptor from typing import List, Dict import uuid @@ -21,6 +19,7 @@ INSTRUMENTATION, TELEMETRY_CAPTURE, ) +from flowcept.flowceptor.adapters.instrumentation_interceptor import InstrumentationInterceptor def _inspect_torch_tensor(tensor: torch.Tensor): @@ -54,6 +53,7 @@ def _inspect_torch_tensor(tensor: torch.Tensor): def full_torch_task(func=None): """Generate pytorch task.""" + interceptor = InstrumentationInterceptor.get_instance() def decorator(func): @wraps(func) @@ -71,9 +71,7 @@ def wrapper(*args, **kwargs): "tensor": _inspect_torch_tensor(args[1]), **{k: v for k, v in vars(args[0]).items() if not k.startswith("_")}, } - task_obj["telemetry_at_start"] = ( - instrumentation_interceptor.telemetry_capture.capture().to_dict() - ) + task_obj["telemetry_at_start"] = interceptor.telemetry_capture.capture().to_dict() try: result = func(*args, **kwargs) task_obj["status"] = Status.FINISHED.value @@ -82,14 +80,12 @@ def wrapper(*args, **kwargs): result = None task_obj["stderr"] = str(e) task_obj["ended_at"] = time() - task_obj["telemetry_at_end"] = ( - instrumentation_interceptor.telemetry_capture.capture().to_dict() - ) + task_obj["telemetry_at_end"] = interceptor.telemetry_capture.capture().to_dict() task_obj["generated"] = { "tensor": _inspect_torch_tensor(args[1]), # add other module metadata } - instrumentation_interceptor.intercept(task_obj) + interceptor.intercept(task_obj) return result return wrapper @@ -114,6 +110,7 @@ def wrapper(*args, **kwargs): def lightweight_tensor_inspection_torch_task(func=None): """Get lightweight pytorch task.""" + interceptor = InstrumentationInterceptor.get_instance() def decorator(func): @wraps(func) @@ -136,7 +133,7 @@ def wrapper(*args, **kwargs): used=used, generated={"tensor": _inspect_torch_tensor(result)}, ) - instrumentation_interceptor.intercept(task_dict) + interceptor.intercept(task_dict) return result return wrapper @@ -149,6 +146,7 @@ def wrapper(*args, **kwargs): def lightweight_telemetry_tensor_inspection_torch_task(func=None): """Get lightweight tensor inspect task.""" + interceptor = InstrumentationInterceptor.get_instance() def decorator(func): @wraps(func) @@ -170,9 +168,9 @@ def wrapper(*args, **kwargs): activity_id=args[0].__class__.__name__, used=used, generated={"tensor": _inspect_torch_tensor(result)}, - telemetry_at_start=instrumentation_interceptor.telemetry_capture.capture().to_dict(), + telemetry_at_start=interceptor.telemetry_capture.capture().to_dict(), ) - instrumentation_interceptor.intercept(task_dict) + interceptor.intercept(task_dict) return result return wrapper @@ -185,6 +183,7 @@ def wrapper(*args, **kwargs): def lightweight_telemetry_torch_task(func=None): """Get lightweight telemetry torch task.""" + interceptor = InstrumentationInterceptor.get_instance() def decorator(func): @wraps(func) @@ -196,9 +195,9 @@ def wrapper(*args, **kwargs): type="task", workflow_id=args[0].workflow_id, activity_id=func.__qualname__, - telemetry_at_start=instrumentation_interceptor.telemetry_capture.capture().to_dict(), + telemetry_at_start=interceptor.telemetry_capture.capture().to_dict(), ) - instrumentation_interceptor.intercept(task_dict) + interceptor.intercept(task_dict) return result return wrapper @@ -295,7 +294,5 @@ def register_module_as_workflow( # workflow_obj.parent_task_id = parent_task_id if REGISTER_WORKFLOW: - flowcept.instrumentation.decorators.instrumentation_interceptor.send_workflow_message( - workflow_obj - ) + InstrumentationInterceptor.get_instance().send_workflow_message(workflow_obj) return workflow_obj.workflow_id diff --git a/tests/adapters/test_mlflow.py b/tests/adapters/test_mlflow.py index 73abd3e3..fe367731 100644 --- a/tests/adapters/test_mlflow.py +++ b/tests/adapters/test_mlflow.py @@ -76,6 +76,7 @@ def test_observer_and_consumption(self): assert TestMLFlow.interceptor is not None with Flowcept(TestMLFlow.interceptor): run_uuid = self.test_pure_run_mlflow() + sleep(5) print(run_uuid) assert evaluate_until( lambda: self.interceptor.state_manager.has_element_id(run_uuid), diff --git a/tests/api/flowcept_api_test.py b/tests/api/flowcept_api_test.py index 2127418a..147eb046 100644 --- a/tests/api/flowcept_api_test.py +++ b/tests/api/flowcept_api_test.py @@ -4,7 +4,9 @@ Flowcept, flowcept_task, ) +from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.commons.utils import assert_by_querying_tasks_until, get_current_config_values +from flowcept.flowceptor.adapters.instrumentation_interceptor import InstrumentationInterceptor @flowcept_task @@ -74,6 +76,19 @@ def test_simple_workflow(self): == 1 ) + def test_instrumentation_interceptor(self): + logger = FlowceptLogger() + try: + InstrumentationInterceptor() + except Exception as e: + logger.debug(f"This exception is expected: {e}") + + a = InstrumentationInterceptor.get_instance() + b = InstrumentationInterceptor.get_instance() + + assert a == b + assert id(a) == id(b) + @unittest.skip("Test only for dev.") def test_continuous_run(self): import numpy as np diff --git a/tests/decorator_tests/flowcept_task_decorator_test.py b/tests/decorator_tests/flowcept_task_decorator_test.py index 9e8d0022..c5e57055 100644 --- a/tests/decorator_tests/flowcept_task_decorator_test.py +++ b/tests/decorator_tests/flowcept_task_decorator_test.py @@ -3,18 +3,15 @@ import uuid import random -from time import sleep import pandas as pd from time import time, sleep -from flowcept.commons import FlowceptLogger - -import flowcept.commons import flowcept.instrumentation.decorators from flowcept import Flowcept import unittest +from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.commons.utils import assert_by_querying_tasks_until from flowcept.instrumentation.decorators.flowcept_task import ( flowcept_task, diff --git a/tests/log_tests/log_test.py b/tests/log_tests/log_test.py index 5a5f20ce..ec457ba9 100644 --- a/tests/log_tests/log_test.py +++ b/tests/log_tests/log_test.py @@ -19,7 +19,7 @@ def test_log(self): _logger.exception(e) _logger.info("It's ok") - _logger2 = flowcept.commons.logger + _logger2 = FlowceptLogger() # Testing singleton assert (