Skip to content

Commit

Permalink
CPU device and CPU support in ZeusMonitor (#90)
Browse files Browse the repository at this point in the history
Co-authored-by: Jae-Won Chung <jwnchung@umich.edu>
  • Loading branch information
wbjin and jaywonchung authored Jun 8, 2024
1 parent 7d41581 commit c64ccbc
Show file tree
Hide file tree
Showing 12 changed files with 654 additions and 29 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ dist/
env/
.pytest_cache/
/envs
.nvim
gpg
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ pydocstyle.convention = "google"
"zeus/optimizer/batch_size/server/router.py" = ["B008"]
"zeus/optimizer/batch_size/common.py" = ["N805"]
"zeus/device/gpu/*.py" = ["N802", "N803"]
"zeus/device/cpu/*.py" = ["N802"]

[tool.pytest.ini_options]
addopts = "--numprocesses auto"
2 changes: 1 addition & 1 deletion tests/optimizer/batch_size/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def mock_monitor(mocker: MockerFixture):
zeus_monitor_mock_instance.gpu_indices = [0, 1, 2, 3]
zeus_monitor_mock_instance.end_window.return_value = Measurement(
time=37.24807469360,
energy={
gpu_energy={
0: 4264.87199999392,
1: 4367.186999991536,
2: 4342.869000002742,
Expand Down
12 changes: 7 additions & 5 deletions tests/test_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ def tick():

def assert_window_begin(name: str, begin_time: int):
"""Assert monitor measurement states right after a window begins."""
assert monitor.measurement_states[name][0] == begin_time
assert monitor.measurement_states[name][1] == {
assert monitor.measurement_states[name].time == begin_time
assert monitor.measurement_states[name].gpu_energy == {
# `4` is the time origin of `time_counter`.
i: pytest.approx((1000 + 3 * (begin_time - 4)) / 1000.0)
for i in torch_gpu_indices
Expand Down Expand Up @@ -259,13 +259,15 @@ def assert_measurement(
assert_calls: Whether to assert calls to mock functions. (Default: `True`)
"""
assert name not in monitor.measurement_states
assert num_gpus == len(measurement.energy)
assert num_gpus == len(measurement.gpu_energy)
assert elapsed_time == measurement.time
assert set(measurement.energy.keys()) == set(torch_gpu_indices)
assert set(measurement.gpu_energy.keys()) == set(torch_gpu_indices)
for i in torch_gpu_indices:
if not is_old_torch[i]:
# The energy counter increments with step size 3.
assert measurement.energy[i] == pytest.approx(elapsed_time * 3 / 1000.0)
assert measurement.gpu_energy[i] == pytest.approx(
elapsed_time * 3 / 1000.0
)

if not assert_calls:
return
Expand Down
1 change: 1 addition & 0 deletions zeus/device/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Abstraction layer over devices like GPUs."""

from zeus.device.gpu import get_gpus
from zeus.device.cpu import get_cpus
33 changes: 33 additions & 0 deletions zeus/device/cpu/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""Abstraction layer for CPU devices.
The main function of this module is [`get_cpus`][zeus.device.cpu.get_cpus],
which returns a CPU Manager object specific to the platform.
"""

from __future__ import annotations

from zeus.device.cpu.common import CPUs, ZeusCPUInitError
from zeus.device.cpu.rapl import rapl_is_available, RAPLCPUs

_cpus: CPUs | None = None


def get_cpus() -> CPUs:
"""Initialize and return a singleton CPU monitoring object for INTEL CPUs.
The function returns a CPU management object that aims to abstract the underlying CPU monitoring libraries
(RAPL for Intel CPUs).
This function attempts to initialize CPU mointoring using RAPL. If this attempt fails, it raises
a ZeusErrorInit exception.
"""
global _cpus
if _cpus is not None:
return _cpus
if rapl_is_available():
_cpus = RAPLCPUs()
return _cpus
else:
raise ZeusCPUInitError(
"RAPL unvailable Failed to initialize CPU management library."
)
175 changes: 175 additions & 0 deletions zeus/device/cpu/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
"""Error wrappers and classes common to all CPU vendors."""

from __future__ import annotations

import abc
from typing import Sequence
from dataclasses import dataclass

from zeus.device.exception import ZeusBaseCPUError


@dataclass
class CpuDramMeasurement:
"""Represents a measurement of CPU and DRAM energy consumption.
Attributes:
cpu_mj (int): The CPU energy consumption in millijoules.
dram_mj (Optional[int]): The DRAM energy consumption in millijoules. Defaults to None.
"""

cpu_mj: float
dram_mj: float | None = None

def __sub__(self, other: CpuDramMeasurement) -> CpuDramMeasurement:
"""Subtracts the values of another CpuDramMeasurement from this one.
Args:
other (CpuDramMeasurement): The other CpuDramMeasurement to subtract.
Returns:
CpuDramMeasurement: A new CpuDramMeasurement with the result of the subtraction.
"""
dram_mj = None
if self.dram_mj is not None and other.dram_mj is not None:
dram_mj = self.dram_mj - other.dram_mj
elif self.dram_mj is not None:
dram_mj = self.dram_mj
elif other.dram_mj is not None:
dram_mj = -other.dram_mj
return CpuDramMeasurement(self.cpu_mj - other.cpu_mj, dram_mj)

def __truediv__(self, other: int | float) -> CpuDramMeasurement:
"""Divides the values of this CpuDramMeasurement by a float.
Args:
other: The float to divide by.
Returns:
CpuDramMeasurement: A new CpuDramMeasurement with the result of the division.
Raises:
ZeroDivisionError: If division by zero is attempted.
"""
if isinstance(other, (int, float)):
if other == 0:
raise ZeroDivisionError("Division by zero is not allowed")
dram_mj = None
if self.dram_mj is not None:
dram_mj = self.dram_mj / other
return CpuDramMeasurement(self.cpu_mj / other, dram_mj)
else:
return NotImplemented


class ZeusCPUInitError(ZeusBaseCPUError):
"""Import error or CPU library initialization failures."""

def __init__(self, message: str) -> None:
"""Initialize Zeus Exception."""
super().__init__(message)


class ZeusCPUNoPermissionError(ZeusBaseCPUError):
"""Zeus CPU exception class wrapper for No Permission to perform CPU operation."""

def __init__(self, message: str) -> None:
"""Initialize Zeus Exception."""
super().__init__(message)


class ZeusCPUNotFoundError(ZeusBaseCPUError):
"""Zeus CPU exception class wrapper for Not Found CPU."""

def __init__(self, message: str) -> None:
"""Initialize Zeus Exception."""
super().__init__(message)


class CPU(abc.ABC):
"""Abstract base class for CPU management.
This class defines the interface for interacting with CPUs, subclasses should implement the methods to interact with specific CPU libraries.
"""

def __init__(self, cpu_index: int) -> None:
"""Initialize the CPU with a specified index."""
self.cpu_index = cpu_index

@abc.abstractmethod
def getTotalEnergyConsumption(self) -> CpuDramMeasurement:
"""Returns the total energy consumption of the specified powerzone. Units: mJ."""
pass

@abc.abstractmethod
def supportsGetDramEnergyConsumption(self) -> bool:
"""Returns True if the specified CPU powerzone supports retrieving the subpackage energy consumption."""
pass


class CPUs(abc.ABC):
"""An abstract base class for CPU manager object.
This class defines the essential interface and common functionality for CPU management, instantiating multiple `CPU` objects for each CPU being tracked.
Forwards the call for a specific method to the corresponding CPU object.
"""

@abc.abstractmethod
def __init__(self) -> None:
"""Initializes the CPU management library to communicate with the CPU driver and sets up tracking for specified CPUs."""
pass

@abc.abstractmethod
def __del__(self) -> None:
"""Shuts down the CPU monitoring library to release resources and clean up."""
pass

@property
@abc.abstractmethod
def cpus(self) -> Sequence[CPU]:
"""Returns a list of CPU objects being tracked."""
pass

def getTotalEnergyConsumption(self, index: int) -> CpuDramMeasurement:
"""Returns the total energy consumption of the specified powerzone. Units: mJ."""
return self.cpus[index].getTotalEnergyConsumption()

def supportsGetDramEnergyConsumption(self, index: int) -> bool:
"""Returns True if the specified CPU powerzone supports retrieving the subpackage energy consumption."""
return self.cpus[index].supportsGetDramEnergyConsumption()

def __len__(self) -> int:
"""Returns the number of CPUs being tracked."""
return len(self.cpus)


class EmptyCPUs(CPUs):
"""Empty CPUs management object to be used when CPUs management object is unavailable.
Calls to any methods will return a value error and the length of this object will be 0
"""

def __init__(self) -> None:
"""Instantiates empty CPUs object."""
pass

def __del__(self) -> None:
"""Shuts down the Intel CPU monitoring."""
pass

@property
def cpus(self) -> Sequence[CPU]:
"""Returns a list of CPU objects being tracked."""
return []

def getTotalEnergyConsumption(self, index: int) -> CpuDramMeasurement:
"""Returns the total energy consumption of the specified powerzone. Units: mJ."""
raise ValueError("No CPUs available.")

def supportsGetDramEnergyConsumption(self, index: int) -> bool:
"""Returns True if the specified CPU powerzone supports retrieving the subpackage energy consumption."""
raise ValueError("No CPUs available.")

def __len__(self) -> int:
"""Returns 0 since the object is empty."""
return 0
Loading

0 comments on commit c64ccbc

Please sign in to comment.