Skip to content

Commit

Permalink
Merge pull request #166 from ORNL/examples2
Browse files Browse the repository at this point in the history
Adding dask, mlflow, tensorboard examples
  • Loading branch information
renan-souza authored Oct 31, 2024
2 parents 0eb7de0 + 9e7ff47 commit 981accb
Show file tree
Hide file tree
Showing 10 changed files with 274 additions and 50 deletions.
25 changes: 9 additions & 16 deletions .github/workflows/run-tests-kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@ jobs:
python-version: "3.10"
cache: "pip"

# - name: Copy settings file
# run: |
# mkdir ~/.flowcept
# cp resources/sample_settings.yaml ~/.flowcept
# mv ~/.flowcept/sample_settings.yaml ~/.flowcept/settings.yaml

- name: Install package and dependencies
run: |
python -m pip install --upgrade pip
Expand All @@ -51,14 +45,13 @@ jobs:
pytest --ignore=tests/decorator_tests/ml_tests/llm_tests
- name: Test notebooks
run: |
pip install -e .[all]
export MQ_TYPE=kafka
export MQ_PORT=9092
python -c 'from flowcept.configs import MQ_TYPE, MQ_PORT; print(f"MQ_TYPE={MQ_TYPE}"); print(f"MQ_PORT={MQ_PORT}")'
python -c 'from flowcept import Flowcept; assert Flowcept.services_alive()'
run: pytest --ignore=notebooks/zambeze.ipynb --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb

python src/flowcept/flowcept_webserver/app.py &
sleep 3
# export FLOWCEPT_SETTINGS_PATH=~/.flowcept/settings.yaml
pytest --ignore=notebooks/zambeze.ipynb --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb
# - name: Test notebooks
# run: |
# pip install -e .[all] # Installing stuff again may not be needed
# export MQ_TYPE=kafka
# export MQ_PORT=9092
# python -c 'from flowcept.configs import MQ_TYPE, MQ_PORT; print(f"MQ_TYPE={MQ_TYPE}"); print(f"MQ_PORT={MQ_PORT}")'
# python -c 'from flowcept import Flowcept; assert Flowcept.services_alive()'
# pytest --ignore=notebooks/zambeze.ipynb --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb
30 changes: 19 additions & 11 deletions .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@ jobs:
- name: Show OS Info
run: '[[ "$OSTYPE" == "linux-gnu"* ]] && { echo "OS Type: Linux"; (command -v lsb_release &> /dev/null && lsb_release -a) || cat /etc/os-release; uname -r; } || [[ "$OSTYPE" == "darwin"* ]] && { echo "OS Type: macOS"; sw_vers; uname -r; } || echo "Unsupported OS type: $OSTYPE"'

# - name: Copy settings file
# run: |
# mkdir ~/.flowcept
# cp resources/sample_settings.yaml ~/.flowcept
# mv ~/.flowcept/sample_settings.yaml ~/.flowcept/settings.yaml

- name: Start docker compose with redis
run: docker compose -f deployment/compose.yml up -d

Expand All @@ -37,6 +31,24 @@ jobs:
pip install .
python examples/instrumentation/simple_script.py
- name: Install Dask dependencies alone and run a simple Dask test
run: |
pip uninstall flowcept -y
pip install .[dask]
python examples/dask_example.py
- name: Install MLFlow dependencies alone and run a simple MLFlow test
run: |
pip uninstall flowcept -y
pip install .[mlflow]
python examples/mlflow_example.py
- name: Install Tensorboard dependencies alone and run a simple Tensorboard test
run: |
pip uninstall flowcept -y
pip install .[tensorboard]
python examples/tensorboard_example.py
- name: Install all dependencies
run: |
python -m pip install --upgrade pip
Expand All @@ -50,11 +62,7 @@ jobs:
pytest --ignore=tests/decorator_tests/ml_tests/llm_tests
- name: Test notebooks with pytest and redis
run: |
python src/flowcept/flowcept_webserver/app.py &
sleep 3
# export FLOWCEPT_SETTINGS_PATH=~/.flowcept/settings.yaml
pytest --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb
run: pytest --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb

- name: Shut down docker compose
run: docker compose -f deployment/compose.yml down
Expand Down
61 changes: 61 additions & 0 deletions examples/dask_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Make sure you run `pip install flowcept[dask]` first.
from dask.distributed import Client, LocalCluster

from flowcept import Flowcept, FlowceptDaskSchedulerAdapter, FlowceptDaskWorkerAdapter
from flowcept.flowceptor.adapters.dask.dask_plugins import register_dask_workflow


def add(x, y):
return x + y


def multiply(x, y):
return x * y


def sum_list(values):
return sum(values)


if __name__ == "__main__":
# Starting a local Dask cluster
cluster = LocalCluster(n_workers=1)
scheduler = cluster.scheduler
client = Client(scheduler.address)

# Registering Flowcept's worker and scheduler adapters
scheduler.add_plugin(FlowceptDaskSchedulerAdapter(scheduler))
client.register_plugin(FlowceptDaskWorkerAdapter())

# Registering a Dask workflow in Flowcept's database
wf_id = register_dask_workflow(client)
print(f"workflow_id={wf_id}")

# Start Flowcept's Dask observer
flowcept = Flowcept("dask").start()
t1 = client.submit(add, 1, 2)
t2 = client.submit(multiply, 3, 4)
t3 = client.submit(add, t1.result(), t2.result())
t4 = client.submit(sum_list, [t1, t2, t3])
result = t4.result()
print("Result:", result)

# Closing Dask and Flowcept
client.close()
cluster.close()
flowcept.stop()

# Querying Flowcept's database about this run
print(f"t1_key={t1.key}")
print("Getting first task only:")
task1 = Flowcept.db.query(filter={"task_id": t1.key})[0]
assert task1["workflow_id"] == wf_id
print(task1)
print("Getting all tasks from this workflow:")
all_tasks = Flowcept.db.query(filter={"workflow_id": wf_id})
assert len(all_tasks) == 4
print(all_tasks)
print("Getting workflow info:")
wf_info = Flowcept.db.query(filter={"workflow_id": wf_id}, type="workflow")[0]
assert wf_info["workflow_id"] == wf_id
print(wf_info)
5 changes: 2 additions & 3 deletions examples/instrumentation/simple_script.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from flowcept import Flowcept, flowcept_task


@flowcept_task
def sum_one(n):
return n + 1
Expand All @@ -10,11 +11,9 @@ def mult_two(n):
return n * 2


with Flowcept(workflow_name='test_workflow'):
with Flowcept(workflow_name="test_workflow"):
n = 3
o1 = sum_one(n)
o2 = mult_two(o1)
print(o2)

print(Flowcept.db.query(filter={"workflow_id": Flowcept.current_workflow_id}))

38 changes: 38 additions & 0 deletions examples/mlflow_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Make sure you run `pip install flowcept[mlflow]` first.
import os
import uuid
from time import sleep
import mlflow

from flowcept import MLFlowInterceptor, Flowcept


if __name__ == "__main__":
# Starting the interceptor
interceptor = MLFlowInterceptor()
print(f"SQLITE DB path: {interceptor.settings.file_path}")

# Clean up previous runs if they exist
if os.path.exists(interceptor.settings.file_path):
os.remove(interceptor.settings.file_path)
with open(interceptor.settings.file_path, "w") as f:
f.write("")
sleep(1)
mlflow.set_tracking_uri(f"sqlite:///{interceptor.settings.file_path}")
mlflow.delete_experiment(mlflow.create_experiment("starter"))
sleep(1)

# Starting the workflow
with Flowcept(interceptor):
experiment_name = "experiment_test"
experiment_id = mlflow.create_experiment(experiment_name + str(uuid.uuid4()))
with mlflow.start_run(experiment_id=experiment_id) as run:
mlflow.log_params({"param1": 1})
mlflow.log_params({"param2": 2})
mlflow.log_metric("metric1", 10)
run_id = run.info.run_uuid

run_data = interceptor.dao.get_run_data(run_id)
task = Flowcept.db.query(filter={"task_id": run.info.run_uuid})[0]
assert task["status"] == "FINISHED"
print(task)
141 changes: 141 additions & 0 deletions examples/tensorboard_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Make sure you run `pip install flowcept[tensorboard]` first.
import uuid
from time import sleep

from flowcept import TensorboardInterceptor, Flowcept


def run_tensorboard_hparam_tuning(logdir):
"""
Code based on
https://www.tensorflow.org/tensorboard/hyperparameter_tuning_with_hparams
:return:
"""
wf_id = str(uuid.uuid4())
import tensorflow as tf
from tensorboard.plugins.hparams import api as hp

fashion_mnist = tf.keras.datasets.fashion_mnist

(x_train, y_train), (x_test, y_test) = fashion_mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

HP_NUM_UNITS = hp.HParam("num_units", hp.Discrete([16, 32]))
HP_DROPOUT = hp.HParam("dropout", hp.RealInterval(0.1, 0.2))
HP_OPTIMIZER = hp.HParam("optimizer", hp.Discrete(["adam", "sgd"]))
# HP_BATCHSIZES = hp.HParam("batch_size", hp.Discrete([32, 64]))
HP_BATCHSIZES = hp.HParam("batch_size", hp.Discrete([32, 64]))

HP_MODEL_CONFIG = hp.HParam("model_config")
HP_OPTIMIZER_CONFIG = hp.HParam("optimizer_config")

METRIC_ACCURACY = "accuracy"

with tf.summary.create_file_writer(logdir).as_default():
hp.hparams_config(
hparams=[
HP_NUM_UNITS,
HP_DROPOUT,
HP_OPTIMIZER,
HP_BATCHSIZES,
HP_MODEL_CONFIG,
HP_OPTIMIZER_CONFIG,
],
metrics=[hp.Metric(METRIC_ACCURACY, display_name="Accuracy")],
)

def train_test_model(hparams, logdir):
model = tf.keras.models.Sequential(
[
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(hparams[HP_NUM_UNITS], activation=tf.nn.relu),
tf.keras.layers.Dropout(hparams[HP_DROPOUT]),
tf.keras.layers.Dense(10, activation=tf.nn.softmax),
]
)
model.compile(
optimizer=hparams[HP_OPTIMIZER],
loss="sparse_categorical_crossentropy",
metrics=["accuracy"],
)

model.fit(
x_train,
y_train,
epochs=1,
callbacks=[
tf.keras.callbacks.TensorBoard(logdir),
# log metrics
hp.KerasCallback(logdir, hparams), # log hparams
],
batch_size=hparams[HP_BATCHSIZES],
) # Run with 1 epoch to speed things up for tests
_, accuracy = model.evaluate(x_test, y_test)
return accuracy

def run(run_dir, hparams):
with tf.summary.create_file_writer(run_dir).as_default():
hp.hparams(hparams) # record the values used in this trial
accuracy = train_test_model(hparams, logdir)
tf.summary.scalar(METRIC_ACCURACY, accuracy, step=1)

session_num = 0

for num_units in HP_NUM_UNITS.domain.values:
for dropout_rate in (
HP_DROPOUT.domain.min_value,
HP_DROPOUT.domain.max_value,
):
for optimizer in HP_OPTIMIZER.domain.values:
for batch_size in HP_BATCHSIZES.domain.values:
# These two added ids below are optional and useful
# just to contextualize this run.
hparams = {
"workflow_id": wf_id,
"activity_id": "hyperparam_evaluation",
HP_NUM_UNITS: num_units,
HP_DROPOUT: dropout_rate,
HP_OPTIMIZER: optimizer,
HP_BATCHSIZES: batch_size,
}
run_name = f"wf_id_{wf_id}_{session_num}"
print("--- Starting trial: %s" % run_name)
print(f"{hparams}")
run(f"{logdir}/" + run_name, hparams)
session_num += 1

return wf_id


def reset_tensorboard_dir(logdir, watch_interval_sec):
import os
import shutil

if os.path.exists(logdir):
print("Path exists, going to delete")
shutil.rmtree(logdir)
sleep(1)
os.mkdir(logdir)
print("Tensorboard directory exists? " + str(os.path.exists(logdir)))
print(f"Waiting {watch_interval_sec} seconds after directory reset.")
sleep(watch_interval_sec)


if __name__ == "__main__":
# Starting the interceptor
interceptor = TensorboardInterceptor()
logdir = interceptor.settings.file_path
print(f"Tensorboard dir: {logdir}")

reset_tensorboard_dir(logdir, 10)

with Flowcept(interceptor):
wf_id = run_tensorboard_hparam_tuning(logdir)
wait_time = 10
print(f"Done training. Waiting {wait_time} seconds.")
sleep(wait_time)

tasks = Flowcept.db.query(filter={"workflow_id": wf_id})
assert len(tasks) == 16
print(tasks)

3 changes: 0 additions & 3 deletions src/flowcept/commons/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
"""Commons subpackage."""

from flowcept.commons.flowcept_logger import FlowceptLogger
from flowcept.commons.utils import get_adapter_exception_msg

logger = FlowceptLogger()

__all__ = ["get_adapter_exception_msg"]
13 changes: 0 additions & 13 deletions src/flowcept/commons/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from flowcept import configs
from flowcept.configs import (
PERF_LOG,
SETTINGS_PATH,
)
from flowcept.commons.flowcept_dataclasses.task_object import Status

Expand Down Expand Up @@ -63,18 +62,6 @@ def get_status_from_str(status_str: str) -> Status:
return Status.UNKNOWN


def get_adapter_exception_msg(adapter_kind):
"""Get the adapter."""
return (
f"You have an adapter for {adapter_kind} in"
f" {SETTINGS_PATH} but we couldn't import its interceptor."
f" Consider fixing the following exception (e.g., try installing the"
f" adapter requirements -- see the README file remove that adapter"
f" from the settings."
f" Exception:"
)


def assert_by_querying_tasks_until(
filter,
condition_to_evaluate: Callable = None,
Expand Down
6 changes: 3 additions & 3 deletions src/flowcept/version.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Version module."""

# WARNING: CHANGE THIS FILE MANUALLY ONLY TO RESOLVE CONFLICTS!
# This file is supposed to be automatically modified by the CI Bot.
# WARNING: CHANGE THIS FILE MANUALLY ONLY TO RESOLVE CONFLICTS OR TO UPDATE Major or Minor versions.
# The expected format is: <Major>.<Minor>.<Patch>
# This file is supposed to be automatically modified by the CI Bot.
# See .github/workflows/version_bumper.py
__version__ = "0.6.8"
__version__ = "0.6.9"
Loading

0 comments on commit 981accb

Please sign in to comment.