Skip to content

Commit

Permalink
updates based on review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Tieske committed Dec 14, 2016
1 parent cc8d6f4 commit 27b87d2
Show file tree
Hide file tree
Showing 12 changed files with 203 additions and 1,203 deletions.
19 changes: 14 additions & 5 deletions kong/api/routes/upstreams.lua
Original file line number Diff line number Diff line change
Expand Up @@ -44,25 +44,30 @@ return {
end,

POST = function(self, dao_factory, helpers)
local cleanup_factor = 10 -- when to cleanup; invalid-entries > (valid-ones * cleanup_factor)
-- when to cleanup: invalid-entries > (valid-ones * cleanup_factor)
local cleanup_factor = 10

--cleaning up history, check if it's necessary...
local target_history = dao_factory.targets:find_all(
{ upstream_id = self.params.upstream_id })

if target_history then --ignoring errors here, will be caught when posting below
-- sort the targets
for _,target in ipairs(target_history) do
target.order = target.created_at..":"..target.id
end

-- sort table in reverse order
table.sort(target_history, function(a,b) return a.order>b.order end)
-- do clean up
local cleaned = {}
local delete = {}

for _, entry in ipairs(target_history) do
if cleaned[entry.target] then
-- we got a newer entry for this target than this, so this one can go
delete[#delete+1] = entry

else
-- haven't got this one, so this is the last one for this target
cleaned[entry.target] = true
Expand All @@ -77,7 +82,9 @@ return {
-- either nothing left, or when 10x more outdated than active entries
if (#cleaned == 0 and #delete > 0) or
(#delete >= (math.max(#cleaned,1)*cleanup_factor)) then
ngx.log(ngx.WARN, "Starting cleanup of target table for upstream "..tostring(self.params.upstream_id))

ngx.log(ngx.INFO, "[admin api] Starting cleanup of target table for upstream ",
tostring(self.params.upstream_id))
local cnt = 0
for _, entry in ipairs(delete) do
-- not sending update events, one event at the end, based on the
Expand All @@ -90,11 +97,13 @@ return {
-- in case another kong-node does the same cleanup simultaneously
cnt = cnt + 1
end
ngx.log(ngx.WARN, "Finished cleanup of target table for upstream "..
tostring(self.params.upstream_id).." removed "..tostring(cnt).." target entries")

ngx.log(ngx.INFO, "[admin api] Finished cleanup of target table",
" for upstream ", tostring(self.params.upstream_id),
" removed ", tostring(cnt), " target entries")
end
end

crud.post(self.params, dao_factory.targets)
end,
},
Expand Down
119 changes: 67 additions & 52 deletions kong/core/balancer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,22 @@ local dns_client = require "resty.dns.client" -- due to startup/require order,
local ring_balancer = require "resty.dns.balancer"

local toip = dns_client.toip
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
local ngx_DEBUG = ngx.DEBUG
local log = ngx.log

local empty = pl_tablex.readonly {}
local ERROR = ngx.ERR
local DEBUG = ngx.DEBUG
local EMPTY_T = pl_tablex.readonly {}

--===========================================================
-- Ring-balancer based resolution
--===========================================================
local balancers = {} -- table holding our balancer objects, indexed by upstream name

-- caching logic;
-- we retain 3 entities;
-- 1) list of upstreams, to be invalidated on any upstream change
-- 2) individual upstreams, to be invalidated on individual basis
-- 3) target history for an upstream, invalidated when;
-- we retain 3 entities:
-- 1) list of upstreams: to be invalidated on any upstream change
-- 2) individual upstreams: to be invalidated on individual basis
-- 3) target history for an upstream, invalidated when:
-- a) along with the upstream it belongs to
-- b) upon any target change for the upstream (can only add entries)
-- Distinction between 1 and 2 makes it possible to invalidate individual
Expand All @@ -30,7 +30,7 @@ local balancers = {} -- table holding our balancer objects, indexed by upstream
-- Implements a simple dictionary with all upstream-ids indexed
-- by their name.
local function load_upstreams_dict_into_memory()
ngx_log(ngx_DEBUG, "fetching all upstreams")
log(DEBUG, "fetching all upstreams")
local upstreams, err = singletons.dao.upstreams:find_all()
if err then
return nil, err
Expand All @@ -55,11 +55,10 @@ end

-- loads a single upstream entity
local function load_upstream_into_memory(upstream_id)
ngx_log(ngx_DEBUG, "fetching upstream: ",tostring(name)," ",tostring(upstream_id))
log(DEBUG, "fetching upstream: ", tostring(upstream_id))

local upstream, err = singletons.dao.upstreams:find_all {id = upstream_id}
if not upstream then
return nil, err
end

This comment has been minimized.

Copy link
@thibaultcha

thibaultcha Dec 14, 2016

Member

(I actually like this form better, it gives some more space as well and is also better for code coverage (if ever).

This comment has been minimized.

Copy link
@Tieske

Tieske Dec 14, 2016

Author Member

done

if not upstream then return nil, err end

upstream = upstream[1] -- searched by id, so only 1 row in the returned set

Expand Down Expand Up @@ -87,41 +86,41 @@ end
-- @return upstream table, or `false` if not found, or nil+error
local function get_upstream(upstream_name)
local upstreams_dict, err = cache.get_or_set(cache.upstreams_dict_key(),
nil, load_upstreams_dict_into_memory)
nil, load_upstreams_dict_into_memory)
if err then
return nil, err
end

local upstream_id = upstreams_dict[upstream_name]
if not upstream_id then return false end -- no upstream by this name

return cache.get_or_set(cache.upstream_key(upstream_id), nil, load_upstream_into_memory, upstream_id)
return cache.get_or_set(cache.upstream_key(upstream_id), nil,
load_upstream_into_memory, upstream_id)
end

-- loads the target history for an upstream
-- @param upstream_id Upstream uuid for which to load the target history
local function load_targets_into_memory(upstream_id)
ngx_log(ngx_DEBUG, "fetching targets for upstream: ",tostring(upstream_id))
log(DEBUG, "fetching targets for upstream: ",tostring(upstream_id))

local target_history, err = singletons.dao.targets:find_all {upstream_id = upstream_id}
if err then
return nil, err
end
if err then return nil, err end

-- some raw data updates
-- perform some raw data updates
for _, target in ipairs(target_history) do
-- split `target` field into `name` and `port`
local port
target.name, port = string.match(target.target, "^(.-):(%d+)$")
target.port = tonumber(port)
-- need exact order, so order by created time and uuid

-- need exact order, so create sort-key by created-time and uuid
target.order = target.created_at..":"..target.id
end

-- order by time
table.sort(target_history, function(a,b) return a.order<b.order end)
--for i, t in ipairs(target_history) do
-- ngx_log(ngx_DEBUG, i, t.order)
--end

table.sort(target_history, function(a,b)
return a.order<b.order
end)

return target_history
end

Expand All @@ -131,22 +130,24 @@ end
-- @param start the index where to start in the `history` parameter
-- @return true
local function apply_history(rb, history, start)

for i = start, #history do
local target = history[i]

if target.weight > 0 then
assert(rb:addHost(target.name, target.port, target.weight))
else
assert(rb:removeHost(target.name, target.port))
end

rb.__targets_history[i] = {
name = target.name,
port = target.port,
weight = target.weight,
order = target.order,
}
end

return true
end

Expand All @@ -160,32 +161,37 @@ local get_balancer = function(target)
-- first go and find the upstream object, from cache or the db
local upstream, err = get_upstream(hostname)
if err then
return nil, err -- there was an error
elseif upstream == false then
return false -- no upstream by this name
return nil, err -- there was an error
end

if upstream == false then
return false -- no upstream by this name
end

-- we've got the upstream, now fetch its targets, from cache or the db
local targets_history, err = cache.get_or_set(cache.targets_key(upstream.id),
nil, load_targets_into_memory, upstream.id)

if err then
return nil, err
end

local balancer = balancers[upstream.name] -- always exists, created upon fetching upstream

-- check history state
-- NOTE: in the code below variables are similarly named, but the
-- ones with `__`-prefixed, are the ones on the `balancer` object, and the
-- regular ones are the ones we just fetched and are comparing against
local __size = #balancer.__targets_history
local size = #targets_history

if __size ~= size or
balancer.__targets_history[__size].order ~= targets_history[size].order then
-- last entries in history don't match, so we must do some updates.

-- compare balancer history with db-loaded history
local last_equal_index = 0 -- last index where history is the same
for i, entry in ipairs(balancer.__targets_history) do
if entry.order ~= (targets_history[i] or empty).order then
if entry.order ~= (targets_history[i] or EMPTY_T).order then
last_equal_index = i - 1
break
end
Expand All @@ -194,6 +200,7 @@ local get_balancer = function(target)
if last_equal_index == __size then
-- history is the same, so we only need to add new entries
apply_history(balancer, targets_history, last_equal_index + 1)

else
-- history not the same.
-- TODO: ideally we would undo the last ones until we're equal again
Expand All @@ -205,9 +212,9 @@ local get_balancer = function(target)
dns = dns_client,
})
if not balancer then return balancer, err end

balancer.__targets_history = {}
balancers[upstream.name] = balancer -- overwrite our existing one

apply_history(balancer, targets_history, 1)
end
end
Expand All @@ -229,21 +236,23 @@ end
-- @return true on success, nil+error otherwise
local function execute(target)
local upstream = target.upstream

if target.type ~= "name" then
-- it's an ip address (v4 or v6), so nothing we can do...
target.ip = upstream.host
target.port = upstream.port or 80
target.port = upstream.port or 80 -- TODO: remove this fallback value
return true
end

-- when tries == 0 it runs before the `balancer` context (in the `access` context),
-- when tries >= 2 then it performs a retry in the `balancer` context
local dns_cache_only = target.tries ~= 0
local balancer

if dns_cache_only then
-- retry, so balancer is already set if there was one
balancer = target.balancer

else
local err
-- first try, so try and find a matching balancer/upstream object
Expand All @@ -264,30 +273,36 @@ local function execute(target)
if not ip then
if port == "No peers are available" then
-- in this case a "503 service unavailable", others will be a 500.
ngx_log(ngx_ERR, "Failure to get a peer from the ring-balancer '",upstream.host,"'; ",port)
log(ERROR, "failure to get a peer from the ring-balancer '",
upstream.host, "'; ", port)
return responses.send(503)
end

return nil, port -- some other error
end

target.ip = ip
target.port = port
target.hostname = hostname
return true
else
-- have to do a regular DNS lookup
local ip, port = toip(upstream.host, upstream.port, dns_cache_only)
if not ip then
return nil, port
end
target.ip = ip
target.port = port
return true
end

-- have to do a regular DNS lookup
local ip, port = toip(upstream.host, upstream.port, dns_cache_only)
if not ip then
return nil, port
end

target.ip = ip
target.port = port
return true
end

return {
execute = execute,
_load_upstreams_dict_into_memory = load_upstreams_dict_into_memory, -- exported for test purposes
_load_upstream_into_memory = load_upstream_into_memory, -- exported for test purposes
_load_targets_into_memory = load_targets_into_memory, -- exported for test purposes

-- ones below are exported for test purposes
_load_upstreams_dict_into_memory = load_upstreams_dict_into_memory,
_load_upstream_into_memory = load_upstream_into_memory,
_load_targets_into_memory = load_targets_into_memory,
}
5 changes: 4 additions & 1 deletion kong/core/handler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,20 @@ return {
-- hostname = nil, -- the hostname that belongs to the ip address returned by the balancer
}
ngx.ctx.balancer_address = balancer_address

local ok, err = balancer_execute(balancer_address)
if not ok then
return responses.send_HTTP_INTERNAL_SERVER_ERROR("failed the initial "..
"dns/balancer resolve for '"..balancer_address.upstream.host..
"' with; "..tostring(err))
"' with: "..tostring(err))
end

if balancer_address.hostname and not ngx.ctx.api.preserve_host then
ngx.var.upstream_host = balancer_address.hostname
else
ngx.var.upstream_host = upstream_host
end

end,
-- Only executed if the `resolver` module found an API and allows nginx to proxy it.
after = function()
Expand Down
7 changes: 7 additions & 0 deletions kong/dao/migrations/cassandra.lua
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ return {
},
{
name = "2016-09-16-141423_upstreams",
-- Note on the timestamps;
-- The Cassandra timestamps are created in Lua code, and hence ALL entities
-- will now be created in millisecond precision. The existing entries will
-- remain in second precision, but new ones (for ALL entities!) will be
-- in millisecond precision.
-- This differs from the Postgres one where only the new entities (upstreams
-- and targets) will get millisecond precision.
up = [[
CREATE TABLE IF NOT EXISTS upstreams(
id uuid,
Expand Down
5 changes: 2 additions & 3 deletions kong/dao/migrations/postgres.lua
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,8 @@ return {
{
name = "2016-09-16-141423_upstreams",
-- Note on the timestamps below; these use a precision of milliseconds
-- this differs from the other tables above, as they only use second precision
-- The Cassandra timestamps are created in Lua code, and have hence ALL at
-- the same time been updated to millisecond precision.
-- this differs from the other tables above, as they only use second precision.
-- This differs from the change to the Cassandra entities.
up = [[
CREATE TABLE IF NOT EXISTS upstreams(
id uuid PRIMARY KEY,
Expand Down
Loading

0 comments on commit 27b87d2

Please sign in to comment.