diff --git a/src/googleclouddebugger/capture_collector.py b/src/googleclouddebugger/capture_collector.py index 096eca1..41ac174 100644 --- a/src/googleclouddebugger/capture_collector.py +++ b/src/googleclouddebugger/capture_collector.py @@ -23,6 +23,7 @@ import os import re import sys +import time import types import labels @@ -48,6 +49,9 @@ OBJECT_HAS_NO_FIELDS = 'Object has no fields' LOG_ACTION_NOT_SUPPORTED = 'Log action on a breakpoint not supported' INVALID_EXPRESSION_INDEX = '' +DYNAMIC_LOG_OUT_OF_QUOTA = ( + 'LOGPOINT: Logpoint is paused due to high log rate until log ' + 'quota is restored') def _ListTypeFormatString(value): @@ -594,6 +598,12 @@ def __init__(self, definition): # When capturing recursively, limit on the size of sublists. self.max_sublist_items = 5 + # Time to pause after dynamic log quota has run out. + self.quota_recovery_ms = 500 + + # The time when we first entered the quota period + self._quota_recovery_start_time = None + # Select log function. level = self._definition.get('logLevel') if not level or level == 'INFO': @@ -619,15 +629,28 @@ def Log(self, frame): return {'isError': True, 'description': {'format': LOG_ACTION_NOT_SUPPORTED}} + if self._quota_recovery_start_time: + ms_elapsed = (time.time() - self._quota_recovery_start_time) * 1000 + if ms_elapsed > self.quota_recovery_ms: + # We are out of the recovery period, clear the time and continue + self._quota_recovery_start_time = None + else: + # We are in the recovery period, exit + return + # Evaluate watched expressions. - message = _FormatMessage( + message = 'LOGPOINT: ' + _FormatMessage( self._definition.get('logMessageFormat', ''), self._EvaluateExpressions(frame)) cdbg_logging_location = (NormalizePath(frame.f_code.co_filename), frame.f_lineno, _GetFrameCodeObjectName(frame)) - self._log_message('LOGPOINT: ' + message) + if native.ApplyDynamicLogsQuota(len(message)): + self._log_message(message) + else: + self._quota_recovery_start_time = time.time() + self._log_message(DYNAMIC_LOG_OUT_OF_QUOTA) del cdbg_logging_location return None diff --git a/src/googleclouddebugger/native_module.cc b/src/googleclouddebugger/native_module.cc index 63995c7..37649cb 100644 --- a/src/googleclouddebugger/native_module.cc +++ b/src/googleclouddebugger/native_module.cc @@ -66,23 +66,6 @@ static const INTEGER_CONSTANT kIntegerConstants[] = { // Class to set zero overhead breakpoints. static BytecodeBreakpoint g_bytecode_breakpoint; -// Condition and dynamic logging rate limits are defined as the maximum -// amount of time in nanoseconds to spend on particular processing per -// second. These rate are enforced as following: -// 1. If a single breakpoint contributes to half the maximum rate, that -// breakpoint will be deactivated. -// 2. If all breakpoints combined hit the maximum rate, any breakpoint to -// exceed the limit gets disabled. -// -// The first rule ensures that in vast majority of scenarios expensive -// breakpoints will get deactivated. The second rule guarantees that in edge -// case scenarios the total amount of time spent in condition evaluation will -// not exceed the alotted limit. -// -// All limits ignore the number of CPUs since Python is inherently single -// threaded. -static std::unique_ptr g_global_condition_quota_; - // Initializes C++ flags and logging. // // This function should be called exactly once during debugger bootstrap. It @@ -376,6 +359,31 @@ static PyObject* CallImmutable(PyObject* self, PyObject* py_args) { return PyEval_EvalCode(code, frame->f_globals, frame->f_locals); } +// Applies the dynamic logs quota, which is limited by both total messages and +// total bytes. This should be called before doing the actual logging call. +// +// Args: +// num_bytes: number of bytes in the message to log. +// Returns: +// True if there is quota available, False otherwise. +static PyObject* ApplyDynamicLogsQuota(PyObject* self, PyObject* py_args) { + LazyInitializeRateLimit(); + int num_bytes = -1; + if (!PyArg_ParseTuple(py_args, "i", &num_bytes) || num_bytes < 1) { + Py_RETURN_FALSE; + } + + LeakyBucket* global_dynamic_log_limiter = GetGlobalDynamicLogQuota(); + LeakyBucket* global_dynamic_log_bytes_limiter = + GetGlobalDynamicLogBytesQuota(); + + if (global_dynamic_log_limiter->RequestTokens(1) && + global_dynamic_log_bytes_limiter->RequestTokens(num_bytes)) { + Py_RETURN_TRUE; + } else { + Py_RETURN_FALSE; + } +} static PyMethodDef g_module_functions[] = { { @@ -427,6 +435,12 @@ static PyMethodDef g_module_functions[] = { METH_VARARGS, "Invokes a Python callable object with immutability tracer." }, + { + "ApplyDynamicLogsQuota", + ApplyDynamicLogsQuota, + METH_VARARGS, + "Applies the dynamic log quota" + }, { nullptr, nullptr, 0, nullptr } // sentinel }; diff --git a/src/googleclouddebugger/python_breakpoint.py b/src/googleclouddebugger/python_breakpoint.py index e369efc..0d84adf 100644 --- a/src/googleclouddebugger/python_breakpoint.py +++ b/src/googleclouddebugger/python_breakpoint.py @@ -109,6 +109,9 @@ def __init__(self, definition, hub_client, breakpoints_manager): self._lock = Lock() self._completed = False + if self.definition.get('action') == 'LOG': + self._collector = capture_collector.LogCollector(self.definition) + if not self._TryActivateBreakpoint() and not self._completed: self._DeferBreakpoint() @@ -328,8 +331,7 @@ def _BreakpointEvent(self, event, frame): if event != native.BREAKPOINT_EVENT_HIT: error_status = _BREAKPOINT_EVENT_STATUS[event] elif self.definition.get('action') == 'LOG': - collector = capture_collector.LogCollector(self.definition) - error_status = collector.Log(frame) + error_status = self._collector.Log(frame) if not error_status: return # Log action successful, no need to clear the breakpoint. diff --git a/src/googleclouddebugger/rate_limit.cc b/src/googleclouddebugger/rate_limit.cc index 84fca7b..20d7bb1 100644 --- a/src/googleclouddebugger/rate_limit.cc +++ b/src/googleclouddebugger/rate_limit.cc @@ -24,6 +24,18 @@ DEFINE_int32( 5000, "maximum number of Python lines/sec to spend on condition evaluation"); +DEFINE_int32( + max_dynamic_log_rate, + 50, // maximum of 50 log entries per second on average + "maximum rate of dynamic log entries in this process; short bursts are " + "allowed to exceed this limit"); + +DEFINE_int32( + max_dynamic_log_bytes_rate, + 20480, // maximum of 20K bytes per second on average + "maximum rate of dynamic log bytes in this process; short bursts are " + "allowed to exceed this limit"); + namespace devtools { namespace cdbg { @@ -39,26 +51,39 @@ namespace cdbg { // burst, and will only disable the breakpoint if CPU consumption due to // debugger is continuous for a prolonged period of time. static const double kConditionCostCapacityFactor = 0.1; +static const double kDynamicLogCapacityFactor = 5; +static const double kDynamicLogBytesCapacityFactor = 2; static std::unique_ptr g_global_condition_quota; +static std::unique_ptr g_global_dynamic_log_quota; +static std::unique_ptr g_global_dynamic_log_bytes_quota; static int64 GetBaseConditionQuotaCapacity() { return FLAGS_max_condition_lines_rate * kConditionCostCapacityFactor; } - void LazyInitializeRateLimit() { if (g_global_condition_quota == nullptr) { g_global_condition_quota.reset(new LeakyBucket( GetBaseConditionQuotaCapacity(), FLAGS_max_condition_lines_rate)); + + g_global_dynamic_log_quota.reset(new LeakyBucket( + FLAGS_max_dynamic_log_rate * kDynamicLogCapacityFactor, + FLAGS_max_dynamic_log_rate)); + + g_global_dynamic_log_bytes_quota.reset(new LeakyBucket( + FLAGS_max_dynamic_log_bytes_rate * kDynamicLogBytesCapacityFactor, + FLAGS_max_dynamic_log_bytes_rate)); } } void CleanupRateLimit() { g_global_condition_quota = nullptr; + g_global_dynamic_log_quota = nullptr; + g_global_dynamic_log_bytes_quota = nullptr; } @@ -66,6 +91,13 @@ LeakyBucket* GetGlobalConditionQuota() { return g_global_condition_quota.get(); } +LeakyBucket* GetGlobalDynamicLogQuota() { + return g_global_dynamic_log_quota.get(); +} + +LeakyBucket* GetGlobalDynamicLogBytesQuota() { + return g_global_dynamic_log_bytes_quota.get(); +} std::unique_ptr CreatePerBreakpointConditionQuota() { return std::unique_ptr(new LeakyBucket( diff --git a/src/googleclouddebugger/rate_limit.h b/src/googleclouddebugger/rate_limit.h index 30038fa..c7db0c0 100644 --- a/src/googleclouddebugger/rate_limit.h +++ b/src/googleclouddebugger/rate_limit.h @@ -48,7 +48,8 @@ void CleanupRateLimit(); // single threaded. LeakyBucket* GetGlobalConditionQuota(); std::unique_ptr CreatePerBreakpointConditionQuota(); - +LeakyBucket* GetGlobalDynamicLogQuota(); +LeakyBucket* GetGlobalDynamicLogBytesQuota(); } // namespace cdbg } // namespace devtools