Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev #67

Merged
merged 48 commits into from
Jun 27, 2023
Merged

Dev #67

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
153711b
Moving conf env vars to settings.yaml
renan-souza Mar 30, 2023
eb5a0cd
change resource/settings.yaml
renan-souza Mar 30, 2023
b3c16fa
Small fixes after running on Summit
renan-souza Apr 13, 2023
d604bf4
Checkpoint to run dask deepmd
renan-souza Apr 21, 2023
c86c52d
Checkpoint to run dask
renan-souza Apr 21, 2023
1ca9374
Merge branch 'dev' of github.com:ORNL/flowcept into dev
renan-souza Apr 26, 2023
6f207a2
Checkpoint for extra metadata
renan-souza May 4, 2023
01baced
checkpoint from summit
May 4, 2023
a176ca1
Checkpoint after merge
renan-souza May 4, 2023
ae417f6
Update for more flexibility to mlflow capture
renan-souza May 8, 2023
e14a8fe
Improving exception handling in mlflow interception
renan-souza May 8, 2023
cd0e3ae
checkpoint mlflow
renan-souza May 8, 2023
b7bc0b2
Updates
renan-souza May 10, 2023
62fb266
Merge branch 'dev' of github.com:ORNL/flowcept into dev
renan-souza May 10, 2023
4271587
Moving dask_scheduler_setup to resources
renan-souza May 10, 2023
e85f7c7
Major changes to buffer Redis insertions
renan-souza May 15, 2023
6330f5e
Adding while to doc inserter
renan-souza May 15, 2023
cd6d843
changes in db inserter to avoid statuses inconsistencies
renan-souza May 15, 2023
b0a0cb6
Improving logging
renan-souza May 15, 2023
610374e
debugging doc upsert
renan-souza May 16, 2023
b378650
debugging doc insert
renan-souza May 16, 2023
eafdeb2
Adding locks to flush operations
renan-souza May 16, 2023
4919c52
Debug msgs
renan-souza May 16, 2023
0be17af
adding sleep time
renan-souza May 16, 2023
d6deefa
adding long sleep time in stop. Worked with small case
renan-souza May 16, 2023
2beb5f0
removing debug msg
renan-souza May 16, 2023
3c0d9a5
Fixing sleep time for consumer.stopg
renan-souza May 16, 2023
8284296
https://github.com/ORNL/flowcept/issues/66
renan-souza May 16, 2023
19591ba
https://github.com/ORNL/flowcept/issues/66
renan-souza May 16, 2023
7edb41f
Adding perf logging + task_id index
renan-souza May 16, 2023
75de307
Improve logging
renan-souza May 16, 2023
accaf44
hostname to logging
renan-souza May 16, 2023
643f1b8
fix to perf logging
renan-souza May 16, 2023
9e976ad
Adding submission_time to dask interceptor and time at insertion to m…
renan-souza May 16, 2023
1879ce3
adding adaptive buffer in mongo insertion
renan-souza May 16, 2023
cd3f906
Improving adaptive buffer size
renan-souza May 17, 2023
70c03e2
adding if len check into lock() block
renan-souza May 17, 2023
844bfe1
Major changes to allow a graceful stop without too much sleep time
renan-souza May 17, 2023
0e54ab3
Minor fixes for better terminology
renan-souza Jun 26, 2023
8381233
code format
renan-souza Jun 26, 2023
6e3e8c3
Enhancing MD files
renan-souza Jun 26, 2023
9614ebc
running tests
renan-souza Jun 26, 2023
bdf38b3
Fixing pip install
renan-souza Jun 26, 2023
ec6e64d
Changing tests
renan-souza Jun 26, 2023
f358c36
Notebooks test on hold
renan-souza Jun 27, 2023
c054c95
Testing notebooks
renan-souza Jun 27, 2023
fc8104e
Fixing MLFLow notebook
renan-souza Jun 27, 2023
d4c6eb0
Merge branch 'main' into dev
renan-souza Jun 27, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/create-release-n-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,8 @@ jobs:
with:
password: ${{ secrets.PYPI_API_TOKEN }}
verbose: true
- name: Test pip install
run: pip install flowcept
- name: Test pip install full
run: pip install flowcept[full]

10 changes: 5 additions & 5 deletions .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: Unit, integration, and notebook tests
#on: [push, pull_request]
on: [pull_request]
on: [push, pull_request]
#on: [pull_request]

jobs:

Expand All @@ -18,15 +18,15 @@ jobs:
- name: Install our dependencies
run: |
python -m pip install --upgrade pip
pip install -e .
pip install -e .[full]
pip install -r extra_requirements/dev-requirements.txt
- name: Run Docker Compose
run: docker compose -f deployment/compose.yml up -d
- name: Test with pytest
run: |
pytest
pytest --ignore=tests/plugins/test_tensorboard.py
- name: Test notebooks
run: |
python flowcept/flowcept_webserver/app.py &
sleep 3
pytest --nbmake "notebooks/"
pytest --nbmake "notebooks/" --ignore=notebooks/dask_from_CLI.ipynb
4 changes: 2 additions & 2 deletions .github/workflows/test-python-310.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ jobs:
- name: Install our dependencies
run: |
python -m pip install --upgrade pip
pip install -e .
pip install -e .[full]
pip install -r extra_requirements/dev-requirements.txt
- name: Run Docker Compose
run: docker compose -f deployment/compose.yml up -d
- name: Test with pytest
run: |
pytest
pytest --ignore=tests/plugins/dont_run_test_tensorboard.py
- name: Test notebooks
run: |
python flowcept/flowcept_webserver/app.py &
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test-python-39.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ jobs:
- name: Install our dependencies
run: |
python -m pip install --upgrade pip
pip install -e .
pip install -e .[full]
pip install -r extra_requirements/dev-requirements.txt
- name: Run Docker Compose
run: docker compose -f deployment/compose.yml up -d
- name: Test with pytest
run: |
pytest
pytest --ignore=tests/plugins/dont_run_test_tensorboard.py
- name: Test notebooks
run: |
python flowcept/flowcept_webserver/app.py &
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ tensorboard_events/
**/*.DS_Store*
**/*.log*
notebooks/.ipynb_checkpoints
**/*ipynb_checkpoints*
notebooks/tb_*
notebooks/scheduler_file.json
9 changes: 7 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ $ flake8 .
All commits to the `main` branch will launch the [automated publish and release GitHub Action](.github/workflows/create-release-n-publish.yml).
This will create a [tagged release](https://github.com/ORNL/flowcept/releases) and publish the package to [pypi](https://pypi.org/project/flowcept).

# Checklist for Creating a new Flowcept Plugin
# Checklist for Creating a new FlowCept adapter

1. Create a new package directory under `flowcept/flowceptor/plugins`
2. Create a new class that inherits from `BaseInterceptor`, and consider implementing the abstract methods:
Expand All @@ -51,13 +51,18 @@ This will create a [tagged release](https://github.com/ORNL/flowcept/releases) a
- Prepare_task_msg

See the existing plugins for a reference.

3. [Optional] You may need extra classes, such as
local state manager (we provide a generic [`Interceptor State Manager`](flowcept/flowceptor/plugins/interceptor_state_manager.py)),
`@dataclasses`, Data Access Objects (`DAOs`), and event handlers.
`@dataclasses`, Data Access Objects (`DAOs`), and event handlers.

4. Create a new entry in the [settings.yaml](resources/settings.yaml) file and in the [Settings factory](flowcept/flowceptor/plugins/settings_factory.py)

5. Create a new `requirements.txt` file under the directory [extra_requirements](extra_requirements) and
adjust the [setup.py](setup.py).

6. [Optional] Add a new constant to [vocabulary.py](flowcept/commons/vocabulary.py).

7. [Optional] Ajust flowcept.__init__.py.


Expand Down
90 changes: 52 additions & 38 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,62 +5,71 @@
[![License: MIT](https://img.shields.io/github/license/ORNL/flowcept)](LICENSE)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)

# Flowcept
# FlowCept

## Development Environment
FlowCept is a system for runtime data integration of data processed by multiple workflows, allowing
users (scientists, engineers) to understand, at runtime, complex, heterogeneous, large-scale data coming from various sources.

Read the [Contributing](CONTRIBUTING.md) file.
FlowCept is intended to address scenarios where multiple workflows in a science campaign or in an enterprise run and generate
important data to be analyzed in an integrated manner. Since these workflows may use different data generation tools or can be executed within
different parallel computing systems (e.g., Dask, Spark, workflow management systems), its key differentiator is the
capability to seamless integrate data from various sources. By using provenance data management techniques,
it builds an integrated data view at runtime of these multi-workflow data following
[W3C PROV](https://www.w3.org/TR/prov-overview/) recommendations for its data schema.
By using data observability, it does not require changes in user codes
or systems (i.e., instrumentation).
All users need to do is to create adapters for their systems or tools, if one is not available yet.

### Code Formatting
Currently, FlowCept provides adapters for: [Dask](https://www.dask.org/), [MLFlow](https://mlflow.org/), [TensorBoard](https://www.tensorflow.org/tensorboard), and [Zambeze](https://github.com/ORNL/zambeze).

Flowcept code uses [Black](https://github.com/psf/black), a PEP 8 compliant code formatter, and
[Flake8](https://github.com/pycqa/flake8), a code style guide enforcement tool. To install the
these tools you simply need to run the following:
See the [Jupyter Notebooks](notebooks) for utilization examples.

```bash
$ pip install flake8 black
```
See the [Contributing](CONTRIBUTING.md) file for guidelines to contribute with new adapters. Note that we may use the
term 'plugin' in the codebase as a synonym to adapter. Future releases should standardize the terminology to use adapter.

Before _every commit_, you should run the following:

```bash
$ black .
$ flake8 .
```
## Install and Setup:

If errors are reported by `flake8`, please fix them before commiting the code.
1. Install FlowCept:

### Running Tests
`pip install .[full]` in this directory (or `pip install flowcept[full]`).

There are a few dependencies that need to be installed to run the pytest, if you installed the requirements.txt file then this should be covered as well.
```bash
$ pip install pytest
```
For convenience, this will install all dependencies for all adapters. But it can install
dependencies for adapters you will not use. For this reason, you may want to install
like this: `pip install .[adapter_key1,adapter_key2]` for the adapters we have implemented, e.g., `pip install .[dask]`.
See [extra_requirements](extra_requirements) if you want to install the dependencies individually.

2. Start MongoDB and Redis:

From the root directory using pytest we can run:
To enable the full advantages of FlowCept, the user needs to run Redis, as FlowCept's message queue system, and MongoDB, as FlowCept's main database system.
The easiest way to start Redis and MongoDB is by using the [docker-compose file](deployment/compose.yml) for its dependent services:
MongoDB and Redis. You only need RabbitMQ if you want to observe Zambeze messages as well.

```bash
$ pytest
```
3. Define the settings (e.g., routes and ports) accordingly in the [settings.yaml](resources/settings.yaml) file.

## Redis Server for the Interception Messages
```bash
$ docker run -p 6379:6379 --name flowcept_redis -d redis
```
4. Start the observation using the Controller API, as shown in the [Jupyter Notebooks](notebooks).

## Redis Server for the local cache
```bash
$ docker run -p 60379:6379 --name local_interceptor_cache -d redis
```
5. To use FlowCept's Query API, you need to start the flask webserver:
`python flowcept/flowcept_webserver/app.py`. Query API utilization examples are
available in the notebooks.


## Performance Tuning for Performance Evaluation

In the settings.yaml file, the following variables might impact interception performance:

## MongoDB
```
$ docker run --name mongo -d -p 27017:27017 mongo
```yaml
main_redis:
buffer_size: 50
insertion_buffer_time_secs: 5

plugin:
enrich_messages: false
```

And other variables depending on the Plugin. For instance, in Dask, timestamp creation by workers add interception overhead.


# Plugins-specific info

You can run `pip install flowcept[plugin_name]` to install requirements for a specific plugin, instead of installing the
Expand All @@ -77,7 +86,12 @@ If you're on mac, `pip install` may not work out of the box because of Tensorflo
You may need to `pip install tensorflow-macos` instead of the `tensorflow` lib available in the tensorboard-requirements.


## See also

- [Zambeze Repository](https://github.com/ORNL/zambeze)

# See also
## Acknowledgement

- [Zambeze Repository](https://github.com/ORNL/zambeze)
This research uses resources of the Oak Ridge Leadership Computing Facility
at the Oak Ridge National Laboratory, which is supported by the Office of
Science of the U.S. Department of Energy under Contract No. DE-AC05-00OR22725.
Empty file removed deployment/Dockerfile
Empty file.
12 changes: 7 additions & 5 deletions deployment/compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ services:
ports:
- 27017:27017

local_interceptor_cache:
container_name: local_interceptor_cache
image: redis
ports:
- 60379:6379
# This is just for the cases where one does not want to use the same Redis instance for caching and messaging, but
# it's not required to have separate instances.
# local_interceptor_cache:
# container_name: local_interceptor_cache
# image: redis
# ports:
# - 60379:6379

zambeze_rabbitmq:
container_name: zambeze_rabbitmq
Expand Down
45 changes: 30 additions & 15 deletions flowcept/__init__.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,36 @@
import logging

from flowcept.configs import PROJECT_NAME
from flowcept.version import __version__

from flowcept.flowcept_api.consumer_api import FlowceptConsumerAPI
from flowcept.flowcept_api.task_query_api import TaskQueryAPI

from flowcept.flowceptor.plugins.zambeze.zambeze_interceptor import (
ZambezeInterceptor,
)
from flowcept.flowceptor.plugins.tensorboard.tensorboard_interceptor import (
TensorboardInterceptor,
)
from flowcept.flowceptor.plugins.mlflow.mlflow_interceptor import (
MLFlowInterceptor,
)
from flowcept.flowceptor.plugins.dask.dask_plugins import (
FlowceptDaskSchedulerPlugin,
FlowceptDaskWorkerPlugin,
)
# TODO: Redo these try/excepts in a better way
try:
from flowcept.flowceptor.plugins.zambeze.zambeze_interceptor import (
ZambezeInterceptor,
)
except:
print("Could not import Zambeze Interceptor")
pass

try:
from flowcept.flowceptor.plugins.tensorboard.tensorboard_interceptor import (
TensorboardInterceptor,
)
except:
print("Could not import TensorBoard interceptor.")

try:
from flowcept.flowceptor.plugins.mlflow.mlflow_interceptor import (
MLFlowInterceptor,
)
except:
print("Could not import MLFlow Interceptor")

try:
from flowcept.flowceptor.plugins.dask.dask_plugins import (
FlowceptDaskSchedulerPlugin,
FlowceptDaskWorkerPlugin,
)
except:
print("Could not import Dask interceptor")
21 changes: 18 additions & 3 deletions flowcept/commons/daos/document_db_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@
from pymongo import MongoClient, UpdateOne

from flowcept.commons.flowcept_logger import FlowceptLogger
from flowcept.commons.flowcept_data_classes import TaskMessage
from flowcept.commons.utils import perf_log
from flowcept.configs import (
MONGO_HOST,
MONGO_PORT,
MONGO_DB,
MONGO_COLLECTION,
PERF_LOG,
)
from flowcept.flowceptor.consumers.consumer_utils import (
curate_dict_task_messages,
)
from time import time


class DocumentDBDao(object):
Expand All @@ -20,6 +24,7 @@ def __init__(self):
client = MongoClient(MONGO_HOST, MONGO_PORT)
db = client[MONGO_DB]
self._collection = db[MONGO_COLLECTION]
self._collection.create_index(TaskMessage.get_index_field())

def find(
self,
Expand Down Expand Up @@ -63,19 +68,29 @@ def insert_and_update_many(
self, indexing_key, doc_list: List[Dict]
) -> bool:
try:
indexed_buffer = curate_dict_task_messages(doc_list, indexing_key)
if len(doc_list) == 0:
return False
t0 = 0
if PERF_LOG:
t0 = time()
indexed_buffer = curate_dict_task_messages(
doc_list, indexing_key, t0
)
t1 = perf_log("doc_curate_dict_task_messages", t0)
if len(indexed_buffer) == 0:
return False
requests = []
for indexing_key_value in indexed_buffer:
if "finished" in indexed_buffer[indexing_key_value]:
indexed_buffer[indexing_key_value].pop("finished")
requests.append(
UpdateOne(
filter={indexing_key: indexing_key_value},
update=[{"$set": indexed_buffer[indexing_key_value]}],
upsert=True,
)
)
t2 = perf_log("indexing_buffer", t1)
self._collection.bulk_write(requests)
perf_log("bulk_write", t2)
return True
except Exception as e:
self.logger.exception(e)
Expand Down
Loading