diff --git a/kong-0.9.2-0.rockspec b/kong-0.9.2-0.rockspec index 8f8401eabe8..3a1a274547c 100644 --- a/kong-0.9.2-0.rockspec +++ b/kong-0.9.2-0.rockspec @@ -27,7 +27,8 @@ dependencies = { "lua-resty-iputils == 0.2.1", "luacrypto == 0.3.2", "luasyslog == 1.0.0", - "lua_pack == 1.0.4" + "lua_pack == 1.0.4", + "lua-resty-worker-events == 0.2.0" } build = { type = "builtin", diff --git a/kong/api/routes/cache.lua b/kong/api/routes/cache.lua index 13da89dcab6..131e8d4670a 100644 --- a/kong/api/routes/cache.lua +++ b/kong/api/routes/cache.lua @@ -30,4 +30,4 @@ return { return responses.send_HTTP_NOT_FOUND() end } -} +} \ No newline at end of file diff --git a/kong/api/routes/cluster.lua b/kong/api/routes/cluster.lua index 10489f6a1ee..e4ba7e41af6 100644 --- a/kong/api/routes/cluster.lua +++ b/kong/api/routes/cluster.lua @@ -1,5 +1,7 @@ local singletons = require "kong.singletons" local responses = require "kong.tools.responses" +local constants = require "kong.constants" +local ev = require "resty.worker.events" local pairs = pairs local table_insert = table.insert @@ -60,7 +62,9 @@ return { local message_t = self.params -- The type is always upper case - if message_t.type then + if not message_t or not message_t.type then + return responses.send_HTTP_BAD_REQUEST() + else message_t.type = string_upper(message_t.type) end @@ -88,9 +92,9 @@ return { end -- Trigger event in the node - singletons.events:publish(message_t.type, message_t) + ev.post(constants.CACHE.CLUSTER, message_t.type, message_t) return responses.send_HTTP_OK() end } -} +} \ No newline at end of file diff --git a/kong/constants.lua b/kong/constants.lua index 063fd01797f..f580f3d1c98 100644 --- a/kong/constants.lua +++ b/kong/constants.lua @@ -41,5 +41,8 @@ return { SYSLOG = { ADDRESS = "kong-hf.mashape.com", PORT = 61828 + }, + CACHE = { + CLUSTER = "cluster" } -} +} \ No newline at end of file diff --git a/kong/core/cluster.lua b/kong/core/cluster.lua index 51c73fbc61e..0b5f5cd49b2 100644 --- a/kong/core/cluster.lua +++ b/kong/core/cluster.lua @@ -55,11 +55,11 @@ local function async_autojoin(premature) end -- Create retries counter key if it doesn't exist - if not cache.get(cache.autojoin_retries_key()) then - cache.rawset(cache.autojoin_retries_key(), 0) + if not cache.sh_get(cache.autojoin_retries_key()) then + cache.sh_set(cache.autojoin_retries_key(), 0) end - local autojoin_retries = cache.incr(cache.autojoin_retries_key(), 1) -- Increment retries counter + local autojoin_retries = cache.sh_incr(cache.autojoin_retries_key(), 1) -- Increment retries counter if (autojoin_retries < ASYNC_AUTOJOIN_RETRIES) then create_timer(ASYNC_AUTOJOIN_INTERVAL, async_autojoin) end diff --git a/kong/core/reports.lua b/kong/core/reports.lua index 1f9cba47b83..08529c01eb0 100644 --- a/kong/core/reports.lua +++ b/kong/core/reports.lua @@ -121,11 +121,11 @@ ping_handler = function(premature) elseif elapsed == 0 then send { signal = "ping", - requests = cache.get(cache.requests_key()) or 0, + requests = cache.sh_get(cache.requests_key()) or 0, unique_id = unique_str, database = singletons.configuration.database } - cache.rawset(cache.requests_key(), 0) + cache.sh_set(cache.requests_key(), 0) end create_ping_timer() @@ -137,12 +137,12 @@ return { ----------------- init_worker = function() if not enabled then return end - cache.rawset(cache.requests_key(), 0) + cache.sh_set(cache.requests_key(), 0) create_ping_timer() end, log = function() if not enabled then return end - cache.incr(cache.requests_key(), 1) + cache.sh_incr(cache.requests_key(), 1) end, ----------------- -- custom methods @@ -153,4 +153,4 @@ return { get_system_infos = get_system_infos, send = send, api_signal = "api" -} +} \ No newline at end of file diff --git a/kong/kong.lua b/kong/kong.lua index 1b65431ee08..da1ba4297e8 100644 --- a/kong/kong.lua +++ b/kong/kong.lua @@ -30,6 +30,7 @@ local core = require "kong.core.handler" local Serf = require "kong.serf" local utils = require "kong.tools.utils" local Events = require "kong.core.events" +local constants = require "kong.constants" local singletons = require "kong.singletons" local DAOFactory = require "kong.dao.factory" local plugins_iterator = require "kong.core.plugins_iterator" @@ -147,6 +148,28 @@ function Kong.init_worker() for _, plugin in ipairs(singletons.loaded_plugins) do plugin.handler:init_worker() end + + local ev = require "resty.worker.events" + local handler = function(data, event, source, pid) + if source and source == constants.CACHE.CLUSTER then + singletons.events:publish(event, data) + end + end + + ev.register(handler) + + local ok, err = ev.configure { + shm = "process_events", -- defined by "lua_shared_dict" + timeout = 2, -- life time of event data in shm + interval = 1, -- poll interval (seconds) + + wait_interval = 0.010, -- wait before retry fetching event data + wait_max = 0.5, -- max wait time before discarding event + } + if not ok then + ngx.log(ngx.ERR, "failed to start event system: ", err) + return + end end function Kong.ssl_certificate() @@ -193,4 +216,4 @@ function Kong.log() core.log.after() end -return Kong +return Kong \ No newline at end of file diff --git a/kong/meta.lua b/kong/meta.lua index 6acc80e4705..5f1c5fb225b 100644 --- a/kong/meta.lua +++ b/kong/meta.lua @@ -19,7 +19,7 @@ return { -- to lua-version's `set()` in the form {from, to} _DEPENDENCIES = { nginx = {"1.9.15.1", "1.11.2.1"}, - serf = {"0.7.0", "0.7.0"}, + serf = {"0.7.0", "0.8.0"}, --resty = {}, -- not version dependent for now --dnsmasq = {} -- not version dependent for now } diff --git a/kong/plugins/rate-limiting/policies.lua b/kong/plugins/rate-limiting/policies.lua index 22b8f56530a..f25e8c110e8 100644 --- a/kong/plugins/rate-limiting/policies.lua +++ b/kong/plugins/rate-limiting/policies.lua @@ -26,11 +26,11 @@ return { local periods = timestamp.get_timestamps(current_timestamp) for period, period_date in pairs(periods) do local cache_key = get_local_key(api_id, identifier, period_date, period) - if not cache.rawget(cache_key) then - cache.rawset(cache_key, 0, EXPIRATIONS[period]) + if not cache.sh_get(cache_key) then + cache.sh_set(cache_key, 0, EXPIRATIONS[period]) end - local _, err = cache.incr(cache_key, value) + local _, err = cache.sh_incr(cache_key, value) if err then ngx_log("[rate-limiting] could not increment counter for period '"..period.."': "..tostring(err)) end @@ -39,7 +39,7 @@ return { usage = function(conf, api_id, identifier, current_timestamp, name) local periods = timestamp.get_timestamps(current_timestamp) local cache_key = get_local_key(api_id, identifier, periods[name], name) - local current_metric, err = cache.rawget(cache_key) + local current_metric, err = cache.sh_get(cache_key) if err then return nil, err end @@ -138,4 +138,4 @@ return { return current_metric and current_metric or 0 end } -} +} \ No newline at end of file diff --git a/kong/plugins/response-ratelimiting/policies.lua b/kong/plugins/response-ratelimiting/policies.lua index a726d86631f..3afc0ad4af7 100644 --- a/kong/plugins/response-ratelimiting/policies.lua +++ b/kong/plugins/response-ratelimiting/policies.lua @@ -26,11 +26,11 @@ return { local periods = timestamp.get_timestamps(current_timestamp) for period, period_date in pairs(periods) do local cache_key = get_local_key(api_id, identifier, period_date, name, period) - if not cache.rawget(cache_key) then - cache.rawset(cache_key, 0, EXPIRATIONS[period]) + if not cache.sh_get(cache_key) then + cache.sh_set(cache_key, 0, EXPIRATIONS[period]) end - local _, err = cache.incr(cache_key, value) + local _, err = cache.sh_incr(cache_key, value) if err then ngx_log("[rate-limiting] could not increment counter for period '"..period.."': "..tostring(err)) end @@ -39,7 +39,7 @@ return { usage = function(conf, api_id, identifier, current_timestamp, period, name) local periods = timestamp.get_timestamps(current_timestamp) local cache_key = get_local_key(api_id, identifier, periods[period], name, period) - local current_metric, err = cache.rawget(cache_key) + local current_metric, err = cache.sh_get(cache_key) if err then return nil, err end diff --git a/kong/templates/nginx_kong.lua b/kong/templates/nginx_kong.lua index cea3da82161..4c1eba6e2f1 100644 --- a/kong/templates/nginx_kong.lua +++ b/kong/templates/nginx_kong.lua @@ -39,6 +39,7 @@ lua_shared_dict reports_locks 100k; lua_shared_dict cluster_locks 100k; lua_shared_dict cluster_autojoin_locks 100k; lua_shared_dict cache_locks 100k; +lua_shared_dict process_events 1m; lua_shared_dict cassandra 1m; lua_shared_dict cassandra_prepared 5m; lua_socket_log_errors off; diff --git a/kong/tools/database_cache.lua b/kong/tools/database_cache.lua index 79ce573dad1..df1d8d6f15d 100644 --- a/kong/tools/database_cache.lua +++ b/kong/tools/database_cache.lua @@ -25,43 +25,111 @@ local CACHE_KEYS = { local _M = {} -function _M.rawset(key, value, exptime) +-- Shared Dictionary + +function _M.sh_set(key, value, exptime) return cache:set(key, value, exptime or 0) end -function _M.set(key, value) - if value then - value = cjson.encode(value) - end +function _M.sh_incr(key, value) + return cache:incr(key, value) +end + +function _M.sh_get(key, value) + return cache:get(key, value) +end - return _M.rawset(key, value) +function _M.sh_delete(key) + cache:delete(key) end -function _M.rawget(key) - return cache:get(key) +function _M.sh_delete_all() + cache:flush_all() -- This does not free up the memory, only marks the items as expired + cache:flush_expired() -- This does actually remove the elements from the memory +end + +-- Local Memory + +local DATA = {} + +function _M.set(key, value) + DATA[key] = value + + -- Save into Shared Dictionary + local _, err = _M.sh_set(key, cjson.encode(value)) + if err then return nil, err end + + return true end function _M.get(key) - local value, flags = _M.rawget(key) - if value then - value = cjson.decode(value) + local value = DATA[key] + + -- Get from Shared Dictionary + if value == nil then + value = _M.sh_get(key) + if value ~= nil then + value = cjson.decode(value) + end end - return value, flags -end -function _M.incr(key, value) - return cache:incr(key, value) + return value end function _M.delete(key) - cache:delete(key) + DATA[key] = nil + _M.sh_delete(key) end function _M.delete_all() - cache:flush_all() -- This does not free up the memory, only marks the items as expired - cache:flush_expired() -- This does actually remove the elements from the memory + DATA = {} + _M.sh_delete_all() end +function _M.get_or_set(key, cb) + -- Try to get the value from the cache + local value = _M.get(key) + if value then return value end + + local lock, err = resty_lock:new("cache_locks", { + exptime = 10, + timeout = 5 + }) + if not lock then + ngx_log(ngx.ERR, "could not create lock: ", err) + return + end + + -- The value is missing, acquire a lock + local elapsed, err = lock:lock(key) + if not elapsed then + ngx_log(ngx.ERR, "failed to acquire cache lock: ", err) + end + + -- Lock acquired. Since in the meantime another worker may have + -- populated the value we have to check again + value = _M.get(key) + if not value then + -- Get from closure + value = cb() + if value then + local ok, err = _M.set(key, value) + if not ok then + ngx_log(ngx.ERR, err) + end + end + end + + local ok, err = lock:unlock() + if not ok and err then + ngx_log(ngx.ERR, "failed to unlock: ", err) + end + + return value +end + +-- Utility Functions + function _M.requests_key() return CACHE_KEYS.REQUESTS end @@ -126,46 +194,4 @@ function _M.all_apis_by_dict_key() return CACHE_KEYS.ALL_APIS_BY_DIC end -function _M.get_or_set(key, cb) - -- Try to get the value from the cache - local value = _M.get(key) - if value then return value end - - local lock, err = resty_lock:new("cache_locks", { - exptime = 10, - timeout = 5 - }) - if not lock then - ngx_log(ngx.ERR, "could not create lock: ", err) - return - end - - -- The value is missing, acquire a lock - local elapsed, err = lock:lock(key) - if not elapsed then - ngx_log(ngx.ERR, "failed to acquire cache lock: ", err) - end - - -- Lock acquired. Since in the meantime another worker may have - -- populated the value we have to check again - value = _M.get(key) - if not value then - -- Get from closure - value = cb() - if value then - local ok, err = _M.set(key, value) - if not ok then - ngx_log(ngx.ERR, err) - end - end - end - - local ok, err = lock:unlock() - if not ok and err then - ngx_log(ngx.ERR, "failed to unlock: ", err) - end - - return value -end - -return _M +return _M \ No newline at end of file diff --git a/spec/02-integration/03-admin_api/06-cluster_routes_spec.lua b/spec/02-integration/03-admin_api/06-cluster_routes_spec.lua index de9bccbb34f..d211b1e84e2 100644 --- a/spec/02-integration/03-admin_api/06-cluster_routes_spec.lua +++ b/spec/02-integration/03-admin_api/06-cluster_routes_spec.lua @@ -5,12 +5,16 @@ describe("Admin API", function() local client setup(function() assert(helpers.start_kong()) - client = helpers.admin_client() end) teardown(function() - if client then client:close() end helpers.stop_kong() end) + before_each(function() + client = helpers.admin_client() + end) + after_each(function() + if client then client:close() end + end) describe("/cluster", function() describe("GET", function() @@ -116,16 +120,24 @@ describe("Admin API", function() describe("/cluster/events", function() describe("POST", function() - it("posts a new event", function() - -- old test simply converted + it("fails with an invalid event", function() local res = assert(client:send { method = "POST", path = "/cluster/events", body = {}, headers = {["Content-Type"] = "application/json"} }) + assert.res_status(400, res) + end) + it("posts a new event", function() + local res = assert(client:send { + method = "POST", + path = "/cluster/events", + body = { type = "hello" }, + headers = {["Content-Type"] = "application/json"} + }) assert.res_status(200, res) end) end) end) -end) +end) \ No newline at end of file diff --git a/spec/fixtures/kong/plugins/database-cache/handler.lua b/spec/fixtures/kong/plugins/database-cache/handler.lua index cb393d9cd18..b21fea1c901 100644 --- a/spec/fixtures/kong/plugins/database-cache/handler.lua +++ b/spec/fixtures/kong/plugins/database-cache/handler.lua @@ -15,21 +15,21 @@ end function DatabaseCacheHandler:init_worker() DatabaseCacheHandler.super.init_worker(self) - cache.rawset(INVOCATIONS, 0) - cache.rawset(LOOKUPS, 0) + cache.sh_set(INVOCATIONS, 0) + cache.sh_set(LOOKUPS, 0) end function DatabaseCacheHandler:access(conf) DatabaseCacheHandler.super.access(self) cache.get_or_set("pile_effect", function() - cache.incr(LOOKUPS, 1) + cache.sh_incr(LOOKUPS, 1) -- Adds some delay ngx.sleep(tonumber(ngx.req.get_uri_args().sleep)) return true end) - cache.incr(INVOCATIONS, 1) + cache.sh_incr(INVOCATIONS, 1) end -return DatabaseCacheHandler +return DatabaseCacheHandler \ No newline at end of file