From 5e73cab74eb3b8946b1fc6fe4a8e65abc50d87e9 Mon Sep 17 00:00:00 2001 From: suika Date: Wed, 11 Jan 2023 18:30:21 +0800 Subject: [PATCH] feat(tracing): route/service scoping implement KAG-244 --- CHANGELOG.md | 2 + kong/pdk/tracing.lua | 118 +++++++++++++++++- kong/plugins/opentelemetry/handler.lua | 8 +- kong/plugins/opentelemetry/schema.lua | 4 +- kong/runloop/handler.lua | 19 ++- .../37-opentelemetry/04-exporter_spec.lua | 63 ++++++++-- 6 files changed, 201 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ba0c5477e0f..d2a6a528df5a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -96,6 +96,8 @@ parameter that allows the selection of the IMDS protocol version. Defaults to `v1`, can be set to `v2` to enable IMDSv2. [#9962](https://github.com/Kong/kong/pull/9962) +- **OpenTelemetry**: Support Service and Router scoping. + [#10096](https://github.com/Kong/kong/pull/10096) ### Fixes diff --git a/kong/pdk/tracing.lua b/kong/pdk/tracing.lua index 2bf97877526c..acbfba7284d9 100644 --- a/kong/pdk/tracing.lua +++ b/kong/pdk/tracing.lua @@ -425,7 +425,7 @@ local function new_tracer(name, options) -- @tparam table options TODO(mayo) -- @treturn table span function self.start_span(...) - if not base.get_request() then + if (not base.get_request()) or ngx.ctx.stoping_tracing then return noop_span end @@ -488,8 +488,124 @@ setmetatable(noop_tracer, { __newindex = NOOP, }) +local function terminate_tracers() + -- stop further tracing + ngx.ctx.stoping_tracing = true + + local spans = ngx.ctx.KONG_SPANS + if type(spans) ~= "table" then + return + end + + -- stop existing spans + for _, span in ipairs(spans) do + if type(span) == "table" then + span.should_sample = false + end + end +end + +local register_tracer_plugin, is_tracers_enabled_at +do + local GLOBAL_QUERY_OPTS = { workspace = ngx.null, show_ws_id = true } + + local tracers = {} + local tracers_n = 0 + + + local function get_cache_key(name) + return "tracers:is_enabled_at:" .. name + end + + -- we cannot decide consumer before access phase, so tracer plugins must + -- be defined no_consumer + local function get_tracer_enabled_path(name) + local enabled_tbl = {} + for plugin, err in kong.db.plugins:each(nil, GLOBAL_QUERY_OPTS) do + if err then + kong.log.crit("could not obtain list of plugins: ", err) + goto continue + end + + if plugin.name == name and plugin.enabled then + if plugin.route and not plugin.service then + enabled_tbl[plugin.route.id] = true + end + + if plugin.service and not plugin.route then + enabled_tbl[plugin.service.id] = true + end + + if plugin.route and plugin.service then + enabled_tbl[plugin.route.id .. ":" .. plugin.service.id] = true + end + + if not plugin.route and not plugin.servicer then + enabled_tbl.global = true + end + end + ::continue:: + end + + return enabled_tbl + end + + local function is_tracer_enabled_at(name, route_id, service_id) + local enabled_tbl, err = kong.cache:get(get_cache_key(name), nil, get_tracer_enabled_path, name) + + if not enabled_tbl then + return false, err + end + + -- any of them enabled, we should do the tracing + return enabled_tbl.global or enabled_tbl[route_id] or enabled_tbl[service_id] or enabled_tbl[route_id .. ":" .. service_id] + end + + local function invalidate_cache_event(name) + local worker_events = kong.worker_events + local cache_key = get_cache_key(name) + -- invalidations + if kong.configuration.database == "off" then + worker_events.register(function() + kong.cache:invalidate(cache_key) + end, "declarative", "reconfigure") + + else + worker_events.register(function(data) + if data.entity.name == name then + kong.cache:invalidate(cache_key) + end + end, "crud", "plugins") + end + end + + function register_tracer_plugin(name) + invalidate_cache_event(name) + tracers_n = tracers_n + 1 + tracers[tracers_n] = name + end + + function is_tracers_enabled_at(router_id, service_id) + for i = 1, tracers_n do + local enabled, err = is_tracer_enabled_at(tracers[i], router_id, service_id) + + if err then + kong.log.debug("failed to check if tracer is enabled: ", err) + end + + if enabled then + return true + end + end + return false + end +end + return { new = function() return global_tracer end, + register_tracer_plugin = register_tracer_plugin, + is_tracers_enabled_at = is_tracers_enabled_at, + terminate_tracers = terminate_tracers, } diff --git a/kong/plugins/opentelemetry/handler.lua b/kong/plugins/opentelemetry/handler.lua index edf533d6585d..5c8b1cefdcf9 100644 --- a/kong/plugins/opentelemetry/handler.lua +++ b/kong/plugins/opentelemetry/handler.lua @@ -3,6 +3,7 @@ local http = require "resty.http" local clone = require "table.clone" local otlp = require "kong.plugins.opentelemetry.otlp" local propagation = require "kong.tracing.propagation" +local register_tracer_plugin = require("kong.pdk.tracing").register_tracer_plugin local pairs = pairs @@ -113,7 +114,12 @@ local function process_span(span, queue) queue:add(pb_span) end -function OpenTelemetryHandler:rewrite() +function OpenTelemetryHandler:init() + register_tracer_plugin("opentelemetry") +end + +-- TODO: unify handling of propagation with other tracers like zipkin +function OpenTelemetryHandler:access() local headers = ngx_get_headers() local root_span = ngx.ctx.KONG_SPANS and ngx.ctx.KONG_SPANS[1] diff --git a/kong/plugins/opentelemetry/schema.lua b/kong/plugins/opentelemetry/schema.lua index af1ce2d9eaca..58af23690ef2 100644 --- a/kong/plugins/opentelemetry/schema.lua +++ b/kong/plugins/opentelemetry/schema.lua @@ -30,10 +30,8 @@ local resource_attributes = Schema.define { return { name = "opentelemetry", fields = { - -- global plugin only + -- tracer pluign does not scope on consumers { consumer = typedefs.no_consumer }, - { service = typedefs.no_service }, - { route = typedefs.no_route }, { protocols = typedefs.protocols_http }, -- TODO: support stream mode { config = { type = "record", diff --git a/kong/runloop/handler.lua b/kong/runloop/handler.lua index a0a61780211f..2f3566faf48b 100644 --- a/kong/runloop/handler.lua +++ b/kong/runloop/handler.lua @@ -13,6 +13,7 @@ local lrucache = require "resty.lrucache" local marshall = require "kong.cache.marshall" +local tracing = require "kong.pdk.tracing" local PluginsIterator = require "kong.runloop.plugins_iterator" local instrumentation = require "kong.tracing.instrumentation" @@ -44,6 +45,8 @@ local clear_header = ngx.req.clear_header local http_version = ngx.req.http_version local escape = require("kong.tools.uri").escape +local is_tracers_enabled_at = tracing.is_tracers_enabled_at +local terminate_tracers = tracing.terminate_tracers local is_http_module = subsystem == "http" local is_stream_module = subsystem == "stream" @@ -1116,14 +1119,28 @@ return { local match_t = router:exec(ctx) if not match_t then -- tracing + + -- we do not trace 404 requests if span then - span:set_status(2) + span.should_sample = false span:finish() + terminate_tracers() end return kong.response.exit(404, { message = "no Route matched with those values" }) end + -- is tracing enabled? + local route_id = match_t.route and match_t.route.id + local service_id = match_t.service and match_t.service.id + if not is_tracers_enabled_at(route_id, service_id) then + if span then + span.should_sample = false + end + + terminate_tracers() + end + -- ends tracing span if span then span:finish() diff --git a/spec/03-plugins/37-opentelemetry/04-exporter_spec.lua b/spec/03-plugins/37-opentelemetry/04-exporter_spec.lua index 45d334b6f6e1..ca0059e1c7f9 100644 --- a/spec/03-plugins/37-opentelemetry/04-exporter_spec.lua +++ b/spec/03-plugins/37-opentelemetry/04-exporter_spec.lua @@ -45,24 +45,29 @@ for _, strategy in helpers.each_strategy() do end) -- helpers - local function setup_instrumentations(types, config, fixtures) + local function setup_instrumentations(types, config, fixtures, router_scoped) local http_srv = assert(bp.services:insert { name = "mock-service", host = helpers.mock_upstream_host, port = helpers.mock_upstream_port, }) - bp.routes:insert({ service = http_srv, - protocols = { "http" }, - paths = { "/" }}) + local route = assert(bp.routes:insert({ service = http_srv, + protocols = { "http" }, + paths = { "/" }})) - bp.plugins:insert({ + assert(bp.routes:insert({ service = http_srv, + protocols = { "http" }, + paths = { "/no_plugin" }})) + + assert(bp.plugins:insert({ name = "opentelemetry", + route = router_scoped and route, config = table_merge({ endpoint = "http://127.0.0.1:" .. HTTP_SERVER_PORT, batch_flush_delay = 0, -- report immediately }, config) - }) + })) assert(helpers.start_kong({ proxy_listen = "0.0.0.0:" .. PROXY_PORT, @@ -140,6 +145,51 @@ for _, strategy in helpers.each_strategy() do end) end) + describe("#scoping", function () + lazy_setup(function() + bp, _ = assert(helpers.get_db_utils(strategy, { + "services", + "routes", + "plugins", + }, { "opentelemetry" })) + + setup_instrumentations("all", { + headers = { + ["X-Access-Token"] = "token", + }, + }, nil, true) + end) + + lazy_teardown(function() + helpers.stop_kong() + helpers.kill_http_server(HTTP_SERVER_PORT) + end) + + it("works", function () + local thread = helpers.http_server(HTTP_SERVER_PORT, { timeout = 10 }) + local cli = helpers.proxy_client(7000, PROXY_PORT) + local r = assert(cli:send { + method = "GET", + path = "/no_plugin", + }) + assert.res_status(200, r) + r = assert(cli:send { + method = "GET", + path = "/no_exist", + }) + assert.res_status(404, r) + + -- close client connection + cli:close() + + local ok, err = thread:join() + + -- we should have no telemetry reported + assert.is_falsy(ok) + assert.same(err, "timeout") + end) + end) + describe("overwrite resource attributes #http", function () lazy_setup(function() bp, _ = assert(helpers.get_db_utils(strategy, { @@ -284,7 +334,6 @@ for _, strategy in helpers.each_strategy() do local body = fd:read("*a") pb_set = ngx_re.split(body, "\n") - print("pb set length: ", #pb_set) local count = 0 for _, pb_data in ipairs(pb_set) do local decoded = assert(pb.decode("opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest", ngx.decode_base64(pb_data)))