Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] Analytics plugin #272

Merged
merged 5 commits into from
Jun 4, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ before_install:
- bash .travis/setup_cassandra.sh

install:
- sudo make install
- sudo make dev
- sudo sed -i.bak s@/usr/local/bin/luajit@/usr/bin/lua@g /usr/local/bin/busted

Expand Down
7 changes: 6 additions & 1 deletion kong-0.3.0-1.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ build = {
["kong.plugins.keyauth.schema"] = "kong/plugins/keyauth/schema.lua",
["kong.plugins.keyauth.api"] = "kong/plugins/keyauth/api.lua",

["kong.plugins.log_serializers.basic"] = "kong/plugins/log_serializers/basic.lua",
["kong.plugins.log_serializers.alf"] = "kong/plugins/log_serializers/alf.lua",

["kong.plugins.tcplog.handler"] = "kong/plugins/tcplog/handler.lua",
["kong.plugins.tcplog.log"] = "kong/plugins/tcplog/log.lua",
["kong.plugins.tcplog.schema"] = "kong/plugins/tcplog/schema.lua",
Expand All @@ -111,10 +114,12 @@ build = {
["kong.plugins.httplog.schema"] = "kong/plugins/httplog/schema.lua",

["kong.plugins.filelog.handler"] = "kong/plugins/filelog/handler.lua",
["kong.plugins.filelog.log"] = "kong/plugins/filelog/log.lua",
["kong.plugins.filelog.schema"] = "kong/plugins/filelog/schema.lua",
["kong.plugins.filelog.fd_util"] = "kong/plugins/filelog/fd_util.lua",

["kong.plugins.analytics.handler"] = "kong/plugins/analytics/handler.lua",
["kong.plugins.analytics.schema"] = "kong/plugins/analytics/schema.lua",

["kong.plugins.ratelimiting.handler"] = "kong/plugins/ratelimiting/handler.lua",
["kong.plugins.ratelimiting.access"] = "kong/plugins/ratelimiting/access.lua",
["kong.plugins.ratelimiting.schema"] = "kong/plugins/ratelimiting/schema.lua",
Expand Down
1 change: 1 addition & 0 deletions kong.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ plugins_available:
- request_transformer
- response_transformer
- requestsizelimiting
- analytics

## The Kong working directory
## (Make sure you have read and write permissions)
Expand Down
56 changes: 16 additions & 40 deletions kong/kong.lua
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ local cache = require "kong.tools.database_cache"
local stringy = require "stringy"
local constants = require "kong.constants"
local responses = require "kong.tools.responses"
local timestamp = require "kong.tools.timestamp"

-- Define the plugins to load here, in the appropriate order
local plugins = {}
Expand Down Expand Up @@ -167,10 +166,10 @@ function _M.exec_plugins_certificate()
return
end

-- Calls plugins_access() on every loaded plugin
-- Calls plugins' access() on every loaded plugin
function _M.exec_plugins_access()
-- Setting a property that will be available for every plugin
ngx.ctx.started_at = timestamp.get_utc()
ngx.ctx.started_at = ngx.req.start_time()
ngx.ctx.plugin_conf = {}

-- Iterate over all the plugins
Expand All @@ -186,9 +185,9 @@ function _M.exec_plugins_access()
end
end

local conf = ngx.ctx.plugin_conf[plugin.name]
if not ngx.ctx.stop_phases and (plugin.resolver or conf) then
plugin.handler:access(conf and conf.value or nil)
local plugin_conf = ngx.ctx.plugin_conf[plugin.name]
if not ngx.ctx.stop_phases and (plugin.resolver or plugin_conf) then
plugin.handler:access(plugin_conf and plugin_conf.value or nil)
end
end

Expand All @@ -200,20 +199,20 @@ function _M.exec_plugins_access()
end
ngx.var.backend_url = final_url

ngx.ctx.proxy_started_at = timestamp.get_utc() -- Setting a property that will be available for every plugin
ngx.ctx.proxy_started_at = ngx.now() -- Setting a property that will be available for every plugin
end

-- Calls header_filter() on every loaded plugin
function _M.exec_plugins_header_filter()
ngx.ctx.proxy_ended_at = timestamp.get_utc() -- Setting a property that will be available for every plugin
ngx.ctx.proxy_ended_at = ngx.now() -- Setting a property that will be available for every plugin

if not ngx.ctx.stop_phases then
ngx.header["Via"] = constants.NAME.."/"..constants.VERSION

for _, plugin in ipairs(plugins) do
local conf = ngx.ctx.plugin_conf[plugin.name]
if conf then
plugin.handler:header_filter(conf.value)
local plugin_conf = ngx.ctx.plugin_conf[plugin.name]
if plugin_conf then
plugin.handler:header_filter(plugin_conf.value)
end
end
end
Expand All @@ -223,9 +222,9 @@ end
function _M.exec_plugins_body_filter()
if not ngx.ctx.stop_phases then
for _, plugin in ipairs(plugins) do
local conf = ngx.ctx.plugin_conf[plugin.name]
if conf then
plugin.handler:body_filter(conf.value)
local plugin_conf = ngx.ctx.plugin_conf[plugin.name]
if plugin_conf then
plugin.handler:body_filter(plugin_conf.value)
end
end
end
Expand All @@ -234,33 +233,10 @@ end
-- Calls log() on every loaded plugin
function _M.exec_plugins_log()
if not ngx.ctx.stop_phases then
-- Creating the log variable that will be serialized
local message = {
request = {
uri = ngx.var.request_uri,
request_uri = ngx.var.scheme.."://"..ngx.var.host..":"..ngx.var.server_port..ngx.var.request_uri,
querystring = ngx.req.get_uri_args(), -- parameters, as a table
method = ngx.req.get_method(), -- http method
headers = ngx.req.get_headers(),
size = ngx.var.request_length
},
response = {
status = ngx.status,
headers = ngx.resp.get_headers(),
size = ngx.var.bytes_sent
},
authenticated_entity = ngx.ctx.authenticated_entity,
api = ngx.ctx.api,
client_ip = ngx.var.remote_addr,
started_at = ngx.req.start_time() * 1000
}

ngx.ctx.log_message = message

for _, plugin in ipairs(plugins) do
local conf = ngx.ctx.plugin_conf[plugin.name]
if conf then
plugin.handler:log(conf.value)
local plugin_conf = ngx.ctx.plugin_conf[plugin.name]
if plugin_conf then
plugin.handler:log(plugin_conf.value)
end
end
end
Expand Down
163 changes: 163 additions & 0 deletions kong/plugins/analytics/handler.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
-- Analytics plugin handler.
--
-- How it works:
-- Keep track of calls made to configured APIs on a per-worker basis, using the ALF format
-- (alf_serializer.lua). `:access()` and `:body_filter()` are implemented to record some properties
-- required for the ALF entry.
--
-- When the buffer is full (it reaches the `batch_size` configuration value), send the batch to the server.
-- If the server doesn't accept it, don't flush the data and it'll try again at the next call.
-- If the server accepted the batch, flush the buffer.
--
-- In order to keep Analytics as real-time as possible, we also start a 'delayed timer' running in background.
-- If no requests are made during a certain period of time (the `delay` configuration value), the
-- delayed timer will fire and send the batch + flush the data, not waiting for the buffer to be full.

local http = require "resty_http"
local BasePlugin = require "kong.plugins.base_plugin"
local ALFSerializer = require "kong.plugins.log_serializers.alf"

local ALF_BUFFER = {}
local DELAYED_LOCK = false
local LATEST_CALL

local ANALYTICS_SOCKET = {
host = "localhost", -- socket.analytics.mashape.com
port = 58000,
path = "/alf_1.0.0"
}

local function send_batch(premature, conf, alf)
-- Abort the sending if the entries are empty, maybe it was triggered from the delayed
-- timer, but already sent because we reached the limit in a request later.
if table.getn(alf.har.log.entries) < 1 then
return
end

local message = alf:to_json_string(conf.service_token)

local ok, err
local client = http:new()
client:set_timeout(50000) -- 5 sec

ok, err = client:connect(ANALYTICS_SOCKET.host, ANALYTICS_SOCKET.port)
if not ok then
ngx.log(ngx.ERR, "[analytics] failed to connect to the socket: "..err)
return
end

local res, err = client:request({ path = ANALYTICS_SOCKET.path, body = message })
if not res then
ngx.log(ngx.ERR, "[analytics] failed to send batch: "..err)
end

-- close connection, or put it into the connection pool
if res.headers["connection"] == "close" then
ok, err = client:close()
if not ok then
ngx.log(ngx.ERR, "[analytics] failed to close: "..err)
end
else
client:set_keepalive()
end

if res.status == 200 then
alf:flush_entries()
ngx.log(ngx.DEBUG, "[analytics] successfully saved the batch")
else
ngx.log(ngx.ERR, "[analytics] socket refused the batch: "..res.body)
end
end

-- A handler for delayed batch sending. When no call have been made for X seconds
-- (X being conf.delay), we send the batch to keep analytics as close to real-time
-- as possible.
local delayed_send_handler
delayed_send_handler = function(premature, conf, alf)
-- If the latest call was received during the wait delay, abort the delayed send and
-- report it for X more seconds.
if ngx.now() - LATEST_CALL < conf.delay then
local ok, err = ngx.timer.at(conf.delay, delayed_send_handler, conf, alf)
if not ok then
ngx.log(ngx.ERR, "[analytics] failed to create delayed batch sending timer: ", err)
end
else
DELAYED_LOCK = false -- re-enable creation of a delayed-timer
send_batch(premature, conf, alf)
end
end

--
--
--

local AnalyticsHandler = BasePlugin:extend()

function AnalyticsHandler:new()
AnalyticsHandler.super.new(self, "analytics")
end

function AnalyticsHandler:access(conf)
AnalyticsHandler.super.access(self)

-- Retrieve and keep in memory the bodies for this request
ngx.ctx.analytics = {
req_body = "",
res_body = ""
}

if conf.log_body then
ngx.req.read_body()
ngx.ctx.analytics.req_body = ngx.req.get_body_data()
end
end

function AnalyticsHandler:body_filter(conf)
AnalyticsHandler.super.body_filter(self)

local chunk, eof = ngx.arg[1], ngx.arg[2]
-- concatenate response chunks for ALF's `response.content.text`
if conf.log_body then
ngx.ctx.analytics.res_body = ngx.ctx.analytics.res_body..chunk
end

if eof then -- latest chunk
ngx.ctx.analytics.response_received = ngx.now()
end
end

function AnalyticsHandler:log(conf)
AnalyticsHandler.super.log(self)

local api_id = ngx.ctx.api.id

-- Create the ALF if not existing for this API
if not ALF_BUFFER[api_id] then
ALF_BUFFER[api_id] = ALFSerializer:new_alf()
end

-- Simply adding the entry to the ALF
local n_entries = ALF_BUFFER[api_id]:add_entry(ngx)

-- Keep track of the latest call for the delayed timer
LATEST_CALL = ngx.now()

if n_entries >= conf.batch_size then
-- Batch size reached, let's send the data
local ok, err = ngx.timer.at(0, send_batch, conf, ALF_BUFFER[api_id])
if not ok then
ngx.log(ngx.ERR, "[analytics] failed to create batch sending timer: ", err)
end
elseif not DELAYED_LOCK then
DELAYED_LOCK = true -- Make sure only one delayed timer is ever pending
-- Batch size not yet reached.
-- Set a timer sending the data only in case nothing happens for awhile or if the batch_size is taking
-- too much time to reach the limit and trigger the flush.
local ok, err = ngx.timer.at(conf.delay, delayed_send_handler, conf, ALF_BUFFER[api_id])
if not ok then
ngx.log(ngx.ERR, "[analytics] failed to create delayed batch sending timer: ", err)
end
end
end

return AnalyticsHandler
6 changes: 6 additions & 0 deletions kong/plugins/analytics/schema.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
return {
service_token = { type = "string", required = true },
batch_size = { type = "number", default = 100 },
log_body = { type = "boolean", default = false },
delay = { type = "number", default = 10 }
}
4 changes: 1 addition & 3 deletions kong/plugins/filelog/handler.lua
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
-- Copyright (C) Mashape, Inc.

local BasePlugin = require "kong.plugins.base_plugin"
local log = require "kong.plugins.filelog.log"
local BasePlugin = require "kong.plugins.base_plugin"

local FileLogHandler = BasePlugin:extend()

Expand Down
10 changes: 7 additions & 3 deletions kong/plugins/filelog/log.lua
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
-- Copyright (C) Mashape, Inc.
local cjson = require "cjson"

local ffi = require "ffi"
local cjson = require "cjson"
local fd_util = require "kong.plugins.filelog.fd_util"
local basic_serializer = require "kong.plugins.log_serializers.basic"

ffi.cdef[[
typedef struct {
Expand All @@ -24,7 +26,7 @@ int fprintf(FILE *stream, const char *format, ...);
-- @param `conf` Configuration table, holds http endpoint details
-- @param `message` Message to be logged
local function log(premature, conf, message)
local message = cjson.encode(message).."\n"
message = cjson.encode(message).."\n"

local f = fd_util.get_fd()
if not f then
Expand All @@ -39,7 +41,9 @@ end
local _M = {}

function _M.execute(conf)
local ok, err = ngx.timer.at(0, log, conf, ngx.ctx.log_message)
local message = basic_serializer.serialize(ngx)

local ok, err = ngx.timer.at(0, log, conf, message)
if not ok then
ngx.log(ngx.ERR, "[filelog] failed to create timer: ", err)
end
Expand Down
5 changes: 4 additions & 1 deletion kong/plugins/httplog/handler.lua
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
local basic_serializer = require "kong.plugins.log_serializers.basic"
local BasePlugin = require "kong.plugins.base_plugin"
local log = require "kong.plugins.httplog.log"

Expand All @@ -9,7 +10,9 @@ end

function HttpLogHandler:log(conf)
HttpLogHandler.super.log(self)
log.execute(conf)

local message = basic_serializer.serialize(ngx)
log.execute(conf, message)
end

return HttpLogHandler
Loading