Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(xRPC): support dynamic upstream with upstream_id #6919

Merged
merged 1 commit into from
Apr 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 21 additions & 89 deletions apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ local set_upstream = apisix_upstream.set_by_route
local upstream_util = require("apisix.utils.upstream")
local xrpc = require("apisix.stream.xrpc")
local ctxdump = require("resty.ctxdump")
local ipmatcher = require("resty.ipmatcher")
local ngx_balancer = require("ngx.balancer")
local debug = require("apisix.debug")
local ngx = ngx
Expand All @@ -48,7 +47,6 @@ local ngx_exit = ngx.exit
local math = math
local error = error
local ipairs = ipairs
local tostring = tostring
local ngx_now = ngx.now
local ngx_var = ngx.var
local str_byte = string.byte
Expand Down Expand Up @@ -168,61 +166,9 @@ function _M.http_ssl_phase()
end




local function parse_domain_for_nodes(nodes)
local new_nodes = core.table.new(#nodes, 0)
for _, node in ipairs(nodes) do
local host = node.host
if not ipmatcher.parse_ipv4(host) and
not ipmatcher.parse_ipv6(host) then
local ip, err = core.resolver.parse_domain(host)
if ip then
local new_node = core.table.clone(node)
new_node.host = ip
new_node.domain = host
core.table.insert(new_nodes, new_node)
end

if err then
core.log.error("dns resolver domain: ", host, " error: ", err)
end
else
core.table.insert(new_nodes, node)
end
end
return new_nodes
end


local function parse_domain_in_up(up)
local nodes = up.value.nodes
local new_nodes, err = parse_domain_for_nodes(nodes)
if not new_nodes then
return nil, err
end

local ok = upstream_util.compare_upstream_node(up.dns_value, new_nodes)
if ok then
return up
end

if not up.orig_modifiedIndex then
up.orig_modifiedIndex = up.modifiedIndex
end
up.modifiedIndex = up.orig_modifiedIndex .. "#" .. ngx_now()

up.dns_value = core.table.clone(up.value)
up.dns_value.nodes = new_nodes
core.log.info("resolve upstream which contain domain: ",
core.json.delay_encode(up, true))
return up
end


local function parse_domain_in_route(route)
local nodes = route.value.upstream.nodes
local new_nodes, err = parse_domain_for_nodes(nodes)
local new_nodes, err = upstream_util.parse_domain_for_nodes(nodes)
if not new_nodes then
return nil, err
end
Expand Down Expand Up @@ -280,38 +226,6 @@ local function set_upstream_headers(api_ctx, picked_server)
end


local function get_upstream_by_id(up_id)
local upstreams = core.config.fetch_created_obj("/upstreams")
if upstreams then
local upstream = upstreams:get(tostring(up_id))
if not upstream then
core.log.error("failed to find upstream by id: " .. up_id)
if is_http then
return core.response.exit(502)
end

return ngx_exit(1)
end

if upstream.has_domain then
local err
upstream, err = parse_domain_in_up(upstream)
if err then
core.log.error("failed to get resolved upstream: ", err)
if is_http then
return core.response.exit(500)
end

return ngx_exit(1)
end
end

core.log.info("parsed upstream: ", core.json.delay_encode(upstream, true))
return upstream.dns_value or upstream.value
end
end


local function verify_tls_client(ctx)
if ctx and ctx.ssl_client_verified then
local res = ngx_var.ssl_client_verify
Expand Down Expand Up @@ -475,7 +389,15 @@ function _M.http_access_phase()
end

if up_id then
local upstream = get_upstream_by_id(up_id)
local upstream = apisix_upstream.get_by_id(up_id)
if not upstream then
if is_http then
return core.response.exit(502)
end

return ngx_exit(1)
end

api_ctx.matched_upstream = upstream

else
Expand Down Expand Up @@ -898,7 +820,17 @@ function _M.stream_preread_phase()

local up_id = matched_route.value.upstream_id
if up_id then
api_ctx.matched_upstream = get_upstream_by_id(up_id)
local upstream = apisix_upstream.get_by_id(up_id)
if not upstream then
if is_http then
return core.response.exit(502)
end

return ngx_exit(1)
end

api_ctx.matched_upstream = upstream

else
if matched_route.has_domain then
local err
Expand Down
15 changes: 12 additions & 3 deletions apisix/stream/xrpc/sdk.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
local core = require("apisix.core")
local config_util = require("apisix.core.config_util")
local router = require("apisix.stream.router.ip_port")
local apisix_upstream = require("apisix.upstream")
local xrpc_socket = require("resty.apisix.stream.xrpc.socket")
local ngx_now = ngx.now
local str_fmt = string.format
local tab_insert = table.insert
local error = error
local tostring = tostring
Expand Down Expand Up @@ -153,18 +155,25 @@ end
-- @function xrpc.sdk.set_upstream
-- @tparam table xrpc session
-- @tparam table the route configuration
-- @treturn nil|string error message if present
function _M.set_upstream(session, conf)
local up
if conf.upstream then
up = conf.upstream
-- TODO: support upstream_id
else
local id = conf.upstream_id
up = apisix_upstream.get_by_id(id)
if not up then
return str_fmt("upstream %s can't be got", id)
end
end

local key = tostring(conf)
core.log.info("set upstream to: ", key)
local key = tostring(up)
core.log.info("set upstream to: ", key, " conf: ", core.json.delay_encode(up, true))

session._upstream_key = key
session.upstream_conf = up
return nil
end


Expand Down
26 changes: 26 additions & 0 deletions apisix/upstream.lua
Original file line number Diff line number Diff line change
Expand Up @@ -542,4 +542,30 @@ function _M.init_worker()
end


function _M.get_by_id(up_id)
local upstream
local upstreams = core.config.fetch_created_obj("/upstreams")
if upstreams then
upstream = upstreams:get(tostring(up_id))
end

if not upstream then
core.log.error("failed to find upstream by id: ", up_id)
return nil
end

if upstream.has_domain then
local err
upstream, err = upstream_util.parse_domain_in_up(upstream)
if err then
core.log.error("failed to get resolved upstream: ", err)
return nil
end
end

core.log.info("parsed upstream: ", core.json.delay_encode(upstream, true))
return upstream.dns_value or upstream.value
end


return _M
56 changes: 55 additions & 1 deletion apisix/utils/upstream.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
-- limitations under the License.
--
local core = require("apisix.core")
local ipmatcher = require("resty.ipmatcher")
local ngx_now = ngx.now
local ipairs = ipairs
local type = type

Expand All @@ -27,7 +29,7 @@ local function sort_by_key_host(a, b)
end


function _M.compare_upstream_node(up_conf, new_t)
local function compare_upstream_node(up_conf, new_t)
if up_conf == nil then
return false
end
Expand Down Expand Up @@ -56,6 +58,58 @@ function _M.compare_upstream_node(up_conf, new_t)

return true
end
_M.compare_upstream_node = compare_upstream_node


local function parse_domain_for_nodes(nodes)
local new_nodes = core.table.new(#nodes, 0)
for _, node in ipairs(nodes) do
local host = node.host
if not ipmatcher.parse_ipv4(host) and
not ipmatcher.parse_ipv6(host) then
local ip, err = core.resolver.parse_domain(host)
if ip then
local new_node = core.table.clone(node)
new_node.host = ip
new_node.domain = host
core.table.insert(new_nodes, new_node)
end

if err then
core.log.error("dns resolver domain: ", host, " error: ", err)
end
else
core.table.insert(new_nodes, node)
end
end
return new_nodes
end
_M.parse_domain_for_nodes = parse_domain_for_nodes


function _M.parse_domain_in_up(up)
local nodes = up.value.nodes
local new_nodes, err = parse_domain_for_nodes(nodes)
if not new_nodes then
return nil, err
end

local ok = compare_upstream_node(up.dns_value, new_nodes)
if ok then
return up
end

if not up.orig_modifiedIndex then
up.orig_modifiedIndex = up.modifiedIndex
end
up.modifiedIndex = up.orig_modifiedIndex .. "#" .. ngx_now()

up.dns_value = core.table.clone(up.value)
up.dns_value.nodes = new_nodes
core.log.info("resolve upstream which contain domain: ",
core.json.delay_encode(up, true))
return up
end


return _M
6 changes: 5 additions & 1 deletion t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,11 @@ function _M.from_downstream(session, downstream)

local conf = router[ctx.service]
if conf then
sdk.set_upstream(session, conf)
local err = sdk.set_upstream(session, conf)
if err then
core.log.error("failed to set upstream: ", err)
return DECLINED
end
end
end

Expand Down
Loading