forked from kubernetes/ingress-nginx
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request kubernetes#53 from Shopify/adding-monitoring-modules
Emit response StatsD metrics about upstreams when dynamic configuration is enabled
- Loading branch information
Showing
9 changed files
with
407 additions
and
36 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
local statsd = require('statsd') | ||
local defer = require("util.defer") | ||
local split = require("util.split") | ||
|
||
local _M = {} | ||
|
||
local function send_response_data(upstream_state, client_state) | ||
local status_class | ||
if upstream_state.status then | ||
for i, status in ipairs(upstream_state.status) do | ||
-- TODO: link this with the zones when we use openresty-upstream | ||
if status == '-' then | ||
status = 'ngx_error' | ||
status_class = 'ngx_error' | ||
else | ||
status_class = string.sub(status, 0, 1) .. "xx" | ||
end | ||
|
||
ngx.log(ngx.INFO, upstream_state.addr[i] ) | ||
statsd.increment('ingress.nginx.upstream.response', 1, { | ||
status=status, | ||
status_class=status_class, | ||
upstream_name=client_state.upstream_name | ||
}) | ||
|
||
statsd.histogram('ingress.nginx.upstream.response_time', | ||
upstream_state.response_time[i], { | ||
upstream_name=client_state.upstream_name | ||
}) | ||
end | ||
end | ||
|
||
status_class = string.sub(client_state.status, 0, 1) .. "xx" | ||
statsd.increment('ingress.nginx.client.response', 1, { | ||
status=client_state.status, | ||
status_class=status_class, | ||
upstream_name=client_state.upstream_name | ||
}) | ||
|
||
statsd.histogram('ingress.nginx.client.request_time', client_state.request_time, { | ||
upstream_name=client_state.upstream_name | ||
}) | ||
end | ||
|
||
function _M.call() | ||
local status, status_err = split.split_upstream_var(ngx.var.upstream_status) | ||
if status_err then | ||
return nil, status_err | ||
end | ||
|
||
local addrs, addrs_err = split.split_upstream_addr(ngx.var.upstream_addr) | ||
if addrs_err then | ||
return nil, addrs_err | ||
end | ||
|
||
local response_time, rt_err = split.split_upstream_var(ngx.var.upstream_response_time) | ||
if rt_err then | ||
return nil, rt_err | ||
end | ||
|
||
local ok, err = defer.to_timer_phase(send_response_data, { | ||
status=status, | ||
addr=addrs, | ||
response_time=response_time | ||
}, { | ||
status=ngx.var.status, | ||
request_time=ngx.var.request_time, | ||
upstream_name=ngx.var.proxy_upstream_name | ||
}) | ||
|
||
if not ok then | ||
local msg = "failed to send response data: " .. tostring(err) | ||
ngx.log(ngx.ERR, msg) | ||
return nil, msg | ||
end | ||
end | ||
|
||
return _M |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
local pairs = pairs | ||
local string_format = string.format | ||
local string_len = string.len | ||
local udp = ngx.socket.udp | ||
|
||
local util = require("util") | ||
local defer = require("util.defer") | ||
|
||
local util_tablelength = util.tablelength | ||
|
||
local _M = {} | ||
local default_tag_string = "|#" | ||
|
||
local METRIC_COUNTER = "c" | ||
local METRIC_GAUGE = "g" | ||
local METRIC_HISTOGRAM = "h" | ||
local METRIC_SET = "s" | ||
local MICROSECONDS = 1000000 | ||
|
||
local function create_udp_socket(host, port) | ||
local sock, sock_err = udp() | ||
if not sock then | ||
return nil, sock_err | ||
end | ||
|
||
local ok, peer_err = sock:setpeername(host, port) | ||
if not ok then | ||
return nil, peer_err | ||
end | ||
|
||
return sock, nil | ||
end | ||
|
||
local function get_udp_socket(host, port) | ||
local id = string_format("%s:%d", host, port) | ||
local ctx = ngx.ctx.statsd_sockets | ||
local err | ||
|
||
if not ctx then | ||
ctx = {} | ||
ngx.ctx.statsd_sockets = ctx | ||
end | ||
|
||
local sock = ctx[id] | ||
if not sock then | ||
sock, err = create_udp_socket(host, port) | ||
if not sock then | ||
return nil, err | ||
end | ||
|
||
ctx[id] = sock | ||
end | ||
|
||
return sock, nil | ||
end | ||
|
||
local function generate_tag_string(tags) | ||
if not tags or util_tablelength(tags) == 0 then | ||
return "" | ||
end | ||
|
||
local tag_str = default_tag_string | ||
for k,v in pairs(tags) do | ||
if string_len(tag_str) > 2 then | ||
tag_str = tag_str .. "," | ||
end | ||
tag_str = tag_str .. k .. ":" .. v | ||
end | ||
|
||
return tag_str | ||
end | ||
|
||
local function generate_packet(metric, key, value, tags, sampling_rate) | ||
if sampling_rate == 1 then | ||
sampling_rate = "" | ||
else | ||
sampling_rate = string_format("|@%g", sampling_rate) | ||
end | ||
|
||
return string_format("%s:%s|%s%s%s", key, tostring(value), metric, sampling_rate, generate_tag_string(tags)) | ||
end | ||
|
||
local function metric(metric_type, key, value, tags, sample_rate) | ||
if not value then | ||
return nil, "no value passed" | ||
end | ||
if value == '-' then | ||
return nil, nil -- don't pass an error to avoid logging to error log | ||
end | ||
|
||
if not _M.config then | ||
return true, nil | ||
end | ||
|
||
local sampling_rate = sample_rate or _M.config.sampling_rate | ||
|
||
if sampling_rate ~= 1 and math.random() > sampling_rate then | ||
return nil, nil -- don't pass an error to avoid logging to error log | ||
end | ||
|
||
local packet = generate_packet(metric_type, key, value, tags, sampling_rate) | ||
|
||
local sock, err = get_udp_socket(_M.config.host, _M.config.port) | ||
if not sock then | ||
return nil, err | ||
end | ||
|
||
return sock:send(packet) | ||
end | ||
|
||
local function send_metrics(...) | ||
local ok, err = metric(...) | ||
if not ok and err then | ||
ngx.log(ngx.WARN, "failed logging to statsd: " .. tostring(err)) | ||
end | ||
return ok, err | ||
end | ||
|
||
-- to avoid logging everywhere in #metric | ||
local function log_metric(...) | ||
local ok, err = defer.to_timer_phase(send_metrics, ...) | ||
if not ok then | ||
local msg = "failed to log metric: " .. tostring(err) | ||
ngx.log(ngx.ERR, msg) | ||
return nil, msg | ||
end | ||
return true | ||
end | ||
|
||
-- Statsd module level convenince functions | ||
|
||
function _M.increment(key, value, tags, ...) | ||
return log_metric(METRIC_COUNTER, key, value or 1, tags, ...) | ||
end | ||
|
||
function _M.gauge(key, value, tags, ...) | ||
return log_metric(METRIC_GAUGE, key, value, tags, ...) | ||
end | ||
|
||
function _M.histogram(key, value, tags, ...) | ||
return log_metric(METRIC_HISTOGRAM, key, value, tags, ...) | ||
end | ||
|
||
function _M.set(key, value, tags, ...) | ||
return log_metric(METRIC_SET, key, value, tags, ...) | ||
end | ||
|
||
function _M.time(f) | ||
local start_time = ngx.now() | ||
local ret = { f() } | ||
return ret, (ngx.now() - start_time) * MICROSECONDS | ||
end | ||
|
||
function _M.measure(key, f, tags) | ||
local ret, time = _M.time(f) | ||
_M.histogram(key, time, tags or {}) | ||
return unpack(ret) | ||
end | ||
|
||
_M.config = { | ||
host = os.getenv("STATSD_HOST"), | ||
port = os.getenv("STATSD_PORT"), | ||
sampling_rate = 1.0, | ||
tags = { | ||
pod_id = os.getenv("POD_NAME"), | ||
namespace = os.getenv("POD_NAMESPACE") | ||
} | ||
} | ||
|
||
if not _M.config.host or not _M.config.port then | ||
error("STATSD_HOST and STATSD_PORT env variables must be set") | ||
end | ||
|
||
if _M.config.tags then | ||
default_tag_string = generate_tag_string(_M.config.tags) | ||
end | ||
|
||
return _M |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.