Skip to content

Commit

Permalink
feat(xRPC): support dynamic upstream (apache#6901)
Browse files Browse the repository at this point in the history
Signed-off-by: spacewander <spacewanderlzx@gmail.com>
  • Loading branch information
spacewander authored and Liu-Junlin committed May 20, 2022
1 parent c64ce4a commit b448bea
Show file tree
Hide file tree
Showing 8 changed files with 434 additions and 36 deletions.
1 change: 1 addition & 0 deletions apisix/schema_def.lua
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,7 @@ local xrpc_protocol_schema = {
name = {
type = "string",
},
superior_id = id_schema,
conf = {
description = "protocol-specific configuration",
type = "object",
Expand Down
6 changes: 6 additions & 0 deletions apisix/stream/router/ip_port.lua
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ do
end

local route = item.value
if route.protocol and route.protocol.superior_id then
-- subordinate route won't be matched in the entry
-- TODO: check the subordinate relationship in the Admin API
goto CONTINUE
end

if item.value.remote_addr then
item.value.remote_addr_matcher = core_ip.create_ip_matcher({item.value.remote_addr})
end
Expand Down
13 changes: 6 additions & 7 deletions apisix/stream/xrpc/protocols/redis/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,13 @@ local function read_len(sk)
end


local function read_req(sk)
local function read_req(sk, ctx)
local narg, err = read_len(sk)
if not narg then
return nil, err
end

local ctx = {
cmd_line = core.table.new(narg, 0)
}
ctx.cmd_line = core.table.new(narg, 0)

for i = 1, narg do
local n, err = read_len(sk)
Expand Down Expand Up @@ -116,7 +114,7 @@ local function read_req(sk)
end

ctx.cmd = ctx.cmd_line[1]
return ctx
return true
end


Expand Down Expand Up @@ -195,8 +193,9 @@ end


function _M.from_downstream(session, downstream)
local ctx, err = read_req(downstream)
if not ctx then
local ctx = sdk.get_req_ctx(session, 0)
local ok, err = read_req(downstream, ctx)
if not ok then
if err ~= "timeout" and err ~= "closed" then
core.log.error("failed to read request: ", err)
end
Expand Down
70 changes: 57 additions & 13 deletions apisix/stream/xrpc/runner.lua
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ local _M = {}

local function open_session(conn_ctx)
conn_ctx.xrpc_session = {
upstream_conf = conn_ctx.matched_upstream,
_upstream_conf = conn_ctx.matched_upstream,
-- fields start with '_' should not be accessed by the protocol implementation
_route = conn_ctx.matched_route.value,
_ctxs = {},
}
return conn_ctx.xrpc_session
Expand All @@ -44,8 +46,18 @@ local function close_session(session, protocol)
protocol.disconnect_upstream(session, up, upstream_ctx.broken)
end

local upstream_ctxs = session._upstream_ctxs
if upstream_ctxs then
for _, upstream_ctx in pairs(upstream_ctxs) do
upstream_ctx.closed = true

local up = upstream_ctx.upstream
protocol.disconnect_upstream(session, up, upstream_ctx.broken)
end
end

for id in pairs(session._ctxs) do
core.log.info("RPC is not finished, id: ", id)
core.log.notice("RPC is not finished, id: ", id)
end
end

Expand All @@ -67,21 +79,43 @@ end


local function open_upstream(protocol, session, ctx)
if session._upstream_ctx then
return OK, session._upstream_ctx
local key = session._upstream_key
session._upstream_key = nil

if key then
if not session._upstream_ctxs then
session._upstream_ctxs = {}
end

local up_ctx = session._upstream_ctxs[key]
if up_ctx then
return OK, up_ctx
end
else
if session._upstream_ctx then
return OK, session._upstream_ctx
end

session.upstream_conf = session._upstream_conf
end

local state, upstream = protocol.connect_upstream(session, session)
if state ~= OK then
return state, nil
end

session._upstream_ctx = {
local up_ctx = {
upstream = upstream,
broken = false,
closed = false,
}
return OK, session._upstream_ctx
if key then
session._upstream_ctxs[key] = up_ctx
else
session._upstream_ctx = up_ctx
end

return OK, up_ctx
end


Expand Down Expand Up @@ -135,18 +169,28 @@ function _M.run(protocol, conn_ctx)
-- need to do some auth/routing jobs before reaching upstream
local status, up_ctx = open_upstream(protocol, session, ctx)
if status ~= OK then
if ctx ~= nil then
finish_req(protocol, session, ctx)
end

break
end

status = protocol.to_upstream(session, ctx, downstream, up_ctx.upstream)
if status == DECLINED then
up_ctx.broken = true
break
end
if status ~= OK then
if ctx ~= nil then
finish_req(protocol, session, ctx)
end

if status == DONE then
-- for Unary request we can directly reply here
goto continue
if status == DECLINED then
up_ctx.broken = true
break
end

if status == DONE then
-- for Unary request we can directly reply here
goto continue
end
end

if not up_ctx.coroutine then
Expand Down
68 changes: 68 additions & 0 deletions apisix/stream/xrpc/sdk.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
--
-- @module xrpc.sdk
local core = require("apisix.core")
local config_util = require("apisix.core.config_util")
local router = require("apisix.stream.router.ip_port")
local xrpc_socket = require("resty.apisix.stream.xrpc.socket")
local ngx_now = ngx.now
local tab_insert = table.insert
local error = error
local tostring = tostring


local _M = {}
Expand Down Expand Up @@ -92,6 +96,7 @@ function _M.get_req_ctx(session, id)
end

local ctx = core.tablepool.fetch("xrpc_ctxs", 4, 4)
-- fields start with '_' should not be accessed by the protocol implementation
ctx._id = id
session._ctxs[id] = ctx

Expand All @@ -100,4 +105,67 @@ function _M.get_req_ctx(session, id)
end


---
-- Returns the new router if the stream routes are changed
--
-- @function xrpc.sdk.get_router
-- @tparam table xrpc session
-- @tparam string the current router version, should come from the last call
-- @treturn boolean whether there is a change
-- @treturn table the new router under the specific protocol
-- @treturn string the new router version
function _M.get_router(session, version)
local protocol_name = session._route.protocol.name
local id = session._route.id

local items, conf_version = router.routes()
if version == conf_version then
return false
end

local proto_router = {}
for _, item in config_util.iterate_values(items) do
if item.value == nil then
goto CONTINUE
end

local route = item.value
if route.protocol.name ~= protocol_name then
goto CONTINUE
end

if tostring(route.protocol.superior_id) ~= id then
goto CONTINUE
end

tab_insert(proto_router, route)

::CONTINUE::
end

return true, proto_router, conf_version
end


---
-- Set the session's current upstream according to the route's configuration
--
-- @function xrpc.sdk.set_upstream
-- @tparam table xrpc session
-- @tparam table the route configuration
function _M.set_upstream(session, conf)
local up
if conf.upstream then
up = conf.upstream
-- TODO: support upstream_id
end

local key = tostring(conf)
core.log.info("set upstream to: ", key)

session._upstream_key = key
session.upstream_conf = up
end


return _M
56 changes: 51 additions & 5 deletions t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,27 @@ local bit = require("bit")
local lshift = bit.lshift
local ffi = require("ffi")
local ffi_str = ffi.string
local ipairs = ipairs
local math_random = math.random
local OK = ngx.OK
local DECLINED = ngx.DECLINED
local DONE = ngx.DONE
local str_byte = string.byte


local _M = {}
local router_version
local router
-- pingpong protocol is designed to use in the test of xRPC.
-- It contains two part: a fixed-length header & a body.
-- Header format:
-- "pp" (magic number) + 1 bytes req type + 2 bytes stream id + 1 reserved bytes
-- + 4 bytes body length
local _M = {}
-- + 4 bytes body length + optional 4 bytes service name
local HDR_LEN = 10
local TYPE_HEARTBEAT = 1
local TYPE_UNARY = 2
local TYPE_STREAM = 3
local TYPE_UNARY_DYN_UP = 4


function _M.init_worker()
Expand All @@ -48,7 +52,7 @@ end
function _M.init_downstream(session)
-- create the downstream
local sk = xrpc_socket.downstream.socket()
sk:settimeout(10) -- the short timeout is just for test
sk:settimeout(1000) -- the short timeout is just for test
return sk
end

Expand Down Expand Up @@ -104,15 +108,54 @@ function _M.from_downstream(session, downstream)
local body_len = to_int32(p, 6)
core.log.info("read body len: ", body_len)

if typ == TYPE_UNARY_DYN_UP then
local p = read_data(downstream, 4, false)
if p == nil then
return DECLINED
end

local len = 4
for i = 0, 3 do
if p[i] == 0 then
len = i
break
end
end
local service = ffi_str(p, len)
core.log.info("get service [", service, "]")
ctx.service = service

local changed, raw_router, version = sdk.get_router(session, router_version)
if changed then
router_version = version
router = {}

for _, r in ipairs(raw_router) do
local conf = r.protocol.conf
if conf and conf.service then
router[conf.service] = r
end
end
end

local conf = router[ctx.service]
if conf then
sdk.set_upstream(session, conf)
end
end

local p = read_data(downstream, body_len, true)
if p == nil then
return DECLINED
end

ctx.is_unary = typ == TYPE_UNARY
ctx.is_unary = typ == TYPE_UNARY or typ == TYPE_UNARY_DYN_UP
ctx.is_stream = typ == TYPE_STREAM
ctx.id = stream_id
ctx.len = HDR_LEN + body_len
if typ == TYPE_UNARY_DYN_UP then
ctx.len = ctx.len + 4
end
return OK, ctx
end

Expand All @@ -131,7 +174,10 @@ function _M.connect_upstream(session, ctx)
end
local node = nodes[math_random(#nodes)]
local sk = xrpc_socket.upstream.socket()
sk:settimeout(10) -- the short timeout is just for test
sk:settimeout(1000) -- the short timeout is just for test

core.log.info("connect to ", node.host, ":", node.port)

local ok, err = sk:connect(node.host, node.port)
if not ok then
core.log.error("failed to connect: ", err)
Expand Down
3 changes: 3 additions & 0 deletions t/xrpc/apisix/stream/xrpc/protocols/pingpong/schema.lua
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ local core = require("apisix.core")
local schema = {
type = "object",
properties = {
service = {
type = "string"
},
faults = {
type = "array",
minItems = 1,
Expand Down
Loading

0 comments on commit b448bea

Please sign in to comment.