diff --git a/.github/workflows/create-release-n-publish.yml b/.github/workflows/create-release-n-publish.yml index 07a06762..af14f440 100644 --- a/.github/workflows/create-release-n-publish.yml +++ b/.github/workflows/create-release-n-publish.yml @@ -97,30 +97,22 @@ jobs: run: pip install flowcept[dask] - name: Test pip install multiple adapters run: pip install flowcept[mlflow,tensorboard] - - name: Test pip install full - run: pip install flowcept[full] - - name: Install dev dependencies - run: | - pip install -r extra_requirements/dev-requirements.txt + - name: Install our dependencies + run: pip install -e .[fulldev] # This will install all dependencies, for all adapters and dev deps. + - name: Pip list + run: pip list - name: Run Docker Compose - run: docker compose -f deployment/compose.yml up -d - - name: Copy settings + run: docker compose -f deployment/compose-full.yml up -d + - name: Test with pytest run: | mkdir -p ~/.flowcept cp resources/sample_settings.yaml ~/.flowcept/settings.yaml export FLOWCEPT_SETTINGS_PATH=~/.flowcept/settings.yaml - - name: Test with pytest - run: | - export FLOWCEPT_SETTINGS_PATH=~/.flowcept/settings.yaml cat $FLOWCEPT_SETTINGS_PATH pytest --ignore=tests/decorator_tests/ml_tests/llm_tests - name: Test notebooks run: | - pip install flowcept[full] - pip install -r extra_requirements/dev-requirements.txt - pip list | grep flowcept export FLOWCEPT_SETTINGS_PATH=~/.flowcept/settings.yaml - cat $FLOWCEPT_SETTINGS_PATH python flowcept/flowcept_webserver/app.py & sleep 3 pytest --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb diff --git a/.github/workflows/run-tests-kafka.yml b/.github/workflows/run-tests-kafka.yml new file mode 100644 index 00000000..608eee77 --- /dev/null +++ b/.github/workflows/run-tests-kafka.yml @@ -0,0 +1,54 @@ +name: All tests on Kafka MQ +on: + pull_request: + branches: [ "dev", "main" ] + types: [opened, synchronize, reopened] +# branches: [ "disabled" ] + +jobs: + + build: + runs-on: ubuntu-latest + timeout-minutes: 60 + if: "!contains(github.event.head_commit.message, 'CI Bot')" + steps: + - uses: actions/checkout@v3 + - name: Set up Python 3.9 + uses: actions/setup-python@v3 + with: + python-version: "3.9" + - name: Check python version + run: python --version + - name: Install our dependencies + run: | + python -m pip install --upgrade pip + pip install -e .[fulldev] + - name: Pip list + run: pip list + - name: Run Docker Compose + run: docker compose -f deployment/compose-kafka.yml up -d + - name: Wait 1 min + run: sleep 60 + - name: Check liveness + run: | + export MQ_TYPE=kafka + export MQ_PORT=9092 + python -c 'from flowcept.configs import MQ_TYPE, MQ_PORT; print(f"MQ_TYPE={MQ_TYPE}"); print(f"MQ_TYPE={MQ_PORT}")' + python -c 'from flowcept import Flowcept; assert Flowcept.services_alive()' + - name: Run Tests with Kafka + run: | + export MQ_TYPE=kafka + export MQ_PORT=9092 + pytest --ignore=tests/decorator_tests/ml_tests/llm_tests + - name: Test notebooks + run: | + pip install -e .[full] + export MQ_TYPE=kafka + export MQ_PORT=9092 + python -c 'from flowcept.configs import MQ_TYPE, MQ_PORT; print(f"MQ_TYPE={MQ_TYPE}"); print(f"MQ_TYPE={MQ_PORT}")' + python -c 'from flowcept import Flowcept; assert Flowcept.services_alive()' + + python flowcept/flowcept_webserver/app.py & + sleep 3 + export FLOWCEPT_SETTINGS_PATH=~/.flowcept/settings.yaml + pytest --ignore=notebooks/zambeze.ipynb --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 1b4121ae..82cc8d63 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -19,17 +19,36 @@ jobs: - name: Install our dependencies run: | python -m pip install --upgrade pip - pip install -e .[full] - pip install -r extra_requirements/dev-requirements.txt - - name: Run Docker Compose - run: docker compose -f deployment/compose.yml up -d - - name: Test with pytest + pip install -e .[fulldev] # This will install all dependencies, for all adapters and dev deps. + - name: Pip list + run: pip list + - name: Start Docker Compose with Redis + run: docker compose -f deployment/compose-full.yml up -d + - name: Test with pytest with Redis run: | pytest --ignore=tests/decorator_tests/ml_tests/llm_tests - - name: Test notebooks + - name: Test notebooks with Redis run: | pip install -e . python flowcept/flowcept_webserver/app.py & sleep 3 export FLOWCEPT_SETTINGS_PATH=~/.flowcept/settings.yaml pytest --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb + - name: Shut down compose + run: docker compose -f deployment/compose-full.yml down + - name: Start Docker Compose with Kafka + run: docker compose -f deployment/compose-kafka.yml up -d + - name: Wait 1 min + run: sleep 60 + - name: Check liveness + run: | + export MQ_TYPE=kafka + export MQ_PORT=9092 + python -c 'from flowcept.configs import MQ_TYPE, MQ_PORT; print(f"MQ_TYPE={MQ_TYPE}"); print(f"MQ_TYPE={MQ_PORT}")' + python -c 'from flowcept import Flowcept; assert Flowcept.services_alive()' + - name: Run Tests with Kafka + run: | + export MQ_TYPE=kafka + export MQ_PORT=9092 + # Ignoring heavy tests. They are executed with Kafka in another GH Action. + pytest --ignore=tests/decorator_tests/ml_tests --ignore=tests/adapters/test_tensorboard.py diff --git a/.github/workflows/test-python-310-macos.yml b/.github/workflows/test-python-310-macos.yml index 340c198a..6b5880e4 100644 --- a/.github/workflows/test-python-310-macos.yml +++ b/.github/workflows/test-python-310-macos.yml @@ -35,7 +35,7 @@ jobs: - name: Run Docker Compose run: | docker compose version - docker compose -f deployment/compose.yml up -d + docker compose -f deployment/compose-full.yml up -d - name: Test with pytest run: | pytest --ignore=tests/decorator_tests/ml_tests/llm_tests/ diff --git a/.github/workflows/test-python-310.yml b/.github/workflows/test-python-310.yml index 4b989c88..6d49709e 100644 --- a/.github/workflows/test-python-310.yml +++ b/.github/workflows/test-python-310.yml @@ -25,7 +25,7 @@ jobs: pip install -e .[full] pip install -r extra_requirements/dev-requirements.txt - name: Run Docker Compose - run: docker compose -f deployment/compose.yml up -d + run: docker compose -f deployment/compose-full.yml up -d - name: Test with pytest run: | pytest --ignore=tests/decorator_tests/ml_tests/llm_tests diff --git a/.github/workflows/test-python-311.yml b/.github/workflows/test-python-311.yml index a83415ee..5962c3d2 100644 --- a/.github/workflows/test-python-311.yml +++ b/.github/workflows/test-python-311.yml @@ -28,7 +28,7 @@ jobs: pip install -e .[full] pip install -r extra_requirements/dev-requirements.txt - name: Run Docker Compose - run: docker compose -f deployment/compose.yml up -d + run: docker compose -f deployment/compose-full.yml up -d - name: Test with pytest run: | pytest --ignore=tests/decorator_tests/ml_tests/llm_tests/ diff --git a/README.md b/README.md index dfaa7a77..01439189 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,34 @@ You may need to set the environment variable `FLOWCEPT_SETTINGS_PATH` with the a 5. To use FlowCept's Query API, see utilization examples in the notebooks. +### Simple Example with Decorators Instrumentation + +In addition to existing adapters to Dask, MLFlow, and others (it's extensible for any system that generates data), FlowCept also offers instrumentation via @decorators. + +```python +from flowcept import Flowcept, flowcept_task + +@flowcept_task +def sum_one(n): + return n + 1 + + +@flowcept_task +def mult_two(n): + return n * 2 + + +with Flowcept(workflow_name='test_workflow'): + n = 3 + o1 = sum_one(n) + o2 = mult_two(o1) + print(o2) + +print(Flowcept.db.query(filter={"workflow_id": Flowcept.current_workflow_id})) +``` + + + ## Performance Tuning for Performance Evaluation In the settings.yaml file, the following variables might impact interception performance: diff --git a/deployment/compose-full.yml b/deployment/compose-full.yml new file mode 100644 index 00000000..abd7dbdc --- /dev/null +++ b/deployment/compose-full.yml @@ -0,0 +1,32 @@ +version: '3.8' +name: flowcept +services: + flowcept_redis: + container_name: flowcept_redis + image: redis + ports: + - 6379:6379 + + flowcept_mongo: + container_name: flowcept_mongo + image: mongo:latest + # volumes: + # - /Users/rsr/Downloads/mongo_data/db:/data/db + ports: + - 27017:27017 + + +# # This is just for the cases where one does not want to use the same Redis instance for caching and messaging, but +# # it's not required to have separate instances. +# # local_interceptor_cache: +# # container_name: local_interceptor_cache +# # image: redis +# # ports: +# # - 60379:6379 + + zambeze_rabbitmq: + container_name: zambeze_rabbitmq + image: rabbitmq:3.11-management + ports: + - 5672:5672 + - 15672:15672 diff --git a/deployment/compose-kafka.yml b/deployment/compose-kafka.yml new file mode 100644 index 00000000..e1792313 --- /dev/null +++ b/deployment/compose-kafka.yml @@ -0,0 +1,57 @@ +version: '3.8' +name: flowcept +services: + flowcept_redis: + container_name: flowcept_redis + image: redis + ports: + - 6379:6379 + + flowcept_mongo: + container_name: flowcept_mongo + image: mongo:latest + # volumes: + # - /Users/rsr/Downloads/mongo_data/db:/data/db + ports: + - 27017:27017 + + zookeeper: + image: confluentinc/cp-zookeeper:6.1.1 + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + + # reachable on 9092 from the host and on 29092 from inside docker compose + kafka: + image: confluentinc/cp-kafka:6.1.1 + depends_on: + - zookeeper + ports: + - '9092:9092' + expose: + - '29092' + environment: + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1' + KAFKA_MIN_INSYNC_REPLICAS: '1' + + init-kafka: + image: confluentinc/cp-kafka:6.1.1 + depends_on: + - kafka + entrypoint: [ '/bin/sh', '-c' ] + command: | + " + # blocks until kafka is reachable + kafka-topics --bootstrap-server kafka:29092 --list + + echo -e 'Creating kafka topics' + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic interception --replication-factor 1 --partitions 1 + + echo -e 'Successfully created the following topics:' + kafka-topics --bootstrap-server kafka:29092 --list + " diff --git a/deployment/compose.yml b/deployment/compose.yml index abd7dbdc..d6ebe6c4 100644 --- a/deployment/compose.yml +++ b/deployment/compose.yml @@ -16,6 +16,7 @@ services: - 27017:27017 + # # This is just for the cases where one does not want to use the same Redis instance for caching and messaging, but # # it's not required to have separate instances. # # local_interceptor_cache: @@ -24,9 +25,3 @@ services: # # ports: # # - 60379:6379 - zambeze_rabbitmq: - container_name: zambeze_rabbitmq - image: rabbitmq:3.11-management - ports: - - 5672:5672 - - 15672:15672 diff --git a/extra_requirements/data_augmentation-requirements.txt b/extra_requirements/data_augmentation-requirements.txt deleted file mode 100644 index e8d25e80..00000000 --- a/extra_requirements/data_augmentation-requirements.txt +++ /dev/null @@ -1 +0,0 @@ -h2o==3.44.0.3 diff --git a/extra_requirements/dev-requirements.txt b/extra_requirements/dev-requirements.txt index 686230fa..7b044ead 100644 --- a/extra_requirements/dev-requirements.txt +++ b/extra_requirements/dev-requirements.txt @@ -1,11 +1,10 @@ pytest==6.2.4 flake8==5.0.4 black==23.1.0 -numpy==1.23.4 +numpy<2.0.0 bokeh==2.4.2 -jupyterlab==3.6.1 -nbmake==1.4 -cluster_experiment_utils +jupyterlab +nbmake # Pytorch models stuff: torch torchvision diff --git a/extra_requirements/kafka-requirements.txt b/extra_requirements/kafka-requirements.txt new file mode 100644 index 00000000..8f27adb3 --- /dev/null +++ b/extra_requirements/kafka-requirements.txt @@ -0,0 +1 @@ +confluent-kafka==2.5.3 diff --git a/extra_requirements/mlflow-requirements.txt b/extra_requirements/mlflow-requirements.txt index ca5a9ecd..f21f410c 100644 --- a/extra_requirements/mlflow-requirements.txt +++ b/extra_requirements/mlflow-requirements.txt @@ -1,4 +1,4 @@ -mlflow-skinny==2.1.1 +mlflow-skinny>2.1.1,<=2.16.2 SQLAlchemy==1.4.42 alembic==1.8.1 watchdog==2.2.1 diff --git a/extra_requirements/tensorboard-requirements.txt b/extra_requirements/tensorboard-requirements.txt index 64f3e44f..6e337fab 100644 --- a/extra_requirements/tensorboard-requirements.txt +++ b/extra_requirements/tensorboard-requirements.txt @@ -1,3 +1,3 @@ -tensorboard==2.13.0 -tensorflow==2.13.0 +tensorboard +tensorflow tbparse==0.0.7 diff --git a/flowcept/__init__.py b/flowcept/__init__.py index 94368532..4438052a 100644 --- a/flowcept/__init__.py +++ b/flowcept/__init__.py @@ -1,12 +1,15 @@ import flowcept + from flowcept.configs import SETTINGS_PATH + from flowcept.version import __version__ from flowcept.commons.vocabulary import Vocabulary -from flowcept.flowcept_api.consumer_api import FlowceptConsumerAPI + +from flowcept.flowcept_api.flowcept_controller import Flowcept from flowcept.flowcept_api.task_query_api import TaskQueryAPI -from flowcept.flowcept_api.db_api import DBAPI +from flowcept.instrumentation.decorators.flowcept_task import flowcept_task from flowcept.commons.flowcept_dataclasses.workflow_object import ( WorkflowObject, diff --git a/flowcept/analytics/data_augmentation.py b/flowcept/analytics/data_augmentation.py index ce8082f5..88282d91 100644 --- a/flowcept/analytics/data_augmentation.py +++ b/flowcept/analytics/data_augmentation.py @@ -4,10 +4,12 @@ import pandas as pd from h2o.automl import H2OAutoML +from typing_extensions import deprecated h2o.init() +@deprecated def train_model( df, x_cols: List[str], @@ -26,6 +28,7 @@ def train_model( return aml +@deprecated def augment_df_linearly(df, N, cols_to_augment, seed=1234): np.random.seed(seed) new_df = df.copy() @@ -48,6 +51,7 @@ def augment_df_linearly(df, N, cols_to_augment, seed=1234): return appended_df +@deprecated def augment_data(df, N, augmentation_model: H2OAutoML, x_cols, y_col): new_df = augment_df_linearly(df, N, x_cols) h2odf = h2o.H2OFrame(new_df.loc[new_df["original"] == 0][x_cols]) diff --git a/flowcept/commons/daos/autoflush_buffer.py b/flowcept/commons/daos/autoflush_buffer.py index 7f7774b2..bac16ba7 100644 --- a/flowcept/commons/daos/autoflush_buffer.py +++ b/flowcept/commons/daos/autoflush_buffer.py @@ -1,29 +1,7 @@ -from queue import Queue -from typing import Union, List, Dict, Callable +from typing import Callable -import msgpack -from redis import Redis -from redis.client import PubSub -from threading import Thread, Lock, Event -from time import time, sleep - -import flowcept.commons -from flowcept.commons.daos.keyvalue_dao import KeyValueDAO -from flowcept.commons.utils import perf_log +from threading import Thread, Event from flowcept.commons.flowcept_logger import FlowceptLogger -from flowcept.configs import ( - REDIS_HOST, - REDIS_PORT, - REDIS_CHANNEL, - REDIS_PASSWORD, - JSON_SERIALIZER, - REDIS_BUFFER_SIZE, - REDIS_INSERTION_BUFFER_TIME, - PERF_LOG, - REDIS_URI, -) - -from flowcept.commons.utils import GenericJSONEncoder class AutoflushBuffer: diff --git a/flowcept/commons/daos/keyvalue_dao.py b/flowcept/commons/daos/keyvalue_dao.py index 0871b65f..6dc64051 100644 --- a/flowcept/commons/daos/keyvalue_dao.py +++ b/flowcept/commons/daos/keyvalue_dao.py @@ -3,9 +3,9 @@ from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.commons import singleton from flowcept.configs import ( - REDIS_HOST, - REDIS_PORT, - REDIS_PASSWORD, + KVDB_HOST, + KVDB_PORT, + KVDB_PASSWORD, ) @@ -15,10 +15,10 @@ def __init__(self, connection=None): self.logger = FlowceptLogger() if connection is None: self._redis = Redis( - host=REDIS_HOST, - port=REDIS_PORT, + host=KVDB_HOST, + port=KVDB_PORT, db=0, - password=REDIS_PASSWORD, + password=KVDB_PASSWORD, ) else: self._redis = connection @@ -30,9 +30,9 @@ def add_key_into_set(self, set_name: str, key): self._redis.sadd(set_name, key) def remove_key_from_set(self, set_name: str, key): - self.logger.info(f"Removing key {key} from set: {set_name}") + self.logger.debug(f"Removing key {key} from set: {set_name}") self._redis.srem(set_name, key) - self.logger.info(f"Removed key {key} from set: {set_name}") + self.logger.debug(f"Removed key {key} from set: {set_name}") def set_has_key(self, set_name: str, key) -> bool: return self._redis.sismember(set_name, key) diff --git a/flowcept/commons/daos/mq_dao/__init__.py b/flowcept/commons/daos/mq_dao/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/flowcept/commons/daos/mq_dao.py b/flowcept/commons/daos/mq_dao/mq_dao_base.py similarity index 60% rename from flowcept/commons/daos/mq_dao.py rename to flowcept/commons/daos/mq_dao/mq_dao_base.py index 8b9c25fb..aa5d8902 100644 --- a/flowcept/commons/daos/mq_dao.py +++ b/flowcept/commons/daos/mq_dao/mq_dao_base.py @@ -1,44 +1,49 @@ -import concurrent -import concurrent.futures -from functools import partial -from multiprocessing import Pool, cpu_count -from queue import Queue -from typing import Union, List, Dict, Callable +from abc import ABC, abstractmethod +from typing import Union, List, Callable import msgpack from redis import Redis -from redis.client import PubSub -from time import time import flowcept.commons from flowcept.commons.daos.autoflush_buffer import AutoflushBuffer from flowcept.commons.daos.keyvalue_dao import KeyValueDAO -from flowcept.commons.utils import perf_log, chunked + +from flowcept.commons.utils import chunked from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.configs import ( - REDIS_HOST, - REDIS_PORT, - REDIS_CHANNEL, - REDIS_PASSWORD, + MQ_CHANNEL, JSON_SERIALIZER, - REDIS_BUFFER_SIZE, - REDIS_INSERTION_BUFFER_TIME, - REDIS_CHUNK_SIZE, - PERF_LOG, - REDIS_URI, - ENRICH_MESSAGES, - DB_FLUSH_MODE, + MQ_BUFFER_SIZE, + MQ_INSERTION_BUFFER_TIME, + MQ_CHUNK_SIZE, + MQ_URI, + MQ_TYPE, + KVDB_HOST, + KVDB_PORT, + KVDB_PASSWORD, ) from flowcept.commons.utils import GenericJSONEncoder -class MQDao: - MESSAGE_TYPES_IGNORE = {"psubscribe"} +class MQDao(ABC): ENCODER = GenericJSONEncoder if JSON_SERIALIZER == "complex" else None # TODO we don't have a unit test to cover complex dict! + @staticmethod + def build(*args, **kwargs) -> "MQDao": + if MQ_TYPE == "redis": + from flowcept.commons.daos.mq_dao.mq_dao_redis import MQDaoRedis + + return MQDaoRedis(*args, **kwargs) + elif MQ_TYPE == "kafka": + from flowcept.commons.daos.mq_dao.mq_dao_kafka import MQDaoKafka + + return MQDaoKafka(*args, **kwargs) + else: + raise NotImplementedError + @staticmethod def _get_set_name(exec_bundle_id=None): """ @@ -50,65 +55,40 @@ def _get_set_name(exec_bundle_id=None): set_id += "_" + str(exec_bundle_id) return set_id - @staticmethod - def pipe_publish( - buffer, redis_connection, logger=flowcept.commons.logger - ): - pipe = redis_connection.pipeline() - logger.info(f"Going to flush {len(buffer)} to MQ...") - for message in buffer: - try: - logger.debug( - f"Going to send Message:" - f"\n\t[BEGIN_MSG]{message}\n[END_MSG]\t" - ) - pipe.publish(REDIS_CHANNEL, msgpack.dumps(message)) - except Exception as e: - logger.exception(e) - logger.error( - "Some messages couldn't be flushed! Check the messages' contents!" - ) - logger.error(f"Message that caused error: {message}") - t0 = 0 - if PERF_LOG: - t0 = time() - try: - pipe.execute() - logger.info(f"Flushed {len(buffer)} msgs to MQ!") - except Exception as e: - logger.exception(e) - perf_log("mq_pipe_execute", t0) - - @staticmethod - def bulk_publish( - buffer, redis_connection, logger=flowcept.commons.logger - ): - if REDIS_CHUNK_SIZE > 1: - for chunk in chunked(buffer, REDIS_CHUNK_SIZE): - MQDao.pipe_publish(chunk, redis_connection, logger) - else: - MQDao.pipe_publish(buffer, redis_connection, logger) - - def __init__(self, mq_host=None, mq_port=None, adapter_settings=None): + def __init__(self, kv_host=None, kv_port=None, adapter_settings=None): self.logger = FlowceptLogger() - if REDIS_URI is not None: + if MQ_URI is not None: # If a URI is provided, use it for connection - self._redis = Redis.from_url(REDIS_URI) + self._kv_conn = Redis.from_url(MQ_URI) else: # Otherwise, use the host, port, and password settings - self._redis = Redis( - host=REDIS_HOST if mq_host is None else mq_host, - port=REDIS_PORT if mq_port is None else mq_port, + self._kv_conn = Redis( + host=KVDB_HOST if kv_host is None else kv_host, + port=KVDB_PORT if kv_port is None else kv_port, db=0, - password=REDIS_PASSWORD if REDIS_PASSWORD else None, + password=KVDB_PASSWORD if KVDB_PASSWORD else None, ) - self._adapter_settings = adapter_settings - self._keyvalue_dao = KeyValueDAO(connection=self._redis) + self._adapter_settings = adapter_settings + self._keyvalue_dao = KeyValueDAO(connection=self._kv_conn) self._time_based_flushing_started = False self.buffer: Union[AutoflushBuffer, List] = None + @abstractmethod + def _bulk_publish( + self, buffer, channel=MQ_CHANNEL, serializer=msgpack.dumps + ): + raise NotImplementedError() + + def bulk_publish(self, buffer): + self.logger.info(f"Going to flush {len(buffer)} to MQ...") + if MQ_CHUNK_SIZE > 1: + for chunk in chunked(buffer, MQ_CHUNK_SIZE): + self._bulk_publish(chunk) + else: + self._bulk_publish(buffer) + def register_time_based_thread_init( self, interceptor_instance_id: str, exec_bundle_id=None ): @@ -136,21 +116,15 @@ def all_time_based_threads_ended(self, exec_bundle_id=None): set_name = MQDao._get_set_name(exec_bundle_id) return self._keyvalue_dao.set_is_empty(set_name) - # def delete_all_time_based_threads_sets(self): - # return self._keyvalue_dao.delete_all_matching_sets( - # MQFlusher._get_set_name() + "*" - # ) - def init_buffer(self, interceptor_instance_id: str, exec_bundle_id=None): if flowcept.configs.DB_FLUSH_MODE == "online": self.logger.info( f"Starting MQ time-based flushing! bundle: {exec_bundle_id}; interceptor id: {interceptor_instance_id}" ) self.buffer = AutoflushBuffer( - max_size=REDIS_BUFFER_SIZE, - flush_interval=REDIS_INSERTION_BUFFER_TIME, - flush_function=MQDao.bulk_publish, - redis_connection=self._redis, + max_size=MQ_BUFFER_SIZE, + flush_interval=MQ_INSERTION_BUFFER_TIME, + flush_function=self.bulk_publish, ) # self.register_time_based_thread_init( @@ -168,14 +142,9 @@ def _close_buffer(self): else: self.logger.error("MQ time-based flushing is not started") else: - MQDao.bulk_publish(self.buffer, self._redis) + self.bulk_publish(self.buffer) self.buffer = list() - def subscribe(self) -> PubSub: - pubsub = self._redis.pubsub() - pubsub.psubscribe(REDIS_CHANNEL) - return pubsub - def stop(self, interceptor_instance_id: str, bundle_exec_id: int = None): self.logger.info( f"MQ publisher received stop signal! bundle: {bundle_exec_id}; interceptor id: {interceptor_instance_id}" @@ -184,11 +153,11 @@ def stop(self, interceptor_instance_id: str, bundle_exec_id: int = None): self.logger.info( f"Flushed MQ for the last time! Now going to send stop msg. bundle: {bundle_exec_id}; interceptor id: {interceptor_instance_id}" ) - self.send_mq_dao_time_thread_stop( + self._send_mq_dao_time_thread_stop( interceptor_instance_id, bundle_exec_id ) - def send_mq_dao_time_thread_stop( + def _send_mq_dao_time_thread_stop( self, interceptor_instance_id, exec_bundle_id=None ): # These control_messages are handled by the document inserter @@ -200,16 +169,27 @@ def send_mq_dao_time_thread_stop( "exec_bundle_id": exec_bundle_id, } self.logger.info("Control msg sent: " + str(msg)) - self._redis.publish(REDIS_CHANNEL, msgpack.dumps(msg)) + self.send_message(msg) def send_document_inserter_stop(self): # These control_messages are handled by the document inserter msg = {"type": "flowcept_control", "info": "stop_document_inserter"} - self._redis.publish(REDIS_CHANNEL, msgpack.dumps(msg)) + self.send_message(msg) + + @abstractmethod + def send_message( + self, message: dict, channel=MQ_CHANNEL, serializer=msgpack.dumps + ): + raise NotImplementedError() + + @abstractmethod + def message_listener(self, message_handler: Callable): + raise NotImplementedError() + @abstractmethod def liveness_test(self): try: - response = self._redis.ping() + response = self._kv_conn.ping() if response: return True else: diff --git a/flowcept/commons/daos/mq_dao/mq_dao_kafka.py b/flowcept/commons/daos/mq_dao/mq_dao_kafka.py new file mode 100644 index 00000000..5deac449 --- /dev/null +++ b/flowcept/commons/daos/mq_dao/mq_dao_kafka.py @@ -0,0 +1,103 @@ +from typing import Callable + +import msgpack +from time import time + +from confluent_kafka import Producer, Consumer, KafkaError +from confluent_kafka.admin import AdminClient + +from flowcept.commons.daos.mq_dao.mq_dao_base import MQDao +from flowcept.commons.utils import perf_log +from flowcept.configs import ( + MQ_CHANNEL, + PERF_LOG, + MQ_HOST, + MQ_PORT, +) + + +class MQDaoKafka(MQDao): + def __init__(self, kv_host=None, kv_port=None, adapter_settings=None): + super().__init__(kv_host, kv_port, adapter_settings) + + self._kafka_conf = { + "bootstrap.servers": f"{MQ_HOST}:{MQ_PORT}", + } + self._producer = Producer(self._kafka_conf) + + def message_listener(self, message_handler: Callable): + self._kafka_conf.update( + { + "group.id": "my_group", + "auto.offset.reset": "earliest", + "enable.auto.commit": True, + } + ) + consumer = Consumer(self._kafka_conf) + consumer.subscribe([MQ_CHANNEL]) + + try: + while True: + msg = consumer.poll(1.0) + if msg is None: + continue + if msg.error(): + if msg.error().code() == KafkaError._PARTITION_EOF: + continue + else: + self.logger.error(f"Consumer error: {msg.error()}") + break + message = msgpack.loads(msg.value(), raw=False) + self.logger.debug(f"Received message: {message}") + if not message_handler(message): + break + except Exception as e: + self.logger.exception(e) + finally: + consumer.close() + + def send_message( + self, message: dict, channel=MQ_CHANNEL, serializer=msgpack.dumps + ): + self._producer.produce( + channel, key=channel, value=serializer(message) + ) + self._producer.flush() + + def _bulk_publish( + self, buffer, channel=MQ_CHANNEL, serializer=msgpack.dumps + ): + for message in buffer: + try: + self.logger.debug( + f"Going to send Message:" + f"\n\t[BEGIN_MSG]{message}\n[END_MSG]\t" + ) + self._producer.produce( + channel, key=channel, value=serializer(message) + ) + except Exception as e: + self.logger.exception(e) + self.logger.error( + "Some messages couldn't be flushed! Check the messages' contents!" + ) + self.logger.error(f"Message that caused error: {message}") + t0 = 0 + if PERF_LOG: + t0 = time() + try: + self._producer.flush() + self.logger.info(f"Flushed {len(buffer)} msgs to MQ!") + except Exception as e: + self.logger.exception(e) + perf_log("mq_pipe_flush", t0) + + def liveness_test(self): + try: + super().liveness_test() + admin_client = AdminClient(self._kafka_conf) + kafka_metadata = admin_client.list_topics(timeout=5) + return MQ_CHANNEL in kafka_metadata.topics + except Exception as e: + self.logger.exception(e) + return False diff --git a/flowcept/commons/daos/mq_dao/mq_dao_redis.py b/flowcept/commons/daos/mq_dao/mq_dao_redis.py new file mode 100644 index 00000000..790f3ab3 --- /dev/null +++ b/flowcept/commons/daos/mq_dao/mq_dao_redis.py @@ -0,0 +1,85 @@ +from typing import Callable + +import msgpack +from time import time + +from flowcept.commons.daos.mq_dao.mq_dao_base import MQDao +from flowcept.commons.utils import perf_log +from flowcept.configs import ( + MQ_HOST, + MQ_PORT, + MQ_CHANNEL, + MQ_PASSWORD, + JSON_SERIALIZER, + MQ_BUFFER_SIZE, + MQ_INSERTION_BUFFER_TIME, + MQ_CHUNK_SIZE, + PERF_LOG, + MQ_URI, + ENRICH_MESSAGES, + DB_FLUSH_MODE, + MQ_TYPE, +) + + +class MQDaoRedis(MQDao): + MESSAGE_TYPES_IGNORE = {"psubscribe"} + + def __init__(self, kv_host=None, kv_port=None, adapter_settings=None): + super().__init__(kv_host, kv_port, adapter_settings) + self._producer = ( + self._kv_conn + ) # if MQ is redis, we use the same KV for the MQ + + def message_listener(self, message_handler: Callable): + pubsub = self._kv_conn.pubsub() + pubsub.psubscribe(MQ_CHANNEL) + for message in pubsub.listen(): + self.logger.debug("Received a message!") + if message["type"] in MQDaoRedis.MESSAGE_TYPES_IGNORE: + continue + msg_obj = msgpack.loads( + message["data"] # , cls=DocumentInserter.DECODER + ) + if not message_handler(msg_obj): + break + + def send_message( + self, message: dict, channel=MQ_CHANNEL, serializer=msgpack.dumps + ): + self._producer.publish(channel, serializer(message)) + + def _bulk_publish( + self, buffer, channel=MQ_CHANNEL, serializer=msgpack.dumps + ): + pipe = self._producer.pipeline() + for message in buffer: + try: + self.logger.debug( + f"Going to send Message:" + f"\n\t[BEGIN_MSG]{message}\n[END_MSG]\t" + ) + pipe.publish(MQ_CHANNEL, serializer(message)) + except Exception as e: + self.logger.exception(e) + self.logger.error( + "Some messages couldn't be flushed! Check the messages' contents!" + ) + self.logger.error(f"Message that caused error: {message}") + t0 = 0 + if PERF_LOG: + t0 = time() + try: + pipe.execute() + self.logger.info(f"Flushed {len(buffer)} msgs to MQ!") + except Exception as e: + self.logger.exception(e) + perf_log("mq_pipe_execute", t0) + + def liveness_test(self): + try: + super().liveness_test() + return True + except Exception as e: + self.logger.exception(e) + return False diff --git a/flowcept/commons/utils.py b/flowcept/commons/utils.py index e4e9f999..2270e89e 100644 --- a/flowcept/commons/utils.py +++ b/flowcept/commons/utils.py @@ -2,6 +2,9 @@ import json from time import time, sleep from typing import Callable +import os +import platform +import subprocess import numpy as np @@ -180,6 +183,54 @@ def replace_non_serializable(obj): return f"{obj.__class__.__name__}_instance_id_{id(obj)}" +def get_gpu_vendor(): + system = platform.system() + + # Linux + if system == "Linux": + # Check for NVIDIA GPU + if os.path.exists("/proc/driver/nvidia/version"): + return "NVIDIA" + + # Check for AMD GPU using lspci + try: + lspci_output = subprocess.check_output( + "lspci", shell=True + ).decode() + if "AMD" in lspci_output: + return "AMD" + except subprocess.CalledProcessError: + pass + + # Windows + elif system == "Windows": + try: + wmic_output = subprocess.check_output( + "wmic path win32_videocontroller get name", shell=True + ).decode() + if "NVIDIA" in wmic_output: + return "NVIDIA" + elif "AMD" in wmic_output: + return "AMD" + except subprocess.CalledProcessError: + pass + + # macOS + elif system == "Darwin": # macOS is "Darwin" in platform.system() + try: + sp_output = subprocess.check_output( + "system_profiler SPDisplaysDataType", shell=True + ).decode() + if "NVIDIA" in sp_output: + return "NVIDIA" + elif "AMD" in sp_output: + return "AMD" + except subprocess.CalledProcessError: + pass + + return None + + class GenericJSONDecoder(json.JSONDecoder): def __init__(self, *args, **kwargs): json.JSONDecoder.__init__( diff --git a/flowcept/configs.py b/flowcept/configs.py index 97438ad7..90b995fe 100644 --- a/flowcept/configs.py +++ b/flowcept/configs.py @@ -48,29 +48,35 @@ CAMPAIGN_ID = settings["experiment"].get("campaign_id", "super_campaign") ###################### -# Redis Settings # +# MQ Settings # ###################### -REDIS_URI = settings["main_redis"].get("uri", None) -REDIS_INSTANCES = settings["main_redis"].get("instances", None) - -REDIS_CHANNEL = settings["main_redis"].get("channel", "interception") -REDIS_PASSWORD = settings["main_redis"].get("password", None) -REDIS_HOST = os.getenv( - "REDIS_HOST", settings["main_redis"].get("host", "localhost") +MQ_URI = settings["mq"].get("uri", None) +MQ_INSTANCES = settings["mq"].get("instances", None) + +MQ_TYPE = os.getenv("MQ_TYPE", settings["mq"].get("type", "redis")) +MQ_CHANNEL = settings["mq"].get("channel", "interception") +MQ_PASSWORD = settings["mq"].get("password", None) +MQ_HOST = os.getenv("MQ_HOST", settings["mq"].get("host", "localhost")) +MQ_PORT = int(os.getenv("MQ_PORT", settings["mq"].get("port", "6379"))) + +MQ_BUFFER_SIZE = int(settings["mq"].get("buffer_size", 50)) +MQ_INSERTION_BUFFER_TIME = int( + settings["mq"].get("insertion_buffer_time_secs", 5) ) -REDIS_PORT = int( - os.getenv("REDIS_PORT", settings["main_redis"].get("port", "6379")) +MQ_INSERTION_BUFFER_TIME = random.randint( + int(MQ_INSERTION_BUFFER_TIME * 0.9), + int(MQ_INSERTION_BUFFER_TIME * 1.4), ) +MQ_CHUNK_SIZE = int(settings["mq"].get("chunk_size", -1)) + +##################### +# KV SETTINGS # +##################### + +KVDB_PASSWORD = settings["kv_db"].get("password", None) +KVDB_HOST = os.getenv("KVDB_HOST", settings["kv_db"].get("host", "localhost")) +KVDB_PORT = int(os.getenv("KVDB_PORT", settings["kv_db"].get("port", "6379"))) -REDIS_BUFFER_SIZE = int(settings["main_redis"].get("buffer_size", 50)) -REDIS_INSERTION_BUFFER_TIME = int( - settings["main_redis"].get("insertion_buffer_time_secs", 5) -) -REDIS_INSERTION_BUFFER_TIME = random.randint( - int(REDIS_INSERTION_BUFFER_TIME * 0.9), - int(REDIS_INSERTION_BUFFER_TIME * 1.4), -) -REDIS_CHUNK_SIZE = int(settings["main_redis"].get("chunk_size", -1)) ###################### # MongoDB Settings # @@ -110,7 +116,6 @@ ###################### DB_FLUSH_MODE = settings["project"].get("db_flush_mode", "online") -MQ_TYPE = settings["project"].get("mq_type", "redis") # DEBUG_MODE = settings["project"].get("debug", False) PERF_LOG = settings["project"].get("performance_logging", False) JSON_SERIALIZER = settings["project"].get("json_serializer", "default") @@ -235,8 +240,8 @@ EXTRA_METADATA = settings.get("extra_metadata", {}) -EXTRA_METADATA.update({"mq_host": REDIS_HOST}) -EXTRA_METADATA.update({"mq_port": REDIS_PORT}) +EXTRA_METADATA.update({"mq_host": MQ_HOST}) +EXTRA_METADATA.update({"mq_port": MQ_PORT}) ###################### # Web Server # diff --git a/flowcept/flowcept_api/consumer_api.py b/flowcept/flowcept_api/flowcept_controller.py similarity index 67% rename from flowcept/flowcept_api/consumer_api.py rename to flowcept/flowcept_api/flowcept_controller.py index f06451b1..82139024 100644 --- a/flowcept/flowcept_api/consumer_api.py +++ b/flowcept/flowcept_api/flowcept_controller.py @@ -1,19 +1,25 @@ from typing import List, Union from time import sleep +from flowcept.commons.flowcept_dataclasses.workflow_object import ( + WorkflowObject, +) + import flowcept.instrumentation.decorators from flowcept.commons import logger from flowcept.commons.daos.document_db_dao import DocumentDBDao -from flowcept.commons.daos.mq_dao import MQDao -from flowcept.configs import REDIS_INSTANCES +from flowcept.commons.daos.mq_dao.mq_dao_base import MQDao +from flowcept.configs import MQ_INSTANCES +from flowcept.flowcept_api.db_api import DBAPI from flowcept.flowceptor.consumers.document_inserter import DocumentInserter from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.flowceptor.adapters.base_interceptor import BaseInterceptor -# TODO: :code-reorg: This may not be considered an API anymore as it's doing critical things for the good functioning of the system. -class FlowceptConsumerAPI(object): - INSTRUMENTATION = "instrumentation" +class Flowcept(object): + db = DBAPI() + + current_workflow_id = None def __init__( self, @@ -22,7 +28,23 @@ def __init__( ] = None, bundle_exec_id=None, start_doc_inserter=True, + workflow_id: str = None, + workflow_name: str = None, + workflow_args: str = None, ): + """ + Flowcept controller. + + This class controls the interceptors, including instrumentation. + If using for instrumentation, we assume one instance of this class + per workflow is being utilized. + + Parameters + ---------- + interceptors - list of Flowcept interceptors. If none, instrumentation will be used. If a string is passed, no interceptor will be started. # TODO: improve clarity for the documentation. + bundle_exec_id - A way to group interceptors. + start_doc_inserter - Whether you want to start consuming MQ messages to inject in the DB. + """ self.logger = FlowceptLogger() self._document_inserters: List[DocumentInserter] = [] @@ -31,13 +53,26 @@ def __init__( self._bundle_exec_id = id(self) else: self._bundle_exec_id = bundle_exec_id - if interceptors == FlowceptConsumerAPI.INSTRUMENTATION: - interceptors = ( - flowcept.instrumentation.decorators.instrumentation_interceptor + if isinstance(interceptors, str): + self._interceptors = None + else: + if interceptors is None: + interceptors = [ + flowcept.instrumentation.decorators.instrumentation_interceptor + ] + elif not isinstance(interceptors, list): + interceptors = [interceptors] + self._interceptors: List[BaseInterceptor] = interceptors + + if workflow_id or workflow_args or workflow_name: + wf_obj = WorkflowObject( + workflow_id, workflow_name, used=workflow_args ) - if interceptors is not None and type(interceptors) != list: - interceptors = [interceptors] - self._interceptors: List[BaseInterceptor] = interceptors + Flowcept.db.insert_or_update_workflow(wf_obj) + Flowcept.current_workflow_id = wf_obj.workflow_id + else: + Flowcept.current_workflow_id = None + self.is_started = False def start(self): @@ -59,8 +94,8 @@ def start(self): if self._start_doc_inserter: self.logger.debug("Flowcept Consumer starting...") - if REDIS_INSTANCES is not None and len(REDIS_INSTANCES): - for mq_host_port in REDIS_INSTANCES: + if MQ_INSTANCES is not None and len(MQ_INSTANCES): + for mq_host_port in MQ_INSTANCES: split = mq_host_port.split(":") mq_host = split[0] mq_port = int(split[1]) @@ -105,7 +140,7 @@ def stop(self): if self._start_doc_inserter: self.logger.info("Stopping Doc Inserters...") for doc_inserter in self._document_inserters: - doc_inserter.stop(bundle_exec_id=id(self)) + doc_inserter.stop(bundle_exec_id=self._bundle_exec_id) self.is_started = False self.logger.debug("All stopped!") @@ -124,7 +159,7 @@ def start_instrumentation_interceptor(): @staticmethod def services_alive() -> bool: - if not MQDao().liveness_test(): + if not MQDao.build().liveness_test(): logger.error("MQ Not Ready!") return False if not DocumentDBDao().liveness_test(): diff --git a/flowcept/flowceptor/adapters/base_interceptor.py b/flowcept/flowceptor/adapters/base_interceptor.py index fa44e9c7..a0c2f157 100644 --- a/flowcept/flowceptor/adapters/base_interceptor.py +++ b/flowcept/flowceptor/adapters/base_interceptor.py @@ -7,7 +7,7 @@ ENRICH_MESSAGES, ) from flowcept.commons.flowcept_logger import FlowceptLogger -from flowcept.commons.daos.mq_dao import MQDao +from flowcept.commons.daos.mq_dao.mq_dao_base import MQDao from flowcept.commons.flowcept_dataclasses.task_object import TaskObject from flowcept.commons.settings_factory import get_settings @@ -32,8 +32,7 @@ def __init__(self, plugin_key=None): self.settings = get_settings(plugin_key) else: self.settings = None - self._mq_dao = MQDao(adapter_settings=self.settings) - # self._db_api = DBAPI() + self._mq_dao = MQDao.build(adapter_settings=self.settings) self._bundle_exec_id = None self._interceptor_instance_id = str(id(self)) self.telemetry_capture = TelemetryCapture() diff --git a/flowcept/flowceptor/adapters/mlflow/interception_event_handler.py b/flowcept/flowceptor/adapters/mlflow/interception_event_handler.py index a568d2a3..89582917 100644 --- a/flowcept/flowceptor/adapters/mlflow/interception_event_handler.py +++ b/flowcept/flowceptor/adapters/mlflow/interception_event_handler.py @@ -1,7 +1,7 @@ -from watchdog.events import LoggingEventHandler +from watchdog.events import LoggingEventHandler, FileSystemEventHandler -class InterceptionEventHandler(LoggingEventHandler): +class InterceptionEventHandler(FileSystemEventHandler): def __init__(self, interceptor_instance, callback_function): super().__init__() self.callback_function = callback_function diff --git a/flowcept/flowceptor/adapters/mlflow/mlflow_dao.py b/flowcept/flowceptor/adapters/mlflow/mlflow_dao.py index 6f45a654..a4359638 100644 --- a/flowcept/flowceptor/adapters/mlflow/mlflow_dao.py +++ b/flowcept/flowceptor/adapters/mlflow/mlflow_dao.py @@ -1,5 +1,6 @@ from typing import List from sqlalchemy.engine import Row, create_engine +from sqlalchemy import text from textwrap import dedent from flowcept.commons import singleton @@ -31,8 +32,9 @@ def _get_db_engine(sqlite_file): raise Exception(f"Could not create DB engine with uri: {db_uri}") def get_finished_run_uuids(self) -> List[Row]: - sql = dedent( - f""" + sql = text( + dedent( + f""" SELECT run_uuid FROM runs @@ -41,6 +43,7 @@ def get_finished_run_uuids(self) -> List[Row]: ORDER BY end_time DESC LIMIT {MLFlowDAO._LIMIT} """ + ) ) try: conn = self._engine.connect() @@ -55,8 +58,9 @@ def get_finished_run_uuids(self) -> List[Row]: def get_run_data(self, run_uuid: str) -> RunData: # TODO: consider outer joins to get the run data even if there's # no metric or param or if the task hasn't finished yet - sql = dedent( - f""" + sql = text( + dedent( + f""" SELECT r.run_uuid, r.start_time, r.end_time, r.status, m.key as 'metric_key', m.value as 'metric_value', p.key as 'parameter_key', p.value as 'parameter_value' @@ -75,12 +79,13 @@ def get_run_data(self, run_uuid: str) -> RunData: parameter_key, parameter_value LIMIT 30 """ + ) ) try: conn = self._engine.connect() result_set = conn.execute(sql).fetchall() except Exception as e: - self.logger.warning(e) + self.logger.exception(e) return None finally: conn.close() diff --git a/flowcept/flowceptor/adapters/mlflow/mlflow_interceptor.py b/flowcept/flowceptor/adapters/mlflow/mlflow_interceptor.py index 89489356..741d30aa 100644 --- a/flowcept/flowceptor/adapters/mlflow/mlflow_interceptor.py +++ b/flowcept/flowceptor/adapters/mlflow/mlflow_interceptor.py @@ -1,5 +1,6 @@ import os import time +from threading import Thread from watchdog.observers import Observer from watchdog.observers.polling import PollingObserver @@ -24,6 +25,7 @@ class MLFlowInterceptor(BaseInterceptor): def __init__(self, plugin_key="mlflow"): super().__init__(plugin_key) self._observer: PollingObserver = None + self._observer_thread: Thread = None self.state_manager = InterceptorStateManager(self.settings) self.dao = MLFlowDAO(self.settings) @@ -61,17 +63,20 @@ def callback(self): def start(self, bundle_exec_id) -> "MLFlowInterceptor": super().start(bundle_exec_id) - self.observe() + self._observer_thread = Thread(target=self.observe, daemon=True) + self._observer_thread.start() return self def stop(self) -> bool: super().stop() self.logger.debug("Interceptor stopping...") self._observer.stop() + self._observer_thread.join() self.logger.debug("Interceptor stopped.") return True def observe(self): + self.logger.debug("Observing") event_handler = InterceptionEventHandler( self, self.__class__.callback ) diff --git a/flowcept/flowceptor/consumers/document_inserter.py b/flowcept/flowceptor/consumers/document_inserter.py index 6f752998..5aced530 100644 --- a/flowcept/flowceptor/consumers/document_inserter.py +++ b/flowcept/flowceptor/consumers/document_inserter.py @@ -1,4 +1,3 @@ -import msgpack from time import time, sleep from threading import Thread, Event, Lock from typing import Dict @@ -20,7 +19,7 @@ MONGO_REMOVE_EMPTY_FIELDS, ) from flowcept.commons.flowcept_logger import FlowceptLogger -from flowcept.commons.daos.mq_dao import MQDao +from flowcept.commons.daos.mq_dao.mq_dao_base import MQDao from flowcept.commons.daos.document_db_dao import DocumentDBDao from flowcept.flowceptor.consumers.consumer_utils import ( remove_empty_fields_from_dict, @@ -49,7 +48,7 @@ def __init__( bundle_exec_id=None, ): self._task_dicts_buffer = list() - self._mq_dao = MQDao(mq_host, mq_port) + self._mq_dao = MQDao.build(mq_host, mq_port) self._doc_dao = DocumentDBDao() self._previous_time = time() self.logger = FlowceptLogger() @@ -107,30 +106,7 @@ def flush_function(buffer, doc_dao, logger=flowcept.commons.logger): else: logger.info(f"Flushed {len(buffer)} msgs to DocDB!") - # - # def _flush(self): - # self._set_buffer_size() - # with self._lock: - # if len(self._task_dicts_buffer): - # self.logger.info( - # f"Current Doc buffer size: {len(self._task_dicts_buffer)}, " - # f"Gonna flush {len(self._task_dicts_buffer)} msgs to DocDB!" - # ) - # inserted = self._doc_dao.insert_and_update_many( - # TaskObject.task_id_field(), self._task_dicts_buffer - # ) - # if not inserted: - # self.logger.warning( - # f"Could not insert the buffer correctly. " - # f"Buffer content={self._task_dicts_buffer}" - # ) - # else: - # self.logger.info( - # f"Flushed {len(self._task_dicts_buffer)} msgs to DocDB!" - # ) - # self._task_dicts_buffer = list() - - def handle_task_message(self, message: Dict): + def _handle_task_message(self, message: Dict): # if "utc_timestamp" in message: # dt = datetime.fromtimestamp(message["utc_timestamp"]) # message["timestamp"] = dt.utcnow() @@ -167,7 +143,7 @@ def handle_task_message(self, message: Dict): # self.logger.debug("Docs buffer exceeded, flushing...") # self._flush() - def handle_workflow_message(self, message: Dict): + def _handle_workflow_message(self, message: Dict): message.pop("type") self.logger.debug( f"Received following msg in DocInserter:" @@ -179,7 +155,7 @@ def handle_workflow_message(self, message: Dict): inserted = self._doc_dao.workflow_insert_or_update(wf_obj) return inserted - def handle_control_message(self, message): + def _handle_control_message(self, message): self.logger.info( f"I'm doc inserter {id(self)}. I received this control msg received: {message}" ) @@ -207,22 +183,6 @@ def handle_control_message(self, message): self.logger.info("Document Inserter is stopping...") return "stop" - # def time_based_flushing(self, event: Event): - # while not event.is_set(): - # with self._lock: - # if len(self._task_dicts_buffer): - # now = time() - # timediff = now - self._previous_time - # if timediff >= MONGO_INSERTION_BUFFER_TIME: - # self.logger.debug("Time to flush to doc db!") - # self._previous_time = now - # self._flush() - # self.logger.debug( - # f"Time-based DocDB inserter going to wait for {MONGO_INSERTION_BUFFER_TIME} s." - # ) - # event.wait(MONGO_INSERTION_BUFFER_TIME) - # self.logger.debug("Broke the time_based_flushing in Doc Inserter!") - def start(self) -> "DocumentInserter": self._main_thread = Thread(target=self._start) self._main_thread.start() @@ -230,49 +190,38 @@ def start(self) -> "DocumentInserter": def _start(self): stop_event = Event() - # time_thread = Thread( - # target=self.time_based_flushing, args=(stop_event,) - # ) - # time_thread.start() - pubsub = self._mq_dao.subscribe() - should_continue = True - while should_continue: + while True: try: - for message in pubsub.listen(): - self.logger.debug("Doc inserter Received a message!") - if message["type"] in MQDao.MESSAGE_TYPES_IGNORE: - continue - _dict_obj = msgpack.loads( - message["data"] # , cls=DocumentInserter.DECODER - ) - msg_type = _dict_obj.get("type") - if msg_type == "flowcept_control": - r = self.handle_control_message(_dict_obj) - if r == "stop": - stop_event.set() - self.buffer.stop() - should_continue = False - break - elif msg_type == "task": - self.handle_task_message(_dict_obj) - elif msg_type == "workflow": - self.handle_workflow_message(_dict_obj) - elif msg_type is None: - raise Exception("Please inform the message type.") - else: - self.logger.error("Unexpected message type") - self.logger.debug( - "Processed all MQ msgs in doc_inserter we got so far. " - "Now waiting (hopefully not forever!) on the " - "pubsub.listen() loop for new messages." - ) + self._mq_dao.message_listener(self._message_handler) + stop_event.set() + self.buffer.stop() + break except Exception as e: self.logger.exception(e) sleep(2) self.logger.debug("Still in the doc insert. message listen loop") self.logger.info("Ok, we broke the doc inserter message listen loop!") - # time_thread.join() - # self.logger.info("Joined time thread in doc inserter.") + + def _message_handler(self, msg_obj: dict): + msg_type = msg_obj.get("type") + if msg_type == "flowcept_control": + r = self._handle_control_message(msg_obj) + if r == "stop": + return False + return True + elif msg_type == "task": + self._handle_task_message(msg_obj) + return True + elif msg_type == "workflow": + self._handle_workflow_message(msg_obj) + return True + elif msg_type is None: + self.logger.warning(f"Message without type???\n {msg_obj}") + return True + # raise Exception("Please inform the message type.") + else: + self.logger.error("Unexpected message type") + return True def stop(self, bundle_exec_id=None): if self.check_safe_stops: diff --git a/flowcept/instrumentation/decorators/flowcept_task.py b/flowcept/instrumentation/decorators/flowcept_task.py index 92841931..92554509 100644 --- a/flowcept/instrumentation/decorators/flowcept_task.py +++ b/flowcept/instrumentation/decorators/flowcept_task.py @@ -1,6 +1,7 @@ from time import time from functools import wraps import flowcept.commons +from flowcept import Flowcept from flowcept.commons.flowcept_dataclasses.task_object import ( TaskObject, Status, @@ -25,6 +26,9 @@ def default_args_handler(task_message: TaskObject, *args, **kwargs): "workflow_id", None ) args_handled.update(kwargs) + task_message.workflow_id = ( + task_message.workflow_id or Flowcept.current_workflow_id + ) if REPLACE_NON_JSON_SERIALIZABLE: args_handled = replace_non_serializable(args_handled) return args_handled diff --git a/flowcept/instrumentation/decorators/responsible_ai.py b/flowcept/instrumentation/decorators/responsible_ai.py index 5bda3519..18d19f97 100644 --- a/flowcept/instrumentation/decorators/responsible_ai.py +++ b/flowcept/instrumentation/decorators/responsible_ai.py @@ -1,7 +1,7 @@ from functools import wraps import numpy as np from torch import nn -from flowcept import DBAPI +from flowcept import Flowcept from flowcept.commons.utils import replace_non_serializable from flowcept.configs import REPLACE_NON_JSON_SERIALIZABLE, INSTRUMENTATION @@ -110,7 +110,7 @@ def wrapper(*args, **kwargs): if INSTRUMENTATION.get("torch", False) and INSTRUMENTATION[ "torch" ].get("save_models", False): - obj_id = DBAPI().save_torch_model( + obj_id = Flowcept.db.save_torch_model( model, custom_metadata=ret["responsible_ai_metadata"] ) ret["object_id"] = obj_id diff --git a/flowcept/main.py b/flowcept/main.py index 7e1d69d1..f6870e9c 100644 --- a/flowcept/main.py +++ b/flowcept/main.py @@ -1,7 +1,7 @@ import sys from flowcept import ( - FlowceptConsumerAPI, + Flowcept, ZambezeInterceptor, MLFlowInterceptor, TensorboardInterceptor, @@ -36,7 +36,7 @@ def main(): ) interceptors.append(interceptor) - consumer = FlowceptConsumerAPI(interceptors) + consumer = Flowcept(interceptors) consumer.start() diff --git a/flowcept/version.py b/flowcept/version.py index 15548d30..41cdf622 100644 --- a/flowcept/version.py +++ b/flowcept/version.py @@ -2,4 +2,4 @@ # This file is supposed to be automatically modified by the CI Bot. # The expected format is: .. # See .github/workflows/version_bumper.py -__version__ = "0.3.11" +__version__ = "0.5.0" diff --git a/notebooks/analytics.ipynb b/notebooks/analytics.ipynb index 48a288fa..0bc2ea70 100644 --- a/notebooks/analytics.ipynb +++ b/notebooks/analytics.ipynb @@ -32,8 +32,7 @@ " \"\"\"\n", " import json\n", " from uuid import uuid4\n", - " from flowcept import DBAPI\n", - " db_api = DBAPI()\n", + " from flowcept import Flowcept\n", " test_data_path = '../tests/api/sample_data_with_telemetry_and_rai.json' # This sample data contains a workflow composed of 9 tasks.\n", " with open(test_data_path) as f:\n", " base_data = json.loads(f.read())\n", @@ -47,7 +46,7 @@ " new_doc[\"workflow_id\"] = wf_id\n", " docs.append(new_doc)\n", " \n", - " inserted_ids = db_api._dao.insert_many(docs)\n", + " inserted_ids = Flowcept.db._dao.insert_many(docs)\n", " assert len(inserted_ids) == len(base_data)\n", " return wf_id" ] diff --git a/notebooks/dask.ipynb b/notebooks/dask.ipynb index 071de54e..ff81a146 100644 --- a/notebooks/dask.ipynb +++ b/notebooks/dask.ipynb @@ -14,7 +14,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "e7f738dd-4e78-4707-a0b4-b7ddc729a635", "metadata": { "tags": [] @@ -30,7 +30,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "0d399983-63f2-4f0d-acdc-6e3ff4abbb4d", "metadata": { "tags": [] @@ -44,7 +44,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "50721f98-6f40-4bd9-83f1-56e83e75aa8b", "metadata": { "tags": [] @@ -80,7 +80,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "47a4cf2c-4b5c-4fe5-9973-cda2734b0623", "metadata": { "tags": [] @@ -114,27 +114,27 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "b1e311c3-ca2a-4cf5-9a38-6742c91a0035", "metadata": { "tags": [] }, "outputs": [], "source": [ - "from flowcept import FlowceptConsumerAPI\n", - "consumer = FlowceptConsumerAPI()" + "from flowcept import Flowcept\n", + "flowcept = Flowcept('dask')" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "fe609b49-28cf-4f2c-9027-ee7bc51fb86a", "metadata": { "tags": [] }, "outputs": [], "source": [ - "consumer.start()" + "flowcept.start()" ] }, { @@ -149,7 +149,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "702f3c58-2a52-4763-87d9-fd7062192e48", "metadata": { "tags": [] @@ -172,7 +172,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "e843f2c8-4566-46f2-95de-34d17bd4c061", "metadata": { "tags": [] @@ -194,7 +194,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "84307c0a-6ef5-428d-bf01-fd921e148c86", "metadata": { "tags": [] @@ -212,7 +212,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "e211229d-ac01-48c6-81f1-efba8e72d58c", "metadata": { "tags": [] @@ -234,7 +234,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "34fbe181-c55d-4ac4-84bf-0684fb3f54ca", "metadata": { "tags": [] @@ -247,7 +247,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "72a80432-f4fd-459e-a3f2-900beeea434d", "metadata": { "tags": [] @@ -267,7 +267,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "82d8a1cc-86c8-48a6-b91e-d822c0417c1b", "metadata": { "tags": [] @@ -281,7 +281,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "2a1afa5d-3934-4188-8a35-4d221bd58550", "metadata": { "tags": [] @@ -293,7 +293,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "f0a5d746-1157-4591-af37-76360e7a7b1c", "metadata": { "tags": [] @@ -315,19 +315,19 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "45291c13-4fcf-47b4-9b9f-de0050b1b076", "metadata": { "tags": [] }, "outputs": [], "source": [ - "consumer.stop()" + "flowcept.stop()" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "7a3d4d67-315a-46dd-a41e-b15174dc9784", "metadata": { "tags": [] @@ -339,7 +339,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": "null", "id": "fb51cbaf-2127-4fe3-8fb6-e1be9d009f7e", "metadata": { "tags": [] @@ -366,7 +366,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.16" + "version": "3.9.19" } }, "nbformat": 4, diff --git a/notebooks/dask_from_CLI.ipynb b/notebooks/dask_from_CLI.ipynb index dc8bfe9e..2aed4697 100644 --- a/notebooks/dask_from_CLI.ipynb +++ b/notebooks/dask_from_CLI.ipynb @@ -51,9 +51,9 @@ }, "outputs": [], "source": [ - "from flowcept import FlowceptConsumerAPI\n", - "consumer = FlowceptConsumerAPI()\n", - "consumer.start()" + "from flowcept import Flowcept\n", + "flowcept = Flowcept()\n", + "flowcept.start()" ] }, { @@ -161,7 +161,7 @@ "metadata": {}, "outputs": [], "source": [ - "consumer.stop()" + "flowcept.stop()" ] } ], @@ -181,7 +181,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.16" + "version": "3.9.19" } }, "nbformat": 4, diff --git a/notebooks/mlflow.ipynb b/notebooks/mlflow.ipynb index 7f5a84fa..e8b9d5ab 100644 --- a/notebooks/mlflow.ipynb +++ b/notebooks/mlflow.ipynb @@ -148,9 +148,9 @@ "metadata": {}, "outputs": [], "source": [ - "from flowcept import FlowceptConsumerAPI\n", - "consumer = FlowceptConsumerAPI(interceptor)\n", - "consumer.start()" + "from flowcept import Flowcept\n", + "flowcept = Flowcept(interceptor)\n", + "flowcept.start()" ] }, { @@ -402,7 +402,7 @@ }, "outputs": [], "source": [ - "consumer.stop()" + "flowcept.stop()" ] } ], @@ -422,7 +422,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.16" + "version": "3.9.19" } }, "nbformat": 4, diff --git a/notebooks/sample_data.csv b/notebooks/sample_data.csv new file mode 100644 index 00000000..86cfa453 --- /dev/null +++ b/notebooks/sample_data.csv @@ -0,0 +1,10 @@ +,used.max_epochs,generated.loss,generated.accuracy,generated.responsible_ai_metadata.shap_sum,generated.responsible_ai_metadata.flops,generated.responsible_ai_metadata.params,generated.responsible_ai_metadata.max_width,generated.responsible_ai_metadata.depth,generated.responsible_ai_metadata.n_fc_layers,generated.responsible_ai_metadata.n_cv_layers,telemetry_diff.cpu.times_avg.user,telemetry_diff.cpu.times_avg.system,telemetry_diff.cpu.times_avg.idle,telemetry_diff.process.memory.rss,telemetry_diff.process.memory.vms,telemetry_diff.process.memory.pfaults,telemetry_diff.process.memory.pageins,telemetry_diff.process.cpu_times.user,telemetry_diff.process.cpu_times.system,telemetry_diff.memory.virtual.available,telemetry_diff.memory.virtual.used,telemetry_diff.memory.virtual.free,telemetry_diff.memory.virtual.active,telemetry_diff.memory.virtual.inactive,telemetry_diff.memory.virtual.wired,telemetry_diff.memory.swap.total,telemetry_diff.memory.swap.used,telemetry_diff.memory.swap.free,telemetry_diff.memory.swap.sin,telemetry_diff.memory.swap.sout,telemetry_diff.disk.disk_usage.free,used.conv_in_outs_sum,used.conv_kernel_sizes_sum,used.conv_pool_sizes_sum,used.fc_in_outs_sum,used.softmax_dims_sum,telemetry_diff.network.activity,telemetry_diff.disk.activity,telemetry_diff.process.activity +0,1.0,0.014728536784648895,40.75,0.0,21880192.0,162990.0,100.0,12.0,5.0,7.0,397.4499999999971,155.3100000000013,16.699999999953434,2720251904.0,4253073408.0,7128964.0,39.0,314.642169344,60.419478912,-779124736.0,-900841472.0,119111680.0,-967688192.0,-746061824.0,66846720.0,1073741824.0,577437696.0,496304128.0,371359744.0,5390336.0,-1067229184.0,41.0,29.0,2.0,220.0,1.0,156472.4375,171472968.0,1997778.0 +1,1.0,0.040325844478607174,11.35,0.0,47275136.0,359840.0,400.0,16.0,9.0,7.0,411.41999999999825,159.40999999999985,17.17000000004191,2675638272.0,4349198336.0,7289754.0,39.0,326.14157747200005,62.117328288,-930316288.0,-887881728.0,12861440.0,-937263104.0,-792707072.0,49381376.0,1073741824.0,577437696.0,496304128.0,373030912.0,5390336.0,-1067397120.0,41.0,29.0,2.0,1620.0,3.0,162635.8125,172406277.0,2044184.0 +2,1.0,0.05815730080604553,11.35,0.0,5405073024.0,42184840.0,4000.0,24.0,17.0,7.0,1179.7599999999948,401.4300000000003,50.68000000005122,2231304192.0,3327492096.0,17076525.0,39.0,942.530430464,153.566138432,-159088640.0,-1295351808.0,751190016.0,-1227440128.0,-747388928.0,-67911680.0,1073741824.0,560660480.0,513081344.0,738394112.0,14172160.0,-1065861120.0,41.0,29.0,2.0,32020.0,7.0,369630.8125,251856855.0,4531840.0 +3,1.0,0.018241909003257752,10.28,0.0,324195712.0,1890690.0,100.0,16.0,5.0,11.0,1862.6100000000006,579.5999999999985,114.22000000003027,2189475840.0,3818635264.0,23538438.0,39.0,1486.561319424,216.581898688,-693305344.0,-804503552.0,55394304.0,-755220480.0,-590577664.0,-49283072.0,1073741824.0,552271872.0,521469952.0,803258368.0,21512192.0,-1069760512.0,181.0,30.0,3.0,260.0,1.0,650028.625,272619547.0,6284750.0 +4,1.0,0.040312224340438844,11.35,0.0,349846656.0,2089540.0,400.0,20.0,9.0,11.0,1875.5800000000017,582.4599999999991,115.87000000005355,2155741184.0,3380838400.0,23604991.0,37.0,1496.636215552,217.575447296,-489193472.0,-862502912.0,309952512.0,-788512768.0,-642940928.0,-73990144.0,1073741824.0,552271872.0,521469952.0,819249152.0,22134784.0,-1070206976.0,181.0,30.0,3.0,1660.0,3.0,652738.75,275295002.8333333,6311004.0 +5,1.0,0.05813799858093262,11.35,0.0,5709692544.0,43930540.0,4000.0,28.0,17.0,11.0,2384.9100000000035,701.8199999999997,252.69000000000233,2042101760.0,3384459264.0,27004118.0,37.0,1888.629006592,255.444219456,-567836672.0,-731906048.0,224264192.0,-695762944.0,-632913920.0,-36143104.0,1073741824.0,552271872.0,521469952.0,884097024.0,29392896.0,-1060265984.0,181.0,30.0,3.0,32060.0,7.0,799624.5,295061285.8333333,7313349.0 +6,1.0,0.018207813382148743,10.09,0.0,1810792832.0,8485880.0,120.0,20.0,5.0,15.0,4169.770000000004,1086.2099999999991,1084.350000000035,968032256.0,2068332544.0,35022644.0,37.0,3220.7134123520004,354.79103712,-17186816.0,-623886336.0,534364160.0,-553648128.0,-393068544.0,-70238208.0,1073741824.0,543883264.0,529858560.0,1058029568.0,49823744.0,-1066872832.0,481.0,31.0,4.0,320.0,1.0,1362017.5625,347619554.0,10075678.0 +7,1.0,0.04012699522972107,11.35,0.0,1836827776.0,8687730.0,400.0,24.0,9.0,15.0,4172.880000000005,1086.9700000000012,1087.1500000000233,1305690112.0,2392899584.0,35045100.0,37.0,3222.663728896,354.923501504,-17170432.0,-623886336.0,534364160.0,-553648128.0,-393052160.0,-70238208.0,1073741824.0,543883264.0,529858560.0,1058029568.0,49823744.0,-1067188224.0,481.0,31.0,4.0,1720.0,3.0,1363559.3125,345680791.6666667,10079114.0 +8,1.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,481.0,31.0,4.0,32120.0,7.0,,,0.0 diff --git a/notebooks/tensorboard.ipynb b/notebooks/tensorboard.ipynb index f8f9fc10..b69f84f7 100644 --- a/notebooks/tensorboard.ipynb +++ b/notebooks/tensorboard.ipynb @@ -302,9 +302,9 @@ }, "outputs": [], "source": [ - "from flowcept import FlowceptConsumerAPI\n", - "consumer = FlowceptConsumerAPI(interceptor)\n", - "consumer.start()" + "from flowcept import Flowcept\n", + "flowcept = Flowcept(interceptor)\n", + "flowcept.start()" ] }, { @@ -339,7 +339,7 @@ "outputs": [], "source": [ "sleep(10)\n", - "consumer.stop()" + "flowcept.stop()" ] }, { @@ -385,7 +385,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.16" + "version": "3.9.19" } }, "nbformat": 4, diff --git a/notebooks/zambeze.ipynb b/notebooks/zambeze.ipynb index 77b031ff..979e87a7 100644 --- a/notebooks/zambeze.ipynb +++ b/notebooks/zambeze.ipynb @@ -114,9 +114,9 @@ "metadata": {}, "outputs": [], "source": [ - "from flowcept import FlowceptConsumerAPI\n", - "consumer = FlowceptConsumerAPI(interceptor)\n", - "consumer.start()" + "from flowcept import Flowcept\n", + "flowcept = Flowcept(interceptor)\n", + "flowcept.start()" ] }, { @@ -205,7 +205,7 @@ "metadata": {}, "outputs": [], "source": [ - "consumer.stop()" + "flowcept.stop()" ] } ], @@ -225,7 +225,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.16" + "version": "3.9.19" } }, "nbformat": 4, diff --git a/requirements.txt b/requirements.txt index 69fdd2c6..951e1f44 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,9 +2,9 @@ redis==4.4.2 psutil==5.9.5 py-cpuinfo==9.0.0 pymongo==4.3.3 -Werkzeug==2.2.2 -flask==2.2.2 -requests==2.31.0 -flask_restful==0.3.9 pandas==2.0.3 omegaconf +flask +requests +flask_restful +Werkzeug diff --git a/resources/sample_settings.yaml b/resources/sample_settings.yaml index e35d2f2f..5352a4e0 100644 --- a/resources/sample_settings.yaml +++ b/resources/sample_settings.yaml @@ -31,15 +31,20 @@ experiment: user: root campaign_id: super_campaign -main_redis: +mq: + type: redis # or kafka; if kafka, please adjust the port host: localhost - #instances: ["localhost:6379"] # We can have multiple redis instances being accessed by the consumers but each interceptor will currently access one single redis. + # instances: ["localhost:6379"] # We can have multiple redis instances being accessed by the consumers but each interceptor will currently access one single redis. port: 6379 channel: interception buffer_size: 50 insertion_buffer_time_secs: 5 chunk_size: 10 # use can use 0 or -1 to disable this. Or simply omit this from the config file. +kv_db: + host: localhost + port: 6379 + mongodb: host: localhost port: 27017 diff --git a/setup.py b/setup.py index 2d251f0f..6fe78529 100644 --- a/setup.py +++ b/setup.py @@ -51,13 +51,13 @@ def create_settings_file(): extras_requirement_keys = [ "zambeze", "mlflow", - "tensorboard", "dask", "nvidia", "amd", "analytics", "responsible_ai", - "data_augmentation", + "kafka", + "tensorboard", ] skip_full = {"amd", "nvidia"} @@ -73,6 +73,11 @@ def create_settings_file(): extras_require["full"] = full_requirements +fulldev = full_requirements.copy() +fulldev.extend(get_requirements(f"extra_requirements/dev-requirements.txt")) + +extras_require["fulldev"] = fulldev + keywords = [ "ai", "ml", diff --git a/tests/adapters/dask_test_utils.py b/tests/adapters/dask_test_utils.py index 93dd2681..3d1a422a 100644 --- a/tests/adapters/dask_test_utils.py +++ b/tests/adapters/dask_test_utils.py @@ -1,13 +1,13 @@ from dask.distributed import Client, LocalCluster from distributed import Status -from flowcept import FlowceptConsumerAPI +from flowcept import Flowcept def close_dask(client, cluster): """ - We must close dask so that the Dask plugins at the workers and scheduler will send the stop signal, which is required for flowcept to stop gracefully (otherwise it will run forever waiting for this stop signal. - The trick part was to find the correct order of closures for dask, that's why I created this [very simple] method, which might be reused in other tests. + We must close dask so that the Dask plugins at the workers and scheduler will send the stop signal, which is required for flowcept to stop gracefully (otherwise it will run forever waiting for this stop signal). + The tricky part was to find the correct order of closures for dask, that's why I created this [very simple] method, which might be reused in other tests. From all alternatives, after several trial and errors, what worked best without exceptions being thrown is here in this method. client.shutdown causes the workers to die unexpectedly. :param client: @@ -29,7 +29,9 @@ def setup_local_dask_cluster(consumer=None, n_workers=1, exec_bundle=None): ) if consumer is None or not consumer.is_started: - consumer = FlowceptConsumerAPI(bundle_exec_id=exec_bundle).start() + consumer = Flowcept( + interceptors="dask", bundle_exec_id=exec_bundle + ).start() cluster = LocalCluster(n_workers=n_workers) scheduler = cluster.scheduler diff --git a/tests/adapters/test_dask.py b/tests/adapters/test_dask.py index ddc04bc4..ee0c2b78 100644 --- a/tests/adapters/test_dask.py +++ b/tests/adapters/test_dask.py @@ -5,7 +5,7 @@ from dask.distributed import Client, LocalCluster -from flowcept import FlowceptConsumerAPI, TaskQueryAPI, DBAPI +from flowcept import Flowcept, TaskQueryAPI from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.commons.utils import ( assert_by_querying_tasks_until, @@ -51,12 +51,10 @@ def forced_error_func(x): class TestDask(unittest.TestCase): client: Client = None cluster: LocalCluster = None - consumer: FlowceptConsumerAPI = None + consumer: Flowcept = None def __init__(self, *args, **kwargs): super(TestDask, self).__init__(*args, **kwargs) - self.query_api = TaskQueryAPI() - self.db_api = DBAPI() self.logger = FlowceptLogger() @classmethod @@ -171,12 +169,13 @@ def test_observer_and_consumption(self): print("Done workflow!") assert assert_by_querying_tasks_until( {"task_id": o2_task_id}, - condition_to_evaluate=lambda docs: "telemetry_at_end" in docs[0] + condition_to_evaluate=lambda docs: "ended_at" in docs[0] and "y" in docs[0]["used"] and len(docs[0]["generated"]) > 0, ) assert evaluate_until( - lambda: self.db_api.get_workflow(workflow_id=wf_id) is not None, + lambda: TestDask.consumer.db.get_workflow(workflow_id=wf_id) + is not None, msg="Checking if workflow object was saved in db", ) print("All conditions met!") diff --git a/tests/adapters/test_dask_with_context_mgmt.py b/tests/adapters/test_dask_with_context_mgmt.py index 364c543c..13d34418 100644 --- a/tests/adapters/test_dask_with_context_mgmt.py +++ b/tests/adapters/test_dask_with_context_mgmt.py @@ -2,12 +2,15 @@ import numpy as np from dask.distributed import Client +from distributed import LocalCluster -from flowcept import FlowceptConsumerAPI +from flowcept import Flowcept from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.commons.utils import assert_by_querying_tasks_until from flowcept.flowceptor.adapters.dask.dask_plugins import ( register_dask_workflow, + FlowceptDaskSchedulerAdapter, + FlowceptDaskWorkerAdapter, ) from tests.adapters.dask_test_utils import ( setup_local_dask_cluster, @@ -23,45 +26,29 @@ def dummy_func1(x): class TestDaskContextMgmt(unittest.TestCase): - client: Client = None - cluster = None - consumer = None - def __init__(self, *args, **kwargs): super(TestDaskContextMgmt, self).__init__(*args, **kwargs) self.logger = FlowceptLogger() - @classmethod - def setUpClass(cls): - ( - TestDaskContextMgmt.client, - TestDaskContextMgmt.cluster, - TestDaskContextMgmt.consumer, - ) = setup_local_dask_cluster(TestDaskContextMgmt.consumer, 2) - def test_workflow(self): - i1 = np.random.random() - register_dask_workflow(self.client) - with FlowceptConsumerAPI(): - o1 = self.client.submit(dummy_func1, i1) + cluster = LocalCluster(n_workers=2) + scheduler = cluster.scheduler + client = Client(scheduler.address) + scheduler.add_plugin(FlowceptDaskSchedulerAdapter(scheduler)) + client.register_plugin(FlowceptDaskWorkerAdapter()) + register_dask_workflow(client) + + with Flowcept("dask"): + i1 = np.random.random() + o1 = client.submit(dummy_func1, i1) self.logger.debug(o1.result()) self.logger.debug(o1.key) - assert assert_by_querying_tasks_until( - {"task_id": o1.key}, - condition_to_evaluate=lambda docs: "ended_at" in docs[0], - ) - - @classmethod - def tearDownClass(cls): - print("Ending tests!") - try: - close_dask( - TestDaskContextMgmt.client, TestDaskContextMgmt.cluster - ) - except Exception as e: - print(e) - pass + close_dask(client, cluster) + # stop signal sent to doc inserter must be sent after + # all other interceptors stopped - if TestDaskContextMgmt.consumer: - TestDaskContextMgmt.consumer.stop() + assert assert_by_querying_tasks_until( + {"task_id": o1.key}, + condition_to_evaluate=lambda docs: "ended_at" in docs[0], + ) diff --git a/tests/adapters/test_mlflow.py b/tests/adapters/test_mlflow.py index 76b9a5ee..5993d501 100644 --- a/tests/adapters/test_mlflow.py +++ b/tests/adapters/test_mlflow.py @@ -2,7 +2,7 @@ from time import sleep from flowcept.commons.flowcept_logger import FlowceptLogger -from flowcept import MLFlowInterceptor, FlowceptConsumerAPI, TaskQueryAPI +from flowcept import MLFlowInterceptor, Flowcept from flowcept.commons.utils import ( assert_by_querying_tasks_until, evaluate_until, @@ -19,8 +19,6 @@ def test_pure_run_mlflow(self): import uuid import mlflow - # from mlflow.tracking import MlflowClient - # client = MlflowClient() mlflow.set_tracking_uri( f"sqlite:///" f"{self.interceptor.settings.file_path}" ) @@ -35,27 +33,6 @@ def test_pure_run_mlflow(self): self.logger.debug("\nTrained model") mlflow.log_metric("loss", 0.04) - return run.info.run_uuid - - def test_pure_run_mlflow_no_ctx_mgr(self): - import uuid - import mlflow - - # from mlflow.tracking import MlflowClient - # client = MlflowClient() - mlflow.set_tracking_uri( - f"sqlite:///" f"{self.interceptor.settings.file_path}" - ) - experiment_name = "LinearRegression" - experiment_id = mlflow.create_experiment( - experiment_name + str(uuid.uuid4()) - ) - run = mlflow.start_run(experiment_id=experiment_id) - mlflow.log_params({"number_epochs": 10}) - mlflow.log_params({"batch_size": 64}) - self.logger.debug("\nTrained model") - mlflow.log_metric("loss", 0.04) - mlflow.end_run() return run.info.run_uuid def test_get_runs(self): @@ -84,13 +61,20 @@ def test_check_state_manager(self): self.interceptor.state_manager.add_element_id(run_uuid) def test_observer_and_consumption(self): - with FlowceptConsumerAPI(self.interceptor): + # if os.path.exists(self.interceptor.settings.file_path): + # os.remove(self.interceptor.settings.file_path) + # + # with open(self.interceptor.settings.file_path, 'w+') as f: + # f.write("") + + with Flowcept(self.interceptor): run_uuid = self.test_pure_run_mlflow() - sleep(5) + # sleep(3) assert evaluate_until( - lambda: self.interceptor.state_manager.has_element_id(run_uuid) + lambda: self.interceptor.state_manager.has_element_id(run_uuid), ) + assert assert_by_querying_tasks_until( {"task_id": run_uuid}, ) diff --git a/tests/adapters/test_tensorboard.py b/tests/adapters/test_tensorboard.py index 9dc18f87..347efe72 100644 --- a/tests/adapters/test_tensorboard.py +++ b/tests/adapters/test_tensorboard.py @@ -5,7 +5,7 @@ from flowcept.configs import MONGO_INSERTION_BUFFER_TIME from flowcept.commons.flowcept_logger import FlowceptLogger -from flowcept import TensorboardInterceptor, FlowceptConsumerAPI, TaskQueryAPI +from flowcept import TensorboardInterceptor, Flowcept, TaskQueryAPI from flowcept.commons.utils import ( assert_by_querying_tasks_until, evaluate_until, @@ -36,7 +36,7 @@ def reset_log_dir(self): # Making sure we'll wait until next watch cycle sleep(watch_interval_sec * 3) - def test_run_tensorboard_hparam_tuning(self): + def run_tensorboard_hparam_tuning(self): """ Code based on https://www.tensorflow.org/tensorboard/hyperparameter_tuning_with_hparams @@ -142,8 +142,8 @@ def run(run_dir, hparams): def test_observer_and_consumption(self): self.reset_log_dir() - with FlowceptConsumerAPI(self.interceptor): - wf_id = self.test_run_tensorboard_hparam_tuning() + with Flowcept(self.interceptor): + wf_id = self.run_tensorboard_hparam_tuning() self.logger.debug("Done training. Sleeping some time...") watch_interval_sec = MONGO_INSERTION_BUFFER_TIME # Making sure we'll wait until next watch cycle @@ -155,15 +155,12 @@ def test_observer_and_consumption(self): ) assert assert_by_querying_tasks_until({"workflow_id": wf_id}) - # TODO: Sometimes this fails. It's been hard to debug and tensorboard - # is not a priority. Need to investigate later - # May be related: https://github.com/ORNL/flowcept/issues/49 - # docs = TaskQueryAPI().query({"workflow_id": wf_id}) - # assert len(docs) == 16 - + @unittest.skip( + "This test is useful only for developing. No need to run " "in CI" + ) def test_read_tensorboard_hparam_tuning(self): self.reset_log_dir() - self.test_run_tensorboard_hparam_tuning() + self.run_tensorboard_hparam_tuning() from tbparse import SummaryReader logdir = self.interceptor.settings.file_path diff --git a/tests/adapters/test_zambeze.py b/tests/adapters/test_zambeze.py index c4eb4197..ad43efcf 100644 --- a/tests/adapters/test_zambeze.py +++ b/tests/adapters/test_zambeze.py @@ -4,8 +4,10 @@ import pika from uuid import uuid4 +from pika.exceptions import AMQPConnectionError + from flowcept.commons.flowcept_logger import FlowceptLogger -from flowcept import ZambezeInterceptor, FlowceptConsumerAPI, TaskQueryAPI +from flowcept import ZambezeInterceptor, Flowcept, TaskQueryAPI from flowcept.flowceptor.adapters.zambeze.zambeze_dataclasses import ( ZambezeMessage, ) @@ -17,20 +19,35 @@ def __init__(self, *args, **kwargs): super(TestZambeze, self).__init__(*args, **kwargs) self.logger = FlowceptLogger() interceptor = ZambezeInterceptor() - self.consumer = FlowceptConsumerAPI(interceptor) - self._connection = pika.BlockingConnection( - pika.ConnectionParameters( - interceptor.settings.host, - interceptor.settings.port, + try: + self._connected = False + self._connection = pika.BlockingConnection( + pika.ConnectionParameters( + interceptor.settings.host, + interceptor.settings.port, + ) ) - ) + self._connected = self._connection.is_open + except AMQPConnectionError: + print("Failed to connect to RabbitMQ. Is it running?") + return + except Exception as e: + print(f"An error occurred: {e}") + return + + self.consumer = Flowcept(interceptor) self._channel = self._connection.channel() self._queue_names = interceptor.settings.queue_names - self._channel.queue_declare(queue=self._queue_names[0]) self.consumer.start() def test_send_message(self): + if not self._connected: + self.logger.warning( + "RabbitMQ was not found. Skipping this " "Zambeze test." + ) + assert True + return another_act_id = str(uuid4()) act_id = str(uuid4()) msg = ZambezeMessage( diff --git a/tests/api/dbapi_test.py b/tests/api/dbapi_test.py index 588e93ff..93d4a366 100644 --- a/tests/api/dbapi_test.py +++ b/tests/api/dbapi_test.py @@ -2,10 +2,7 @@ from uuid import uuid4 from flowcept.commons.flowcept_dataclasses.task_object import TaskObject -from flowcept.commons.flowcept_dataclasses.workflow_object import ( - WorkflowObject, -) -from flowcept.flowcept_api.db_api import DBAPI +from flowcept import Flowcept, WorkflowObject from flowcept.flowceptor.telemetry_capture import TelemetryCapture @@ -20,17 +17,16 @@ def __str__(self): class WorkflowDBTest(unittest.TestCase): def test_wf_dao(self): - dbapi = DBAPI() workflow1_id = str(uuid4()) wf1 = WorkflowObject() wf1.workflow_id = workflow1_id - assert dbapi.insert_or_update_workflow(wf1) + assert Flowcept.db.insert_or_update_workflow(wf1) wf1.custom_metadata = {"test": "abc"} - assert dbapi.insert_or_update_workflow(wf1) + assert Flowcept.db.insert_or_update_workflow(wf1) - wf_obj = dbapi.get_workflow(workflow_id=workflow1_id) + wf_obj = Flowcept.db.get_workflow(workflow_id=workflow1_id) assert wf_obj is not None print(wf_obj) @@ -41,56 +37,56 @@ def test_wf_dao(self): wf2.workflow_id = wf2_id tel = TelemetryCapture() - assert dbapi.insert_or_update_workflow(wf2) + assert Flowcept.db.insert_or_update_workflow(wf2) wf2.interceptor_ids = ["123"] - assert dbapi.insert_or_update_workflow(wf2) + assert Flowcept.db.insert_or_update_workflow(wf2) wf2.interceptor_ids = ["1234"] - assert dbapi.insert_or_update_workflow(wf2) - wf_obj = dbapi.get_workflow(wf2_id) + assert Flowcept.db.insert_or_update_workflow(wf2) + wf_obj = Flowcept.db.get_workflow(wf2_id) assert len(wf_obj.interceptor_ids) == 2 wf2.machine_info = {"123": tel.capture_machine_info()} - assert dbapi.insert_or_update_workflow(wf2) - wf_obj = dbapi.get_workflow(wf2_id) + assert Flowcept.db.insert_or_update_workflow(wf2) + wf_obj = Flowcept.db.get_workflow(wf2_id) assert wf_obj wf2.machine_info = {"1234": tel.capture_machine_info()} - assert dbapi.insert_or_update_workflow(wf2) - wf_obj = dbapi.get_workflow(wf2_id) + assert Flowcept.db.insert_or_update_workflow(wf2) + wf_obj = Flowcept.db.get_workflow(wf2_id) assert len(wf_obj.machine_info) == 2 def test_save_blob(self): - dbapi = DBAPI() import pickle obj = pickle.dumps(OurObject()) - obj_id = dbapi.save_object(object=obj) + obj_id = Flowcept.db.save_object(object=obj) print(obj_id) - obj_docs = dbapi.query(filter={"object_id": obj_id}, type="object") + obj_docs = Flowcept.db.query( + filter={"object_id": obj_id}, type="object" + ) loaded_obj = pickle.loads(obj_docs[0]["data"]) assert type(loaded_obj) == OurObject def test_dump(self): - dbapi = DBAPI() wf_id = str(uuid4()) - c0 = dbapi._dao.count() + c0 = Flowcept.db._dao.count() for i in range(10): t = TaskObject() t.workflow_id = wf_id t.task_id = str(uuid4()) - dbapi.insert_or_update_task(t) + Flowcept.db.insert_or_update_task(t) _filter = {"workflow_id": wf_id} - assert dbapi.dump_to_file( + assert Flowcept.db.dump_to_file( filter=_filter, ) - assert dbapi.dump_to_file(filter=_filter, should_zip=True) - assert dbapi.dump_to_file( + assert Flowcept.db.dump_to_file(filter=_filter, should_zip=True) + assert Flowcept.db.dump_to_file( filter=_filter, output_file="dump_test.json" ) - dbapi._dao.delete_with_filter(_filter) - c1 = dbapi._dao.count() + Flowcept.db._dao.delete_with_filter(_filter) + c1 = Flowcept.db._dao.count() assert c0 == c1 diff --git a/tests/api/flowcept_api_test.py b/tests/api/flowcept_api_test.py index c8d9b375..fde9a203 100644 --- a/tests/api/flowcept_api_test.py +++ b/tests/api/flowcept_api_test.py @@ -1,49 +1,53 @@ import unittest -from time import sleep -from uuid import uuid4 -from flowcept import FlowceptConsumerAPI -from flowcept.commons.flowcept_dataclasses.workflow_object import ( - WorkflowObject, +from flowcept import ( + Flowcept, + flowcept_task, ) from flowcept.commons.utils import assert_by_querying_tasks_until -from flowcept.flowcept_api.db_api import DBAPI -from flowcept.instrumentation.decorators.flowcept_task import flowcept_task @flowcept_task -def sum_one(n, workflow_id=None): - sleep(0.1) +def sum_one(n): return n + 1 @flowcept_task -def mult_two(n, workflow_id=None): +def mult_two(n): return n * 2 class FlowceptAPITest(unittest.TestCase): def test_simple_workflow(self): - db = DBAPI() - assert FlowceptConsumerAPI.services_alive() + assert Flowcept.services_alive() - wf_id = str(uuid4()) - with FlowceptConsumerAPI(FlowceptConsumerAPI.INSTRUMENTATION): - # The next line is optional - db.insert_or_update_workflow(WorkflowObject(workflow_id=wf_id)) + with Flowcept(workflow_name="test_workflow"): n = 3 - o1 = sum_one(n, workflow_id=wf_id) - o2 = mult_two(o1, workflow_id=wf_id) + o1 = sum_one(n) + o2 = mult_two(o1) print(o2) assert assert_by_querying_tasks_until( - {"workflow_id": wf_id}, + {"workflow_id": Flowcept.current_workflow_id}, condition_to_evaluate=lambda docs: len(docs) == 2, ) - print("workflow_id", wf_id) + print("workflow_id", Flowcept.current_workflow_id) - assert len(db.query(filter={"workflow_id": wf_id})) == 2 assert ( - len(db.query(type="workflow", filter={"workflow_id": wf_id})) == 1 + len( + Flowcept.db.query( + filter={"workflow_id": Flowcept.current_workflow_id} + ) + ) + == 2 + ) + assert ( + len( + Flowcept.db.query( + type="workflow", + filter={"workflow_id": Flowcept.current_workflow_id}, + ) + ) + == 1 ) diff --git a/tests/decorator_tests/flowcept_task_decorator_test.py b/tests/decorator_tests/flowcept_task_decorator_test.py index 56c98dd1..9e8d0022 100644 --- a/tests/decorator_tests/flowcept_task_decorator_test.py +++ b/tests/decorator_tests/flowcept_task_decorator_test.py @@ -11,7 +11,7 @@ import flowcept.commons import flowcept.instrumentation.decorators -from flowcept import FlowceptConsumerAPI +from flowcept import Flowcept import unittest @@ -147,7 +147,7 @@ def decorated_function_with_self(self, x, workflow_id=None): def test_decorated_function(self): workflow_id = str(uuid.uuid4()) # TODO :refactor-base-interceptor: - with FlowceptConsumerAPI(FlowceptConsumerAPI.INSTRUMENTATION): + with Flowcept(): self.decorated_function_with_self(x=0.1, workflow_id=workflow_id) decorated_static_function( df=pd.DataFrame(), workflow_id=workflow_id @@ -169,10 +169,7 @@ def test_decorated_function_simple( workflow_id = str(uuid.uuid4()) print(workflow_id) # TODO :refactor-base-interceptor: - consumer = FlowceptConsumerAPI( - interceptors=FlowceptConsumerAPI.INSTRUMENTATION, - start_doc_inserter=start_doc_inserter, - ) + consumer = Flowcept(start_doc_inserter=start_doc_inserter) consumer.start() t0 = time() for i in range(max_tasks): diff --git a/tests/decorator_tests/ml_tests/dl_trainer.py b/tests/decorator_tests/ml_tests/dl_trainer.py index ade6bee6..050fe9e3 100644 --- a/tests/decorator_tests/ml_tests/dl_trainer.py +++ b/tests/decorator_tests/ml_tests/dl_trainer.py @@ -7,7 +7,7 @@ from flowcept import ( - FlowceptConsumerAPI, + Flowcept, ) from flowcept.instrumentation.decorators.flowcept_torch import ( register_modules, @@ -186,8 +186,7 @@ def model_fit( # We are calling the consumer api here (sometimes for the second time) # because we are capturing at two levels: at the model.fit and at # every layer. Can we do it better? - with FlowceptConsumerAPI( - FlowceptConsumerAPI.INSTRUMENTATION, + with Flowcept( bundle_exec_id=workflow_id, start_doc_inserter=False, ): diff --git a/tests/decorator_tests/ml_tests/llm_tests/decorator_dask_llm_test.py b/tests/decorator_tests/ml_tests/llm_tests/decorator_dask_llm_test.py index b077b824..db4b6f63 100644 --- a/tests/decorator_tests/ml_tests/llm_tests/decorator_dask_llm_test.py +++ b/tests/decorator_tests/ml_tests/llm_tests/decorator_dask_llm_test.py @@ -1,12 +1,8 @@ import unittest - +import itertools import uuid -from dask.distributed import Client - -from cluster_experiment_utils.utils import generate_configs - -from flowcept import FlowceptConsumerAPI, WorkflowObject, DBAPI +from flowcept import WorkflowObject, Flowcept from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.flowceptor.adapters.dask.dask_plugins import ( @@ -17,13 +13,65 @@ close_dask, ) -from tests.adapters.test_dask import TestDask from tests.decorator_tests.ml_tests.llm_tests.llm_trainer import ( get_wiki_text, model_train, ) +def _interpolate_values(start, end, step): + return [start + i * step for i in range((end - start) // step + 1)] + + +def generate_configs(params): + param_names = list(params.keys()) + param_values = [] + + for param_name in param_names: + param_data = params[param_name] + + if isinstance(param_data, dict): + init_value = param_data["init"] + end_value = param_data["end"] + step_value = param_data.get("step", 1) + + if isinstance(init_value, (int, float)): + param_values.append( + [ + round(val / 10, 1) + for val in range( + int(init_value * 10), + int((end_value + step_value) * 10), + int(step_value * 10), + ) + ] + ) + elif isinstance(init_value, list) and all( + isinstance(v, (int, float)) for v in init_value + ): + interpolated_values = _interpolate_values( + init_value[0], end_value[0], step_value + ) + param_values.append( + [ + (val, val + init_value[1] - init_value[0]) + for val in interpolated_values + ] + ) + + elif isinstance(param_data, list): + param_values.append(param_data) + + configs = list(itertools.product(*param_values)) + + result = [] + for config_values in configs: + config = dict(zip(param_names, config_values)) + result.append(config) + + return result + + class DecoratorDaskLLMTests(unittest.TestCase): def __init__(self, *args, **kwargs): super(DecoratorDaskLLMTests, self).__init__(*args, **kwargs) @@ -32,7 +80,6 @@ def __init__(self, *args, **kwargs): def test_llm(self): # Manually registering the DataPrep workflow (manual instrumentation) tokenizer = "toktok" # basic_english, moses, toktok - db_api = DBAPI() dataset_prep_wf = WorkflowObject() dataset_prep_wf.workflow_id = f"prep_wikitext_tokenizer_{tokenizer}" dataset_prep_wf.used = {"tokenizer": tokenizer} @@ -46,7 +93,7 @@ def test_llm(self): "test_data": id(test_data), } print(dataset_prep_wf) - db_api.insert_or_update_workflow(dataset_prep_wf) + Flowcept.db.insert_or_update_workflow(dataset_prep_wf) # Automatically registering the Dask workflow train_wf_id = str(uuid.uuid4()) diff --git a/tests/decorator_tests/ml_tests/llm_tests/llm_trainer.py b/tests/decorator_tests/ml_tests/llm_tests/llm_trainer.py index 7af1fceb..80b8433a 100644 --- a/tests/decorator_tests/ml_tests/llm_tests/llm_trainer.py +++ b/tests/decorator_tests/ml_tests/llm_tests/llm_trainer.py @@ -11,7 +11,7 @@ from datasets import load_dataset import flowcept -from flowcept import FlowceptConsumerAPI +from flowcept import Flowcept from flowcept.configs import N_GPUS from flowcept.instrumentation.decorators.flowcept_torch import ( @@ -280,8 +280,7 @@ def model_train( # TODO :ml-refactor: save device type and random seed: https://pytorch.org/docs/stable/notes/randomness.html # TODO :base-interceptor-refactor: Can we do it better? - with FlowceptConsumerAPI( - FlowceptConsumerAPI.INSTRUMENTATION, + with Flowcept( bundle_exec_id=workflow_id, start_doc_inserter=False, ): diff --git a/tests/decorator_tests/ml_tests/ml_decorator_test.py b/tests/decorator_tests/ml_tests/ml_decorator_test.py index fb0c3869..e3155643 100644 --- a/tests/decorator_tests/ml_tests/ml_decorator_test.py +++ b/tests/decorator_tests/ml_tests/ml_decorator_test.py @@ -2,7 +2,7 @@ import unittest -from flowcept import DBAPI +from flowcept import Flowcept from tests.decorator_tests.ml_tests.dl_trainer import ModelTrainer, TestNet @@ -32,7 +32,7 @@ def test_cnn_model_trainer(): c.pop("workflow_id") loaded_model = TestNet(**c) - loaded_model = DBAPI().load_torch_model( + loaded_model = Flowcept.db.load_torch_model( loaded_model, result["object_id"] ) assert len(loaded_model(result["test_data"]))