Skip to content

Commit

Permalink
Merge pull request #160 from ORNL/dev
Browse files Browse the repository at this point in the history
Main < Dev
  • Loading branch information
renan-souza authored Oct 29, 2024
2 parents 50ec83e + c9369cc commit 0f40e32
Show file tree
Hide file tree
Showing 18 changed files with 228 additions and 104 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/run-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ jobs:
python-version: "3.10"
cache: "pip"

- name: Install package and dependencies
- name: Install Ruff
run: |
python -m pip install --upgrade pip
python -m pip install .[all]
python -m pip install ruff
- name: Run ruff linter checks
run: ruff check src

- name: Run ruff formatter checks
run: ruff format --check src
run: ruff format --check src
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ Currently, the optional dependencies available are:
```
pip install flowcept[mlflow] # To install mlflow's adapter.
pip install flowcept[dask] # To install dask's adapter.
pip install flowcept[tensorboard] # To install tensorboaard's adapter
pip install flowcept[kafka] # To utilize Kafka as the MQ, instead of Redis
pip install flowcept[tensorboard] # To install tensorboaard's adapter.
pip install flowcept[kafka] # To utilize Kafka as the MQ, instead of Redis.
pip install flowcept[nvidia] # To capture NVIDIA GPU runtime information.
pip install flowcept[analytics] # For extra analytics features.
pip install flowcept[dev] # To install dev dependencies
pip install flowcept[dev] # To install dev dependencies.
```

You do not need to install any optional dependency to run Flowcept without any adapter, e.g., if you want to use simple instrumentation (see below).
Expand Down
14 changes: 2 additions & 12 deletions src/flowcept/commons/__init__.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,8 @@
"""Commons subpackage."""

from flowcept.commons.flowcept_logger import FlowceptLogger
from flowcept.commons.utils import get_adapter_exception_msg

logger = FlowceptLogger()


def singleton(cls):
"""Create a singleton."""
instances = {}

class SingletonWrapper(cls):
def __new__(cls, *args, **kwargs):
if cls not in instances:
instances[cls] = super().__new__(cls)
return instances[cls]

return SingletonWrapper
__all__ = ["get_adapter_exception_msg"]
79 changes: 59 additions & 20 deletions src/flowcept/commons/daos/document_db_dao.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Document module."""
"""Document DB interaction module."""

from typing import List, Dict, Tuple, Any
import io
Expand All @@ -18,7 +18,6 @@
from flowcept.commons.flowcept_logger import FlowceptLogger
from flowcept.commons.flowcept_dataclasses.task_object import TaskObject
from flowcept.commons.utils import perf_log, get_utc_now_str
from flowcept.commons import singleton
from flowcept.configs import (
MONGO_HOST,
MONGO_PORT,
Expand All @@ -35,25 +34,35 @@
from time import time


@singleton
class DocumentDBDao(object):
"""Document class."""

_instance: "DocumentDBDao" = None

def __new__(cls, *args, **kwargs) -> "DocumentDBDao":
"""Singleton creator for DocumentDBDao."""
if cls._instance is None:
cls._instance = super(DocumentDBDao, cls).__new__(cls)
return cls._instance

def __init__(self, create_index=MONGO_CREATE_INDEX):
self.logger = FlowceptLogger()
if not hasattr(self, "_initialized"):
self._initialized = True

if MONGO_URI is not None:
client = MongoClient(MONGO_URI)
else:
client = MongoClient(MONGO_HOST, MONGO_PORT)
self._db = client[MONGO_DB]
self.logger = FlowceptLogger()

if MONGO_URI is not None:
self._client = MongoClient(MONGO_URI)
else:
self._client = MongoClient(MONGO_HOST, MONGO_PORT)
self._db = self._client[MONGO_DB]

self._tasks_collection = self._db[MONGO_TASK_COLLECTION]
self._wfs_collection = self._db[MONGO_WORKFLOWS_COLLECTION]
self._obj_collection = self._db["objects"]
self._tasks_collection = self._db[MONGO_TASK_COLLECTION]
self._wfs_collection = self._db[MONGO_WORKFLOWS_COLLECTION]
self._obj_collection = self._db["objects"]

if create_index:
self._create_indices()
if create_index:
self._create_indices()

def _create_indices(self):
# Creating task collection indices:
Expand Down Expand Up @@ -454,17 +463,28 @@ def save_object(
workflow_id=None,
type=None,
custom_metadata=None,
save_data_in_collection=False,
pickle_=False,
):
"""Save an object."""
if object_id is None:
object_id = str(uuid4())
obj_doc = {"object_id": object_id}
blob = object
if pickle_:
blob = pickle.dumps(object)
obj_doc["pickle"] = True
obj_doc["data"] = blob

if save_data_in_collection:
blob = object
if pickle_:
blob = pickle.dumps(object)
obj_doc["pickle"] = True
obj_doc["data"] = blob

else:
from gridfs import GridFS

fs = GridFS(self._db)
grid_fs_file_id = fs.put(object)
obj_doc["grid_fs_file_id"] = grid_fs_file_id

if task_id is not None:
obj_doc["task_id"] = task_id
if workflow_id is not None:
Expand All @@ -478,7 +498,26 @@ def save_object(

return object_id

def get_file_data(self, file_id):
"""Get a file in the GridFS."""
from gridfs import GridFS, NoFile

fs = GridFS(self._db)
try:
file_data = fs.get(file_id)
return file_data.read()
except NoFile:
self.logger.error(f"File with ID {file_id} not found.")
return None
except Exception as e:
self.logger.exception(f"An error occurred: {e}")
return None

def get_objects(self, filter):
"""Get some objects."""
"""Get objects."""
documents = self._obj_collection.find(filter)
return list(documents)

def close_client(self):
"""Close Mongo client."""
self._client.close()
36 changes: 23 additions & 13 deletions src/flowcept/commons/daos/keyvalue_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,39 @@
from redis import Redis

from flowcept.commons.flowcept_logger import FlowceptLogger
from flowcept.commons import singleton
from flowcept.configs import (
KVDB_HOST,
KVDB_PORT,
KVDB_PASSWORD,
)


@singleton
class KeyValueDAO:
"""Key value class."""
"""Key value DAO class."""

_instance: "KeyValueDAO" = None

def __new__(cls, *args, **kwargs) -> "KeyValueDAO":
"""Singleton creator for KeyValueDAO."""
# Check if an instance already exists
if cls._instance is None:
# Create a new instance if not
cls._instance = super(KeyValueDAO, cls).__new__(cls)
return cls._instance

def __init__(self, connection=None):
self.logger = FlowceptLogger()
if connection is None:
self._redis = Redis(
host=KVDB_HOST,
port=KVDB_PORT,
db=0,
password=KVDB_PASSWORD,
)
else:
self._redis = connection
if not hasattr(self, "_initialized"):
self._initialized = True
self.logger = FlowceptLogger()
if connection is None:
self._redis = Redis(
host=KVDB_HOST,
port=KVDB_PORT,
db=0,
password=KVDB_PASSWORD,
)
else:
self._redis = connection

def delete_set(self, set_name: str):
"""Delete it."""
Expand Down
53 changes: 38 additions & 15 deletions src/flowcept/flowcept_api/db_api.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
"""DB module."""
"""DB API module."""

import uuid
from typing import List

from flowcept.commons import singleton
from flowcept.commons.flowcept_dataclasses.workflow_object import (
WorkflowObject,
)
Expand All @@ -13,20 +12,31 @@
from flowcept.commons.flowcept_logger import FlowceptLogger


@singleton
class DBAPI(object):
"""DB class."""
"""DB API class."""

_instance: "DBAPI" = None

def __new__(cls, *args, **kwargs) -> "DBAPI":
"""Singleton creator for DBAPI."""
# Check if an instance already exists
if cls._instance is None:
# Create a new instance if not
cls._instance = super(DBAPI, cls).__new__(cls)
return cls._instance

def __init__(
self,
with_webserver=False,
):
self.logger = FlowceptLogger()
self.with_webserver = with_webserver
if self.with_webserver:
raise NotImplementedError("We did not implement webserver API for this yet.")
if not hasattr(self, "_initialized"):
self._initialized = True
self.logger = FlowceptLogger()
self.with_webserver = with_webserver
if self.with_webserver:
raise NotImplementedError("We did not implement webserver API for this yet.")

self._dao = DocumentDBDao()
self._dao = DocumentDBDao(create_index=False)

def insert_or_update_task(self, task: TaskObject):
"""Insert or update task."""
Expand Down Expand Up @@ -103,6 +113,7 @@ def save_object(
workflow_id=None,
type=None,
custom_metadata=None,
save_data_in_collection=False,
pickle=False,
):
"""Save the object."""
Expand All @@ -113,6 +124,7 @@ def save_object(
workflow_id,
type,
custom_metadata,
save_data_in_collection=save_data_in_collection,
pickle_=pickle,
)

Expand Down Expand Up @@ -152,7 +164,7 @@ def save_torch_model(
model,
task_id=None,
workflow_id=None,
custom_metadata: dict = None,
custom_metadata: dict = {},
) -> str:
"""Save model.
Expand Down Expand Up @@ -188,16 +200,27 @@ def save_torch_model(

return obj_id

def load_torch_model(self, torch_model, object_id: str):
"""Load it."""
def load_torch_model(self, model, object_id: str):
"""Load a torch model stored in the database.
Args:
model (torch.nn.Module): An empty PyTorch model to be loaded. The class of this model
in argument should be the same of the model that was saved.
object_id (str): Id of the object stored in the objects collection.
"""
import torch
import io

doc = self.query({"object_id": object_id}, type="object")[0]
binary_data = doc["data"]

if "data" in doc:
binary_data = doc["data"]
else:
file_id = doc["grid_fs_file_id"]
binary_data = self._dao.get_file_data(file_id)

buffer = io.BytesIO(binary_data)
state_dict = torch.load(buffer, weights_only=True)
torch_model.load_state_dict(state_dict)
model.load_state_dict(state_dict)

return torch_model
return model
14 changes: 11 additions & 3 deletions src/flowcept/flowcept_api/flowcept_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,19 @@


class Flowcept(object):
"""Flowcept class."""
"""Flowcept Controller class."""

db = DBAPI()
_db: DBAPI = None
current_workflow_id = None

@classmethod
@property
def db(cls) -> DBAPI:
"""Property to expose the DBAPI. This also assures the DBAPI init will be called once."""
if cls._db is None:
cls._db = DBAPI()
return cls._db

def __init__(
self,
interceptors: Union[BaseInterceptor, List[BaseInterceptor], str] = None,
Expand Down Expand Up @@ -182,7 +190,7 @@ def services_alive() -> bool:
if not MQDao.build().liveness_test():
logger.error("MQ Not Ready!")
return False
if not DocumentDBDao().liveness_test():
if not DocumentDBDao(create_index=False).liveness_test():
logger.error("DocDB Not Ready!")
return False
logger.info("MQ and DocDB are alive!")
Expand Down
Loading

0 comments on commit 0f40e32

Please sign in to comment.