From 79f6e87ebbfb0428b458cb0992193bdb2b492149 Mon Sep 17 00:00:00 2001 From: spacewander Date: Mon, 22 Feb 2021 09:43:40 +0800 Subject: [PATCH] feat: let balancer support priority Fix #1809 Signed-off-by: spacewander --- apisix/balancer.lua | 40 ++- apisix/balancer/chash.lua | 10 +- apisix/balancer/ewma.lua | 12 +- apisix/balancer/least_conn.lua | 15 +- apisix/balancer/priority.lua | 81 ++++++ apisix/balancer/roundrobin.lua | 10 +- apisix/schema_def.lua | 5 + apisix/upstream.lua | 16 +- apisix/utils/upstream.lua | 2 +- docs/en/latest/admin-api.md | 36 +++ docs/en/latest/control-api.md | 9 + docs/zh/latest/admin-api.md | 35 +++ t/admin/balancer.t | 40 +-- t/control/healthcheck.t | 12 +- t/discovery/consul_kv.t | 4 +- t/node/priority-balancer/health-checker.t | 187 ++++++++++++ t/node/priority-balancer/sanity.t | 332 ++++++++++++++++++++++ t/node/upstream-discovery.t | 84 +++++- 18 files changed, 880 insertions(+), 50 deletions(-) create mode 100644 apisix/balancer/priority.lua create mode 100644 t/node/priority-balancer/health-checker.t create mode 100644 t/node/priority-balancer/sanity.t diff --git a/apisix/balancer.lua b/apisix/balancer.lua index 4be56fcc28aa..16cbd5d661b3 100644 --- a/apisix/balancer.lua +++ b/apisix/balancer.lua @@ -14,10 +14,11 @@ -- See the License for the specific language governing permissions and -- limitations under the License. -- -local require = require -local balancer = require("ngx.balancer") -local core = require("apisix.core") -local ipairs = ipairs +local require = require +local balancer = require("ngx.balancer") +local core = require("apisix.core") +local priority_balancer = require("apisix.balancer.priority") +local ipairs = ipairs local set_more_tries = balancer.set_more_tries local get_last_failure = balancer.get_last_failure local set_timeouts = balancer.set_timeouts @@ -40,12 +41,27 @@ local _M = { } +local function transform_node(new_nodes, node) + if not new_nodes._priority_index then + new_nodes._priority_index = {} + end + + if not new_nodes[node.priority] then + new_nodes[node.priority] = {} + core.table.insert(new_nodes._priority_index, node.priority) + end + + new_nodes[node.priority][node.host .. ":" .. node.port] = node.weight + return new_nodes +end + + local function fetch_health_nodes(upstream, checker) local nodes = upstream.nodes if not checker then local new_nodes = core.table.new(0, #nodes) for _, node in ipairs(nodes) do - new_nodes[node.host .. ":" .. node.port] = node.weight + new_nodes = transform_node(new_nodes, node) end return new_nodes end @@ -56,7 +72,7 @@ local function fetch_health_nodes(upstream, checker) for _, node in ipairs(nodes) do local ok, err = checker:get_target_status(node.host, port or node.port, host) if ok then - up_nodes[node.host .. ":" .. node.port] = node.weight + up_nodes = transform_node(up_nodes, node) elseif err then core.log.error("failed to get health check target status, addr: ", node.host, ":", port or node.port, ", host: ", host, ", err: ", err) @@ -66,7 +82,7 @@ local function fetch_health_nodes(upstream, checker) if core.table.nkeys(up_nodes) == 0 then core.log.warn("all upstream nodes is unhealthy, use default") for _, node in ipairs(nodes) do - up_nodes[node.host .. ":" .. node.port] = node.weight + up_nodes = transform_node(up_nodes, node) end end @@ -83,9 +99,15 @@ local function create_server_picker(upstream, checker) if picker then local up_nodes = fetch_health_nodes(upstream, checker) - core.log.info("upstream nodes: ", core.json.delay_encode(up_nodes)) - return picker.new(up_nodes, upstream) + if #up_nodes._priority_index > 1 then + core.log.info("upstream nodes: ", core.json.delay_encode(up_nodes)) + return priority_balancer.new(up_nodes, upstream, picker) + end + + core.log.info("upstream nodes: ", + core.json.delay_encode(up_nodes[up_nodes._priority_index[1]])) + return picker.new(up_nodes[up_nodes._priority_index[1]], upstream) end return nil, "invalid balancer type: " .. upstream.type, 0 diff --git a/apisix/balancer/chash.lua b/apisix/balancer/chash.lua index f9dbdbbb470b..3b260a25932f 100644 --- a/apisix/balancer/chash.lua +++ b/apisix/balancer/chash.lua @@ -118,7 +118,15 @@ function _M.new(up_nodes, upstream) ctx.balancer_tried_servers[ctx.balancer_server] = true ctx.balancer_tried_servers_count = (ctx.balancer_tried_servers_count or 0) + 1 - end + end, + before_retry_next_priority = function (ctx) + if ctx.balancer_tried_servers then + core.tablepool.release("balancer_tried_servers", ctx.balancer_tried_servers) + ctx.balancer_tried_servers = nil + end + + ctx.balancer_tried_servers_count = 0 + end, } end diff --git a/apisix/balancer/ewma.lua b/apisix/balancer/ewma.lua index e1b276273063..e9926dac6531 100644 --- a/apisix/balancer/ewma.lua +++ b/apisix/balancer/ewma.lua @@ -147,7 +147,7 @@ local function _ewma_find(ctx, up_nodes) return nil, "all upstream servers tried" end - peers = lrucache_trans_format(ctx.upstream_key, ctx.upstream_version, _trans_format, up_nodes) + peers = lrucache_trans_format(up_nodes, ctx.upstream_version, _trans_format, up_nodes) if not peers then return nil, 'up_nodes trans error' end @@ -228,7 +228,15 @@ function _M.new(up_nodes, upstream) get = function(ctx) return _ewma_find(ctx, up_nodes) end, - after_balance = _ewma_after_balance + after_balance = _ewma_after_balance, + before_retry_next_priority = function (ctx) + if ctx.balancer_tried_servers then + core.tablepool.release("balancer_tried_servers", ctx.balancer_tried_servers) + ctx.balancer_tried_servers = nil + end + + ctx.balancer_tried_servers_count = 0 + end, } end diff --git a/apisix/balancer/least_conn.lua b/apisix/balancer/least_conn.lua index 1cfdbf9f9252..8923d1781d00 100644 --- a/apisix/balancer/least_conn.lua +++ b/apisix/balancer/least_conn.lua @@ -31,10 +31,7 @@ end function _M.new(up_nodes, upstream) local servers_heap = binaryHeap.minUnique(least_score) - local safe_limit = 0 for server, weight in pairs(up_nodes) do - safe_limit = safe_limit + 1 - local score = 1 / weight -- Note: the argument order of insert is different from others servers_heap:insert({ @@ -50,8 +47,10 @@ function _M.new(up_nodes, upstream) local server, info, err if ctx.balancer_tried_servers then local tried_server_list = {} - for i = 1, safe_limit do + while true do server, info = servers_heap:peek() + -- we need to let the retry > #nodes so this branch can be hit and + -- the request will retry next priority of nodes if server == nil then err = "all upstream servers tried" break @@ -100,7 +99,13 @@ function _M.new(up_nodes, upstream) end ctx.balancer_tried_servers[server] = true - end + end, + before_retry_next_priority = function (ctx) + if ctx.balancer_tried_servers then + core.tablepool.release("balancer_tried_servers", ctx.balancer_tried_servers) + ctx.balancer_tried_servers = nil + end + end, } end diff --git a/apisix/balancer/priority.lua b/apisix/balancer/priority.lua new file mode 100644 index 000000000000..af5d60cbbeb1 --- /dev/null +++ b/apisix/balancer/priority.lua @@ -0,0 +1,81 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +local core = require("apisix.core") +local ipairs = ipairs + + +local _M = {} + + +local function max_priority(a, b) + return a > b +end + + +function _M.new(up_nodes, upstream, picker_mod) + local priority_index = up_nodes._priority_index + core.table.sort(priority_index, max_priority) + + local pickers = core.table.new(#priority_index, 0) + for i, priority in ipairs(priority_index) do + local picker, err = picker_mod.new(up_nodes[priority], upstream) + if not picker then + return nil, "failed to create picker with priority " .. priority .. ": " .. err + end + if not picker.before_retry_next_priority then + return nil, "picker should define 'before_retry_next_priority' to reset ctx" + end + + pickers[i] = picker + end + + return { + upstream = upstream, + get = function (ctx) + for i = ctx.priority_balancer_picker_idx or 1, #pickers do + local picker = pickers[i] + local server, err = picker.get(ctx) + if server then + ctx.priority_balancer_picker_idx = i + return server + end + + core.log.notice("failed to get server from current priority ", + priority_index[i], + ", try next one, err: ", err) + + picker.before_retry_next_priority(ctx) + end + + return nil, "all servers tried" + end, + after_balance = function (ctx, before_retry) + local priority_balancer_picker = pickers[ctx.priority_balancer_picker_idx] + if not priority_balancer_picker or + not priority_balancer_picker.after_balance + then + return + end + + priority_balancer_picker.after_balance(ctx, before_retry) + end + } +end + + +return _M diff --git a/apisix/balancer/roundrobin.lua b/apisix/balancer/roundrobin.lua index a9469cac50bb..7090f526117c 100644 --- a/apisix/balancer/roundrobin.lua +++ b/apisix/balancer/roundrobin.lua @@ -73,7 +73,15 @@ function _M.new(up_nodes, upstream) ctx.balancer_tried_servers[ctx.balancer_server] = true ctx.balancer_tried_servers_count = (ctx.balancer_tried_servers_count or 0) + 1 - end + end, + before_retry_next_priority = function (ctx) + if ctx.balancer_tried_servers then + core.tablepool.release("balancer_tried_servers", ctx.balancer_tried_servers) + ctx.balancer_tried_servers = nil + end + + ctx.balancer_tried_servers_count = 0 + end, } end diff --git a/apisix/schema_def.lua b/apisix/schema_def.lua index 9ae7669928ba..57f99150bcb5 100644 --- a/apisix/schema_def.lua +++ b/apisix/schema_def.lua @@ -305,6 +305,11 @@ local nodes_schema = { type = "integer", minimum = 0, }, + priority = { + description = "priority of node", + type = "integer", + default = 0, + }, metadata = { description = "metadata of node", type = "object", diff --git a/apisix/upstream.lua b/apisix/upstream.lua index 6ae81c05a4a0..681ce682399d 100644 --- a/apisix/upstream.lua +++ b/apisix/upstream.lua @@ -162,6 +162,10 @@ do need_filled = true end + + if not n.priority then + need_filled = true + end end up_conf.original_nodes = nodes @@ -173,9 +177,17 @@ do local filled_nodes = core.table.new(#nodes, 0) for i, n in ipairs(nodes) do - if not n.port then + if not n.port or not n.priority then filled_nodes[i] = core.table.clone(n) - filled_nodes[i].port = scheme_to_port[scheme] + + if not n.port then + filled_nodes[i].port = scheme_to_port[scheme] + end + + -- fix priority for non-array nodes and nodes from service discovery + if not n.priority then + filled_nodes[i].priority = 0 + end else filled_nodes[i] = n end diff --git a/apisix/utils/upstream.lua b/apisix/utils/upstream.lua index 2b6077740065..4417f07c1344 100644 --- a/apisix/utils/upstream.lua +++ b/apisix/utils/upstream.lua @@ -47,7 +47,7 @@ function _M.compare_upstream_node(up_conf, new_t) for i = 1, #new_t do local new_node = new_t[i] local old_node = old_t[i] - for _, name in ipairs({"host", "port", "weight"}) do + for _, name in ipairs({"host", "port", "weight", "priority"}) do if new_node[name] ~= old_node[name] then return false end diff --git a/docs/en/latest/admin-api.md b/docs/en/latest/admin-api.md index 51054b90c98f..50158914e74d 100644 --- a/docs/en/latest/admin-api.md +++ b/docs/en/latest/admin-api.md @@ -657,6 +657,42 @@ After the execution is successful, nodes will not retain the original data, and ``` +Each node can be configured with a priority. A node with low priority will only be +used when all the nodes with higher priority are unavailable or tried. + +As the default priority is 0, we can configure nodes with negative priority as the backup. + +For example, + +```json +{ + "uri": "/hello", + "upstream": { + "type": "roundrobin", + "nodes": [ + {"host": "127.0.0.1", "port": 1980, "weight": 2000}, + {"host": "127.0.0.2", "port": 1980, "weight": 1, "priority": -1} + ], + "checks": { + "active": { + "http_path": "/status", + "healthy": { + "interval": 1, + "successes": 1 + }, + "unhealthy": { + "interval": 1, + "http_failures": 1 + } + } + } + } +} +``` + +Node `127.0.0.2` will be used only after `127.0.0.1` is unavaibled or tried. +Therefore it is the backup of `127.0.0.1`. + > Response Parameters Return response from etcd currently. diff --git a/docs/en/latest/control-api.md b/docs/en/latest/control-api.md index 09ee2c9450d0..4efd872fa3df 100644 --- a/docs/en/latest/control-api.md +++ b/docs/en/latest/control-api.md @@ -97,6 +97,7 @@ Return current [health check](health-check.md) status in the format below: { "host": "127.0.0.1", "port": 1980, + "priority": 0, "weight": 1 } ], @@ -105,11 +106,13 @@ Return current [health check](health-check.md) status in the format below: { "host": "127.0.0.1", "port": 1980, + "priority": 0, "weight": 1 }, { "host": "127.0.0.2", "port": 1988, + "priority": 0, "weight": 1 } ], @@ -121,6 +124,7 @@ Return current [health check](health-check.md) status in the format below: { "host": "127.0.0.1", "port": 1980, + "priority": 0, "weight": 1 } ], @@ -129,11 +133,13 @@ Return current [health check](health-check.md) status in the format below: { "host": "127.0.0.1", "port": 1980, + "priority": 0, "weight": 1 }, { "host": "127.0.0.1", "port": 1988, + "priority": 0, "weight": 1 } ], @@ -162,6 +168,7 @@ For example, `GET /v1/healthcheck/upstreams/1` returns: { "host": "127.0.0.1", "port": 1980, + "priority": 0, "weight": 1 } ], @@ -170,11 +177,13 @@ For example, `GET /v1/healthcheck/upstreams/1` returns: { "host": "127.0.0.1", "port": 1980, + "priority": 0, "weight": 1 }, { "host": "127.0.0.2", "port": 1988, + "priority": 0, "weight": 1 } ], diff --git a/docs/zh/latest/admin-api.md b/docs/zh/latest/admin-api.md index eb83d8196ed9..510c7eaf5154 100644 --- a/docs/zh/latest/admin-api.md +++ b/docs/zh/latest/admin-api.md @@ -665,6 +665,41 @@ HTTP/1.1 200 OK ``` +节点可以配置自己的优先级。只有在高优先级的节点不可用或者尝试过,才会访问一个低优先级的节点。 + +由于默认的优先级是 0,我们可以给一些节点配置负数的优先级来作为备份。 + +举个例子, + +```json +{ + "uri": "/hello", + "upstream": { + "type": "roundrobin", + "nodes": [ + {"host": "127.0.0.1", "port": 1980, "weight": 2000}, + {"host": "127.0.0.2", "port": 1980, "weight": 1, "priority": -1} + ], + "checks": { + "active": { + "http_path": "/status", + "healthy": { + "interval": 1, + "successes": 1 + }, + "unhealthy": { + "interval": 1, + "http_failures": 1 + } + } + } + } +} +``` + +节点 `127.0.0.2` 只有在 `127.0.0.1` 不可用或者尝试过之后才会被访问。 +所以它是 `127.0.0.1` 的备份。 + > 应答参数 目前是直接返回与 etcd 交互后的结果。 diff --git a/t/admin/balancer.t b/t/admin/balancer.t index d1b9027faad0..9bce5fc8538f 100644 --- a/t/admin/balancer.t +++ b/t/admin/balancer.t @@ -70,9 +70,9 @@ __DATA__ local up_conf = { type = "roundrobin", nodes = { - {host = "39.97.63.215", port = 80, weight = 1}, - {host = "39.97.63.216", port = 81, weight = 1}, - {host = "39.97.63.217", port = 82, weight = 1}, + {host = "39.97.63.215", port = 80, weight = 1, priority = 0}, + {host = "39.97.63.216", port = 81, weight = 1, priority = 0}, + {host = "39.97.63.217", port = 82, weight = 1, priority = 0}, } } local ctx = {conf_version = 1} @@ -101,9 +101,9 @@ host: 39.97.63.217 count: 4 local up_conf = { type = "roundrobin", nodes = { - {host = "39.97.63.215", port = 80, weight = 1}, - {host = "39.97.63.216", port = 81, weight = 2}, - {host = "39.97.63.217", port = 82, weight = 3}, + {host = "39.97.63.215", port = 80, weight = 1, priority = 0}, + {host = "39.97.63.216", port = 81, weight = 2, priority = 0}, + {host = "39.97.63.217", port = 82, weight = 3, priority = 0}, } } local ctx = {conf_version = 1} @@ -132,9 +132,9 @@ host: 39.97.63.217 count: 6 local up_conf = { type = "roundrobin", nodes = { - {host = "39.97.63.215", port = 80, weight = 1}, - {host = "39.97.63.216", port = 81, weight = 1}, - {host = "39.97.63.217", port = 82, weight = 1}, + {host = "39.97.63.215", port = 80, weight = 1, priority = 0}, + {host = "39.97.63.216", port = 81, weight = 1, priority = 0}, + {host = "39.97.63.217", port = 82, weight = 1, priority = 0}, } } local ctx = {} @@ -146,8 +146,8 @@ host: 39.97.63.217 count: 6 -- cached by version up_conf.nodes = { - {host = "39.97.63.218", port = 80, weight = 1}, - {host = "39.97.63.219", port = 80, weight = 0}, + {host = "39.97.63.218", port = 80, weight = 1, priority = 0}, + {host = "39.97.63.219", port = 80, weight = 0, priority = 0}, } test(route, ctx) @@ -179,9 +179,9 @@ host: 39.97.63.218 count: 12 type = "chash", key = "remote_addr", nodes = { - {host = "39.97.63.215", port = 80, weight = 1}, - {host = "39.97.63.216", port = 81, weight = 1}, - {host = "39.97.63.217", port = 82, weight = 1}, + {host = "39.97.63.215", port = 80, weight = 1, priority = 0}, + {host = "39.97.63.216", port = 81, weight = 1, priority = 0}, + {host = "39.97.63.217", port = 82, weight = 1, priority = 0}, } } local ctx = { @@ -195,8 +195,8 @@ host: 39.97.63.218 count: 12 -- cached by version up_conf.nodes = { - {host = "39.97.63.218", port = 80, weight = 1}, - {host = "39.97.63.219", port = 80, weight = 0}, + {host = "39.97.63.218", port = 80, weight = 1, priority = 0}, + {host = "39.97.63.219", port = 80, weight = 0, priority = 0}, } test(route, ctx) @@ -223,9 +223,9 @@ host: 39.97.63.218 count: 12 local up_conf = { type = "roundrobin", nodes = { - {host = "39.97.63.215", port = 80, weight = 1}, - {host = "39.97.63.216", port = 81, weight = 1}, - {host = "39.97.63.217", port = 82, weight = 1}, + {host = "39.97.63.215", port = 80, weight = 1, priority = 0}, + {host = "39.97.63.216", port = 81, weight = 1, priority = 0}, + {host = "39.97.63.217", port = 82, weight = 1, priority = 0}, } } local ctx = {} @@ -237,7 +237,7 @@ host: 39.97.63.218 count: 12 -- one item in nodes, return it directly up_conf.nodes = { - {host = "39.97.63.218", port = 80, weight = 1}, + {host = "39.97.63.218", port = 80, weight = 1, priority = 0}, } test(route, ctx) } diff --git a/t/control/healthcheck.t b/t/control/healthcheck.t index be22420185b3..41fe7f19cf89 100644 --- a/t/control/healthcheck.t +++ b/t/control/healthcheck.t @@ -103,8 +103,8 @@ qr/unhealthy TCP increment \(.+\) for '[^']+'/ unhealthy TCP increment (1/2) for '(127.0.0.2:1988)' unhealthy TCP increment (2/2) for '(127.0.0.2:1988)' --- response_body -[{"healthy_nodes":[{"host":"127.0.0.1","port":1980,"weight":1}],"name":"upstream#/upstreams/1","nodes":[{"host":"127.0.0.1","port":1980,"weight":1},{"host":"127.0.0.2","port":1988,"weight":1}],"src_id":"1","src_type":"upstreams"}] -{"healthy_nodes":[{"host":"127.0.0.1","port":1980,"weight":1}],"name":"upstream#/upstreams/1","nodes":[{"host":"127.0.0.1","port":1980,"weight":1},{"host":"127.0.0.2","port":1988,"weight":1}],"src_id":"1","src_type":"upstreams"} +[{"healthy_nodes":[{"host":"127.0.0.1","port":1980,"priority":0,"weight":1}],"name":"upstream#/upstreams/1","nodes":[{"host":"127.0.0.1","port":1980,"priority":0,"weight":1},{"host":"127.0.0.2","port":1988,"priority":0,"weight":1}],"src_id":"1","src_type":"upstreams"}] +{"healthy_nodes":[{"host":"127.0.0.1","port":1980,"priority":0,"weight":1}],"name":"upstream#/upstreams/1","nodes":[{"host":"127.0.0.1","port":1980,"priority":0,"weight":1},{"host":"127.0.0.2","port":1988,"priority":0,"weight":1}],"src_id":"1","src_type":"upstreams"} @@ -166,8 +166,8 @@ qr/unhealthy TCP increment \(.+\) for '[^']+'/ unhealthy TCP increment (1/2) for '127.0.0.1(127.0.0.1:1988)' unhealthy TCP increment (2/2) for '127.0.0.1(127.0.0.1:1988)' --- response_body -[{"healthy_nodes":[{"host":"127.0.0.1","port":1980,"weight":1}],"name":"upstream#/routes/1","nodes":[{"host":"127.0.0.1","port":1980,"weight":1},{"host":"127.0.0.1","port":1988,"weight":1}],"src_id":"1","src_type":"routes"}] -{"healthy_nodes":[{"host":"127.0.0.1","port":1980,"weight":1}],"name":"upstream#/routes/1","nodes":[{"host":"127.0.0.1","port":1980,"weight":1},{"host":"127.0.0.1","port":1988,"weight":1}],"src_id":"1","src_type":"routes"} +[{"healthy_nodes":[{"host":"127.0.0.1","port":1980,"priority":0,"weight":1}],"name":"upstream#/routes/1","nodes":[{"host":"127.0.0.1","port":1980,"priority":0,"weight":1},{"host":"127.0.0.1","port":1988,"priority":0,"weight":1}],"src_id":"1","src_type":"routes"}] +{"healthy_nodes":[{"host":"127.0.0.1","port":1980,"priority":0,"weight":1}],"name":"upstream#/routes/1","nodes":[{"host":"127.0.0.1","port":1980,"priority":0,"weight":1},{"host":"127.0.0.1","port":1988,"priority":0,"weight":1}],"src_id":"1","src_type":"routes"} @@ -234,8 +234,8 @@ qr/unhealthy TCP increment \(.+\) for '[^']+'/ unhealthy TCP increment (1/2) for '127.0.0.1(127.0.0.1:1988)' unhealthy TCP increment (2/2) for '127.0.0.1(127.0.0.1:1988)' --- response_body -[{"healthy_nodes":{},"name":"upstream#/services/1","nodes":[{"host":"127.0.0.1","port":1980,"weight":1},{"host":"127.0.0.1","port":1988,"weight":1}],"src_id":"1","src_type":"services"}] -{"healthy_nodes":{},"name":"upstream#/services/1","nodes":[{"host":"127.0.0.1","port":1980,"weight":1},{"host":"127.0.0.1","port":1988,"weight":1}],"src_id":"1","src_type":"services"} +[{"healthy_nodes":{},"name":"upstream#/services/1","nodes":[{"host":"127.0.0.1","port":1980,"priority":0,"weight":1},{"host":"127.0.0.1","port":1988,"priority":0,"weight":1}],"src_id":"1","src_type":"services"}] +{"healthy_nodes":{},"name":"upstream#/services/1","nodes":[{"host":"127.0.0.1","port":1980,"priority":0,"weight":1},{"host":"127.0.0.1","port":1988,"priority":0,"weight":1}],"src_id":"1","src_type":"services"} diff --git a/t/discovery/consul_kv.t b/t/discovery/consul_kv.t index 3a5e3d6d40a9..30752879c70c 100644 --- a/t/discovery/consul_kv.t +++ b/t/discovery/consul_kv.t @@ -461,8 +461,8 @@ upstreams: --- request GET /thc --- response_body -[{"healthy_nodes":[{"host":"127.0.0.1","port":30511,"weight":1}],"name":"upstream#/upstreams/1","nodes":[{"host":"127.0.0.1","port":30511,"weight":1},{"host":"127.0.0.2","port":1988,"weight":1}],"src_id":"1","src_type":"upstreams"}] -{"healthy_nodes":[{"host":"127.0.0.1","port":30511,"weight":1}],"name":"upstream#/upstreams/1","nodes":[{"host":"127.0.0.1","port":30511,"weight":1},{"host":"127.0.0.2","port":1988,"weight":1}],"src_id":"1","src_type":"upstreams"} +[{"healthy_nodes":[{"host":"127.0.0.1","port":30511,"priority":0,"weight":1}],"name":"upstream#/upstreams/1","nodes":[{"host":"127.0.0.1","port":30511,"priority":0,"weight":1},{"host":"127.0.0.2","port":1988,"priority":0,"weight":1}],"src_id":"1","src_type":"upstreams"}] +{"healthy_nodes":[{"host":"127.0.0.1","port":30511,"priority":0,"weight":1}],"name":"upstream#/upstreams/1","nodes":[{"host":"127.0.0.1","port":30511,"priority":0,"weight":1},{"host":"127.0.0.2","port":1988,"priority":0,"weight":1}],"src_id":"1","src_type":"upstreams"} diff --git a/t/node/priority-balancer/health-checker.t b/t/node/priority-balancer/health-checker.t new file mode 100644 index 000000000000..445210bd6caf --- /dev/null +++ b/t/node/priority-balancer/health-checker.t @@ -0,0 +1,187 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +repeat_each(1); +log_level('info'); +no_root_location(); +worker_connections(1024); +no_shuffle(); + +add_block_preprocessor(sub { + my ($block) = @_; + + if ($block->apisix_yaml) { + if (!$block->yaml_config) { + my $yaml_config = <<_EOC_; +apisix: + node_listen: 1984 + config_center: yaml + enable_admin: false +_EOC_ + + $block->set_value("yaml_config", $yaml_config); + } + + my $route = <<_EOC_; +routes: + - + upstream_id: 1 + uris: + - /hello +#END +_EOC_ + + $block->set_value("apisix_yaml", $block->apisix_yaml . $route); + } + + if (!$block->request) { + $block->set_value("request", "GET /hello"); + } + + if ((!defined $block->error_log) && (!defined $block->no_error_log)) { + $block->set_value("no_error_log", "[error]"); + } +}); + +run_tests(); + +__DATA__ + +=== TEST 1: all are down detected by health checker +--- apisix_yaml +upstreams: + - + id: 1 + type: least_conn + nodes: + - host: 127.0.0.1 + port: 1979 + weight: 2 + priority: 123 + - host: 127.0.0.2 + port: 1979 + weight: 3 + priority: -1 + checks: + active: + http_path: "/status" + healthy: + interval: 1 + successes: 1 + unhealthy: + interval: 1 + http_failures: 1 +--- config + location /t { + content_by_lua_block { + local http = require "resty.http" + local uri = "http://127.0.0.1:" .. ngx.var.server_port + .. "/hello" + local httpc = http.new() + httpc:request_uri(uri, {method = "GET"}) + ngx.sleep(2.5) + -- still use all nodes + httpc:request_uri(uri, {method = "GET"}) + } + } +--- request +GET /t +--- error_log +connect() failed +unhealthy TCP increment (2/2) for '(127.0.0.1:1979) +unhealthy TCP increment (2/2) for '(127.0.0.2:1979) +--- grep_error_log eval +qr/proxy request to \S+/ +--- grep_error_log_out +proxy request to 127.0.0.1:1979 +proxy request to 127.0.0.2:1979 +proxy request to 127.0.0.1:1979 +proxy request to 127.0.0.2:1979 + + + +=== TEST 2: use priority as backup (setup rule) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "uri": "/hello", + "upstream": { + "type": "roundrobin", + "nodes": [ + {"host": "127.0.0.1", "port": 1979, "weight": 2000}, + {"host": "127.0.0.1", "port": 1980, + "weight": 1, "priority": -1} + ], + "checks": { + "active": { + "http_path": "/status", + "healthy": { + "interval": 1, + "successes": 1 + }, + "unhealthy": { + "interval": 1, + "http_failures": 1 + } + } + } + } + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed + + + +=== TEST 3: use priority as backup +--- config + location /t { + content_by_lua_block { + local http = require "resty.http" + local uri = "http://127.0.0.1:" .. ngx.var.server_port + .. "/hello" + local httpc = http.new() + httpc:request_uri(uri, {method = "GET"}) + ngx.sleep(2.5) + httpc:request_uri(uri, {method = "GET"}) + } + } +--- request +GET /t +--- error_log +connect() failed +unhealthy TCP increment (2/2) for '(127.0.0.1:1979) +--- grep_error_log eval +qr/proxy request to \S+/ +--- grep_error_log_out +proxy request to 127.0.0.1:1979 +proxy request to 127.0.0.1:1980 +proxy request to 127.0.0.1:1980 diff --git a/t/node/priority-balancer/sanity.t b/t/node/priority-balancer/sanity.t new file mode 100644 index 000000000000..aeb9735343fd --- /dev/null +++ b/t/node/priority-balancer/sanity.t @@ -0,0 +1,332 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +repeat_each(2); # repeat each test to ensure after_balance is called correctly +log_level('info'); +no_root_location(); +worker_connections(1024); +no_shuffle(); + +add_block_preprocessor(sub { + my ($block) = @_; + + if ($block->apisix_yaml) { + if (!$block->yaml_config) { + my $yaml_config = <<_EOC_; +apisix: + node_listen: 1984 + config_center: yaml + enable_admin: false +_EOC_ + + $block->set_value("yaml_config", $yaml_config); + } + + my $route = <<_EOC_; +routes: + - + upstream_id: 1 + uris: + - /hello + - /mysleep +#END +_EOC_ + + $block->set_value("apisix_yaml", $block->apisix_yaml . $route); + } + + if (!$block->request) { + $block->set_value("request", "GET /hello"); + } + + if ((!defined $block->error_log) && (!defined $block->no_error_log)) { + $block->set_value("no_error_log", "[error]"); + } +}); + +run_tests(); + +__DATA__ + +=== TEST 1: sanity +--- apisix_yaml +upstreams: + - + id: 1 + type: least_conn + nodes: + - host: 127.0.0.1 + port: 1979 + weight: 2 + priority: 1 + - host: 127.0.0.2 + port: 1979 + weight: 1 + priority: 1 + - host: 127.0.0.3 + port: 1979 + weight: 2 + priority: 0 + - host: 127.0.0.4 + port: 1979 + weight: 1 + priority: 0 + - host: 127.0.0.1 + port: 1980 + weight: 2 + priority: -1 +--- response_body +hello world +--- error_log +connect() failed +failed to get server from current priority 1, try next one +failed to get server from current priority 0, try next one +--- grep_error_log eval +qr/proxy request to \S+/ +--- grep_error_log_out +proxy request to 127.0.0.1:1979 +proxy request to 127.0.0.2:1979 +proxy request to 127.0.0.3:1979 +proxy request to 127.0.0.4:1979 +proxy request to 127.0.0.1:1980 + + + +=== TEST 2: all failed +--- apisix_yaml +upstreams: + - + id: 1 + type: least_conn + nodes: + - host: 127.0.0.1 + port: 1979 + weight: 2 + priority: 1 + - host: 127.0.0.2 + port: 1979 + weight: 1 + priority: 0 + - host: 127.0.0.1 + port: 1979 + weight: 2 + priority: -1 +--- error_code: 502 +--- error_log +connect() failed +--- grep_error_log eval +qr/proxy request to \S+/ +--- grep_error_log_out +proxy request to 127.0.0.1:1979 +proxy request to 127.0.0.2:1979 +proxy request to 127.0.0.1:1979 + + + +=== TEST 3: default priority is zero +--- apisix_yaml +upstreams: + - + id: 1 + type: least_conn + nodes: + - host: 127.0.0.1 + port: 1979 + weight: 2 + priority: 1 + - host: 127.0.0.2 + port: 1979 + weight: 1 + - host: 127.0.0.1 + port: 1980 + weight: 2 + priority: -1 +--- response_body +hello world +--- error_log +connect() failed +--- grep_error_log eval +qr/proxy request to \S+/ +--- grep_error_log_out +proxy request to 127.0.0.1:1979 +proxy request to 127.0.0.2:1979 +proxy request to 127.0.0.1:1980 + + + +=== TEST 4: least_conn +--- apisix_yaml +upstreams: + - + id: 1 + type: least_conn + nodes: + - host: 127.0.0.1 + port: 1979 + weight: 2 + priority: 1 + - host: 127.0.0.1 + port: 1980 + weight: 3 + priority: -1 + - host: 0.0.0.0 + port: 1980 + weight: 2 + priority: -1 +--- config + location /t { + content_by_lua_block { + local http = require "resty.http" + local uri = "http://127.0.0.1:" .. ngx.var.server_port + .. "/mysleep?seconds=0.1" + + local t = {} + for i = 1, 3 do + local th = assert(ngx.thread.spawn(function(i) + local httpc = http.new() + local res, err = httpc:request_uri(uri..i, {method = "GET"}) + if not res then + ngx.log(ngx.ERR, err) + return + end + end, i)) + table.insert(t, th) + end + for i, th in ipairs(t) do + ngx.thread.wait(th) + end + } + } +--- request +GET /t +--- error_log +connect() failed +--- grep_error_log eval +qr/proxy request to \S+ while connecting to upstream/ +--- grep_error_log_out +proxy request to 127.0.0.1:1979 while connecting to upstream +proxy request to 127.0.0.1:1979 while connecting to upstream +proxy request to 127.0.0.1:1979 while connecting to upstream +proxy request to 127.0.0.1:1980 while connecting to upstream +proxy request to 0.0.0.0:1980 while connecting to upstream +proxy request to 127.0.0.1:1980 while connecting to upstream + + + +=== TEST 5: roundrobin +--- apisix_yaml +upstreams: + - + id: 1 + type: roundrobin + nodes: + - host: 127.0.0.1 + port: 1979 + weight: 2 + priority: 1 + - host: 127.0.0.2 + port: 1979 + weight: 1 + priority: 1 + - host: 127.0.0.3 + port: 1979 + weight: 2 + priority: -1 + - host: 127.0.0.4 + port: 1979 + weight: 1 + priority: -1 +--- error_code: 502 +--- error_log +connect() failed +--- grep_error_log eval +qr/proxy request to \S+/ +--- grep_error_log_out +proxy request to 127.0.0.1:1979 +proxy request to 127.0.0.2:1979 +proxy request to 127.0.0.3:1979 +proxy request to 127.0.0.4:1979 + + + +=== TEST 6: ewma +--- apisix_yaml +upstreams: + - + id: 1 + type: ewma + key: remote_addr + nodes: + - host: 127.0.0.1 + port: 1979 + weight: 2 + priority: 1 + - host: 127.0.0.2 + port: 1979 + weight: 1 + priority: 0 + - host: 127.0.0.3 + port: 1979 + weight: 2 + priority: -1 +--- error_code: 502 +--- error_log +connect() failed +--- grep_error_log eval +qr/proxy request to \S+/ +--- grep_error_log_out +proxy request to 127.0.0.1:1979 +proxy request to 127.0.0.2:1979 +proxy request to 127.0.0.3:1979 + + + +=== TEST 7: chash +--- apisix_yaml +upstreams: + - + id: 1 + type: chash + key: remote_addr + nodes: + - host: 127.0.0.1 + port: 1979 + weight: 2 + priority: 1 + - host: 127.0.0.2 + port: 1979 + weight: 1 + priority: 1 + - host: 127.0.0.3 + port: 1979 + weight: 2 + priority: -1 + - host: 127.0.0.4 + port: 1979 + weight: 1 + priority: -1 +--- error_code: 502 +--- error_log +connect() failed +--- grep_error_log eval +qr/proxy request to \S+/ +--- grep_error_log_out +proxy request to 127.0.0.1:1979 +proxy request to 127.0.0.2:1979 +proxy request to 127.0.0.4:1979 +proxy request to 127.0.0.3:1979 diff --git a/t/node/upstream-discovery.t b/t/node/upstream-discovery.t index eba81514248c..6b51d7c5095f 100644 --- a/t/node/upstream-discovery.t +++ b/t/node/upstream-discovery.t @@ -230,7 +230,6 @@ routes: local httpc = http.new() local res, err = httpc:request_uri(uri, {method = "GET", keepalive = false}) ngx.say(res.status) - discovery.mock = { nodes = function() return { @@ -250,3 +249,86 @@ qr/create_obj_fun\(\): upstream nodes:/ create_obj_fun(): upstream nodes: --- error_log connect() failed + + + +=== TEST 5: create new server picker when priority change +--- apisix_yaml +routes: + - + uris: + - /hello + upstream_id: 1 +--- config + location /t { + content_by_lua_block { + local discovery = require("apisix.discovery.init").discovery + discovery.mock = { + nodes = function() + return { + {host = "127.0.0.1", port = 1980, weight = 1}, + {host = "0.0.0.0", port = 1980, weight = 1}, + } + end + } + local http = require "resty.http" + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local httpc = http.new() + local res, err = httpc:request_uri(uri, {method = "GET", keepalive = false}) + ngx.say(res.status) + + discovery.mock = { + nodes = function() + return { + {host = "127.0.0.1", port = 1980, weight = 1}, + {host = "0.0.0.0", port = 1980, weight = 1, priority = 1}, + } + end + } + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local httpc = http.new() + local res, err = httpc:request_uri(uri, {method = "GET", keepalive = false}) + } + } +--- grep_error_log eval +qr/create_obj_fun\(\): upstream nodes:/ +--- grep_error_log_out +create_obj_fun(): upstream nodes: +create_obj_fun(): upstream nodes: + + + +=== TEST 6: default priority of discovered node is 0 +--- apisix_yaml +routes: + - + uris: + - /hello + upstream_id: 1 +--- config + location /t { + content_by_lua_block { + local discovery = require("apisix.discovery.init").discovery + discovery.mock = { + nodes = function() + return { + {host = "127.0.0.1", port = 1979, weight = 1, priority = 1}, + {host = "0.0.0.0", port = 1980, weight = 1}, + {host = "127.0.0.2", port = 1979, weight = 1, priority = -1}, + } + end + } + local http = require "resty.http" + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local httpc = http.new() + local res, err = httpc:request_uri(uri, {method = "GET", keepalive = false}) + ngx.say(res.status) + } + } +--- error_log +connect() failed +--- grep_error_log eval +qr/proxy request to \S+/ +--- grep_error_log_out +proxy request to 127.0.0.1:1979 +proxy request to 0.0.0.0:1980