Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enforce memory usage for agent #2671

Merged
merged 4 commits into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion azurelinuxagent/common/cgroupconfigurator.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ def __init__(self):
self._cgroups_api = None
self._agent_cpu_cgroup_path = None
self._agent_memory_cgroup_path = None
self._agent_memory_cgroup = None
self._check_cgroups_lock = threading.RLock() # Protect the check_cgroups which is called from Monitor thread and main loop.

def initialize(self):
Expand Down Expand Up @@ -194,7 +195,8 @@ def initialize(self):

if self._agent_memory_cgroup_path is not None:
_log_cgroup_info("Agent Memory cgroup: {0}", self._agent_memory_cgroup_path)
CGroupsTelemetry.track_cgroup(MemoryCgroup(AGENT_NAME_TELEMETRY, self._agent_memory_cgroup_path))
self._agent_memory_cgroup = MemoryCgroup(AGENT_NAME_TELEMETRY, self._agent_memory_cgroup_path)
CGroupsTelemetry.track_cgroup(self._agent_memory_cgroup)

_log_cgroup_info('Agent cgroups enabled: {0}', self._agent_cgroups_enabled)

Expand Down Expand Up @@ -729,6 +731,19 @@ def _check_agent_throttled_time(cgroup_metrics):
if metric.value > conf.get_agent_cpu_throttled_time_threshold():
raise CGroupsException("The agent has been throttled for {0} seconds".format(metric.value))

def check_agent_memory_usage(self):
if self.enabled() and self._agent_memory_cgroup:
metrics = self._agent_memory_cgroup.get_tracked_metrics()
current_usage = 0
for metric in metrics:
if metric.counter == MetricsCounter.TOTAL_MEM_USAGE:
current_usage += metric.value
elif metric.counter == MetricsCounter.SWAP_MEM_USAGE:
current_usage += metric.value

if current_usage > conf.get_agent_memory_quota():
raise CGroupsException("The agent memory limit {0} bytes exceeded. The current reported usage is {1} bytes.".format(conf.get_agent_memory_quota(), current_usage))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use a new exception type for this (AgentMemoryExceededException?). CGroupsException is too generic, and also in general the code handles cgroups issues in such a way that the correspoding cgroup task becomes a no-op.


@staticmethod
def _get_parent(pid):
"""
Expand Down
20 changes: 20 additions & 0 deletions azurelinuxagent/common/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ def load_conf_from_file(conf_file_path, conf=__conf__):
"Debug.CgroupLogMetrics": False,
"Debug.CgroupDisableOnProcessCheckFailure": True,
"Debug.CgroupDisableOnQuotaCheckFailure": True,
"Debug.EnableAgentMemoryUsageCheck": False,
"Debug.EnableFastTrack": True,
"Debug.EnableGAVersioning": False
}
Expand Down Expand Up @@ -186,6 +187,7 @@ def load_conf_from_file(conf_file_path, conf=__conf__):
"Debug.CgroupCheckPeriod": 300,
"Debug.AgentCpuQuota": 50,
"Debug.AgentCpuThrottledTimeThreshold": 120,
"Debug.AgentMemoryQuota": 31457280,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use the same notation as below? (30 * 1024 ** 2)

"Debug.EtpCollectionPeriod": 300,
"Debug.AutoUpdateHotfixFrequency": 14400,
"Debug.AutoUpdateNormalFrequency": 86400,
Expand Down Expand Up @@ -555,6 +557,24 @@ def get_agent_cpu_throttled_time_threshold(conf=__conf__):
return conf.get_int("Debug.AgentCpuThrottledTimeThreshold", 120)


def get_agent_memory_quota(conf=__conf__):
"""
Memory quota for the agent in bytes.

NOTE: This option is experimental and may be removed in later versions of the Agent.
"""
return conf.get_int("Debug.AgentMemoryQuota", 30 * 1024 ** 2)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a placeholder value. We will update once we have concrete value.



def get_enable_agent_memory_usage_check(conf=__conf__):
"""
If True, Agent checks it's Memory usage.

NOTE: This option is experimental and may be removed in later versions of the Agent.
"""
return conf.get_switch("Debug.EnableAgentMemoryUsageCheck", False)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feature set to false for now



def get_cgroup_monitor_expiry_time(conf=__conf__):
"""
cgroups monitoring for pilot extensions disabled after expiry time
Expand Down
29 changes: 28 additions & 1 deletion azurelinuxagent/ga/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
from azurelinuxagent.common.cgroupconfigurator import CGroupConfigurator
from azurelinuxagent.common.event import add_event, initialize_event_logger_vminfo_common_parameters, \
WALAEventOperation, EVENTS_DIRECTORY
from azurelinuxagent.common.exception import ResourceGoneError, UpdateError, ExitException, AgentUpgradeExitException
from azurelinuxagent.common.exception import ResourceGoneError, UpdateError, ExitException, AgentUpgradeExitException, \
CGroupsException
from azurelinuxagent.common.future import ustr
from azurelinuxagent.common.osutil import get_osutil, systemd
from azurelinuxagent.common.persist_firewall_rules import PersistFirewallRulesHandler
Expand Down Expand Up @@ -137,6 +138,7 @@ def get_update_handler():

class UpdateHandler(object):
TELEMETRY_HEARTBEAT_PERIOD = timedelta(minutes=30)
CHECK_MEMORY_USAGE_PERIOD = timedelta(seconds=conf.get_cgroup_check_period())

def __init__(self):
self.osutil = get_osutil()
Expand All @@ -162,6 +164,9 @@ def __init__(self):
self._heartbeat_id = str(uuid.uuid4()).upper()
self._heartbeat_counter = 0

self._last_check_memory_usage = datetime.min
self._check_memory_usage_last_error_report = datetime.min

# VM Size is reported via the heartbeat, default it here.
self._vm_size = None

Expand Down Expand Up @@ -401,6 +406,7 @@ def run(self, debug=False):
self._check_threads_running(all_thread_handlers)
self._process_goal_state(exthandlers_handler, remote_access_handler)
self._send_heartbeat_telemetry(protocol)
self._check_agent_memory_usage()
time.sleep(self._goal_state_period)

except AgentUpgradeExitException as exitException:
Expand Down Expand Up @@ -1288,6 +1294,27 @@ def _send_heartbeat_telemetry(self, protocol):
self._heartbeat_update_goal_state_error_count = 0
self._last_telemetry_heartbeat = datetime.utcnow()

def _check_agent_memory_usage(self):
"""
This checks the agent current memory usage and safely exit the process if agent reaches the memory limit
"""
try:
if conf.get_enable_agent_memory_usage_check() and self._extensions_summary.converged:
if self._last_check_memory_usage == datetime.min or datetime.utcnow() >= (self._last_check_memory_usage + UpdateHandler.CHECK_MEMORY_USAGE_PERIOD):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would check if goal state completed and also, cgroup monitor period.

Note: I'm using same cgroup monitor period to run this check assuming that we evaluate and calculate memory limit based on the values we get from monitoring thread.

self._last_check_memory_usage = datetime.utcnow()
CGroupConfigurator.get_instance().check_agent_memory_usage()
except CGroupsException as exception:
msg = "Check on agent memory usage:\n{0}".format(ustr(exception))
logger.info(msg)
add_event(AGENT_NAME, op=WALAEventOperation.CGroupsInfo, is_success=True, message=msg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have a new WALAEventOperation for this event

raise ExitException("Agent {0} is reached memory limit -- exiting".format(CURRENT_AGENT))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ExitException and rest of the logic of sys.exit already handled in main thread for safe exit.

except Exception as exception:
if self._check_memory_usage_last_error_report == datetime.min or (self._check_memory_usage_last_error_report + timedelta(hours=6)) > datetime.now():
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to avoid flooding error msgs in agent log

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. Could also add a short string such as "[Won't report the same error for 6 hours]"?

self._check_memory_usage_last_error_report = datetime.now()
msg = "Error checking the agent's memory usage: {0}".format(ustr(exception))
logger.warn(msg)
add_event(AGENT_NAME, op=WALAEventOperation.CGroupsInfo, is_success=False, message=msg)

@staticmethod
def _ensure_extension_telemetry_state_configured_properly(protocol):
etp_enabled = get_supported_feature_by_name(SupportedFeatureNames.ExtensionTelemetryPipeline).is_supported
Expand Down
12 changes: 12 additions & 0 deletions tests/common/test_cgroupconfigurator.py
Original file line number Diff line number Diff line change
Expand Up @@ -986,3 +986,15 @@ def test_check_cgroups_should_disable_cgroups_when_a_check_fails(self):
finally:
for p in patchers:
p.stop()

def test_check_agent_memory_usage_should_raise_a_cgroups_exception_when_the_limit_is_exceeded(self):
metrics = [MetricValue(MetricsCategory.MEMORY_CATEGORY, MetricsCounter.TOTAL_MEM_USAGE, AGENT_NAME_TELEMETRY, conf.get_agent_memory_quota() + 1),
MetricValue(MetricsCategory.MEMORY_CATEGORY, MetricsCounter.SWAP_MEM_USAGE, AGENT_NAME_TELEMETRY, conf.get_agent_memory_quota() + 1)]

with self.assertRaises(CGroupsException) as context_manager:
with self._get_cgroup_configurator() as configurator:
with patch("azurelinuxagent.common.cgroup.MemoryCgroup.get_tracked_metrics") as tracked_metrics:
tracked_metrics.return_value = metrics
configurator.check_agent_memory_usage()

self.assertIn("The agent memory limit {0} bytes exceeded".format(conf.get_agent_memory_quota()), ustr(context_manager.exception), "An incorrect exception was raised")
62 changes: 52 additions & 10 deletions tests/ga/test_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@

from azurelinuxagent.common import conf
from azurelinuxagent.common.event import EVENTS_DIRECTORY, WALAEventOperation
from azurelinuxagent.common.exception import ProtocolError, UpdateError, ResourceGoneError, HttpError
from azurelinuxagent.common.exception import ProtocolError, UpdateError, ResourceGoneError, HttpError, CGroupsException, \
ExitException
from azurelinuxagent.common.future import ustr, httpclient
from azurelinuxagent.common.persist_firewall_rules import PersistFirewallRulesHandler
from azurelinuxagent.common.protocol.hostplugin import URI_FORMAT_GET_API_VERSIONS, HOST_PLUGIN_PORT, \
Expand Down Expand Up @@ -1461,7 +1462,7 @@ def _get_test_ext_handler_instance(protocol, name="OSTCExtensions.ExampleHandler
eh = Extension(name=name)
eh.version = version
return ExtHandlerInstance(eh, protocol)

def test_update_handler_recovers_from_error_with_no_certs(self):
data = DATA_FILE.copy()
data['goal_state'] = 'wire/goal_state_no_certs.xml'
Expand Down Expand Up @@ -1489,7 +1490,7 @@ def match_unexpected_errors():
for (args, _) in filter(lambda a: len(a) > 0, patched_error.call_args_list):
if unexpected_msg_fragment in args[0]:
matching_errors.append(args[0])

if len(matching_errors) > 1:
self.fail("Guest Agent did not recover, with new error(s): {}"\
.format(matching_errors[1:]))
Expand Down Expand Up @@ -2894,7 +2895,7 @@ def vm_settings_not_supported(url, *_, **__):
if HttpRequestPredicates.is_host_plugin_vm_settings_request(url):
return MockHttpResponse(404)
return None

with mock_wire_protocol(data) as protocol:

def mock_live_migration(iteration):
Expand All @@ -2904,7 +2905,7 @@ def mock_live_migration(iteration):
elif iteration == 2:
protocol.mock_wire_data.set_incarnation(3)
protocol.set_http_handlers(http_get_handler=vm_settings_not_supported)

with mock_update_handler(protocol, 3, on_new_iteration=mock_live_migration) as update_handler:
with patch("azurelinuxagent.ga.update.logger.error") as patched_error:
def check_for_errors():
Expand All @@ -2916,7 +2917,7 @@ def check_for_errors():

update_handler.run(debug=True)
check_for_errors()

timestamp = protocol.client.get_host_plugin()._fast_track_timestamp
self.assertEqual(timestamp, timeutil.create_timestamp(datetime.min),
"Expected fast track time stamp to be set to {0}, got {1}".format(datetime.min, timestamp))
Expand All @@ -2926,16 +2927,16 @@ class HeartbeatTestCase(AgentTestCase):
@patch("azurelinuxagent.common.logger.info")
@patch("azurelinuxagent.ga.update.add_event")
def test_telemetry_heartbeat_creates_event(self, patch_add_event, patch_info, *_):

with mock_wire_protocol(mockwiredata.DATA_FILE) as mock_protocol:
update_handler = get_update_handler()

update_handler.last_telemetry_heartbeat = datetime.utcnow() - timedelta(hours=1)
update_handler._send_heartbeat_telemetry(mock_protocol)
self.assertEqual(1, patch_add_event.call_count)
self.assertTrue(any(call_args[0] == "[HEARTBEAT] Agent {0} is running as the goal state agent {1}"
for call_args in patch_info.call_args), "The heartbeat was not written to the agent's log")

@patch("azurelinuxagent.ga.update.add_event")
@patch("azurelinuxagent.common.protocol.imds.ImdsClient")
def test_telemetry_heartbeat_retries_failed_vm_size_fetch(self, mock_imds_factory, patch_add_event, *_):
Expand All @@ -2953,7 +2954,7 @@ def validate_single_heartbeat_event_matches_vm_size(vm_size):
self.assertTrue(telemetry_message.endswith(vm_size),
"Expected HeartBeat message ('{0}') to end with the test vmSize value, {1}."\
.format(telemetry_message, vm_size))

with mock_wire_protocol(mockwiredata.DATA_FILE) as mock_protocol:
update_handler = get_update_handler()
update_handler.protocol_util.get_protocol = Mock(return_value=mock_protocol)
Expand All @@ -2979,6 +2980,47 @@ def validate_single_heartbeat_event_matches_vm_size(vm_size):
validate_single_heartbeat_event_matches_vm_size("TestVmSizeValue")


class AgentMemoryCheckTestCase(AgentTestCase):

@patch("azurelinuxagent.common.logger.info")
@patch("azurelinuxagent.ga.update.add_event")
def test_check_agent_memory_usage_raises_exit_exception(self, patch_add_event, patch_info, *_):
with patch("azurelinuxagent.common.cgroupconfigurator.CGroupConfigurator._Impl.check_agent_memory_usage", side_effect=CGroupsException()):
with patch('azurelinuxagent.common.conf.get_enable_agent_memory_usage_check', return_value=True):
with self.assertRaises(ExitException) as context_manager:
update_handler = get_update_handler()

update_handler._check_agent_memory_usage()
self.assertEqual(1, patch_add_event.call_count)
self.assertTrue(any("Check on agent memory usage" in call_args[0]
for call_args in patch_info.call_args),
"The memory check was not written to the agent's log")
self.assertIn("Agent {0} is reached memory limit -- exiting".format(CURRENT_AGENT),
ustr(context_manager.exception), "An incorrect exception was raised")

@patch("azurelinuxagent.common.logger.warn")
@patch("azurelinuxagent.ga.update.add_event")
def test_check_agent_memory_usage_fails(self, patch_add_event, patch_warn, *_):
with patch("azurelinuxagent.common.cgroupconfigurator.CGroupConfigurator._Impl.check_agent_memory_usage", side_effect=Exception()):
with patch('azurelinuxagent.common.conf.get_enable_agent_memory_usage_check', return_value=True):
update_handler = get_update_handler()

update_handler._check_agent_memory_usage()
self.assertTrue(any("Error checking the agent's memory usage" in call_args[0]
for call_args in patch_warn.call_args),
"The memory check was not written to the agent's log")
self.assertEqual(1, patch_add_event.call_count)
add_events = [kwargs for _, kwargs in patch_add_event.call_args_list if
kwargs["op"] == WALAEventOperation.CGroupsInfo]
self.assertTrue(
len(add_events) == 1,
"Exactly 1 event should have been emitted when memory usage check fails. Got: {0}".format(add_events))
self.assertIn(
"Error checking the agent's memory usage",
add_events[0]["message"],
"The error message is not correct when memory usage check failed")


class GoalStateIntervalTestCase(AgentTestCase):
def test_initial_goal_state_period_should_default_to_goal_state_period(self):
configuration_provider = conf.ConfigurationProvider()
Expand Down
2 changes: 2 additions & 0 deletions tests/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
DVD.MountPoint = /mnt/cdrom/secure
Debug.AgentCpuQuota = 50
Debug.AgentCpuThrottledTimeThreshold = 120
Debug.AgentMemoryQuota = 31457280
Debug.AutoUpdateHotfixFrequency = 14400
Debug.AutoUpdateNormalFrequency = 86400
Debug.CgroupCheckPeriod = 300
Expand All @@ -39,6 +40,7 @@
Debug.CgroupLogMetrics = False
Debug.CgroupMonitorExpiryTime = 2022-03-31
Debug.CgroupMonitorExtensionName = Microsoft.Azure.Monitor.AzureMonitorLinuxAgent
Debug.EnableAgentMemoryUsageCheck = False
Debug.EnableFastTrack = True
Debug.EnableGAVersioning = False
Debug.EtpCollectionPeriod = 300
Expand Down