Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev #75

Merged
merged 7 commits into from
Jul 3, 2023
Merged

Dev #75

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 45 additions & 12 deletions flowcept/commons/flowcept_dataclasses/telemetry.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,43 @@
from typing import List, Dict
from dataclasses import dataclass, asdict


def remove_none_values(_dict):
return {k: v for (k, v) in _dict if v is not None}


class Telemetry:
class _CPU:
times: Dict[str, float] = None # this is an average of all cpus
percent: float = None
"""
Class representing telemetry information captured in the platform where t
he experiment runs.

We are using psutils and the data it can capture depends on the platform.
So, we won't use dataclasses because we can't list all possible info
to be captured in any platform.

"""

class CPU:
times_avg: Dict[str, float] = None
percent_all: float = None

times_per_cpu: List[Dict[str, float]] = None
percent_per_cpu: List[float] = None

class _Memory:
class Memory:
virtual: Dict[str, float]
swap: Dict[str, float]

class _Network:
class Network:
netio: Dict[str, int]
netio_per_interface: Dict[str, Dict[str, int]]

class _Disk:
class Disk:
disk_usage: Dict[str, float]
io: Dict[str, float]
io_per_disk: Dict[str, Dict[str, float]]

class _Process:
class Process:
pid: int
cpu_number: int
memory: Dict[str, float]
Expand All @@ -38,11 +53,26 @@ class _Process:
executable: str
cmd_line: List[str]

cpu: _CPU = None
process: _Process = None
memory: _Memory = None
disk: _Disk = None
network: _Network = None
@dataclass(init=False)
class GPU:
@dataclass
class GPUMetrics:
total: int
free: int
used: int
usage_percent: float
temperature: float
power_usage: float

gpu_sums: GPUMetrics
per_gpu: Dict[int, GPUMetrics] = None

cpu: CPU = None
process: Process = None
memory: Memory = None
disk: Disk = None
network: Network = None
gpu: GPU = None

def to_dict(self):
ret = {}
Expand All @@ -56,4 +86,7 @@ def to_dict(self):
ret["disk"] = self.disk.__dict__
if self.network is not None:
ret["network"] = self.network.__dict__
if self.gpu is not None:
ret["gpu"] = asdict(self.gpu, dict_factory=remove_none_values)

return ret
5 changes: 0 additions & 5 deletions flowcept/flowcept_api/consumer_api.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
from typing import List, Union
from time import sleep
import random

from flowcept.commons.daos.mq_dao import MQDao
from flowcept.configs import (
REDIS_INSERTION_BUFFER_TIME,
MONGO_INSERTION_BUFFER_TIME,
)
from flowcept.flowceptor.consumers.document_inserter import DocumentInserter
from flowcept.commons.flowcept_logger import FlowceptLogger
from flowcept.flowceptor.plugins.base_interceptor import BaseInterceptor
Expand Down
5 changes: 5 additions & 0 deletions flowcept/flowceptor/plugins/base_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from flowcept.commons.flowcept_dataclasses.task_message import TaskMessage
from flowcept.flowceptor.plugins.settings_factory import get_settings

from flowcept.flowceptor.telemetry_capture import TelemetryCapture

from flowcept.version import __version__


Expand Down Expand Up @@ -63,6 +65,7 @@ def __init__(self, plugin_key):
self.logger = FlowceptLogger().get_logger()
self.settings = get_settings(plugin_key)
self._mq_dao = MQDao()
self.telemetry_capture = TelemetryCapture()

def prepare_task_msg(self, *args, **kwargs) -> TaskMessage:
raise NotImplementedError()
Expand All @@ -73,6 +76,7 @@ def start(self) -> "BaseInterceptor":
:return:
"""
self._mq_dao.start_time_based_flushing()
self.telemetry_capture.init_gpu_telemetry()
return self

def stop(self) -> bool:
Expand All @@ -81,6 +85,7 @@ def stop(self) -> bool:
:return:
"""
self._mq_dao.stop()
self.telemetry_capture.shutdown_gpu_telemetry()

def observe(self, *args, **kwargs):
"""
Expand Down
13 changes: 9 additions & 4 deletions flowcept/flowceptor/plugins/dask/dask_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
)
from flowcept.commons.utils import get_utc_now
from flowcept.configs import TELEMETRY_CAPTURE
from flowcept.flowceptor.telemetry_capture import capture_telemetry


def get_run_spec_data(task_msg: TaskMessage, run_spec):
Expand Down Expand Up @@ -148,7 +147,9 @@ def callback(self, task_id, start, finish, *args, **kwargs):

if ts.state == "executing":
if TELEMETRY_CAPTURE is not None:
task_msg.telemetry_at_start = capture_telemetry()
task_msg.telemetry_at_start = (
self.telemetry_capture.capture()
)
task_msg.status = Status.RUNNING
task_msg.address = self._worker.worker_address
if self.settings.worker_create_timestamps:
Expand All @@ -160,7 +161,9 @@ def callback(self, task_id, start, finish, *args, **kwargs):
else:
get_times_from_task_state(task_msg, ts)
if TELEMETRY_CAPTURE is not None:
task_msg.telemetry_at_end = capture_telemetry()
task_msg.telemetry_at_end = (
self.telemetry_capture.capture()
)
elif ts.state == "error":
task_msg.status = Status.ERROR
if self.settings.worker_create_timestamps:
Expand All @@ -172,7 +175,9 @@ def callback(self, task_id, start, finish, *args, **kwargs):
"traceback": ts.traceback_text,
}
if TELEMETRY_CAPTURE is not None:
task_msg.telemetry_at_end = capture_telemetry()
task_msg.telemetry_at_end = (
self.telemetry_capture.capture()
)
else:
return

Expand Down
Loading