Skip to content
This repository has been archived by the owner on Jan 23, 2024. It is now read-only.

Commit

Permalink
Add global total messages and total bytes rate limiting for python lo…
Browse files Browse the repository at this point in the history
…gpoints

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=152401375
  • Loading branch information
xinghuadou-google authored and Xinghua Dou committed Apr 14, 2017
1 parent 51cd9e0 commit ee1b2cd
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 23 deletions.
27 changes: 25 additions & 2 deletions src/googleclouddebugger/capture_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import os
import re
import sys
import time
import types

import labels
Expand All @@ -48,6 +49,9 @@
OBJECT_HAS_NO_FIELDS = 'Object has no fields'
LOG_ACTION_NOT_SUPPORTED = 'Log action on a breakpoint not supported'
INVALID_EXPRESSION_INDEX = '<N/A>'
DYNAMIC_LOG_OUT_OF_QUOTA = (
'LOGPOINT: Logpoint is paused due to high log rate until log '
'quota is restored')


def _ListTypeFormatString(value):
Expand Down Expand Up @@ -594,6 +598,12 @@ def __init__(self, definition):
# When capturing recursively, limit on the size of sublists.
self.max_sublist_items = 5

# Time to pause after dynamic log quota has run out.
self.quota_recovery_ms = 500

# The time when we first entered the quota period
self._quota_recovery_start_time = None

# Select log function.
level = self._definition.get('logLevel')
if not level or level == 'INFO':
Expand All @@ -619,15 +629,28 @@ def Log(self, frame):
return {'isError': True,
'description': {'format': LOG_ACTION_NOT_SUPPORTED}}

if self._quota_recovery_start_time:
ms_elapsed = (time.time() - self._quota_recovery_start_time) * 1000
if ms_elapsed > self.quota_recovery_ms:
# We are out of the recovery period, clear the time and continue
self._quota_recovery_start_time = None
else:
# We are in the recovery period, exit
return

# Evaluate watched expressions.
message = _FormatMessage(
message = 'LOGPOINT: ' + _FormatMessage(
self._definition.get('logMessageFormat', ''),
self._EvaluateExpressions(frame))

cdbg_logging_location = (NormalizePath(frame.f_code.co_filename),
frame.f_lineno, _GetFrameCodeObjectName(frame))

self._log_message('LOGPOINT: ' + message)
if native.ApplyDynamicLogsQuota(len(message)):
self._log_message(message)
else:
self._quota_recovery_start_time = time.time()
self._log_message(DYNAMIC_LOG_OUT_OF_QUOTA)
del cdbg_logging_location
return None

Expand Down
48 changes: 31 additions & 17 deletions src/googleclouddebugger/native_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,23 +66,6 @@ static const INTEGER_CONSTANT kIntegerConstants[] = {
// Class to set zero overhead breakpoints.
static BytecodeBreakpoint g_bytecode_breakpoint;

// Condition and dynamic logging rate limits are defined as the maximum
// amount of time in nanoseconds to spend on particular processing per
// second. These rate are enforced as following:
// 1. If a single breakpoint contributes to half the maximum rate, that
// breakpoint will be deactivated.
// 2. If all breakpoints combined hit the maximum rate, any breakpoint to
// exceed the limit gets disabled.
//
// The first rule ensures that in vast majority of scenarios expensive
// breakpoints will get deactivated. The second rule guarantees that in edge
// case scenarios the total amount of time spent in condition evaluation will
// not exceed the alotted limit.
//
// All limits ignore the number of CPUs since Python is inherently single
// threaded.
static std::unique_ptr<LeakyBucket> g_global_condition_quota_;

// Initializes C++ flags and logging.
//
// This function should be called exactly once during debugger bootstrap. It
Expand Down Expand Up @@ -376,6 +359,31 @@ static PyObject* CallImmutable(PyObject* self, PyObject* py_args) {
return PyEval_EvalCode(code, frame->f_globals, frame->f_locals);
}

// Applies the dynamic logs quota, which is limited by both total messages and
// total bytes. This should be called before doing the actual logging call.
//
// Args:
// num_bytes: number of bytes in the message to log.
// Returns:
// True if there is quota available, False otherwise.
static PyObject* ApplyDynamicLogsQuota(PyObject* self, PyObject* py_args) {
LazyInitializeRateLimit();
int num_bytes = -1;
if (!PyArg_ParseTuple(py_args, "i", &num_bytes) || num_bytes < 1) {
Py_RETURN_FALSE;
}

LeakyBucket* global_dynamic_log_limiter = GetGlobalDynamicLogQuota();
LeakyBucket* global_dynamic_log_bytes_limiter =
GetGlobalDynamicLogBytesQuota();

if (global_dynamic_log_limiter->RequestTokens(1) &&
global_dynamic_log_bytes_limiter->RequestTokens(num_bytes)) {
Py_RETURN_TRUE;
} else {
Py_RETURN_FALSE;
}
}

static PyMethodDef g_module_functions[] = {
{
Expand Down Expand Up @@ -427,6 +435,12 @@ static PyMethodDef g_module_functions[] = {
METH_VARARGS,
"Invokes a Python callable object with immutability tracer."
},
{
"ApplyDynamicLogsQuota",
ApplyDynamicLogsQuota,
METH_VARARGS,
"Applies the dynamic log quota"
},
{ nullptr, nullptr, 0, nullptr } // sentinel
};

Expand Down
6 changes: 4 additions & 2 deletions src/googleclouddebugger/python_breakpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ def __init__(self, definition, hub_client, breakpoints_manager):
self._lock = Lock()
self._completed = False

if self.definition.get('action') == 'LOG':
self._collector = capture_collector.LogCollector(self.definition)

if not self._TryActivateBreakpoint() and not self._completed:
self._DeferBreakpoint()

Expand Down Expand Up @@ -328,8 +331,7 @@ def _BreakpointEvent(self, event, frame):
if event != native.BREAKPOINT_EVENT_HIT:
error_status = _BREAKPOINT_EVENT_STATUS[event]
elif self.definition.get('action') == 'LOG':
collector = capture_collector.LogCollector(self.definition)
error_status = collector.Log(frame)
error_status = self._collector.Log(frame)
if not error_status:
return # Log action successful, no need to clear the breakpoint.

Expand Down
34 changes: 33 additions & 1 deletion src/googleclouddebugger/rate_limit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ DEFINE_int32(
5000,
"maximum number of Python lines/sec to spend on condition evaluation");

DEFINE_int32(
max_dynamic_log_rate,
50, // maximum of 50 log entries per second on average
"maximum rate of dynamic log entries in this process; short bursts are "
"allowed to exceed this limit");

DEFINE_int32(
max_dynamic_log_bytes_rate,
20480, // maximum of 20K bytes per second on average
"maximum rate of dynamic log bytes in this process; short bursts are "
"allowed to exceed this limit");

namespace devtools {
namespace cdbg {

Expand All @@ -39,33 +51,53 @@ namespace cdbg {
// burst, and will only disable the breakpoint if CPU consumption due to
// debugger is continuous for a prolonged period of time.
static const double kConditionCostCapacityFactor = 0.1;
static const double kDynamicLogCapacityFactor = 5;
static const double kDynamicLogBytesCapacityFactor = 2;

static std::unique_ptr<LeakyBucket> g_global_condition_quota;
static std::unique_ptr<LeakyBucket> g_global_dynamic_log_quota;
static std::unique_ptr<LeakyBucket> g_global_dynamic_log_bytes_quota;


static int64 GetBaseConditionQuotaCapacity() {
return FLAGS_max_condition_lines_rate * kConditionCostCapacityFactor;
}


void LazyInitializeRateLimit() {
if (g_global_condition_quota == nullptr) {
g_global_condition_quota.reset(new LeakyBucket(
GetBaseConditionQuotaCapacity(),
FLAGS_max_condition_lines_rate));

g_global_dynamic_log_quota.reset(new LeakyBucket(
FLAGS_max_dynamic_log_rate * kDynamicLogCapacityFactor,
FLAGS_max_dynamic_log_rate));

g_global_dynamic_log_bytes_quota.reset(new LeakyBucket(
FLAGS_max_dynamic_log_bytes_rate * kDynamicLogBytesCapacityFactor,
FLAGS_max_dynamic_log_bytes_rate));
}
}


void CleanupRateLimit() {
g_global_condition_quota = nullptr;
g_global_dynamic_log_quota = nullptr;
g_global_dynamic_log_bytes_quota = nullptr;
}


LeakyBucket* GetGlobalConditionQuota() {
return g_global_condition_quota.get();
}

LeakyBucket* GetGlobalDynamicLogQuota() {
return g_global_dynamic_log_quota.get();
}

LeakyBucket* GetGlobalDynamicLogBytesQuota() {
return g_global_dynamic_log_bytes_quota.get();
}

std::unique_ptr<LeakyBucket> CreatePerBreakpointConditionQuota() {
return std::unique_ptr<LeakyBucket>(new LeakyBucket(
Expand Down
3 changes: 2 additions & 1 deletion src/googleclouddebugger/rate_limit.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ void CleanupRateLimit();
// single threaded.
LeakyBucket* GetGlobalConditionQuota();
std::unique_ptr<LeakyBucket> CreatePerBreakpointConditionQuota();

LeakyBucket* GetGlobalDynamicLogQuota();
LeakyBucket* GetGlobalDynamicLogBytesQuota();
} // namespace cdbg
} // namespace devtools

Expand Down

0 comments on commit ee1b2cd

Please sign in to comment.