Skip to content

Commit

Permalink
feat(ext-plugin): support hook response body (#6968)
Browse files Browse the repository at this point in the history
  • Loading branch information
soulbird authored and spacewander committed Jun 30, 2022
1 parent 87bb431 commit b533590
Show file tree
Hide file tree
Showing 9 changed files with 842 additions and 4 deletions.
1 change: 1 addition & 0 deletions apisix/constants.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ return {
RPC_PREPARE_CONF = 1,
RPC_HTTP_REQ_CALL = 2,
RPC_EXTRA_INFO = 3,
RPC_HTTP_RESP_CALL = 4,
HTTP_ETCD_DIRECTORY = {
["/upstreams"] = true,
["/plugins"] = true,
Expand Down
172 changes: 172 additions & 0 deletions apisix/plugins/ext-plugin-post-resp.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local ext = require("apisix.plugins.ext-plugin.init")
local constants = require("apisix.constants")
local http = require("resty.http")

local ngx = ngx
local ngx_print = ngx.print
local ngx_flush = ngx.flush
local string = string
local str_sub = string.sub


local name = "ext-plugin-post-resp"
local _M = {
version = 0.1,
priority = -4000,
name = name,
schema = ext.schema,
}


local function include_req_headers(ctx)
-- TODO: handle proxy_set_header
return core.request.headers(ctx)
end


local function close(http_obj)
-- TODO: keepalive
local ok, err = http_obj:close()
if not ok then
core.log.error("close http object failed: ", err)
end
end


local function get_response(ctx, http_obj)
local ok, err = http_obj:connect({
scheme = ctx.upstream_scheme,
host = ctx.picked_server.host,
port = ctx.picked_server.port,
})

if not ok then
return nil, err
end
-- TODO: set timeout
local uri, args
if ctx.var.upstream_uri == "" then
-- use original uri instead of rewritten one
uri = ctx.var.uri
else
uri = ctx.var.upstream_uri

-- the rewritten one may contain new args
local index = core.string.find(uri, "?")
if index then
local raw_uri = uri
uri = str_sub(raw_uri, 1, index - 1)
args = str_sub(raw_uri, index + 1)
end
end
local params = {
path = uri,
query = args or ctx.var.args,
headers = include_req_headers(ctx),
method = core.request.get_method(),
}

local body, err = core.request.get_body()
if err then
return nil, err
end

if body then
params["body"] = body
end

local res, err = http_obj:request(params)
if not res then
return nil, err
end

return res, err
end


local function send_response(res, code)
ngx.status = code or res.status

local reader = res.body_reader
repeat
local chunk, ok, read_err, print_err, flush_err
-- TODO: HEAD or 304
chunk, read_err = reader()
if read_err then
return "read response failed: ".. (read_err or "")
end

if chunk then
ok, print_err = ngx_print(chunk)
if not ok then
return "output response failed: ".. (print_err or "")
end
ok, flush_err = ngx_flush(true)
if not ok then
core.log.warn("flush response failed: ", flush_err)
end
end
until not chunk

return nil
end



function _M.check_schema(conf)
return core.schema.check(_M.schema, conf)
end


function _M.before_proxy(conf, ctx)
local http_obj = http.new()
local res, err = get_response(ctx, http_obj)
if not res or err then
core.log.error("failed to request: ", err or "")
close(http_obj)
return 502
end
ctx.runner_ext_response = res

core.log.info("response info, status: ", res.status)
core.log.info("response info, headers: ", core.json.delay_encode(res.headers))

local code, body = ext.communicate(conf, ctx, name, constants.RPC_HTTP_RESP_CALL)
if body then
close(http_obj)
-- if the body is changed, the code will be set.
return code, body
end
core.log.info("ext-plugin will send response")

-- send origin response, status maybe changed.
err = send_response(res, code)
close(http_obj)

if err then
core.log.error(err)
return not ngx.headers_sent and 502 or nil
end

core.log.info("ext-plugin send response succefully")
end


return _M
108 changes: 106 additions & 2 deletions apisix/plugins/ext-plugin/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ local http_req_call_resp = require("A6.HTTPReqCall.Resp")
local http_req_call_action = require("A6.HTTPReqCall.Action")
local http_req_call_stop = require("A6.HTTPReqCall.Stop")
local http_req_call_rewrite = require("A6.HTTPReqCall.Rewrite")
local http_resp_call_req = require("A6.HTTPRespCall.Req")
local http_resp_call_resp = require("A6.HTTPRespCall.Resp")
local extra_info = require("A6.ExtraInfo.Info")
local extra_info_req = require("A6.ExtraInfo.Req")
local extra_info_var = require("A6.ExtraInfo.Var")
Expand Down Expand Up @@ -680,6 +682,107 @@ local rpc_handlers = {

return true
end,
nil, -- ignore RPC_EXTRA_INFO, already processed during RPC_HTTP_REQ_CALL interaction
function (conf, ctx, sock, entry)
local lrucache_id = core.lrucache.plugin_ctx_id(ctx, entry)
local token, err = core.lrucache.plugin_ctx(lrucache, ctx, entry, rpc_call,
constants.RPC_PREPARE_CONF, conf, ctx,
lrucache_id)
if not token then
return nil, err
end

builder:Clear()
local var = ctx.var

local res = ctx.runner_ext_response
local textEntries = {}
local hdrs = res.headers
for key, val in pairs(hdrs) do
local ty = type(val)
if ty == "table" then
for _, v in ipairs(val) do
core.table.insert(textEntries, build_headers(var, builder, key, v))
end
else
core.table.insert(textEntries, build_headers(var, builder, key, val))
end
end
local len = #textEntries
http_resp_call_req.StartHeadersVector(builder, len)
for i = len, 1, -1 do
builder:PrependUOffsetTRelative(textEntries[i])
end
local hdrs_vec = builder:EndVector(len)

local id = generate_id()
local status = res.status

http_resp_call_req.Start(builder)
http_resp_call_req.AddId(builder, id)
http_resp_call_req.AddStatus(builder, status)
http_resp_call_req.AddConfToken(builder, token)
http_resp_call_req.AddHeaders(builder, hdrs_vec)

local req = http_resp_call_req.End(builder)
builder:Finish(req)

local ok, err = send(sock, constants.RPC_HTTP_RESP_CALL, builder:Output())
if not ok then
return nil, "failed to send RPC_HTTP_RESP_CALL: " .. err
end

local ty, resp = receive(sock)
if ty == nil then
return nil, "failed to receive RPC_HTTP_RESP_CALL: " .. resp
end

if ty ~= constants.RPC_HTTP_RESP_CALL then
return nil, "failed to receive RPC_HTTP_RESP_CALL: unexpected type " .. ty
end

local buf = flatbuffers.binaryArray.New(resp)
local call_resp = http_resp_call_resp.GetRootAsResp(buf, 0)
local len = call_resp:HeadersLength()
if len > 0 then
local resp_headers = {}
for i = 1, len do
local entry = call_resp:Headers(i)
local name = str_lower(entry:Name())
if not exclude_resp_header[name] then
if resp_headers[name] == nil then
core.response.set_header(name, entry:Value())
resp_headers[name] = true
else
core.response.add_header(name, entry:Value())
end
end
end
else
-- Filter out origin headeres
for k, v in pairs(res.headers) do
if not exclude_resp_header[str_lower(k)] then
core.response.set_header(k, v)
end
end
end

local body
local len = call_resp:BodyLength()
if len > 0 then
-- TODO: support empty body
body = call_resp:BodyAsString()
end
local code = call_resp:Status()
core.log.info("recv resp, code: ", code, " body: ", body, " len: ", len)

if code == 0 then
-- runner changes body only, we should set code.
code = body and res.status or nil
end

return true, nil, code, body
end
}


Expand Down Expand Up @@ -719,12 +822,13 @@ local function recreate_lrucache()
end


function _M.communicate(conf, ctx, plugin_name)
function _M.communicate(conf, ctx, plugin_name, rpc_cmd)
local ok, err, code, body
local tries = 0
local ty = rpc_cmd and rpc_cmd or constants.RPC_HTTP_REQ_CALL
while tries < 3 do
tries = tries + 1
ok, err, code, body = rpc_call(constants.RPC_HTTP_REQ_CALL, conf, ctx, plugin_name)
ok, err, code, body = rpc_call(ty, conf, ctx, plugin_name)
if ok then
if code then
return code, body
Expand Down
1 change: 1 addition & 0 deletions conf/config-default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ plugins: # plugin list (sorted by priority)
- openwhisk # priority: -1901
- serverless-post-function # priority: -2000
- ext-plugin-post-req # priority: -3000
- ext-plugin-post-resp # priority: -4000

stream_plugins: # sorted by priority
- ip-restriction # priority: 3000
Expand Down
2 changes: 1 addition & 1 deletion rockspec/apisix-master-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ dependencies = {
"luasec = 0.9-1",
"lua-resty-consul = 0.3-2",
"penlight = 1.9.2-1",
"ext-plugin-proto = 0.4.0",
"ext-plugin-proto = 0.5.0",
"casbin = 1.26.0",
"api7-snowflake = 2.0-1",
"inspect == 3.1.1",
Expand Down
1 change: 1 addition & 0 deletions t/admin/plugins.t
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ azure-functions
openwhisk
serverless-post-function
ext-plugin-post-req
ext-plugin-post-resp
Expand Down
Loading

0 comments on commit b533590

Please sign in to comment.