Skip to content

Commit

Permalink
Merge pull request #74 from ORNL/gpu-tele
Browse files Browse the repository at this point in the history
Native Gpu telemetry
  • Loading branch information
renan-souza authored Jul 3, 2023
2 parents a1b2fb9 + 9138732 commit 6d9414a
Show file tree
Hide file tree
Showing 8 changed files with 271 additions and 140 deletions.
57 changes: 45 additions & 12 deletions flowcept/commons/flowcept_dataclasses/telemetry.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,43 @@
from typing import List, Dict
from dataclasses import dataclass, asdict


def remove_none_values(_dict):
return {k: v for (k, v) in _dict if v is not None}


class Telemetry:
class _CPU:
times: Dict[str, float] = None # this is an average of all cpus
percent: float = None
"""
Class representing telemetry information captured in the platform where t
he experiment runs.
We are using psutils and the data it can capture depends on the platform.
So, we won't use dataclasses because we can't list all possible info
to be captured in any platform.
"""

class CPU:
times_avg: Dict[str, float] = None
percent_all: float = None

times_per_cpu: List[Dict[str, float]] = None
percent_per_cpu: List[float] = None

class _Memory:
class Memory:
virtual: Dict[str, float]
swap: Dict[str, float]

class _Network:
class Network:
netio: Dict[str, int]
netio_per_interface: Dict[str, Dict[str, int]]

class _Disk:
class Disk:
disk_usage: Dict[str, float]
io: Dict[str, float]
io_per_disk: Dict[str, Dict[str, float]]

class _Process:
class Process:
pid: int
cpu_number: int
memory: Dict[str, float]
Expand All @@ -38,11 +53,26 @@ class _Process:
executable: str
cmd_line: List[str]

cpu: _CPU = None
process: _Process = None
memory: _Memory = None
disk: _Disk = None
network: _Network = None
@dataclass(init=False)
class GPU:
@dataclass
class GPUMetrics:
total: int
free: int
used: int
usage_percent: float
temperature: float
power_usage: float

gpu_sums: GPUMetrics
per_gpu: Dict[int, GPUMetrics] = None

cpu: CPU = None
process: Process = None
memory: Memory = None
disk: Disk = None
network: Network = None
gpu: GPU = None

def to_dict(self):
ret = {}
Expand All @@ -56,4 +86,7 @@ def to_dict(self):
ret["disk"] = self.disk.__dict__
if self.network is not None:
ret["network"] = self.network.__dict__
if self.gpu is not None:
ret["gpu"] = asdict(self.gpu, dict_factory=remove_none_values)

return ret
5 changes: 0 additions & 5 deletions flowcept/flowcept_api/consumer_api.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
from typing import List, Union
from time import sleep
import random

from flowcept.commons.daos.mq_dao import MQDao
from flowcept.configs import (
REDIS_INSERTION_BUFFER_TIME,
MONGO_INSERTION_BUFFER_TIME,
)
from flowcept.flowceptor.consumers.document_inserter import DocumentInserter
from flowcept.commons.flowcept_logger import FlowceptLogger
from flowcept.flowceptor.plugins.base_interceptor import BaseInterceptor
Expand Down
5 changes: 5 additions & 0 deletions flowcept/flowceptor/plugins/base_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from flowcept.commons.flowcept_dataclasses.task_message import TaskMessage
from flowcept.flowceptor.plugins.settings_factory import get_settings

from flowcept.flowceptor.telemetry_capture import TelemetryCapture

from flowcept.version import __version__


Expand Down Expand Up @@ -63,6 +65,7 @@ def __init__(self, plugin_key):
self.logger = FlowceptLogger().get_logger()
self.settings = get_settings(plugin_key)
self._mq_dao = MQDao()
self.telemetry_capture = TelemetryCapture()

def prepare_task_msg(self, *args, **kwargs) -> TaskMessage:
raise NotImplementedError()
Expand All @@ -73,6 +76,7 @@ def start(self) -> "BaseInterceptor":
:return:
"""
self._mq_dao.start_time_based_flushing()
self.telemetry_capture.init_gpu_telemetry()
return self

def stop(self) -> bool:
Expand All @@ -81,6 +85,7 @@ def stop(self) -> bool:
:return:
"""
self._mq_dao.stop()
self.telemetry_capture.shutdown_gpu_telemetry()

def observe(self, *args, **kwargs):
"""
Expand Down
13 changes: 9 additions & 4 deletions flowcept/flowceptor/plugins/dask/dask_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
)
from flowcept.commons.utils import get_utc_now
from flowcept.configs import TELEMETRY_CAPTURE
from flowcept.flowceptor.telemetry_capture import capture_telemetry


def get_run_spec_data(task_msg: TaskMessage, run_spec):
Expand Down Expand Up @@ -148,7 +147,9 @@ def callback(self, task_id, start, finish, *args, **kwargs):

if ts.state == "executing":
if TELEMETRY_CAPTURE is not None:
task_msg.telemetry_at_start = capture_telemetry()
task_msg.telemetry_at_start = (
self.telemetry_capture.capture()
)
task_msg.status = Status.RUNNING
task_msg.address = self._worker.worker_address
if self.settings.worker_create_timestamps:
Expand All @@ -160,7 +161,9 @@ def callback(self, task_id, start, finish, *args, **kwargs):
else:
get_times_from_task_state(task_msg, ts)
if TELEMETRY_CAPTURE is not None:
task_msg.telemetry_at_end = capture_telemetry()
task_msg.telemetry_at_end = (
self.telemetry_capture.capture()
)
elif ts.state == "error":
task_msg.status = Status.ERROR
if self.settings.worker_create_timestamps:
Expand All @@ -172,7 +175,9 @@ def callback(self, task_id, start, finish, *args, **kwargs):
"traceback": ts.traceback_text,
}
if TELEMETRY_CAPTURE is not None:
task_msg.telemetry_at_end = capture_telemetry()
task_msg.telemetry_at_end = (
self.telemetry_capture.capture()
)
else:
return

Expand Down
Loading

0 comments on commit 6d9414a

Please sign in to comment.