Skip to content

Commit

Permalink
feat(xRPC): support dynamic upstream with upstream_id (apache#6919)
Browse files Browse the repository at this point in the history
Signed-off-by: spacewander <spacewanderlzx@gmail.com>
  • Loading branch information
spacewander authored and Liu-Junlin committed May 20, 2022
1 parent 081b21f commit 338be6b
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 95 deletions.
110 changes: 21 additions & 89 deletions apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ local apisix_ssl = require("apisix.ssl")
local upstream_util = require("apisix.utils.upstream")
local xrpc = require("apisix.stream.xrpc")
local ctxdump = require("resty.ctxdump")
local ipmatcher = require("resty.ipmatcher")
local ngx_balancer = require("ngx.balancer")
local debug = require("apisix.debug")
local ngx = ngx
Expand All @@ -49,7 +48,6 @@ local ngx_exit = ngx.exit
local math = math
local error = error
local ipairs = ipairs
local tostring = tostring
local ngx_now = ngx.now
local ngx_var = ngx.var
local str_byte = string.byte
Expand Down Expand Up @@ -169,61 +167,9 @@ function _M.http_ssl_phase()
end




local function parse_domain_for_nodes(nodes)
local new_nodes = core.table.new(#nodes, 0)
for _, node in ipairs(nodes) do
local host = node.host
if not ipmatcher.parse_ipv4(host) and
not ipmatcher.parse_ipv6(host) then
local ip, err = core.resolver.parse_domain(host)
if ip then
local new_node = core.table.clone(node)
new_node.host = ip
new_node.domain = host
core.table.insert(new_nodes, new_node)
end

if err then
core.log.error("dns resolver domain: ", host, " error: ", err)
end
else
core.table.insert(new_nodes, node)
end
end
return new_nodes
end


local function parse_domain_in_up(up)
local nodes = up.value.nodes
local new_nodes, err = parse_domain_for_nodes(nodes)
if not new_nodes then
return nil, err
end

local ok = upstream_util.compare_upstream_node(up.dns_value, new_nodes)
if ok then
return up
end

if not up.orig_modifiedIndex then
up.orig_modifiedIndex = up.modifiedIndex
end
up.modifiedIndex = up.orig_modifiedIndex .. "#" .. ngx_now()

up.dns_value = core.table.clone(up.value)
up.dns_value.nodes = new_nodes
core.log.info("resolve upstream which contain domain: ",
core.json.delay_encode(up, true))
return up
end


local function parse_domain_in_route(route)
local nodes = route.value.upstream.nodes
local new_nodes, err = parse_domain_for_nodes(nodes)
local new_nodes, err = upstream_util.parse_domain_for_nodes(nodes)
if not new_nodes then
return nil, err
end
Expand Down Expand Up @@ -281,38 +227,6 @@ local function set_upstream_headers(api_ctx, picked_server)
end


local function get_upstream_by_id(up_id)
local upstreams = core.config.fetch_created_obj("/upstreams")
if upstreams then
local upstream = upstreams:get(tostring(up_id))
if not upstream then
core.log.error("failed to find upstream by id: " .. up_id)
if is_http then
return core.response.exit(502)
end

return ngx_exit(1)
end

if upstream.has_domain then
local err
upstream, err = parse_domain_in_up(upstream)
if err then
core.log.error("failed to get resolved upstream: ", err)
if is_http then
return core.response.exit(500)
end

return ngx_exit(1)
end
end

core.log.info("parsed upstream: ", core.json.delay_encode(upstream, true))
return upstream.dns_value or upstream.value
end
end


local function verify_tls_client(ctx)
local matched = router.router_ssl.match_and_set(ctx, true)
if not matched then
Expand Down Expand Up @@ -482,7 +396,15 @@ function _M.http_access_phase()
end

if up_id then
local upstream = get_upstream_by_id(up_id)
local upstream = apisix_upstream.get_by_id(up_id)
if not upstream then
if is_http then
return core.response.exit(502)
end

return ngx_exit(1)
end

api_ctx.matched_upstream = upstream

else
Expand Down Expand Up @@ -905,7 +827,17 @@ function _M.stream_preread_phase()

local up_id = matched_route.value.upstream_id
if up_id then
api_ctx.matched_upstream = get_upstream_by_id(up_id)
local upstream = apisix_upstream.get_by_id(up_id)
if not upstream then
if is_http then
return core.response.exit(502)
end

return ngx_exit(1)
end

api_ctx.matched_upstream = upstream

else
if matched_route.has_domain then
local err
Expand Down
15 changes: 12 additions & 3 deletions apisix/stream/xrpc/sdk.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
local core = require("apisix.core")
local config_util = require("apisix.core.config_util")
local router = require("apisix.stream.router.ip_port")
local apisix_upstream = require("apisix.upstream")
local xrpc_socket = require("resty.apisix.stream.xrpc.socket")
local ngx_now = ngx.now
local str_fmt = string.format
local tab_insert = table.insert
local error = error
local tostring = tostring
Expand Down Expand Up @@ -153,18 +155,25 @@ end
-- @function xrpc.sdk.set_upstream
-- @tparam table xrpc session
-- @tparam table the route configuration
-- @treturn nil|string error message if present
function _M.set_upstream(session, conf)
local up
if conf.upstream then
up = conf.upstream
-- TODO: support upstream_id
else
local id = conf.upstream_id
up = apisix_upstream.get_by_id(id)
if not up then
return str_fmt("upstream %s can't be got", id)
end
end

local key = tostring(conf)
core.log.info("set upstream to: ", key)
local key = tostring(up)
core.log.info("set upstream to: ", key, " conf: ", core.json.delay_encode(up, true))

session._upstream_key = key
session.upstream_conf = up
return nil
end


Expand Down
26 changes: 26 additions & 0 deletions apisix/upstream.lua
Original file line number Diff line number Diff line change
Expand Up @@ -544,4 +544,30 @@ function _M.init_worker()
end


function _M.get_by_id(up_id)
local upstream
local upstreams = core.config.fetch_created_obj("/upstreams")
if upstreams then
upstream = upstreams:get(tostring(up_id))
end

if not upstream then
core.log.error("failed to find upstream by id: ", up_id)
return nil
end

if upstream.has_domain then
local err
upstream, err = upstream_util.parse_domain_in_up(upstream)
if err then
core.log.error("failed to get resolved upstream: ", err)
return nil
end
end

core.log.info("parsed upstream: ", core.json.delay_encode(upstream, true))
return upstream.dns_value or upstream.value
end


return _M
56 changes: 55 additions & 1 deletion apisix/utils/upstream.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
-- limitations under the License.
--
local core = require("apisix.core")
local ipmatcher = require("resty.ipmatcher")
local ngx_now = ngx.now
local ipairs = ipairs
local type = type

Expand All @@ -27,7 +29,7 @@ local function sort_by_key_host(a, b)
end


function _M.compare_upstream_node(up_conf, new_t)
local function compare_upstream_node(up_conf, new_t)
if up_conf == nil then
return false
end
Expand Down Expand Up @@ -56,6 +58,58 @@ function _M.compare_upstream_node(up_conf, new_t)

return true
end
_M.compare_upstream_node = compare_upstream_node


local function parse_domain_for_nodes(nodes)
local new_nodes = core.table.new(#nodes, 0)
for _, node in ipairs(nodes) do
local host = node.host
if not ipmatcher.parse_ipv4(host) and
not ipmatcher.parse_ipv6(host) then
local ip, err = core.resolver.parse_domain(host)
if ip then
local new_node = core.table.clone(node)
new_node.host = ip
new_node.domain = host
core.table.insert(new_nodes, new_node)
end

if err then
core.log.error("dns resolver domain: ", host, " error: ", err)
end
else
core.table.insert(new_nodes, node)
end
end
return new_nodes
end
_M.parse_domain_for_nodes = parse_domain_for_nodes


function _M.parse_domain_in_up(up)
local nodes = up.value.nodes
local new_nodes, err = parse_domain_for_nodes(nodes)
if not new_nodes then
return nil, err
end

local ok = compare_upstream_node(up.dns_value, new_nodes)
if ok then
return up
end

if not up.orig_modifiedIndex then
up.orig_modifiedIndex = up.modifiedIndex
end
up.modifiedIndex = up.orig_modifiedIndex .. "#" .. ngx_now()

up.dns_value = core.table.clone(up.value)
up.dns_value.nodes = new_nodes
core.log.info("resolve upstream which contain domain: ",
core.json.delay_encode(up, true))
return up
end


return _M
6 changes: 5 additions & 1 deletion t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,11 @@ function _M.from_downstream(session, downstream)

local conf = router[ctx.service]
if conf then
sdk.set_upstream(session, conf)
local err = sdk.set_upstream(session, conf)
if err then
core.log.error("failed to set upstream: ", err)
return DECLINED
end
end
end

Expand Down
Loading

0 comments on commit 338be6b

Please sign in to comment.