Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(proxy) implement delayed response with stream module #6878

Merged
merged 1 commit into from
Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions kong/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -264,18 +264,21 @@ local function execute_init_worker_plugins_iterator(plugins_iterator, ctx)
end


local function execute_access_plugins_iterator(plugins_iterator, ctx)
local function execute_collecting_plugins_iterator(plugins_iterator, phase, ctx)
local old_ws = ctx.workspace

ctx.delay_response = true

for plugin, configuration in plugins_iterator:iterate("access", ctx) do
for plugin, configuration in plugins_iterator:iterate(phase, ctx) do
if not ctx.delayed_response then
local span = instrumentation.plugin_access(plugin)
local span
if phase == "access" then
span = instrumentation.plugin_access(plugin)
end

setup_plugin_context(ctx, plugin)

local co = coroutine.create(plugin.handler.access)
local co = coroutine.create(plugin.handler[phase])
local cok, cerr = coroutine.resume(co, plugin.handler, configuration)
if not cok then
-- set tracing error
Expand Down Expand Up @@ -795,7 +798,17 @@ function Kong.preread()
end

local plugins_iterator = runloop.get_updated_plugins_iterator()
execute_plugins_iterator(plugins_iterator, "preread", ctx)
execute_collecting_plugins_iterator(plugins_iterator, "preread", ctx)

if ctx.delayed_response then
ctx.KONG_PREREAD_ENDED_AT = get_updated_now_ms()
ctx.KONG_PREREAD_TIME = ctx.KONG_PREREAD_ENDED_AT - ctx.KONG_PREREAD_START
ctx.KONG_RESPONSE_LATENCY = ctx.KONG_PREREAD_ENDED_AT - ctx.KONG_PROCESSING_START

return flush_delayed_response(ctx)
end

ctx.delay_response = nil

if not ctx.service then
ctx.KONG_PREREAD_ENDED_AT = get_updated_now_ms()
Expand Down Expand Up @@ -893,7 +906,7 @@ function Kong.access()

local plugins_iterator = runloop.get_plugins_iterator()

execute_access_plugins_iterator(plugins_iterator, ctx)
execute_collecting_plugins_iterator(plugins_iterator, "access", ctx)

if ctx.delayed_response then
ctx.KONG_ACCESS_ENDED_AT = get_updated_now_ms()
Expand Down
57 changes: 43 additions & 14 deletions kong/pdk/response.lua
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ local find = string.find
local lower = string.lower
local error = error
local pairs = pairs
local ipairs = ipairs
local concat = table.concat
local tonumber = tonumber
local coroutine = coroutine
local normalize_header = checks.normalize_header
local normalize_multi_header = checks.normalize_multi_header
Expand Down Expand Up @@ -812,6 +814,33 @@ local function new(self, major_version)
return send(response.status_code, response.content, response.headers)
end


local function send_stream(status, body, headers)
if body then
if status < 400 then
-- only sends body to the client for < 400 status code
local res, err = ngx.print(body)
if not res then
error("unable to send body to client: " .. err, 2)
end

else
self.log.err("unable to proxy stream connection, " ..
"status: " .. status .. ", err: ", body)
end
end

return ngx.exit(status)
end


local function flush_stream(ctx)
ctx = ctx or ngx.ctx
local response = ctx.delayed_response
return send_stream(response.status_code, response.content, response.headers)
end


if ngx and ngx.config.subsystem == 'http' then
---
-- This function interrupts the current processing and produces a response.
Expand Down Expand Up @@ -994,21 +1023,22 @@ local function new(self, major_version)
end
end

if body then
if status < 400 then
-- only sends body to the client for 200 status code
local res, err = ngx.print(body)
if not res then
error("unable to send body to client: " .. err, 2)
end
local ctx = ngx.ctx
ctx.KONG_EXITED = true

else
self.log.err("unable to proxy stream connection, " ..
"status: " .. status .. ", err: ", body)
end
end
if ctx.delay_response and not ctx.delayed_response then
ctx.delayed_response = {
status_code = status,
content = body,
headers = headers,
}

ctx.delayed_response_callback = flush_stream
coroutine.yield()

return ngx.exit(status)
else
return send_stream(status, body, headers)
end
end
end

Expand Down Expand Up @@ -1127,7 +1157,6 @@ local function new(self, major_version)
if type(message) ~= "string" then
error("message must be a nil, a string or a table", 2)
end

end

if headers ~= nil and type(headers) ~= "table" then
Expand Down
135 changes: 131 additions & 4 deletions spec/02-integration/05-proxy/28-stream_plugins_triggering_spec.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
local helpers = require "spec.helpers"
local pl_file = require "pl.file"
local cjson = require "cjson"


local TEST_CONF = helpers.test_conf
Expand Down Expand Up @@ -42,21 +43,33 @@ local phases = {
["%[logger%] log phase"] = 1,
}

local phases_2 = {
["%[logger%] init_worker phase"] = 1,
["%[logger%] preread phase"] = 0,
["%[logger%] log phase"] = 1,
}

local phases_tls = {
["%[logger%] init_worker phase"] = 1,
["%[logger%] certificate phase"] = 1,
["%[logger%] preread phase"] = 1,
["%[logger%] log phase"] = 1,
}

local phases_tls_2 = {
["%[logger%] init_worker phase"] = 1,
["%[logger%] certificate phase"] = 1,
["%[logger%] preread phase"] = 0,
["%[logger%] log phase"] = 1,
}

local function assert_phases(phrases)
for phase, count in pairs(phrases) do
assert(find_in_file(phase, count))
end
end

for _, strategy in helpers.each_strategy() do

describe("#stream Proxying [#" .. strategy .. "]", function()
local bp

Expand Down Expand Up @@ -107,9 +120,9 @@ for _, strategy in helpers.each_strategy() do
service = tls_srv,
}

assert(bp.plugins:insert {
bp.plugins:insert {
name = "logger",
})
}

assert(helpers.start_kong({
database = strategy,
Expand Down Expand Up @@ -140,7 +153,7 @@ for _, strategy in helpers.each_strategy() do
it("tls", function()
local tcp = ngx.socket.tcp()
assert(tcp:connect(helpers.get_proxy_ip(true), 19443))
assert(tcp:sslhandshake(nil, "this-is-needed.test", false))
assert(tcp:sslhandshake(nil, nil, false))
assert(tcp:send(MESSAGE))
local body = assert(tcp:receive("*a"))
assert.equal(MESSAGE, body)
Expand All @@ -149,4 +162,118 @@ for _, strategy in helpers.each_strategy() do
assert_phases(phases_tls)
end)
end)

describe("#stream Proxying [#" .. strategy .. "]", function()
local bp

before_each(function()
bp = helpers.get_db_utils(strategy, {
"routes",
"services",
"plugins",
}, {
"logger",
"short-circuit",
})

local tcp_srv = bp.services:insert({
name = "tcp",
host = helpers.mock_upstream_host,
port = helpers.mock_upstream_stream_port,
protocol = "tcp"
})

bp.routes:insert {
destinations = {
{
port = 19000,
},
},
protocols = {
"tcp",
},
service = tcp_srv,
}

local tls_srv = bp.services:insert({
name = "tls",
host = helpers.mock_upstream_host,
port = helpers.mock_upstream_stream_ssl_port,
protocol = "tls"
})

bp.routes:insert {
destinations = {
{
port = 19443,
},
},
protocols = {
"tls",
},
service = tls_srv,
}

bp.plugins:insert {
name = "logger",
}

bp.plugins:insert {
name = "short-circuit",
config = {
status = 200,
message = "plugin executed"
},
}

assert(helpers.start_kong({
database = strategy,
nginx_conf = "spec/fixtures/custom_nginx.template",
plugins = "logger,short-circuit",
proxy_listen = "off",
admin_listen = "off",
stream_listen = helpers.get_proxy_ip(false) .. ":19000," ..
helpers.get_proxy_ip(false) .. ":19443 ssl"
}))
end)

after_each(function()
helpers.stop_kong()
end)

it("tcp (short-circuited)", function()
local tcp = ngx.socket.tcp()
assert(tcp:connect(helpers.get_proxy_ip(false), 19000))
local body = assert(tcp:receive("*a"))
tcp:close()

local json = cjson.decode(body)
assert.same({
init_worker_called = true,
message = "plugin executed",
status = 200
}, json)

wait()
assert_phases(phases_2)
end)

it("tls (short-circuited)", function()
local tcp = ngx.socket.tcp()
assert(tcp:connect(helpers.get_proxy_ip(true), 19443))
assert(tcp:sslhandshake(nil, nil, false))
local body = assert(tcp:receive("*a"))
tcp:close()

local json = assert(cjson.decode(body))
assert.same({
init_worker_called = true,
message = "plugin executed",
status = 200
}, json)

wait()
assert_phases(phases_tls_2)
end)
end)
end
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ local cjson = require "cjson"


local kong = kong
local req = ngx.req
local exit = ngx.exit
local error = error
local tostring = tostring
local init_worker_called = false

Expand All @@ -23,26 +20,21 @@ end
function ShortCircuitHandler:access(conf)
return kong.response.exit(conf.status, {
status = conf.status,
message = conf.message
message = conf.message,
}, {
["Kong-Init-Worker-Called"] = tostring(init_worker_called),
})
end


function ShortCircuitHandler:preread(conf)
local tcpsock, err = req.socket(true)
if err then
error(err)
end

tcpsock:send(cjson.encode({
status = conf.status,
message = conf.message
}))

-- TODO: this should really support delayed short-circuiting!
return exit(conf.status)
local message = cjson.encode({
status = conf.status,
message = conf.message,
init_worker_called = init_worker_called,
})
return kong.response.exit(conf.status, message)
end


return ShortCircuitHandler