Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(xRPC): support dynamic upstream #6901

Merged
merged 1 commit into from
Apr 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apisix/schema_def.lua
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,7 @@ local xrpc_protocol_schema = {
name = {
type = "string",
},
superior_id = id_schema,
conf = {
description = "protocol-specific configuration",
type = "object",
Expand Down
6 changes: 6 additions & 0 deletions apisix/stream/router/ip_port.lua
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ do
end

local route = item.value
if route.protocol and route.protocol.superior_id then
-- subordinate route won't be matched in the entry
-- TODO: check the subordinate relationship in the Admin API
goto CONTINUE
end

if item.value.remote_addr then
item.value.remote_addr_matcher = core_ip.create_ip_matcher({item.value.remote_addr})
end
Expand Down
13 changes: 6 additions & 7 deletions apisix/stream/xrpc/protocols/redis/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,13 @@ local function read_len(sk)
end


local function read_req(sk)
local function read_req(sk, ctx)
local narg, err = read_len(sk)
if not narg then
return nil, err
end

local ctx = {
cmd_line = core.table.new(narg, 0)
}
ctx.cmd_line = core.table.new(narg, 0)

for i = 1, narg do
local n, err = read_len(sk)
Expand Down Expand Up @@ -116,7 +114,7 @@ local function read_req(sk)
end

ctx.cmd = ctx.cmd_line[1]
return ctx
return true
end


Expand Down Expand Up @@ -195,8 +193,9 @@ end


function _M.from_downstream(session, downstream)
local ctx, err = read_req(downstream)
if not ctx then
local ctx = sdk.get_req_ctx(session, 0)
local ok, err = read_req(downstream, ctx)
if not ok then
if err ~= "timeout" and err ~= "closed" then
core.log.error("failed to read request: ", err)
end
Expand Down
70 changes: 57 additions & 13 deletions apisix/stream/xrpc/runner.lua
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ local _M = {}

local function open_session(conn_ctx)
conn_ctx.xrpc_session = {
upstream_conf = conn_ctx.matched_upstream,
_upstream_conf = conn_ctx.matched_upstream,
-- fields start with '_' should not be accessed by the protocol implementation
_route = conn_ctx.matched_route.value,
_ctxs = {},
}
return conn_ctx.xrpc_session
Expand All @@ -44,8 +46,18 @@ local function close_session(session, protocol)
protocol.disconnect_upstream(session, up, upstream_ctx.broken)
end

local upstream_ctxs = session._upstream_ctxs
if upstream_ctxs then
for _, upstream_ctx in pairs(upstream_ctxs) do
upstream_ctx.closed = true

local up = upstream_ctx.upstream
protocol.disconnect_upstream(session, up, upstream_ctx.broken)
end
end

for id in pairs(session._ctxs) do
core.log.info("RPC is not finished, id: ", id)
core.log.notice("RPC is not finished, id: ", id)
end
end

Expand All @@ -67,21 +79,43 @@ end


local function open_upstream(protocol, session, ctx)
if session._upstream_ctx then
return OK, session._upstream_ctx
local key = session._upstream_key
session._upstream_key = nil

if key then
if not session._upstream_ctxs then
session._upstream_ctxs = {}
end

local up_ctx = session._upstream_ctxs[key]
if up_ctx then
return OK, up_ctx
end
else
if session._upstream_ctx then
return OK, session._upstream_ctx
end

session.upstream_conf = session._upstream_conf
end

local state, upstream = protocol.connect_upstream(session, session)
if state ~= OK then
return state, nil
end

session._upstream_ctx = {
local up_ctx = {
upstream = upstream,
broken = false,
closed = false,
}
return OK, session._upstream_ctx
if key then
session._upstream_ctxs[key] = up_ctx
else
session._upstream_ctx = up_ctx
end

return OK, up_ctx
end


Expand Down Expand Up @@ -135,18 +169,28 @@ function _M.run(protocol, conn_ctx)
-- need to do some auth/routing jobs before reaching upstream
local status, up_ctx = open_upstream(protocol, session, ctx)
if status ~= OK then
if ctx ~= nil then
finish_req(protocol, session, ctx)
end

break
end

status = protocol.to_upstream(session, ctx, downstream, up_ctx.upstream)
if status == DECLINED then
up_ctx.broken = true
break
end
if status ~= OK then
if ctx ~= nil then
finish_req(protocol, session, ctx)
end

if status == DONE then
-- for Unary request we can directly reply here
goto continue
if status == DECLINED then
up_ctx.broken = true
break
end

if status == DONE then
-- for Unary request we can directly reply here
goto continue
end
end

if not up_ctx.coroutine then
Expand Down
68 changes: 68 additions & 0 deletions apisix/stream/xrpc/sdk.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
--
-- @module xrpc.sdk
local core = require("apisix.core")
local config_util = require("apisix.core.config_util")
local router = require("apisix.stream.router.ip_port")
local xrpc_socket = require("resty.apisix.stream.xrpc.socket")
local ngx_now = ngx.now
local tab_insert = table.insert
local error = error
local tostring = tostring


local _M = {}
Expand Down Expand Up @@ -92,6 +96,7 @@ function _M.get_req_ctx(session, id)
end

local ctx = core.tablepool.fetch("xrpc_ctxs", 4, 4)
-- fields start with '_' should not be accessed by the protocol implementation
ctx._id = id
session._ctxs[id] = ctx

Expand All @@ -100,4 +105,67 @@ function _M.get_req_ctx(session, id)
end


---
-- Returns the new router if the stream routes are changed
--
-- @function xrpc.sdk.get_router
-- @tparam table xrpc session
-- @tparam string the current router version, should come from the last call
-- @treturn boolean whether there is a change
-- @treturn table the new router under the specific protocol
-- @treturn string the new router version
function _M.get_router(session, version)
local protocol_name = session._route.protocol.name
local id = session._route.id

local items, conf_version = router.routes()
if version == conf_version then
return false
end

local proto_router = {}
for _, item in config_util.iterate_values(items) do
if item.value == nil then
goto CONTINUE
end

local route = item.value
if route.protocol.name ~= protocol_name then
goto CONTINUE
end

if tostring(route.protocol.superior_id) ~= id then
goto CONTINUE
end

tab_insert(proto_router, route)

::CONTINUE::
end

return true, proto_router, conf_version
end


---
-- Set the session's current upstream according to the route's configuration
--
-- @function xrpc.sdk.set_upstream
-- @tparam table xrpc session
-- @tparam table the route configuration
function _M.set_upstream(session, conf)
local up
if conf.upstream then
up = conf.upstream
-- TODO: support upstream_id
end

local key = tostring(conf)
core.log.info("set upstream to: ", key)

session._upstream_key = key
session.upstream_conf = up
end


return _M
56 changes: 51 additions & 5 deletions t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,27 @@ local bit = require("bit")
local lshift = bit.lshift
local ffi = require("ffi")
local ffi_str = ffi.string
local ipairs = ipairs
local math_random = math.random
local OK = ngx.OK
local DECLINED = ngx.DECLINED
local DONE = ngx.DONE
local str_byte = string.byte


local _M = {}
local router_version
local router
-- pingpong protocol is designed to use in the test of xRPC.
-- It contains two part: a fixed-length header & a body.
-- Header format:
-- "pp" (magic number) + 1 bytes req type + 2 bytes stream id + 1 reserved bytes
-- + 4 bytes body length
local _M = {}
-- + 4 bytes body length + optional 4 bytes service name
local HDR_LEN = 10
local TYPE_HEARTBEAT = 1
local TYPE_UNARY = 2
local TYPE_STREAM = 3
local TYPE_UNARY_DYN_UP = 4


function _M.init_worker()
Expand All @@ -48,7 +52,7 @@ end
function _M.init_downstream(session)
-- create the downstream
local sk = xrpc_socket.downstream.socket()
sk:settimeout(10) -- the short timeout is just for test
sk:settimeout(1000) -- the short timeout is just for test
return sk
end

Expand Down Expand Up @@ -104,15 +108,54 @@ function _M.from_downstream(session, downstream)
local body_len = to_int32(p, 6)
core.log.info("read body len: ", body_len)

if typ == TYPE_UNARY_DYN_UP then
local p = read_data(downstream, 4, false)
if p == nil then
return DECLINED
end

local len = 4
for i = 0, 3 do
if p[i] == 0 then
len = i
break
end
end
local service = ffi_str(p, len)
core.log.info("get service [", service, "]")
ctx.service = service

local changed, raw_router, version = sdk.get_router(session, router_version)
if changed then
router_version = version
router = {}

for _, r in ipairs(raw_router) do
local conf = r.protocol.conf
if conf and conf.service then
router[conf.service] = r
end
end
end

local conf = router[ctx.service]
if conf then
sdk.set_upstream(session, conf)
end
end

local p = read_data(downstream, body_len, true)
if p == nil then
return DECLINED
end

ctx.is_unary = typ == TYPE_UNARY
ctx.is_unary = typ == TYPE_UNARY or typ == TYPE_UNARY_DYN_UP
ctx.is_stream = typ == TYPE_STREAM
ctx.id = stream_id
ctx.len = HDR_LEN + body_len
if typ == TYPE_UNARY_DYN_UP then
ctx.len = ctx.len + 4
end
return OK, ctx
end

Expand All @@ -131,7 +174,10 @@ function _M.connect_upstream(session, ctx)
end
local node = nodes[math_random(#nodes)]
local sk = xrpc_socket.upstream.socket()
sk:settimeout(10) -- the short timeout is just for test
sk:settimeout(1000) -- the short timeout is just for test

core.log.info("connect to ", node.host, ":", node.port)

local ok, err = sk:connect(node.host, node.port)
if not ok then
core.log.error("failed to connect: ", err)
Expand Down
3 changes: 3 additions & 0 deletions t/xrpc/apisix/stream/xrpc/protocols/pingpong/schema.lua
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ local core = require("apisix.core")
local schema = {
type = "object",
properties = {
service = {
type = "string"
},
faults = {
type = "array",
minItems = 1,
Expand Down
Loading