Skip to content

Commit

Permalink
feat(proxy) implement delayed response with stream module
Browse files Browse the repository at this point in the history
### Summary

Implements delayed responses with stream module that is used to
short-circuit streams.
  • Loading branch information
bungle committed Jul 6, 2022
1 parent 5695255 commit c0b2401
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 40 deletions.
25 changes: 19 additions & 6 deletions kong/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -264,18 +264,21 @@ local function execute_init_worker_plugins_iterator(plugins_iterator, ctx)
end


local function execute_access_plugins_iterator(plugins_iterator, ctx)
local function execute_collecting_plugins_iterator(plugins_iterator, phase, ctx)
local old_ws = ctx.workspace

ctx.delay_response = true

for plugin, configuration in plugins_iterator:iterate("access", ctx) do
for plugin, configuration in plugins_iterator:iterate(phase, ctx) do
if not ctx.delayed_response then
local span = instrumentation.plugin_access(plugin)
local span
if phase == "access" then
span = instrumentation.plugin_access(plugin)
end

setup_plugin_context(ctx, plugin)

local co = coroutine.create(plugin.handler.access)
local co = coroutine.create(plugin.handler[phase])
local cok, cerr = coroutine.resume(co, plugin.handler, configuration)
if not cok then
-- set tracing error
Expand Down Expand Up @@ -795,7 +798,17 @@ function Kong.preread()
end

local plugins_iterator = runloop.get_updated_plugins_iterator()
execute_plugins_iterator(plugins_iterator, "preread", ctx)
execute_collecting_plugins_iterator(plugins_iterator, "preread", ctx)

if ctx.delayed_response then
ctx.KONG_PREREAD_ENDED_AT = get_updated_now_ms()
ctx.KONG_PREREAD_TIME = ctx.KONG_PREREAD_ENDED_AT - ctx.KONG_PREREAD_START
ctx.KONG_RESPONSE_LATENCY = ctx.KONG_PREREAD_ENDED_AT - ctx.KONG_PROCESSING_START

return flush_delayed_response(ctx)
end

ctx.delay_response = nil

if not ctx.service then
ctx.KONG_PREREAD_ENDED_AT = get_updated_now_ms()
Expand Down Expand Up @@ -893,7 +906,7 @@ function Kong.access()

local plugins_iterator = runloop.get_plugins_iterator()

execute_access_plugins_iterator(plugins_iterator, ctx)
execute_collecting_plugins_iterator(plugins_iterator, "access", ctx)

if ctx.delayed_response then
ctx.KONG_ACCESS_ENDED_AT = get_updated_now_ms()
Expand Down
57 changes: 43 additions & 14 deletions kong/pdk/response.lua
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ local find = string.find
local lower = string.lower
local error = error
local pairs = pairs
local ipairs = ipairs
local concat = table.concat
local tonumber = tonumber
local coroutine = coroutine
local normalize_header = checks.normalize_header
local normalize_multi_header = checks.normalize_multi_header
Expand Down Expand Up @@ -812,6 +814,33 @@ local function new(self, major_version)
return send(response.status_code, response.content, response.headers)
end


local function send_stream(status, body, headers)
if body then
if status < 400 then
-- only sends body to the client for < 400 status code
local res, err = ngx.print(body)
if not res then
error("unable to send body to client: " .. err, 2)
end

else
self.log.err("unable to proxy stream connection, " ..
"status: " .. status .. ", err: ", body)
end
end

return ngx.exit(status)
end


local function flush_stream(ctx)
ctx = ctx or ngx.ctx
local response = ctx.delayed_response
return send_stream(response.status_code, response.content, response.headers)
end


if ngx and ngx.config.subsystem == 'http' then
---
-- This function interrupts the current processing and produces a response.
Expand Down Expand Up @@ -994,21 +1023,22 @@ local function new(self, major_version)
end
end

if body then
if status < 400 then
-- only sends body to the client for 200 status code
local res, err = ngx.print(body)
if not res then
error("unable to send body to client: " .. err, 2)
end
local ctx = ngx.ctx
ctx.KONG_EXITED = true

else
self.log.err("unable to proxy stream connection, " ..
"status: " .. status .. ", err: ", body)
end
end
if ctx.delay_response and not ctx.delayed_response then
ctx.delayed_response = {
status_code = status,
content = body,
headers = headers,
}

ctx.delayed_response_callback = flush_stream
coroutine.yield()

return ngx.exit(status)
else
return send_stream(status, body, headers)
end
end
end

Expand Down Expand Up @@ -1127,7 +1157,6 @@ local function new(self, major_version)
if type(message) ~= "string" then
error("message must be a nil, a string or a table", 2)
end

end

if headers ~= nil and type(headers) ~= "table" then
Expand Down
135 changes: 131 additions & 4 deletions spec/02-integration/05-proxy/28-stream_plugins_triggering_spec.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
local helpers = require "spec.helpers"
local pl_file = require "pl.file"
local cjson = require "cjson"


local TEST_CONF = helpers.test_conf
Expand Down Expand Up @@ -42,21 +43,33 @@ local phases = {
["%[logger%] log phase"] = 1,
}

local phases_2 = {
["%[logger%] init_worker phase"] = 1,
["%[logger%] preread phase"] = 0,
["%[logger%] log phase"] = 1,
}

local phases_tls = {
["%[logger%] init_worker phase"] = 1,
["%[logger%] certificate phase"] = 1,
["%[logger%] preread phase"] = 1,
["%[logger%] log phase"] = 1,
}

local phases_tls_2 = {
["%[logger%] init_worker phase"] = 1,
["%[logger%] certificate phase"] = 1,
["%[logger%] preread phase"] = 0,
["%[logger%] log phase"] = 1,
}

local function assert_phases(phrases)
for phase, count in pairs(phrases) do
assert(find_in_file(phase, count))
end
end

for _, strategy in helpers.each_strategy() do

describe("#stream Proxying [#" .. strategy .. "]", function()
local bp

Expand Down Expand Up @@ -107,9 +120,9 @@ for _, strategy in helpers.each_strategy() do
service = tls_srv,
}

assert(bp.plugins:insert {
bp.plugins:insert {
name = "logger",
})
}

assert(helpers.start_kong({
database = strategy,
Expand Down Expand Up @@ -140,7 +153,7 @@ for _, strategy in helpers.each_strategy() do
it("tls", function()
local tcp = ngx.socket.tcp()
assert(tcp:connect(helpers.get_proxy_ip(true), 19443))
assert(tcp:sslhandshake(nil, "this-is-needed.test", false))
assert(tcp:sslhandshake(nil, nil, false))
assert(tcp:send(MESSAGE))
local body = assert(tcp:receive("*a"))
assert.equal(MESSAGE, body)
Expand All @@ -149,4 +162,118 @@ for _, strategy in helpers.each_strategy() do
assert_phases(phases_tls)
end)
end)

describe("#stream Proxying [#" .. strategy .. "]", function()
local bp

before_each(function()
bp = helpers.get_db_utils(strategy, {
"routes",
"services",
"plugins",
}, {
"logger",
"short-circuit",
})

local tcp_srv = bp.services:insert({
name = "tcp",
host = helpers.mock_upstream_host,
port = helpers.mock_upstream_stream_port,
protocol = "tcp"
})

bp.routes:insert {
destinations = {
{
port = 19000,
},
},
protocols = {
"tcp",
},
service = tcp_srv,
}

local tls_srv = bp.services:insert({
name = "tls",
host = helpers.mock_upstream_host,
port = helpers.mock_upstream_stream_ssl_port,
protocol = "tls"
})

bp.routes:insert {
destinations = {
{
port = 19443,
},
},
protocols = {
"tls",
},
service = tls_srv,
}

bp.plugins:insert {
name = "logger",
}

bp.plugins:insert {
name = "short-circuit",
config = {
status = 200,
message = "plugin executed"
},
}

assert(helpers.start_kong({
database = strategy,
nginx_conf = "spec/fixtures/custom_nginx.template",
plugins = "logger,short-circuit",
proxy_listen = "off",
admin_listen = "off",
stream_listen = helpers.get_proxy_ip(false) .. ":19000," ..
helpers.get_proxy_ip(false) .. ":19443 ssl"
}))
end)

after_each(function()
helpers.stop_kong()
end)

it("tcp (short-circuited)", function()
local tcp = ngx.socket.tcp()
assert(tcp:connect(helpers.get_proxy_ip(false), 19000))
local body = assert(tcp:receive("*a"))
tcp:close()

local json = cjson.decode(body)
assert.same({
init_worker_called = true,
message = "plugin executed",
status = 200
}, json)

wait()
assert_phases(phases_2)
end)

it("tls (short-circuited)", function()
local tcp = ngx.socket.tcp()
assert(tcp:connect(helpers.get_proxy_ip(true), 19443))
assert(tcp:sslhandshake(nil, nil, false))
local body = assert(tcp:receive("*a"))
tcp:close()

local json = assert(cjson.decode(body))
assert.same({
init_worker_called = true,
message = "plugin executed",
status = 200
}, json)

wait()
assert_phases(phases_tls_2)
end)
end)
end
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ local cjson = require "cjson"


local kong = kong
local req = ngx.req
local exit = ngx.exit
local error = error
local tostring = tostring
local init_worker_called = false

Expand All @@ -23,26 +20,21 @@ end
function ShortCircuitHandler:access(conf)
return kong.response.exit(conf.status, {
status = conf.status,
message = conf.message
message = conf.message,
}, {
["Kong-Init-Worker-Called"] = tostring(init_worker_called),
})
end


function ShortCircuitHandler:preread(conf)
local tcpsock, err = req.socket(true)
if err then
error(err)
end

tcpsock:send(cjson.encode({
status = conf.status,
message = conf.message
}))

-- TODO: this should really support delayed short-circuiting!
return exit(conf.status)
local message = cjson.encode({
status = conf.status,
message = conf.message,
init_worker_called = init_worker_called,
})
return kong.response.exit(conf.status, message)
end


return ShortCircuitHandler

0 comments on commit c0b2401

Please sign in to comment.