Skip to content

Commit

Permalink
Send both process level and cpu level metrics (#35753)
Browse files Browse the repository at this point in the history
  • Loading branch information
lzchen authored May 28, 2024
1 parent 47374d3 commit 38d3a32
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 32 deletions.
2 changes: 2 additions & 0 deletions sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

- Update live metrics to use typespec generated swagger
([#34840](https://github.com/Azure/azure-sdk-for-python/pull/34840))
- Send old and new process level live metrics
([#35753](https://github.com/Azure/azure-sdk-for-python/pull/35753))

## 1.0.0b25 (2024-04-19)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
# (OpenTelemetry metric name, Quickpulse metric name)
# Memory
_COMMITTED_BYTES_NAME = ("azuremonitor.memorycommittedbytes", "\\Memory\\Committed Bytes")
_PROCESS_PHYSICAL_BYTES_NAME = ("azuremonitor.processphysicalbytes", "\\Process\\Physical Bytes")
# CPU
_PROCESSOR_TIME_NAME = ("azuremonitor.processortotalprocessortime", "\\Processor(_Total)\\% Processor Time")
_PROCESS_TIME_NORMALIZED_NAME = ("azuremonitor.processtimenormalized", "\\% Process\\Processor Time Normalized")
# Request
_REQUEST_RATE_NAME = ("azuremonitor.requestssec", "\\ApplicationInsights\\Requests/Sec")
_REQUEST_FAILURE_RATE_NAME = ("azuremonitor.requestsfailedsec", "\\ApplicationInsights\\Requests Failed/Sec")
Expand All @@ -23,8 +25,9 @@
_QUICKPULSE_METRIC_NAME_MAPPINGS = dict(
[
_COMMITTED_BYTES_NAME,
_PROCESS_PHYSICAL_BYTES_NAME,
_PROCESSOR_TIME_NAME,
_PROCESSOR_TIME_NAME,
_PROCESS_TIME_NORMALIZED_NAME,
_REQUEST_RATE_NAME,
_REQUEST_FAILURE_RATE_NAME,
_REQUEST_DURATION_NAME,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
# cSpell:disable

from datetime import datetime
from typing import Any, Iterable, Optional

import platform
Expand All @@ -23,6 +23,8 @@
_DEPENDENCY_FAILURE_RATE_NAME,
_DEPENDENCY_RATE_NAME,
_EXCEPTION_RATE_NAME,
_PROCESS_PHYSICAL_BYTES_NAME,
_PROCESS_TIME_NORMALIZED_NAME,
_PROCESSOR_TIME_NAME,
_REQUEST_DURATION_NAME,
_REQUEST_FAILURE_RATE_NAME,
Expand All @@ -37,7 +39,13 @@
_QuickpulseState,
_is_post_state,
_append_quickpulse_document,
_get_quickpulse_last_process_cpu,
_get_quickpulse_last_process_time,
_get_quickpulse_process_elapsed_time,
_set_global_quickpulse_state,
_set_quickpulse_last_process_cpu,
_set_quickpulse_last_process_time,
_set_quickpulse_process_elapsed_time,
)
from azure.monitor.opentelemetry.exporter._quickpulse._utils import (
_get_log_record_document,
Expand All @@ -55,6 +63,7 @@


PROCESS = psutil.Process()
NUM_CPUS = psutil.cpu_count()

def enable_live_metrics(**kwargs: Any) -> None: # pylint: disable=C4758
"""Live metrics entry point.
Expand Down Expand Up @@ -129,13 +138,21 @@ def __init__(self, connection_string: Optional[str], resource: Optional[Resource
"exc/sec",
"live metrics exception rate per second"
)
self._process_memory_gauge = self._meter.create_observable_gauge(
self._process_memory_gauge_old = self._meter.create_observable_gauge(
_COMMITTED_BYTES_NAME[0],
[_get_process_memory],
)
self._processor_time_gauge = self._meter.create_observable_gauge(
self._process_memory_gauge = self._meter.create_observable_gauge(
_PROCESS_PHYSICAL_BYTES_NAME[0],
[_get_process_memory],
)
self._process_time_gauge_old = self._meter.create_observable_gauge(
_PROCESSOR_TIME_NAME[0],
[_get_processor_time],
[_get_process_time_normalized_old],
)
self._process_time_gauge = self._meter.create_observable_gauge(
_PROCESS_TIME_NORMALIZED_NAME[0],
[_get_process_time_normalized],
)

def _record_span(self, span: ReadableSpan) -> None:
Expand Down Expand Up @@ -178,19 +195,40 @@ def _record_log_record(self, log_data: LogData) -> None:

# pylint: disable=unused-argument
def _get_process_memory(options: CallbackOptions) -> Iterable[Observation]:
# rss is non-swapped physical memory a process has used
yield Observation(
PROCESS.memory_info().rss,
{},
)
memory = 0
try:
# rss is non-swapped physical memory a process has used
memory = PROCESS.memory_info().rss
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
yield Observation(memory, {})


# pylint: disable=unused-argument
def _get_process_time_normalized_old(options: CallbackOptions) -> Iterable[Observation]:
normalized_cpu_percentage = 0.0
try:
cpu_times = PROCESS.cpu_times()
# total process time is user + system in s
total_time_s = cpu_times.user + cpu_times.system
process_time_s = total_time_s - _get_quickpulse_last_process_time()
_set_quickpulse_last_process_time(process_time_s)
# Find elapsed time in s since last collection
current_time = datetime.now()
elapsed_time_s = (current_time - _get_quickpulse_process_elapsed_time()).total_seconds()
_set_quickpulse_process_elapsed_time(current_time)
# Obtain cpu % by dividing by elapsed time
cpu_percentage = process_time_s / elapsed_time_s
# Normalize by dividing by amount of logical cpus
normalized_cpu_percentage = cpu_percentage / NUM_CPUS
_set_quickpulse_last_process_cpu(normalized_cpu_percentage)
except (psutil.NoSuchProcess, psutil.AccessDenied, ZeroDivisionError):
pass
yield Observation(normalized_cpu_percentage, {})


# pylint: disable=unused-argument
def _get_processor_time(options: CallbackOptions) -> Iterable[Observation]:
# Processor time does not include idle time
yield Observation(
100 - psutil.cpu_times_percent().idle,
{},
)
def _get_process_time_normalized(options: CallbackOptions) -> Iterable[Observation]:
yield Observation(_get_quickpulse_last_process_cpu(), {})

# cSpell:enable
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from datetime import datetime
from enum import Enum
from typing import List

Expand All @@ -23,8 +24,11 @@ class _QuickpulseState(Enum):

_GLOBAL_QUICKPULSE_STATE = _QuickpulseState.OFFLINE
_QUICKPULSE_DOCUMENTS: List[DocumentIngress] = []
_QUICKPULSE_LAST_PROCESS_TIME = 0.0
_QUICKPULSE_PROCESS_ELAPSED_TIME = datetime.now()
_QUICKPULSE_LAST_PROCESS_CPU = 0.0

def _set_global_quickpulse_state(state: _QuickpulseState):
def _set_global_quickpulse_state(state: _QuickpulseState) -> None:
# pylint: disable=global-statement
global _GLOBAL_QUICKPULSE_STATE
_GLOBAL_QUICKPULSE_STATE = state
Expand All @@ -34,6 +38,36 @@ def _get_global_quickpulse_state() -> _QuickpulseState:
return _GLOBAL_QUICKPULSE_STATE


def _set_quickpulse_last_process_time(time: float) -> None:
# pylint: disable=global-statement
global _QUICKPULSE_LAST_PROCESS_TIME
_QUICKPULSE_LAST_PROCESS_TIME = time


def _get_quickpulse_last_process_time() -> float:
return _QUICKPULSE_LAST_PROCESS_TIME


def _set_quickpulse_process_elapsed_time(time: datetime) -> None:
# pylint: disable=global-statement
global _QUICKPULSE_PROCESS_ELAPSED_TIME
_QUICKPULSE_PROCESS_ELAPSED_TIME = time


def _get_quickpulse_process_elapsed_time() -> datetime:
return _QUICKPULSE_PROCESS_ELAPSED_TIME


def _set_quickpulse_last_process_cpu(time: float) -> None:
# pylint: disable=global-statement
global _QUICKPULSE_LAST_PROCESS_CPU
_QUICKPULSE_LAST_PROCESS_CPU = time


def _get_quickpulse_last_process_cpu() -> float:
return _QUICKPULSE_LAST_PROCESS_CPU


def is_quickpulse_enabled() -> bool:
return _get_global_quickpulse_state() is not _QuickpulseState.OFFLINE

Expand Down
2 changes: 1 addition & 1 deletion sdk/monitor/azure-monitor-opentelemetry-exporter/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
"msrest>=0.6.10",
"opentelemetry-api~=1.21",
"opentelemetry-sdk~=1.21",
"psutil>=5.9.8",
"psutil~=5.9",
],
entry_points={
"opentelemetry_traces_exporter": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

import collections
import platform
import psutil
import unittest
from datetime import datetime, timedelta
from unittest import mock

from opentelemetry.sdk.metrics import (
Expand All @@ -21,12 +23,12 @@

from azure.monitor.opentelemetry.exporter._generated.models import ContextTagKeys
from azure.monitor.opentelemetry.exporter._quickpulse._constants import (
_COMMITTED_BYTES_NAME,
_DEPENDENCY_DURATION_NAME,
_DEPENDENCY_FAILURE_RATE_NAME,
_DEPENDENCY_RATE_NAME,
_EXCEPTION_RATE_NAME,
_PROCESSOR_TIME_NAME,
_PROCESS_PHYSICAL_BYTES_NAME,
_PROCESS_TIME_NORMALIZED_NAME,
_REQUEST_DURATION_NAME,
_REQUEST_FAILURE_RATE_NAME,
_REQUEST_RATE_NAME,
Expand All @@ -38,7 +40,8 @@
from azure.monitor.opentelemetry.exporter._quickpulse._live_metrics import (
enable_live_metrics,
_get_process_memory,
_get_processor_time,
_get_process_time_normalized,
_get_process_time_normalized_old,
_QuickpulseManager,
)
from azure.monitor.opentelemetry.exporter._quickpulse._state import (
Expand All @@ -65,6 +68,7 @@ def test_enable_live_metrics(self, manager_mock):


class TestQuickpulseManager(unittest.TestCase):

@classmethod
def setUpClass(cls):
_set_global_quickpulse_state(_QuickpulseState.PING_SHORT)
Expand Down Expand Up @@ -131,11 +135,11 @@ def test_init(self, generator_mock):
self.assertTrue(isinstance(qpm._exception_rate_counter, Counter))
self.assertEqual(qpm._exception_rate_counter.name, _EXCEPTION_RATE_NAME[0])
self.assertTrue(isinstance(qpm._process_memory_gauge, ObservableGauge))
self.assertEqual(qpm._process_memory_gauge.name, _COMMITTED_BYTES_NAME[0])
self.assertEqual(qpm._process_memory_gauge.name, _PROCESS_PHYSICAL_BYTES_NAME[0])
self.assertEqual(qpm._process_memory_gauge._callbacks, [_get_process_memory])
self.assertTrue(isinstance(qpm._processor_time_gauge, ObservableGauge))
self.assertEqual(qpm._processor_time_gauge.name, _PROCESSOR_TIME_NAME[0])
self.assertEqual(qpm._processor_time_gauge._callbacks, [_get_processor_time])
self.assertTrue(isinstance(qpm._process_time_gauge, ObservableGauge))
self.assertEqual(qpm._process_time_gauge.name, _PROCESS_TIME_NORMALIZED_NAME[0])
self.assertEqual(qpm._process_time_gauge._callbacks, [_get_process_time_normalized])


def test_singleton(self):
Expand Down Expand Up @@ -301,13 +305,31 @@ def test_process_memory(self):
obs = next(mem)
self.assertEqual(obs.value, 40)

@mock.patch("psutil.cpu_times_percent")
def test_processor_time(self, processor_mock):
cpu = collections.namedtuple('cpu', 'idle')
cpu_times = cpu(idle=94.5)
processor_mock.return_value = cpu_times
time = _get_processor_time(None)
def test_process_memory_error(self):
with mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics.PROCESS") as process_mock:
memory = collections.namedtuple('memory', 'rss')
pmem = memory(rss=40)
process_mock.memory_info.return_value = pmem
process_mock.memory_info.side_effect = psutil.NoSuchProcess(1)
mem = _get_process_memory(None)
obs = next(mem)
self.assertEqual(obs.value, 0)

@mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._get_quickpulse_process_elapsed_time")
@mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._get_quickpulse_last_process_time")
@mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics.PROCESS")
def test_process_time(self, process_mock, process_time_mock, elapsed_time_mock):
current = datetime.now()
cpu = collections.namedtuple("cpu", ['user', 'system'])
cpu_times = cpu(user=3.6, system=6.8)
process_mock.cpu_times.return_value = cpu_times
process_time_mock.return_value = 4.4
elapsed_time_mock.return_value = current - timedelta(seconds=5)
with mock.patch("datetime.datetime") as datetime_mock:
datetime_mock.now.return_value = current
time = _get_process_time_normalized_old(None)
obs = next(time)
self.assertEqual(obs.value, 5.5)
num_cpus = psutil.cpu_count()
self.assertAlmostEqual(obs.value, 1.2 / num_cpus, delta=1)

# cSpell:enable

0 comments on commit 38d3a32

Please sign in to comment.