Skip to content

Commit

Permalink
feat(tracing): route/service scoping
Browse files Browse the repository at this point in the history
implement KAG-244
  • Loading branch information
StarlightIbuki committed Jan 11, 2023
1 parent 6482493 commit 91f4f19
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 12 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@
parameter that allows the selection of the IMDS protocol version.
Defaults to `v1`, can be set to `v2` to enable IMDSv2.
[#9962](https://github.com/Kong/kong/pull/9962)
- **OpenTelemetry**: Support Service and Router scoping.
[#10096](https://github.com/Kong/kong/pull/10096)

### Fixes

Expand Down
118 changes: 117 additions & 1 deletion kong/pdk/tracing.lua
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ local function new_tracer(name, options)
-- @tparam table options TODO(mayo)
-- @treturn table span
function self.start_span(...)
if not base.get_request() then
if (not base.get_request()) or ngx.ctx.stoping_tracing then
return noop_span
end

Expand Down Expand Up @@ -488,8 +488,124 @@ setmetatable(noop_tracer, {
__newindex = NOOP,
})

local function terminate_tracers()
-- stop further tracing
ngx.ctx.stoping_tracing = true

local spans = ngx.ctx.KONG_SPANS
if type(spans) ~= "table" then
return
end

-- stop existing spans
for _, span in ipairs(spans) do
if type(span) == "table" then
span.should_sample = false
end
end
end

local register_tracer_plugin, is_tracers_enabled_at
do
local GLOBAL_QUERY_OPTS = { workspace = ngx.null, show_ws_id = true }

local tracers = {}
local tracers_n = 0


local function get_cache_key(name)
return "tracers:is_enabled_at:" .. name
end

-- we cannot decide consumer before access phase, so tracer plugins must
-- be defined no_consumer
local function get_tracer_enabled_path(name)
local enabled_tbl = {}
for plugin, err in kong.db.plugins:each(nil, GLOBAL_QUERY_OPTS) do
if err then
kong.log.crit("could not obtain list of plugins: ", err)
goto continue
end

if plugin.name == name and plugin.enabled then
if plugin.route and not plugin.service then
enabled_tbl[plugin.route.id] = true
end

if plugin.service and not plugin.route then
enabled_tbl[plugin.service.id] = true
end

if plugin.route and plugin.service then
enabled_tbl[plugin.route.id .. ":" .. plugin.service.id] = true
end

if not plugin.route and not plugin.servicer then
enabled_tbl.global = true
end
end
::continue::
end

return enabled_tbl
end

local function is_tracer_enabled_at(name, route_id, service_id)
local enabled_tbl, err = kong.cache:get(get_cache_key(name), nil, get_tracer_enabled_path, name)

if not enabled_tbl then
return false, err
end

-- any of them enabled, we should do the tracing
return enabled_tbl.global or enabled_tbl[route_id] or enabled_tbl[service_id] or enabled_tbl[route_id .. ":" .. service_id]
end

local function invalidate_cache_event(name)
local worker_events = kong.worker_events
local cache_key = get_cache_key(name)
-- invalidations
if kong.configuration.database == "off" then
worker_events.register(function()
kong.cache:invalidate(cache_key)
end, "declarative", "reconfigure")

else
worker_events.register(function(data)
if data.entity.name == name then
kong.cache:invalidate(cache_key)
end
end, "crud", "plugins")
end
end

function register_tracer_plugin(name)
invalidate_cache_event(name)
tracers_n = tracers_n + 1
tracers[tracers_n] = name
end

function is_tracers_enabled_at(router_id, service_id)
for i = 1, tracers_n do
local enabled, err = is_tracer_enabled_at(tracers[i], router_id, service_id)

if err then
kong.log.debug("failed to check if tracer is enabled: ", err)
end

if enabled then
return true
end
end
return false
end
end

return {
new = function()
return global_tracer
end,
register_tracer_plugin = register_tracer_plugin,
is_tracers_enabled_at = is_tracers_enabled_at,
terminate_tracers = terminate_tracers,
}
8 changes: 7 additions & 1 deletion kong/plugins/opentelemetry/handler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ local http = require "resty.http"
local clone = require "table.clone"
local otlp = require "kong.plugins.opentelemetry.otlp"
local propagation = require "kong.tracing.propagation"
local register_tracer_plugin = require("kong.pdk.tracing").register_tracer_plugin

local pairs = pairs

Expand Down Expand Up @@ -113,7 +114,12 @@ local function process_span(span, queue)
queue:add(pb_span)
end

function OpenTelemetryHandler:rewrite()
function OpenTelemetryHandler:init()
register_tracer_plugin("opentelemetry")
end

-- TODO: unify handling of propagation with other tracers like zipkin
function OpenTelemetryHandler:access()
local headers = ngx_get_headers()
local root_span = ngx.ctx.KONG_SPANS and ngx.ctx.KONG_SPANS[1]

Expand Down
4 changes: 1 addition & 3 deletions kong/plugins/opentelemetry/schema.lua
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,8 @@ local resource_attributes = Schema.define {
return {
name = "opentelemetry",
fields = {
-- global plugin only
-- tracer pluign does not scope on consumers
{ consumer = typedefs.no_consumer },
{ service = typedefs.no_service },
{ route = typedefs.no_route },
{ protocols = typedefs.protocols_http }, -- TODO: support stream mode
{ config = {
type = "record",
Expand Down
19 changes: 18 additions & 1 deletion kong/runloop/handler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ local lrucache = require "resty.lrucache"
local marshall = require "kong.cache.marshall"


local tracing = require "kong.pdk.tracing"
local PluginsIterator = require "kong.runloop.plugins_iterator"
local instrumentation = require "kong.tracing.instrumentation"

Expand Down Expand Up @@ -44,6 +45,8 @@ local clear_header = ngx.req.clear_header
local http_version = ngx.req.http_version
local escape = require("kong.tools.uri").escape

local is_tracers_enabled_at = tracing.is_tracers_enabled_at
local terminate_tracers = tracing.terminate_tracers

local is_http_module = subsystem == "http"
local is_stream_module = subsystem == "stream"
Expand Down Expand Up @@ -1116,14 +1119,28 @@ return {
local match_t = router:exec(ctx)
if not match_t then
-- tracing

-- we do not trace 404 requests
if span then
span:set_status(2)
span.should_sample = false
span:finish()
terminate_tracers()
end

return kong.response.exit(404, { message = "no Route matched with those values" })
end

-- is tracing enabled?
local route_id = match_t.route and match_t.route.id
local service_id = match_t.service and match_t.service.id
if not is_tracers_enabled_at(route_id, service_id) then
if span then
span.should_sample = false
end

terminate_tracers()
end

-- ends tracing span
if span then
span:finish()
Expand Down
62 changes: 56 additions & 6 deletions spec/03-plugins/37-opentelemetry/04-exporter_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,22 @@ for _, strategy in helpers.each_strategy() do
port = helpers.mock_upstream_port,
})

bp.routes:insert({ service = http_srv,
protocols = { "http" },
paths = { "/" }})
local route = assert(bp.routes:insert({ service = http_srv,
protocols = { "http" },
paths = { "/" }}))

bp.plugins:insert({
assert(bp.routes:insert({ service = http_srv,
protocols = { "http" },
paths = { "/no_plugin" }}))

assert(bp.plugins:insert({
name = "opentelemetry",
route = route,
config = table_merge({
endpoint = "http://127.0.0.1:" .. HTTP_SERVER_PORT,
batch_flush_delay = 0, -- report immediately
}, config)
})
}))

assert(helpers.start_kong({
proxy_listen = "0.0.0.0:" .. PROXY_PORT,
Expand Down Expand Up @@ -140,6 +145,52 @@ for _, strategy in helpers.each_strategy() do
end)
end)

describe("#scoping", function ()
lazy_setup(function()
bp, _ = assert(helpers.get_db_utils(strategy, {
"services",
"routes",
"plugins",
}, { "opentelemetry" }))

setup_instrumentations("all", {
headers = {
["X-Access-Token"] = "token",
},
})
end)

lazy_teardown(function()
helpers.stop_kong()
helpers.kill_http_server(HTTP_SERVER_PORT)
end)

it("works", function ()
local thread = helpers.http_server(HTTP_SERVER_PORT, { timeout = 10 })
local cli = helpers.proxy_client(7000, PROXY_PORT)
local r = assert(cli:send {
method = "GET",
path = "/no_plugin",
})
assert.res_status(200, r)
r = assert(cli:send {
method = "GET",
path = "/no_exist",
})
assert.res_status(404, r)

-- close client connection
cli:close()

local ok, err = thread:join()

ngx.sleep(1000)
-- we should have no telemetry reported
assert.is_falsy(ok)
assert.same(err, "timeout")
end)
end)

describe("overwrite resource attributes #http", function ()
lazy_setup(function()
bp, _ = assert(helpers.get_db_utils(strategy, {
Expand Down Expand Up @@ -284,7 +335,6 @@ for _, strategy in helpers.each_strategy() do
local body = fd:read("*a")
pb_set = ngx_re.split(body, "\n")

print("pb set length: ", #pb_set)
local count = 0
for _, pb_data in ipairs(pb_set) do
local decoded = assert(pb.decode("opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest", ngx.decode_base64(pb_data)))
Expand Down

0 comments on commit 91f4f19

Please sign in to comment.