From 82af65aaf3636c152fb5251625c6573a7159a0bb Mon Sep 17 00:00:00 2001 From: Thibault Charbonnier Date: Wed, 3 Jun 2015 19:18:50 +0200 Subject: [PATCH] analytics: rename plugin and bump serializer version --- Makefile | 2 +- kong-0.3.0-1.rockspec | 4 +- kong/plugins/analytics/handler.lua | 163 ++++++++++++++++++ .../{apianalytics => analytics}/schema.lua | 0 kong/plugins/apianalytics/handler.lua | 127 -------------- kong/plugins/log_serializers/alf.lua | 82 +++++---- spec/integration/proxy/resolver_spec.lua | 59 +------ .../alf_serializer_spec.lua | 11 +- .../fixtures/requests.lua | 2 +- 9 files changed, 224 insertions(+), 226 deletions(-) create mode 100644 kong/plugins/analytics/handler.lua rename kong/plugins/{apianalytics => analytics}/schema.lua (100%) delete mode 100644 kong/plugins/apianalytics/handler.lua rename spec/plugins/{apianalytics => analytics}/alf_serializer_spec.lua (91%) rename spec/plugins/{apianalytics => analytics}/fixtures/requests.lua (99%) diff --git a/Makefile b/Makefile index 6d8cb772d82..41f93550ac2 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ TESTING_CONF = kong_TEST.yml DEVELOPMENT_CONF = kong_DEVELOPMENT.yml -DEV_ROCKS=busted luacov luacov-coveralls luacheck +DEV_ROCKS=busted luacov luacov-coveralls luacheck inspect .PHONY: install dev clean start seed drop lint test coverage test-all diff --git a/kong-0.3.0-1.rockspec b/kong-0.3.0-1.rockspec index 0c58cdf3283..b0f88a53589 100644 --- a/kong-0.3.0-1.rockspec +++ b/kong-0.3.0-1.rockspec @@ -117,8 +117,8 @@ build = { ["kong.plugins.filelog.schema"] = "kong/plugins/filelog/schema.lua", ["kong.plugins.filelog.fd_util"] = "kong/plugins/filelog/fd_util.lua", - ["kong.plugins.apianalytics.handler"] = "kong/plugins/apianalytics/handler.lua", - ["kong.plugins.apianalytics.schema"] = "kong/plugins/apianalytics/schema.lua", + ["kong.plugins.analytics.handler"] = "kong/plugins/analytics/handler.lua", + ["kong.plugins.analytics.schema"] = "kong/plugins/analytics/schema.lua", ["kong.plugins.ratelimiting.handler"] = "kong/plugins/ratelimiting/handler.lua", ["kong.plugins.ratelimiting.access"] = "kong/plugins/ratelimiting/access.lua", diff --git a/kong/plugins/analytics/handler.lua b/kong/plugins/analytics/handler.lua new file mode 100644 index 00000000000..4237867926d --- /dev/null +++ b/kong/plugins/analytics/handler.lua @@ -0,0 +1,163 @@ +-- Analytics plugin handler. +-- +-- How it works: +-- Keep track of calls made to configured APIs on a per-worker basis, using the ALF format +-- (alf_serializer.lua). `:access()` and `:body_filter()` are implemented to record some properties +-- required for the ALF entry. +-- +-- When the buffer is full (it reaches the `batch_size` configuration value), send the batch to the server. +-- If the server doesn't accept it, don't flush the data and it'll try again at the next call. +-- If the server accepted the batch, flush the buffer. +-- +-- In order to keep Analytics as real-time as possible, we also start a 'delayed timer' running in background. +-- If no requests are made during a certain period of time (the `delay` configuration value), the +-- delayed timer will fire and send the batch + flush the data, not waiting for the buffer to be full. + +local http = require "resty_http" +local BasePlugin = require "kong.plugins.base_plugin" +local ALFSerializer = require "kong.plugins.log_serializers.alf" + +local ALF_BUFFER = {} +local DELAYED_LOCK = false +local LATEST_CALL + +local ANALYTICS_SOCKET = { + host = "localhost", -- socket.analytics.mashape.com + port = 58000, + path = "/alf_1.0.0" +} + +local function send_batch(premature, conf, alf) + -- Abort the sending if the entries are empty, maybe it was triggered from the delayed + -- timer, but already sent because we reached the limit in a request later. + if table.getn(alf.har.log.entries) < 1 then + return + end + + local message = alf:to_json_string(conf.service_token) + + local ok, err + local client = http:new() + client:set_timeout(50000) -- 5 sec + + ok, err = client:connect(ANALYTICS_SOCKET.host, ANALYTICS_SOCKET.port) + if not ok then + ngx.log(ngx.ERR, "[analytics] failed to connect to the socket: "..err) + return + end + + local res, err = client:request({ path = ANALYTICS_SOCKET.path, body = message }) + if not res then + ngx.log(ngx.ERR, "[analytics] failed to send batch: "..err) + end + + -- close connection, or put it into the connection pool + if res.headers["connection"] == "close" then + ok, err = client:close() + if not ok then + ngx.log(ngx.ERR, "[analytics] failed to close: "..err) + end + else + client:set_keepalive() + end + + if res.status == 200 then + alf:flush_entries() + ngx.log(ngx.DEBUG, "[analytics] successfully saved the batch") + else + ngx.log(ngx.ERR, "[analytics] socket refused the batch: "..res.body) + end +end + +-- A handler for delayed batch sending. When no call have been made for X seconds +-- (X being conf.delay), we send the batch to keep analytics as close to real-time +-- as possible. +local delayed_send_handler +delayed_send_handler = function(premature, conf, alf) + -- If the latest call was received during the wait delay, abort the delayed send and + -- report it for X more seconds. + if ngx.now() - LATEST_CALL < conf.delay then + local ok, err = ngx.timer.at(conf.delay, delayed_send_handler, conf, alf) + if not ok then + ngx.log(ngx.ERR, "[analytics] failed to create delayed batch sending timer: ", err) + end + else + DELAYED_LOCK = false -- re-enable creation of a delayed-timer + send_batch(premature, conf, alf) + end +end + +-- +-- +-- + +local AnalyticsHandler = BasePlugin:extend() + +function AnalyticsHandler:new() + AnalyticsHandler.super.new(self, "analytics") +end + +function AnalyticsHandler:access(conf) + AnalyticsHandler.super.access(self) + + -- Retrieve and keep in memory the bodies for this request + ngx.ctx.analytics = { + req_body = "", + res_body = "" + } + + if conf.log_body then + ngx.req.read_body() + ngx.ctx.analytics.req_body = ngx.req.get_body_data() + end +end + +function AnalyticsHandler:body_filter(conf) + AnalyticsHandler.super.body_filter(self) + + local chunk, eof = ngx.arg[1], ngx.arg[2] + -- concatenate response chunks for ALF's `response.content.text` + if conf.log_body then + ngx.ctx.analytics.res_body = ngx.ctx.analytics.res_body..chunk + end + + if eof then -- latest chunk + ngx.ctx.analytics.response_received = ngx.now() + end +end + +function AnalyticsHandler:log(conf) + AnalyticsHandler.super.log(self) + + local api_id = ngx.ctx.api.id + + -- Create the ALF if not existing for this API + if not ALF_BUFFER[api_id] then + ALF_BUFFER[api_id] = ALFSerializer:new_alf() + end + + -- Simply adding the entry to the ALF + local n_entries = ALF_BUFFER[api_id]:add_entry(ngx) + + -- Keep track of the latest call for the delayed timer + LATEST_CALL = ngx.now() + + if n_entries >= conf.batch_size then + -- Batch size reached, let's send the data + local ok, err = ngx.timer.at(0, send_batch, conf, ALF_BUFFER[api_id]) + if not ok then + ngx.log(ngx.ERR, "[analytics] failed to create batch sending timer: ", err) + end + elseif not DELAYED_LOCK then + DELAYED_LOCK = true -- Make sure only one delayed timer is ever pending + -- Batch size not yet reached. + -- Set a timer sending the data only in case nothing happens for awhile or if the batch_size is taking + -- too much time to reach the limit and trigger the flush. + local ok, err = ngx.timer.at(conf.delay, delayed_send_handler, conf, ALF_BUFFER[api_id]) + if not ok then + ngx.log(ngx.ERR, "[analytics] failed to create delayed batch sending timer: ", err) + end + end +end + +return AnalyticsHandler diff --git a/kong/plugins/apianalytics/schema.lua b/kong/plugins/analytics/schema.lua similarity index 100% rename from kong/plugins/apianalytics/schema.lua rename to kong/plugins/analytics/schema.lua diff --git a/kong/plugins/apianalytics/handler.lua b/kong/plugins/apianalytics/handler.lua deleted file mode 100644 index e59b90dc93e..00000000000 --- a/kong/plugins/apianalytics/handler.lua +++ /dev/null @@ -1,127 +0,0 @@ -local http = require "resty_http" -local BasePlugin = require "kong.plugins.base_plugin" -local ALFSerializer = require "kong.plugins.log_serializers.alf" - -local APIANALYTICS_SOCKET = { - host = "localhost", -- socket.apianalytics.mashape.com - port = 58000, - path = "/alf_1.0.0" -} - -local function send_batch(premature, conf, alf) - -- Abort the sending if the entries are empty, maybe it was triggered from the delayed - -- timer, but already sent because we reached the limit in a request later. - if table.getn(alf.har.log.entries) < 1 then - return - end - - local message = alf:to_json_string(conf.service_token) - - local ok, err - local client = http:new() - client:set_timeout(50000) -- 5 sec - - ok, err = client:connect(APIANALYTICS_SOCKET.host, APIANALYTICS_SOCKET.port) - if not ok then - ngx.log(ngx.ERR, "[apianalytics] failed to connect to the socket: "..err) - return - end - - local res, err = client:request({ path = APIANALYTICS_SOCKET.path, body = message }) - if not res then - ngx.log(ngx.ERR, "[apianalytics] failed to send batch: "..err) - end - - -- close connection, or put it into the connection pool - if res.headers["connection"] == "close" then - ok, err = client:close() - if not ok then - ngx.log(ngx.ERR, "[apianalytics] failed to close: "..err) - end - else - client:set_keepalive() - end - - if res.status == 200 then - alf:flush_entries() - ngx.log(ngx.DEBUG, "[apianalytics] successfully saved the batch") - else - ngx.log(ngx.ERR, "[apianalytics] socket refused the batch: "..res.body) - end -end - --- --- --- - -local APIAnalyticsHandler = BasePlugin:extend() - -function APIAnalyticsHandler:new() - APIAnalyticsHandler.super.new(self, "apianalytics") -end - -function APIAnalyticsHandler:access(conf) - APIAnalyticsHandler.super.access(self) - - -- Retrieve and keep in memory the bodies for this request - ngx.ctx.apianalytics = { - req_body = "", - res_body = "" - } - - if conf.log_body then - ngx.req.read_body() - ngx.ctx.apianalytics.req_body = ngx.req.get_body_data() - end -end - -function APIAnalyticsHandler:body_filter(conf) - APIAnalyticsHandler.super.body_filter(self) - - local chunk, eof = ngx.arg[1], ngx.arg[2] - -- concatenate response chunks for ALF's `response.content.text` - if conf.log_body then - ngx.ctx.apianalytics.res_body = ngx.ctx.apianalytics.res_body..chunk - end - - if eof then -- latest chunk - ngx.ctx.apianalytics.response_received = ngx.now() - end -end - -function APIAnalyticsHandler:log(conf) - APIAnalyticsHandler.super.log(self) - - local api_id = ngx.ctx.api.id - - -- Shared memory zone for apianalytics ALFs - if not ngx.shared.apianalytics then - ngx.shared.apianalytics = {} - end - - -- Create the ALF if not existing for this API - if not ngx.shared.apianalytics[api_id] then - ngx.shared.apianalytics[api_id] = ALFSerializer:new_alf() - end - - -- Simply adding the entry to the ALF - local n_entries = ngx.shared.apianalytics[api_id]:add_entry(ngx) - - if n_entries >= conf.batch_size then - -- Batch size reached, let's send the data - local ok, err = ngx.timer.at(0, send_batch, conf, ngx.shared.apianalytics[api_id]) - if not ok then - ngx.log(ngx.ERR, "[apianalytics] failed to create batch sending timer: ", err) - end - else - -- Batch size not yet reached - -- Set a timer sending the data only in case nothing happens for awhile or if the batch_size is taking - -- too much time to reach the limit and trigger the flush. - local ok, err = ngx.timer.at(conf.delay, send_batch, conf, ngx.shared.apianalytics[api_id]) - if not ok then - ngx.log(ngx.ERR, "[apianalytics] failed to create delayed batch sending timer: ", err) - end - end -end - -return APIAnalyticsHandler diff --git a/kong/plugins/log_serializers/alf.lua b/kong/plugins/log_serializers/alf.lua index 610a4180bcd..2fe194d11d3 100644 --- a/kong/plugins/log_serializers/alf.lua +++ b/kong/plugins/log_serializers/alf.lua @@ -1,5 +1,5 @@ -- ALF serializer module. --- ALF is the format supported by API Analytics (http://apianalytics.com) +-- ALF is the format supported by Mashape Analytics (http://apianalytics.com) -- -- This module represents _one_ ALF entry, which can have multiple requests entries. -- # Usage: @@ -32,8 +32,8 @@ function alf_mt:new_alf() log = { version = "1.2", creator = { - name = "kong-api-analytics-plugin", - version = "0.1" + name = "kong-mashape-analytics-plugin", + version = "1.0.0" }, entries = {} } @@ -47,8 +47,8 @@ end -- Since Lua won't recognize {} as an empty array but an empty object, we need to force it -- to be an array, hence we will do "[__empty_array_placeholder__]". -- Then once the ALF will be stringified, we will remove the placeholder so the only left element will be "[]". --- @param `hash` key/value dictionary to serialize. --- @param `fn` Some function to execute at each key iteration, with the key and value as parameters. +-- @param `hash` key/value dictionary to serialize. +-- @param `fn` Some function to execute at each key iteration, with the key and value as parameters. -- @return `array` an array, or nil local function dic_to_array(hash, fn) if not fn then fn = function() end end @@ -83,70 +83,84 @@ end -- ngx_http_core_module when possible. -- Public for unit testing. function alf_mt:serialize_entry(ngx) - -- Extracted data - local req_headers = ngx.req.get_headers() - local res_headers = ngx.resp.get_headers() + -- ALF properties computation. Properties prefixed with 'alf_' will belong to the ALF entry. + -- other properties are used to compute the ALF properties. - local apianalytics_data = ngx.ctx.apianalytics - local req_body = apianalytics_data.req_body - local res_body = apianalytics_data.res_body + -- bodies + local analytics_data = ngx.ctx.analytics - local started_at = ngx.ctx.started_at + local alf_req_body = analytics_data.req_body + local alf_res_body = analytics_data.res_body - -- ALF properties -- timers - local send_time = round(ngx.ctx.proxy_started_at - started_at) - local wait_time = ngx.ctx.proxy_ended_at - ngx.ctx.proxy_started_at - local receive_time = apianalytics_data.response_received - ngx.ctx.proxy_ended_at + local proxy_started_at, proxy_ended_at = ngx.ctx.proxy_started_at, ngx.ctx.proxy_ended_at + + local alf_started_at = ngx.ctx.started_at + local alf_send_time = proxy_started_at - alf_started_at + local alf_wait_time = proxy_ended_at ~= nil and proxy_ended_at - proxy_started_at or -1 + local alf_receive_time = analytics_data.response_received and analytics_data.response_received - proxy_ended_at or -1 + -- Compute the total time. If some properties were unavailable + -- (because the proxying was aborted), then don't add the value. + local alf_time = 0 + for _, timer in ipairs({alf_send_time, alf_wait_time, alf_receive_time}) do + if timer > 0 then + alf_time = alf_time + timer + end + end + -- headers and headers size local req_headers_str, res_headers_str = "", "" - local req_headers_arr = dic_to_array(req_headers, function(k, v) req_headers_str = req_headers_str..k..v end) - local res_headers_arr = dic_to_array(res_headers, function(k, v) res_headers_str = res_headers_str..k..v end) - local req_headers_size = string.len(req_headers_str) - local res_headers_size = string.len(res_headers_str) - -- values extracted from headers + local req_headers = ngx.req.get_headers() + local res_headers = ngx.resp.get_headers() + + local alf_req_headers_arr = dic_to_array(req_headers, function(k, v) req_headers_str = req_headers_str..k..v end) + local alf_res_headers_arr = dic_to_array(res_headers, function(k, v) res_headers_str = res_headers_str..k..v end) + local alf_req_headers_size = string.len(req_headers_str) + local alf_res_headers_size = string.len(res_headers_str) + + -- mimeType, defaulting to "application/octet-stream" local alf_req_mimeType = req_headers["Content-Type"] and req_headers["Content-Type"] or "application/octet-stream" local alf_res_mimeType = res_headers["Content-Type"] and res_headers["Content-Type"] or "application/octet-stream" return { - startedDateTime = os.date("!%Y-%m-%dT%TZ", started_at), + startedDateTime = os.date("!%Y-%m-%dT%TZ", alf_started_at), clientIPAddress = ngx.var.remote_addr, - time = round(send_time + wait_time + receive_time), + time = round(alf_time), request = { method = ngx.req.get_method(), url = ngx.var.scheme.."://"..ngx.var.host..ngx.var.uri, httpVersion = "HTTP/"..ngx.req.http_version(), queryString = dic_to_array(ngx.req.get_uri_args()), - headers = req_headers_arr, - headersSize = req_headers_size, + headers = alf_req_headers_arr, + headersSize = alf_req_headers_size, cookies = {EMPTY_ARRAY_PLACEHOLDER}, - bodySize = string.len(req_body), + bodySize = string.len(alf_req_body), postData = { mimeType = alf_req_mimeType, params = dic_to_array(ngx.req.get_post_args()), - text = req_body and req_body or "" + text = alf_req_body and alf_req_body or "" } }, response = { status = ngx.status, statusText = "", -- can't find a way to retrieve that httpVersion = "", -- can't find a way to retrieve that either - headers = res_headers_arr, - headersSize = res_headers_size, + headers = alf_res_headers_arr, + headersSize = alf_res_headers_size, cookies = {EMPTY_ARRAY_PLACEHOLDER}, bodySize = tonumber(ngx.var.body_bytes_sent), redirectURL = "", content = { size = tonumber(ngx.var.body_bytes_sent), mimeType = alf_res_mimeType, - text = res_body and res_body or "" + text = alf_res_body and alf_res_body or "" } }, cache = {}, timings = { - send = round(send_time), - wait = round(wait_time), - receive = round(receive_time), + send = round(alf_send_time), + wait = round(alf_wait_time), + receive = round(alf_receive_time), blocked = -1, connect = -1, dns = -1, @@ -162,7 +176,7 @@ end function alf_mt:to_json_string(token) if not token then - error("API Analytics serviceToken required", 2) + error("Mashape Analytics serviceToken required", 2) end -- inject token diff --git a/spec/integration/proxy/resolver_spec.lua b/spec/integration/proxy/resolver_spec.lua index a8a54f438ed..ddbcc0d5320 100644 --- a/spec/integration/proxy/resolver_spec.lua +++ b/spec/integration/proxy/resolver_spec.lua @@ -116,7 +116,6 @@ describe("Resolver", function() assert.are.equal(200, status) end) -<<<<<<< HEAD end) describe("By Path", function() @@ -131,8 +130,8 @@ describe("Resolver", function() it("should proxy and strip the path if `strip_path` is true", function() local response, status = http_client.get(spec_helper.PROXY_URL.."/mockbin/request") - local body = cjson.decode(response) assert.are.equal(200, status) + local body = cjson.decode(response) assert.are.equal("http://mockbin.com/request", body.url) end) @@ -144,16 +143,6 @@ describe("Resolver", function() assert.are.equal(404, status) end) -======= - it("should proxy when the API is in Kong", function() - local _, status = http_client.get(STUB_GET_URL, nil, { host = "mockbin.com"}) - assert.are.equal(200, status) - end) - - it("should proxy when the Host header is not trimmed", function() - local _, status = http_client.get(STUB_GET_URL, nil, { host = " mockbin.com "}) - assert.are.equal(200, status) ->>>>>>> feat(APIAnalytics) config options + polishing end) it("should return the correct Server and Via headers when the request was proxied", function() @@ -170,51 +159,5 @@ describe("Resolver", function() assert.falsy(headers.via) end) -<<<<<<< HEAD -======= - it("should proxy when the API is in Kong and one Host header is being sent via plain TCP", function() - local parsed_url = url.parse(STUB_GET_URL) - local host = parsed_url.host - local port = parsed_url.port - - local tcp = socket.tcp() - tcp:connect(host, port) - tcp:send("GET "..parsed_url.path.." HTTP/1.0\r\nHost: mockbin.com\r\n\r\n"); - local response = "" - while true do - local s, status, partial = tcp:receive() - response = response..(s or partial) - if status == "closed" then break end - end - tcp:close() - - assert.truthy(stringy.startswith(response, "HTTP/1.1 200 OK")) - end) - - it("should proxy when the API is in Kong and multiple Host headers are being sent via plain TCP", function() - local parsed_url = url.parse(STUB_GET_URL) - local host = parsed_url.host - local port = parsed_url.port - - local tcp = socket.tcp() - tcp:connect(host, port) - tcp:send("GET "..parsed_url.path.." HTTP/1.0\r\nHost: fake.com\r\nHost: mockbin.com\r\n\r\n"); - local response = "" - while true do - local s, status, partial = tcp:receive() - response = response..(s or partial) - if status == "closed" then break end - end - tcp:close() - - assert.truthy(stringy.startswith(response, "HTTP/1.1 200 OK")) - end) - - it("should proxy when the request has no Host header but the X-Host-Override header", function() - local _, status = http_client.get(STUB_GET_URL, nil, { ["X-Host-Override"] = "mockbin.com"}) - assert.are.equal(200, status) - end) - ->>>>>>> feat(APIAnalytics) config options + polishing end) end) diff --git a/spec/plugins/apianalytics/alf_serializer_spec.lua b/spec/plugins/analytics/alf_serializer_spec.lua similarity index 91% rename from spec/plugins/apianalytics/alf_serializer_spec.lua rename to spec/plugins/analytics/alf_serializer_spec.lua index 3b7c48ab4bc..a907ea3a927 100644 --- a/spec/plugins/apianalytics/alf_serializer_spec.lua +++ b/spec/plugins/analytics/alf_serializer_spec.lua @@ -1,4 +1,4 @@ -local fixtures = require "spec.plugins.apianalytics.fixtures.requests" +local fixtures = require "spec.plugins.analytics.fixtures.requests" local ALFSerializer = require "kong.plugins.log_serializers.alf" -- @see http://lua-users.org/wiki/CopyTable @@ -32,7 +32,12 @@ local function sameEntry(state, arguments) fixture_entry.time = nil fixture_entry.timings = nil + local inspect = require "inspect" + print(inspect(fixture_entry)) + print(inspect(entry)) + assert.are.same(fixture_entry, entry) + return true end @@ -55,7 +60,7 @@ describe("ALF serializer", function() har = { log = { version = "1.2", - creator = { name = "kong-api-analytics-plugin", version = "0.1" + creator = { name = "kong-mashape-analytics-plugin", version = "1.0.0" }, entries = {} } @@ -97,7 +102,7 @@ describe("ALF serializer", function() it("should throw an error if no token was given", function() assert.has_error(function() alf:to_json_string() end, - "API Analytics serviceToken required") + "Mashape Analytics serviceToken required") end) it("should return a JSON string", function() diff --git a/spec/plugins/apianalytics/fixtures/requests.lua b/spec/plugins/analytics/fixtures/requests.lua similarity index 99% rename from spec/plugins/apianalytics/fixtures/requests.lua rename to spec/plugins/analytics/fixtures/requests.lua index df512c8ef93..b62d90a3192 100644 --- a/spec/plugins/apianalytics/fixtures/requests.lua +++ b/spec/plugins/analytics/fixtures/requests.lua @@ -26,7 +26,7 @@ return { started_at = 1432844571.623, proxy_started_at = 1432844571.719, proxy_ended_at = 1432844572.11, - apianalytics = { + analytics = { req_body = "hello=world&hello=earth", res_body = "{\"message\":\"response body\"}", response_received = 1432844572.11