Skip to content

Commit

Permalink
feat: let balancer support priority
Browse files Browse the repository at this point in the history
Fix #1809
Signed-off-by: spacewander <spacewanderlzx@gmail.com>
  • Loading branch information
spacewander committed Mar 4, 2021
1 parent 4820ad9 commit 79f6e87
Show file tree
Hide file tree
Showing 18 changed files with 880 additions and 50 deletions.
40 changes: 31 additions & 9 deletions apisix/balancer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local require = require
local balancer = require("ngx.balancer")
local core = require("apisix.core")
local ipairs = ipairs
local require = require
local balancer = require("ngx.balancer")
local core = require("apisix.core")
local priority_balancer = require("apisix.balancer.priority")
local ipairs = ipairs
local set_more_tries = balancer.set_more_tries
local get_last_failure = balancer.get_last_failure
local set_timeouts = balancer.set_timeouts
Expand All @@ -40,12 +41,27 @@ local _M = {
}


local function transform_node(new_nodes, node)
if not new_nodes._priority_index then
new_nodes._priority_index = {}
end

if not new_nodes[node.priority] then
new_nodes[node.priority] = {}
core.table.insert(new_nodes._priority_index, node.priority)
end

new_nodes[node.priority][node.host .. ":" .. node.port] = node.weight
return new_nodes
end


local function fetch_health_nodes(upstream, checker)
local nodes = upstream.nodes
if not checker then
local new_nodes = core.table.new(0, #nodes)
for _, node in ipairs(nodes) do
new_nodes[node.host .. ":" .. node.port] = node.weight
new_nodes = transform_node(new_nodes, node)
end
return new_nodes
end
Expand All @@ -56,7 +72,7 @@ local function fetch_health_nodes(upstream, checker)
for _, node in ipairs(nodes) do
local ok, err = checker:get_target_status(node.host, port or node.port, host)
if ok then
up_nodes[node.host .. ":" .. node.port] = node.weight
up_nodes = transform_node(up_nodes, node)
elseif err then
core.log.error("failed to get health check target status, addr: ",
node.host, ":", port or node.port, ", host: ", host, ", err: ", err)
Expand All @@ -66,7 +82,7 @@ local function fetch_health_nodes(upstream, checker)
if core.table.nkeys(up_nodes) == 0 then
core.log.warn("all upstream nodes is unhealthy, use default")
for _, node in ipairs(nodes) do
up_nodes[node.host .. ":" .. node.port] = node.weight
up_nodes = transform_node(up_nodes, node)
end
end

Expand All @@ -83,9 +99,15 @@ local function create_server_picker(upstream, checker)

if picker then
local up_nodes = fetch_health_nodes(upstream, checker)
core.log.info("upstream nodes: ", core.json.delay_encode(up_nodes))

return picker.new(up_nodes, upstream)
if #up_nodes._priority_index > 1 then
core.log.info("upstream nodes: ", core.json.delay_encode(up_nodes))
return priority_balancer.new(up_nodes, upstream, picker)
end

core.log.info("upstream nodes: ",
core.json.delay_encode(up_nodes[up_nodes._priority_index[1]]))
return picker.new(up_nodes[up_nodes._priority_index[1]], upstream)
end

return nil, "invalid balancer type: " .. upstream.type, 0
Expand Down
10 changes: 9 additions & 1 deletion apisix/balancer/chash.lua
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,15 @@ function _M.new(up_nodes, upstream)

ctx.balancer_tried_servers[ctx.balancer_server] = true
ctx.balancer_tried_servers_count = (ctx.balancer_tried_servers_count or 0) + 1
end
end,
before_retry_next_priority = function (ctx)
if ctx.balancer_tried_servers then
core.tablepool.release("balancer_tried_servers", ctx.balancer_tried_servers)
ctx.balancer_tried_servers = nil
end

ctx.balancer_tried_servers_count = 0
end,
}
end

Expand Down
12 changes: 10 additions & 2 deletions apisix/balancer/ewma.lua
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ local function _ewma_find(ctx, up_nodes)
return nil, "all upstream servers tried"
end

peers = lrucache_trans_format(ctx.upstream_key, ctx.upstream_version, _trans_format, up_nodes)
peers = lrucache_trans_format(up_nodes, ctx.upstream_version, _trans_format, up_nodes)
if not peers then
return nil, 'up_nodes trans error'
end
Expand Down Expand Up @@ -228,7 +228,15 @@ function _M.new(up_nodes, upstream)
get = function(ctx)
return _ewma_find(ctx, up_nodes)
end,
after_balance = _ewma_after_balance
after_balance = _ewma_after_balance,
before_retry_next_priority = function (ctx)
if ctx.balancer_tried_servers then
core.tablepool.release("balancer_tried_servers", ctx.balancer_tried_servers)
ctx.balancer_tried_servers = nil
end

ctx.balancer_tried_servers_count = 0
end,
}
end

Expand Down
15 changes: 10 additions & 5 deletions apisix/balancer/least_conn.lua
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ end

function _M.new(up_nodes, upstream)
local servers_heap = binaryHeap.minUnique(least_score)
local safe_limit = 0
for server, weight in pairs(up_nodes) do
safe_limit = safe_limit + 1

local score = 1 / weight
-- Note: the argument order of insert is different from others
servers_heap:insert({
Expand All @@ -50,8 +47,10 @@ function _M.new(up_nodes, upstream)
local server, info, err
if ctx.balancer_tried_servers then
local tried_server_list = {}
for i = 1, safe_limit do
while true do
server, info = servers_heap:peek()
-- we need to let the retry > #nodes so this branch can be hit and
-- the request will retry next priority of nodes
if server == nil then
err = "all upstream servers tried"
break
Expand Down Expand Up @@ -100,7 +99,13 @@ function _M.new(up_nodes, upstream)
end

ctx.balancer_tried_servers[server] = true
end
end,
before_retry_next_priority = function (ctx)
if ctx.balancer_tried_servers then
core.tablepool.release("balancer_tried_servers", ctx.balancer_tried_servers)
ctx.balancer_tried_servers = nil
end
end,
}
end

Expand Down
81 changes: 81 additions & 0 deletions apisix/balancer/priority.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

local core = require("apisix.core")
local ipairs = ipairs


local _M = {}


local function max_priority(a, b)
return a > b
end


function _M.new(up_nodes, upstream, picker_mod)
local priority_index = up_nodes._priority_index
core.table.sort(priority_index, max_priority)

local pickers = core.table.new(#priority_index, 0)
for i, priority in ipairs(priority_index) do
local picker, err = picker_mod.new(up_nodes[priority], upstream)
if not picker then
return nil, "failed to create picker with priority " .. priority .. ": " .. err
end
if not picker.before_retry_next_priority then
return nil, "picker should define 'before_retry_next_priority' to reset ctx"
end

pickers[i] = picker
end

return {
upstream = upstream,
get = function (ctx)
for i = ctx.priority_balancer_picker_idx or 1, #pickers do
local picker = pickers[i]
local server, err = picker.get(ctx)
if server then
ctx.priority_balancer_picker_idx = i
return server
end

core.log.notice("failed to get server from current priority ",
priority_index[i],
", try next one, err: ", err)

picker.before_retry_next_priority(ctx)
end

return nil, "all servers tried"
end,
after_balance = function (ctx, before_retry)
local priority_balancer_picker = pickers[ctx.priority_balancer_picker_idx]
if not priority_balancer_picker or
not priority_balancer_picker.after_balance
then
return
end

priority_balancer_picker.after_balance(ctx, before_retry)
end
}
end


return _M
10 changes: 9 additions & 1 deletion apisix/balancer/roundrobin.lua
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,15 @@ function _M.new(up_nodes, upstream)

ctx.balancer_tried_servers[ctx.balancer_server] = true
ctx.balancer_tried_servers_count = (ctx.balancer_tried_servers_count or 0) + 1
end
end,
before_retry_next_priority = function (ctx)
if ctx.balancer_tried_servers then
core.tablepool.release("balancer_tried_servers", ctx.balancer_tried_servers)
ctx.balancer_tried_servers = nil
end

ctx.balancer_tried_servers_count = 0
end,
}
end

Expand Down
5 changes: 5 additions & 0 deletions apisix/schema_def.lua
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,11 @@ local nodes_schema = {
type = "integer",
minimum = 0,
},
priority = {
description = "priority of node",
type = "integer",
default = 0,
},
metadata = {
description = "metadata of node",
type = "object",
Expand Down
16 changes: 14 additions & 2 deletions apisix/upstream.lua
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ do

need_filled = true
end

if not n.priority then
need_filled = true
end
end

up_conf.original_nodes = nodes
Expand All @@ -173,9 +177,17 @@ do

local filled_nodes = core.table.new(#nodes, 0)
for i, n in ipairs(nodes) do
if not n.port then
if not n.port or not n.priority then
filled_nodes[i] = core.table.clone(n)
filled_nodes[i].port = scheme_to_port[scheme]

if not n.port then
filled_nodes[i].port = scheme_to_port[scheme]
end

-- fix priority for non-array nodes and nodes from service discovery
if not n.priority then
filled_nodes[i].priority = 0
end
else
filled_nodes[i] = n
end
Expand Down
2 changes: 1 addition & 1 deletion apisix/utils/upstream.lua
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ function _M.compare_upstream_node(up_conf, new_t)
for i = 1, #new_t do
local new_node = new_t[i]
local old_node = old_t[i]
for _, name in ipairs({"host", "port", "weight"}) do
for _, name in ipairs({"host", "port", "weight", "priority"}) do
if new_node[name] ~= old_node[name] then
return false
end
Expand Down
36 changes: 36 additions & 0 deletions docs/en/latest/admin-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,42 @@ After the execution is successful, nodes will not retain the original data, and

```

Each node can be configured with a priority. A node with low priority will only be
used when all the nodes with higher priority are unavailable or tried.

As the default priority is 0, we can configure nodes with negative priority as the backup.

For example,

```json
{
"uri": "/hello",
"upstream": {
"type": "roundrobin",
"nodes": [
{"host": "127.0.0.1", "port": 1980, "weight": 2000},
{"host": "127.0.0.2", "port": 1980, "weight": 1, "priority": -1}
],
"checks": {
"active": {
"http_path": "/status",
"healthy": {
"interval": 1,
"successes": 1
},
"unhealthy": {
"interval": 1,
"http_failures": 1
}
}
}
}
}
```

Node `127.0.0.2` will be used only after `127.0.0.1` is unavaibled or tried.
Therefore it is the backup of `127.0.0.1`.

> Response Parameters
Return response from etcd currently.
Expand Down
Loading

0 comments on commit 79f6e87

Please sign in to comment.