Skip to content

Commit

Permalink
feat(proxy) implement delayed response with stream module
Browse files Browse the repository at this point in the history
### Summary

Implements delayed responses with stream module that is used to
short-circuit streams.
  • Loading branch information
bungle committed Mar 1, 2021
1 parent f4852f3 commit b37928c
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 39 deletions.
24 changes: 20 additions & 4 deletions kong/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -245,16 +245,20 @@ local function execute_plugins_iterator(plugins_iterator, phase, ctx)

if ctx then
old_ws = ctx.workspace
delay_response = phase == "access" or nil
delay_response = phase == "access" or phase == "preread" or nil
ctx.delay_response = delay_response
end

for plugin, configuration in plugins_iterator:iterate(phase, ctx) do
if ctx then
if plugin.handler._go then
if not ctx.ran_go_plugin and plugin.handler._go then
ctx.ran_go_plugin = true
end

if delay_response and ctx.delayed_response then
goto continue
end

kong_global.set_named_ctx(kong, "plugin", plugin.handler)
end

Expand All @@ -263,8 +267,8 @@ local function execute_plugins_iterator(plugins_iterator, phase, ctx)
if not delay_response then
plugin.handler[phase](plugin.handler, configuration)

elseif not ctx.delayed_response then
local co = coroutine.create(plugin.handler.access)
else
local co = coroutine.create(plugin.handler[phase])
local cok, cerr = coroutine.resume(co, plugin.handler, configuration)
if not cok then
kong.log.err(cerr)
Expand All @@ -280,6 +284,8 @@ local function execute_plugins_iterator(plugins_iterator, phase, ctx)
if old_ws then
ctx.workspace = old_ws
end

::continue::
end
end

Expand Down Expand Up @@ -671,6 +677,16 @@ function Kong.preread()
local plugins_iterator = runloop.get_updated_plugins_iterator()
execute_plugins_iterator(plugins_iterator, "preread", ctx)

if ctx.delayed_response then
ctx.KONG_PREREAD_ENDED_AT = get_now_ms()
ctx.KONG_PREREAD_TIME = ctx.KONG_PREREAD_ENDED_AT - ctx.KONG_PREREAD_START
ctx.KONG_RESPONSE_LATENCY = ctx.KONG_PREREAD_ENDED_AT - ctx.KONG_PROCESSING_START

return flush_delayed_response(ctx)
end

ctx.delay_response = nil

if not ctx.service then
ctx.KONG_PREREAD_ENDED_AT = get_now_ms()
ctx.KONG_PREREAD_TIME = ctx.KONG_PREREAD_ENDED_AT - ctx.KONG_PREREAD_START
Expand Down
59 changes: 45 additions & 14 deletions kong/pdk/response.lua
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ local find = string.find
local lower = string.lower
local error = error
local pairs = pairs
local ipairs = ipairs
local tonumber = tonumber
local coroutine = coroutine
local normalize_header = checks.normalize_header
local normalize_multi_header = checks.normalize_multi_header
Expand Down Expand Up @@ -680,6 +682,33 @@ local function new(self, major_version)
return send(response.status_code, response.content, response.headers)
end


local function send_stream(status, body, headers)
if body then
if status < 400 then
-- only sends body to the client for 200 status code
local res, err = ngx.print(body)
if not res then
error("unable to send body to client: " .. err, 2)
end

else
self.log.err("unable to proxy stream connection, " ..
"status: " .. status .. ", err: ", body)
end
end

return ngx.exit(status)
end


local function flush_stream(ctx)
ctx = ctx or ngx.ctx
local response = ctx.delayed_response
return send_stream(response.status_code, response.content, response.headers)
end


if ngx and ngx.config.subsystem == 'http' then
---
-- This function interrupts the current processing and produces a response.
Expand Down Expand Up @@ -849,21 +878,23 @@ local function new(self, major_version)
error("body must be a nil or a string", 2)
end

if body then
if status < 400 then
-- only sends body to the client for 200 status code
local res, err = ngx.print(body)
if not res then
error("unable to send body to client: " .. err, 2)
end
local ctx = ngx.ctx
ctx.KONG_EXITED = true

else
self.log.err("unable to proxy stream connection, " ..
"status: " .. status .. ", err: ", body)
end
end
if ctx.delay_response and not ctx.delayed_response then
ngx.log(ngx.ERR, "STREAM DELAYED")
ctx.delayed_response = {
status_code = status,
content = body,
headers = headers,
}

return ngx.exit(status)
ctx.delayed_response_callback = flush_stream
coroutine.yield()

else
return send_stream(status, body, headers)
end
end
end

Expand Down Expand Up @@ -970,7 +1001,7 @@ local function new(self, major_version)
end

if message ~= nil and type(message) ~= "string" then
error("message must be a nil or a string", 2)
error("message must be a nil or a string", 2)
end

if headers ~= nil and type(headers) ~= "table" then
Expand Down
135 changes: 131 additions & 4 deletions spec/02-integration/05-proxy/28-stream_plugins_triggering_spec.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
local helpers = require "spec.helpers"
local pl_file = require "pl.file"
local cjson = require "cjson"


local TEST_CONF = helpers.test_conf
Expand Down Expand Up @@ -42,21 +43,33 @@ local phases = {
["%[logger%] log phase"] = 1,
}

local phases_2 = {
["%[logger%] init_worker phase"] = 1,
["%[logger%] preread phase"] = 0,
["%[logger%] log phase"] = 1,
}

local phases_tls = {
["%[logger%] init_worker phase"] = 1,
["%[logger%] certificate phase"] = 1,
["%[logger%] preread phase"] = 1,
["%[logger%] log phase"] = 1,
}

local phases_tls_2 = {
["%[logger%] init_worker phase"] = 1,
["%[logger%] certificate phase"] = 1,
["%[logger%] preread phase"] = 0,
["%[logger%] log phase"] = 1,
}

local function assert_phases(phrases)
for phase, count in pairs(phrases) do
assert(find_in_file(phase, count))
end
end

for _, strategy in helpers.each_strategy() do

describe("#stream Proxying [#" .. strategy .. "]", function()
local bp

Expand Down Expand Up @@ -107,9 +120,9 @@ for _, strategy in helpers.each_strategy() do
service = tls_srv,
}

assert(bp.plugins:insert {
bp.plugins:insert {
name = "logger",
})
}

assert(helpers.start_kong({
database = strategy,
Expand Down Expand Up @@ -140,7 +153,7 @@ for _, strategy in helpers.each_strategy() do
it("tls", function()
local tcp = ngx.socket.tcp()
assert(tcp:connect(helpers.get_proxy_ip(true), 19443))
assert(tcp:sslhandshake(nil, "this-is-needed.test", false))
assert(tcp:sslhandshake(nil, nil, false))
assert(tcp:send(MESSAGE))
local body = assert(tcp:receive("*a"))
assert.equal(MESSAGE, body)
Expand All @@ -149,4 +162,118 @@ for _, strategy in helpers.each_strategy() do
assert_phases(phases_tls)
end)
end)

describe("#stream Proxying [#" .. strategy .. "]", function()
local bp

before_each(function()
bp = helpers.get_db_utils(strategy, {
"routes",
"services",
"plugins",
}, {
"logger",
"short-circuit",
})

local tcp_srv = bp.services:insert({
name = "tcp",
host = helpers.mock_upstream_host,
port = helpers.mock_upstream_stream_port,
protocol = "tcp"
})

bp.routes:insert {
destinations = {
{
port = 19000,
},
},
protocols = {
"tcp",
},
service = tcp_srv,
}

local tls_srv = bp.services:insert({
name = "tls",
host = helpers.mock_upstream_host,
port = helpers.mock_upstream_stream_ssl_port,
protocol = "tls"
})

bp.routes:insert {
destinations = {
{
port = 19443,
},
},
protocols = {
"tls",
},
service = tls_srv,
}

bp.plugins:insert {
name = "logger",
}

bp.plugins:insert {
name = "short-circuit",
config = {
status = 200,
message = "plugin executed"
},
}

assert(helpers.start_kong({
database = strategy,
nginx_conf = "spec/fixtures/custom_nginx.template",
plugins = "logger,short-circuit",
proxy_listen = "off",
admin_listen = "off",
stream_listen = helpers.get_proxy_ip(false) .. ":19000," ..
helpers.get_proxy_ip(false) .. ":19443 ssl"
}))
end)

after_each(function()
helpers.stop_kong()
end)

it("tcp (short-circuited)", function()
local tcp = ngx.socket.tcp()
assert(tcp:connect(helpers.get_proxy_ip(false), 19000))
local body = assert(tcp:receive("*a"))
tcp:close()

local json = cjson.decode(body)
assert.same({
init_worker_called = true,
message = "plugin executed",
status = 200
}, json)

wait()
assert_phases(phases_2)
end)

it("tls (short-circuited)", function()
local tcp = ngx.socket.tcp()
assert(tcp:connect(helpers.get_proxy_ip(true), 19443))
assert(tcp:sslhandshake(nil, nil, false))
local body = assert(tcp:receive("*a"))
tcp:close()

local json = assert(cjson.decode(body))
assert.same({
init_worker_called = true,
message = "plugin executed",
status = 200
}, json)

wait()
assert_phases(phases_tls_2)
end)
end)
end
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ local cjson = require "cjson"


local kong = kong
local req = ngx.req
local exit = ngx.exit
local error = error
local tostring = tostring
local init_worker_called = false

Expand All @@ -30,7 +27,7 @@ function ShortCircuitHandler:access(conf)
ShortCircuitHandler.super.access(self)
return kong.response.exit(conf.status, {
status = conf.status,
message = conf.message
message = conf.message,
}, {
["Kong-Init-Worker-Called"] = tostring(init_worker_called),
})
Expand All @@ -39,19 +36,12 @@ end

function ShortCircuitHandler:preread(conf)
ShortCircuitHandler.super.preread(self)

local tcpsock, err = req.socket(true)
if err then
error(err)
end

tcpsock:send(cjson.encode({
status = conf.status,
message = conf.message
}))

-- TODO: this should really support delayed short-circuiting!
return exit(conf.status)
local message = cjson.encode({
status = conf.status,
message = conf.message,
init_worker_called = init_worker_called,
})
return kong.response.exit(conf.status, message)
end


Expand Down

0 comments on commit b37928c

Please sign in to comment.