From e23706c2b6a96d0c0222bcaeb93622572888e01d Mon Sep 17 00:00:00 2001 From: Thibault Charbonnier Date: Fri, 29 May 2015 16:14:37 +0200 Subject: [PATCH] feat(APIAnalytics) Implement timer to flush data If the API has low traffic, this timer will flush the data to API Analytics after some delay, otherwise the batch size is never reached, and API Analytics is not real-time anymore. --- kong/plugins/apianalytics/handler.lua | 23 +++++++++++++++---- kong/plugins/apianalytics/schema.lua | 3 ++- kong/plugins/filelog/handler.lua | 7 ++---- .../apianalytics/alf_serializer_spec.lua | 12 +++++----- 4 files changed, 29 insertions(+), 16 deletions(-) diff --git a/kong/plugins/apianalytics/handler.lua b/kong/plugins/apianalytics/handler.lua index 36f79f0e1cc..e59b90dc93e 100644 --- a/kong/plugins/apianalytics/handler.lua +++ b/kong/plugins/apianalytics/handler.lua @@ -9,12 +9,19 @@ local APIANALYTICS_SOCKET = { } local function send_batch(premature, conf, alf) + -- Abort the sending if the entries are empty, maybe it was triggered from the delayed + -- timer, but already sent because we reached the limit in a request later. + if table.getn(alf.har.log.entries) < 1 then + return + end + local message = alf:to_json_string(conf.service_token) + local ok, err local client = http:new() client:set_timeout(50000) -- 5 sec - local ok, err = client:connect(APIANALYTICS_SOCKET.host, APIANALYTICS_SOCKET.port) + ok, err = client:connect(APIANALYTICS_SOCKET.host, APIANALYTICS_SOCKET.port) if not ok then ngx.log(ngx.ERR, "[apianalytics] failed to connect to the socket: "..err) return @@ -27,7 +34,7 @@ local function send_batch(premature, conf, alf) -- close connection, or put it into the connection pool if res.headers["connection"] == "close" then - local ok, err = client:close() + ok, err = client:close() if not ok then ngx.log(ngx.ERR, "[apianalytics] failed to close: "..err) end @@ -100,11 +107,19 @@ function APIAnalyticsHandler:log(conf) -- Simply adding the entry to the ALF local n_entries = ngx.shared.apianalytics[api_id]:add_entry(ngx) - -- Batch size reached, let's send the data if n_entries >= conf.batch_size then + -- Batch size reached, let's send the data local ok, err = ngx.timer.at(0, send_batch, conf, ngx.shared.apianalytics[api_id]) if not ok then - ngx.log(ngx.ERR, "[apianalytics] failed to create timer: ", err) + ngx.log(ngx.ERR, "[apianalytics] failed to create batch sending timer: ", err) + end + else + -- Batch size not yet reached + -- Set a timer sending the data only in case nothing happens for awhile or if the batch_size is taking + -- too much time to reach the limit and trigger the flush. + local ok, err = ngx.timer.at(conf.delay, send_batch, conf, ngx.shared.apianalytics[api_id]) + if not ok then + ngx.log(ngx.ERR, "[apianalytics] failed to create delayed batch sending timer: ", err) end end end diff --git a/kong/plugins/apianalytics/schema.lua b/kong/plugins/apianalytics/schema.lua index c0d1b7c0bf2..444c12cc6d9 100644 --- a/kong/plugins/apianalytics/schema.lua +++ b/kong/plugins/apianalytics/schema.lua @@ -1,5 +1,6 @@ return { service_token = { type = "string", required = true }, batch_size = { type = "number", default = 100 }, - log_body = { type = "boolean", default = false } + log_body = { type = "boolean", default = false }, + delay = { type = "number", default = 10 } } diff --git a/kong/plugins/filelog/handler.lua b/kong/plugins/filelog/handler.lua index a1ab4dda6cc..4a950e5ef41 100644 --- a/kong/plugins/filelog/handler.lua +++ b/kong/plugins/filelog/handler.lua @@ -1,6 +1,5 @@ -local json = require "cjson" +local log = require "kong.plugins.filelog.log" local BasePlugin = require "kong.plugins.base_plugin" -local basic_serializer = require "kong.plugins.log_serializers.basic" local FileLogHandler = BasePlugin:extend() @@ -10,9 +9,7 @@ end function FileLogHandler:log(conf) FileLogHandler.super.log(self) - - local message = basic_serializer.serialize(ngx) - ngx.log(ngx.INFO, json.encode(message)) + log.execute(conf) end return FileLogHandler diff --git a/spec/plugins/apianalytics/alf_serializer_spec.lua b/spec/plugins/apianalytics/alf_serializer_spec.lua index 035c994dcf6..3b7c48ab4bc 100644 --- a/spec/plugins/apianalytics/alf_serializer_spec.lua +++ b/spec/plugins/apianalytics/alf_serializer_spec.lua @@ -32,7 +32,7 @@ local function sameEntry(state, arguments) fixture_entry.time = nil fixture_entry.timings = nil - assert.same(fixture_entry, entry) + assert.are.same(fixture_entry, entry) return true end @@ -69,7 +69,7 @@ describe("ALF serializer", function() it("should serialize an ngx GET request/response", function() local entry = alf:serialize_entry(fixtures.GET.NGX_STUB) - assert.are.same(fixtures.GET.ENTRY, entry) + assert.are.sameEntry(fixtures.GET.ENTRY, entry) end) end) @@ -78,11 +78,11 @@ describe("ALF serializer", function() it("should add the entry to the serializer entries property", function() alf:add_entry(fixtures.GET.NGX_STUB) - assert.are.same(1, table.getn(alf.har.log.entries)) + assert.equal(1, table.getn(alf.har.log.entries)) assert.are.sameEntry(fixtures.GET.ENTRY, alf.har.log.entries[1]) alf:add_entry(fixtures.GET.NGX_STUB) - assert.are.same(2, table.getn(alf.har.log.entries)) + assert.equal(2, table.getn(alf.har.log.entries)) assert.are.sameEntry(fixtures.GET.ENTRY, alf.har.log.entries[2]) end) @@ -102,7 +102,7 @@ describe("ALF serializer", function() it("should return a JSON string", function() local json_str = alf:to_json_string("stub_service_token") - assert.are.same("string", type(json_str)) + assert.equal("string", type(json_str)) end) end) @@ -111,7 +111,7 @@ describe("ALF serializer", function() it("should remove any existing entry", function() alf:flush_entries() - assert.are.same(0, table.getn(alf.har.log.entries)) + assert.equal(0, table.getn(alf.har.log.entries)) end) end)