diff --git a/flowcept/commons/flowcept_dataclasses/telemetry.py b/flowcept/commons/flowcept_dataclasses/telemetry.py index 9e2891f4..0575d466 100644 --- a/flowcept/commons/flowcept_dataclasses/telemetry.py +++ b/flowcept/commons/flowcept_dataclasses/telemetry.py @@ -1,28 +1,43 @@ from typing import List, Dict +from dataclasses import dataclass, asdict + + +def remove_none_values(_dict): + return {k: v for (k, v) in _dict if v is not None} class Telemetry: - class _CPU: - times: Dict[str, float] = None # this is an average of all cpus - percent: float = None + """ + Class representing telemetry information captured in the platform where t + he experiment runs. + + We are using psutils and the data it can capture depends on the platform. + So, we won't use dataclasses because we can't list all possible info + to be captured in any platform. + + """ + + class CPU: + times_avg: Dict[str, float] = None + percent_all: float = None times_per_cpu: List[Dict[str, float]] = None percent_per_cpu: List[float] = None - class _Memory: + class Memory: virtual: Dict[str, float] swap: Dict[str, float] - class _Network: + class Network: netio: Dict[str, int] netio_per_interface: Dict[str, Dict[str, int]] - class _Disk: + class Disk: disk_usage: Dict[str, float] io: Dict[str, float] io_per_disk: Dict[str, Dict[str, float]] - class _Process: + class Process: pid: int cpu_number: int memory: Dict[str, float] @@ -38,11 +53,26 @@ class _Process: executable: str cmd_line: List[str] - cpu: _CPU = None - process: _Process = None - memory: _Memory = None - disk: _Disk = None - network: _Network = None + @dataclass(init=False) + class GPU: + @dataclass + class GPUMetrics: + total: int + free: int + used: int + usage_percent: float + temperature: float + power_usage: float + + gpu_sums: GPUMetrics + per_gpu: Dict[int, GPUMetrics] = None + + cpu: CPU = None + process: Process = None + memory: Memory = None + disk: Disk = None + network: Network = None + gpu: GPU = None def to_dict(self): ret = {} @@ -56,4 +86,7 @@ def to_dict(self): ret["disk"] = self.disk.__dict__ if self.network is not None: ret["network"] = self.network.__dict__ + if self.gpu is not None: + ret["gpu"] = asdict(self.gpu, dict_factory=remove_none_values) + return ret diff --git a/flowcept/flowcept_api/consumer_api.py b/flowcept/flowcept_api/consumer_api.py index c1e49474..7caea82f 100644 --- a/flowcept/flowcept_api/consumer_api.py +++ b/flowcept/flowcept_api/consumer_api.py @@ -1,12 +1,7 @@ from typing import List, Union from time import sleep -import random from flowcept.commons.daos.mq_dao import MQDao -from flowcept.configs import ( - REDIS_INSERTION_BUFFER_TIME, - MONGO_INSERTION_BUFFER_TIME, -) from flowcept.flowceptor.consumers.document_inserter import DocumentInserter from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.flowceptor.plugins.base_interceptor import BaseInterceptor diff --git a/flowcept/flowceptor/plugins/base_interceptor.py b/flowcept/flowceptor/plugins/base_interceptor.py index accc4a9e..ecf65bde 100644 --- a/flowcept/flowceptor/plugins/base_interceptor.py +++ b/flowcept/flowceptor/plugins/base_interceptor.py @@ -17,6 +17,8 @@ from flowcept.commons.flowcept_dataclasses.task_message import TaskMessage from flowcept.flowceptor.plugins.settings_factory import get_settings +from flowcept.flowceptor.telemetry_capture import TelemetryCapture + from flowcept.version import __version__ @@ -63,6 +65,7 @@ def __init__(self, plugin_key): self.logger = FlowceptLogger().get_logger() self.settings = get_settings(plugin_key) self._mq_dao = MQDao() + self.telemetry_capture = TelemetryCapture() def prepare_task_msg(self, *args, **kwargs) -> TaskMessage: raise NotImplementedError() @@ -73,6 +76,7 @@ def start(self) -> "BaseInterceptor": :return: """ self._mq_dao.start_time_based_flushing() + self.telemetry_capture.init_gpu_telemetry() return self def stop(self) -> bool: @@ -81,6 +85,7 @@ def stop(self) -> bool: :return: """ self._mq_dao.stop() + self.telemetry_capture.shutdown_gpu_telemetry() def observe(self, *args, **kwargs): """ diff --git a/flowcept/flowceptor/plugins/dask/dask_interceptor.py b/flowcept/flowceptor/plugins/dask/dask_interceptor.py index 9c12d529..d65c964e 100644 --- a/flowcept/flowceptor/plugins/dask/dask_interceptor.py +++ b/flowcept/flowceptor/plugins/dask/dask_interceptor.py @@ -9,7 +9,6 @@ ) from flowcept.commons.utils import get_utc_now from flowcept.configs import TELEMETRY_CAPTURE -from flowcept.flowceptor.telemetry_capture import capture_telemetry def get_run_spec_data(task_msg: TaskMessage, run_spec): @@ -148,7 +147,9 @@ def callback(self, task_id, start, finish, *args, **kwargs): if ts.state == "executing": if TELEMETRY_CAPTURE is not None: - task_msg.telemetry_at_start = capture_telemetry() + task_msg.telemetry_at_start = ( + self.telemetry_capture.capture() + ) task_msg.status = Status.RUNNING task_msg.address = self._worker.worker_address if self.settings.worker_create_timestamps: @@ -160,7 +161,9 @@ def callback(self, task_id, start, finish, *args, **kwargs): else: get_times_from_task_state(task_msg, ts) if TELEMETRY_CAPTURE is not None: - task_msg.telemetry_at_end = capture_telemetry() + task_msg.telemetry_at_end = ( + self.telemetry_capture.capture() + ) elif ts.state == "error": task_msg.status = Status.ERROR if self.settings.worker_create_timestamps: @@ -172,7 +175,9 @@ def callback(self, task_id, start, finish, *args, **kwargs): "traceback": ts.traceback_text, } if TELEMETRY_CAPTURE is not None: - task_msg.telemetry_at_end = capture_telemetry() + task_msg.telemetry_at_end = ( + self.telemetry_capture.capture() + ) else: return diff --git a/flowcept/flowceptor/telemetry_capture.py b/flowcept/flowceptor/telemetry_capture.py index 4c307d58..1e0b8ff5 100644 --- a/flowcept/flowceptor/telemetry_capture.py +++ b/flowcept/flowceptor/telemetry_capture.py @@ -1,124 +1,210 @@ -from typing import Dict import psutil +import pynvml +from pynvml import ( + nvmlDeviceGetCount, + nvmlDeviceGetHandleByIndex, + nvmlDeviceGetMemoryInfo, + nvmlInit, + nvmlShutdown, + nvmlDeviceGetTemperature, +) +from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.configs import TELEMETRY_CAPTURE from flowcept.commons.flowcept_dataclasses.telemetry import Telemetry -from flowcept.commons.flowcept_logger import FlowceptLogger -def capture_telemetry() -> Telemetry: - conf = TELEMETRY_CAPTURE - if conf is None: - return None - - tel = Telemetry() - tel.cpu = _capture_cpu(conf) - tel.process = _capture_process_info(conf) - tel.memory = _capture_memory(conf) - tel.network = _capture_network(conf) - tel.disk = _capture_disk(conf) - - return tel - - -def _capture_disk(conf): - capt = conf.get("disk", False) - if not capt: - return None - try: - disk = Telemetry._Disk() - disk.disk_usage = psutil.disk_usage("/")._asdict() - disk.io = psutil.disk_io_counters(perdisk=False)._asdict() - io_perdisk = psutil.disk_io_counters(perdisk=True) - if len(io_perdisk) > 1: - disk.io_per_disk = {} - for d in io_perdisk: - disk.io_per_disk[d] = io_perdisk[d]._asdict() - - return disk - except Exception as e: - FlowceptLogger.get_logger().exception(e) - - -def _capture_network(conf): - capt = conf.get("network", False) - if not capt: - return None - try: - net = Telemetry._Network() - net.netio = psutil.net_io_counters(pernic=False)._asdict() - pernic = psutil.net_io_counters(pernic=True) - net.netio_per_interface = {} - for ic in pernic: - if pernic[ic].bytes_sent and pernic[ic].bytes_recv: - net.netio_per_interface[ic] = pernic[ic] - return net - except Exception as e: - FlowceptLogger.get_logger().exception(e) - - -def _capture_memory(conf): - capt = conf.get("mem", False) - if not capt: - return None - try: - mem = Telemetry._Memory() - mem.virtual = psutil.virtual_memory()._asdict() - mem.swap = psutil.swap_memory()._asdict() - return mem - except Exception as e: - FlowceptLogger.get_logger().exception(e) - - -def _capture_process_info(conf): - capt = conf.get("process_info", False) - if not capt: - return None - try: - p = Telemetry._Process() - psutil_p = psutil.Process() - with psutil_p.oneshot(): - p.pid = psutil_p.pid +class TelemetryCapture: + def __init__(self, conf=TELEMETRY_CAPTURE): + self.conf = conf + self.logger = FlowceptLogger().get_logger() + + def capture(self) -> Telemetry: + if self.conf is None: + return None + + tel = Telemetry() + tel.process = self._capture_process_info() + tel.cpu = self._capture_cpu() + tel.memory = self._capture_memory() + tel.network = self._capture_network() + tel.disk = self._capture_disk() + tel.gpu = self._capture_gpu() + + return tel + + def _capture_disk(self): + capt = self.conf.get("disk", False) + if not capt: + return None + try: + disk = Telemetry.Disk() + disk.disk_usage = psutil.disk_usage("/")._asdict() + disk.io_sum = psutil.disk_io_counters(perdisk=False)._asdict() + io_perdisk = psutil.disk_io_counters(perdisk=True) + if len(io_perdisk) > 1: + disk.io_per_disk = {} + for d in io_perdisk: + disk.io_per_disk[d] = io_perdisk[d]._asdict() + + return disk + except Exception as e: + self.logger.exception(e) + + def _capture_network(self): + capt = self.conf.get("network", False) + if not capt: + return None + try: + net = Telemetry.Network() + net.netio_sum = psutil.net_io_counters(pernic=False)._asdict() + pernic = psutil.net_io_counters(pernic=True) + net.netio_per_interface = {} + for ic in pernic: + if pernic[ic].bytes_sent and pernic[ic].bytes_recv: + net.netio_per_interface[ic] = pernic[ic]._asdict() + return net + except Exception as e: + self.logger.exception(e) + + def _capture_memory(self): + capt = self.conf.get("mem", False) + if not capt: + return None + try: + mem = Telemetry.Memory() + mem.virtual = psutil.virtual_memory()._asdict() + mem.swap = psutil.swap_memory()._asdict() + return mem + except Exception as e: + self.logger.exception(e) + + def _capture_process_info(self): + capt = self.conf.get("process_info", False) + if not capt: + return None + try: + p = Telemetry.Process() + psutil_p = psutil.Process() + with psutil_p.oneshot(): + p.pid = psutil_p.pid + try: + p.cpu_number = psutil_p.cpu_num() + except: + pass + p.memory = psutil_p.memory_full_info() + p.memory_percent = psutil_p.memory_percent() + p.cpu_times = psutil_p.cpu_times()._asdict() + p.cpu_percent = psutil_p.cpu_percent() + p.executable = psutil_p.exe() + p.cmd_line = psutil_p.cmdline() + p.num_open_file_descriptors = psutil_p.num_fds() + p.num_connections = len(psutil_p.connections()) + try: + p.io_counters = psutil_p.io_counters()._asdict() + except: + pass + p.num_open_files = len(psutil_p.open_files()) + p.num_threads = psutil_p.num_threads() + p.num_ctx_switches = psutil_p.num_ctx_switches()._asdict() + return p + except Exception as e: + self.logger.exception(e) + + def _capture_cpu(self): + capt_cpu = self.conf.get("cpu", False) + capt_per_cpu = self.conf.get("per_cpu", False) + if not (capt_cpu or capt_per_cpu): + return None + try: + cpu = Telemetry.CPU() + if capt_cpu: + cpu.times_avg = psutil.cpu_times(percpu=False)._asdict() + cpu.percent_all = psutil.cpu_percent() + if capt_per_cpu: + cpu.times_per_cpu = [ + c._asdict() for c in psutil.cpu_times(percpu=True) + ] + cpu.percent_per_cpu = psutil.cpu_percent(percpu=True) + return cpu + except Exception as e: + self.logger.exception(e) + return None + + def _capture_gpu(self): + capt = self.conf.get("gpu", False) + if not capt: + return None + + try: + deviceCount = nvmlDeviceGetCount() + handle = nvmlDeviceGetHandleByIndex(0) + info = nvmlDeviceGetMemoryInfo(handle) + _this_gpu = { + "total": info.total, + "free": info.free, + "used": info.used, + "usage_percent": info.used / info.total * 100, + "temperature": nvmlDeviceGetTemperature( + handle, pynvml.NVML_TEMPERATURE_GPU + ), + "power_usage": pynvml.nvmlDeviceGetPowerUsage(handle), + } + gpu = Telemetry.GPU() + if deviceCount == 1: + gpu.gpu_sums = gpu.GPUMetrics(**_this_gpu) + else: + gpu.per_gpu = {0: gpu.GPUMetrics(**_this_gpu)} + sums = _this_gpu.copy() + for i in range(1, deviceCount): + handle = nvmlDeviceGetHandleByIndex(i) + info = nvmlDeviceGetMemoryInfo(handle) + _temp = nvmlDeviceGetTemperature( + handle, pynvml.NVML_TEMPERATURE_GPU + ) + _pow = pynvml.nvmlDeviceGetPowerUsage(handle) + + sums["total"] += info.total + sums["free"] += info.free + sums["used"] += info.used + sums["temperature"] += _temp + sums["power_usage"] += _pow + + gpu.per_gpu[i] = gpu.GPUMetrics( + total=info.total, + free=info.free, + used=info.used, + usage_percent=info.used / info.total * 100, + temperature=_temp, + power_usage=_pow, + ) + + sums["usage_percent"] = sums["used"] / sums["total"] * 100 + gpu.gpu_sums = gpu.GPUMetrics(**sums) + + return gpu + except Exception as e: + self.logger.exception(e) + return None + + def init_gpu_telemetry(self): + if self.conf is None: + return None + + if self.conf.get("gpu", False): try: - p.cpu_number = psutil_p.cpu_num() - except: - pass - p.memory = psutil_p.memory_full_info() - p.memory_percent = psutil_p.memory_percent() - p.cpu_times = psutil_p.cpu_times()._asdict() - p.cpu_percent = psutil_p.cpu_percent() - p.executable = psutil_p.exe() - p.cmd_line = psutil_p.cmdline() - p.num_open_file_descriptors = psutil_p.num_fds() - p.num_connections = len(psutil_p.connections()) + nvmlInit() + except Exception as e: + self.logger.error("NVIDIA GPU NOT FOUND!") + self.logger.exception(e) + + def shutdown_gpu_telemetry(self): + if self.conf is None: + return None + + if self.conf.get("gpu", False): try: - p.io_counters = psutil_p.io_counters()._asdict() - except: - pass - p.num_open_files = len(psutil_p.open_files()) - p.num_threads = psutil_p.num_threads() - p.num_ctx_switches = psutil_p.num_ctx_switches()._asdict() - return p - except Exception as e: - FlowceptLogger.get_logger().exception(e) - - -def _capture_cpu(conf: Dict): - capt_cpu = conf.get("cpu", False) - capt_per_cpu = conf.get("per_cpu", False) - if not (capt_cpu or capt_per_cpu): - return None - try: - cpu = Telemetry._CPU() - if conf.get("cpu", False): - cpu.times = psutil.cpu_times(percpu=False)._asdict() - cpu.percent = psutil.cpu_percent() - if conf.get("per_cpu", False): - cpu.times_per_cpu = [ - c._asdict() for c in psutil.cpu_times(percpu=True) - ] - cpu.percent_per_cpu = psutil.cpu_percent(percpu=True) - return cpu - except Exception as e: - FlowceptLogger.get_logger().exception(e) - return None + nvmlShutdown() + except Exception as e: + self.logger.error("NVIDIA GPU NOT FOUND!") + self.logger.exception(e) diff --git a/requirements.txt b/requirements.txt index b890e129..af4a753f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ PyYAML==6.0 redis==4.4.2 psutil==5.9.5 +nvidia-ml-py==11.525.131 diff --git a/setup.py b/setup.py index 2d11108d..d329309c 100644 --- a/setup.py +++ b/setup.py @@ -95,7 +95,7 @@ def get_requirements(file_path): setup( - name="flowcept", + name=PROJECT_NAME, version=version, license="MIT", author="Oak Ridge National Laboratory", diff --git a/tests/telemetry_test.py b/tests/telemetry_test.py index 57640dea..e0433ef5 100644 --- a/tests/telemetry_test.py +++ b/tests/telemetry_test.py @@ -1,8 +1,14 @@ import unittest -from flowcept.flowceptor.telemetry_capture import capture_telemetry +import json + +from flowcept.flowceptor.telemetry_capture import TelemetryCapture class TestTelemetry(unittest.TestCase): def test_telemetry(self): - telemetry = capture_telemetry() + tele_capture = TelemetryCapture() + tele_capture.init_gpu_telemetry() + telemetry = tele_capture.capture() assert telemetry.to_dict() + print(json.dumps(telemetry.to_dict(), indent=True)) + tele_capture.shutdown_gpu_telemetry()