diff --git a/apisix/core/ctx.lua b/apisix/core/ctx.lua index df6e430ce396..5bf3daa5743e 100644 --- a/apisix/core/ctx.lua +++ b/apisix/core/ctx.lua @@ -278,11 +278,12 @@ do else local getter = apisix_var_names[key] if getter then + local ctx = t._ctx if getter == true then - val = ngx.ctx.api_ctx and ngx.ctx.api_ctx[key] + val = ctx and ctx[key] else -- the getter is registered by ctx.register_var - val = getter(ngx.ctx.api_ctx) + val = getter(ctx) end else @@ -341,6 +342,7 @@ function _M.set_vars_meta(ctx) end var._request = get_request() + var._ctx = ctx setmetatable(var, mt) ctx.var = var end diff --git a/apisix/stream/xrpc/runner.lua b/apisix/stream/xrpc/runner.lua index 51dbd271e338..5f1b97d1a213 100644 --- a/apisix/stream/xrpc/runner.lua +++ b/apisix/stream/xrpc/runner.lua @@ -27,6 +27,11 @@ local pcall = pcall local ipairs = ipairs local tostring = tostring + +core.ctx.register_var("rpc_time", function(ctx) + return ctx._rpc_end_time - ctx._rpc_start_time +end) + local logger_expr_cache = core.lrucache.new({ ttl = 300, count = 1024 }) @@ -72,7 +77,7 @@ local function filter_logger(ctx, logger) core.log.error("failed to validate the 'filter' expression: ", err) return false end - return filter_expr:eval(ctx) + return filter_expr:eval(ctx.var) end @@ -93,7 +98,6 @@ end local function finialize_req(protocol, session, ctx) ctx._rpc_end_time = ngx_now() - local loggers = session.route.protocol.logger if loggers and #loggers > 0 then for _, logger in ipairs(loggers) do diff --git a/docs/en/latest/apisix-variable.md b/docs/en/latest/apisix-variable.md index d6254b82048b..c9281731bee6 100644 --- a/docs/en/latest/apisix-variable.md +++ b/docs/en/latest/apisix-variable.md @@ -39,5 +39,6 @@ List in alphabetical order: | route_name | core | name of `route` | | | service_id | core | id of `service` | | | service_name | core | name of `service` | | +| rpc_time | xRPC | time spent at the rpc request level | | You can also [register your own variable](./plugin-develop.md#register-custom-variable). diff --git a/docs/en/latest/xrpc.md b/docs/en/latest/xrpc.md index 1fb2a515933a..9ad486e83063 100644 --- a/docs/en/latest/xrpc.md +++ b/docs/en/latest/xrpc.md @@ -131,6 +131,47 @@ One specifies the `superior_id`, whose corresponding value is the ID of another For example, for the Dubbo RPC protocol, the subordinate route is matched based on the service_name and other parameters configured in the route and the actual service_name brought in the request. If the match is successful, the configuration above the subordinate route is used, otherwise the configuration of the superior route is still used. In the above example, if the match for route 2 is successful, it will be forwarded to upstream 2; otherwise, it will still be forwarded to upstream 1. +### Log Reporting + +xRPC supports logging-related functions. You can use this feature to filter requests that require attention, such as high latency, excessive transfer content, etc. + +Each logger item configuration parameter will contain + +- name: the Logger plugin name, +- filter: the prerequisites for the execution of the logger plugin(e.g., request processing time exceeding a given value), +- conf: the configuration of the logger plugin itself. + + The following configuration is an example: + +```json +{ + ... + "protocol": { + "name": "redis", + "logger": { + { + "name": "syslog", + "filter": [ + ["rpc_time", ">=", 0.01] + ], + "conf": { + "host": "127.0.0.1", + "port": 8125, + } + } + } + } +} +``` + +This configuration means that when the `rpc_time` is greater than 0.01 seconds, xPRC reports the request log to the log server via the `syslog` plugin. `conf` is the configuration of the logging server required by the `syslog` plugin. + +Unlike standard TCP proxies, which only execute a logger when the connection is closed, xRPC's executed logger at the end of each 'request'. + +The protocol itself defines the granularity of the specific request, and the xRPC extension code implements the request's granularity. + +For example, in the Redis protocol, the execution of a command is considered a request. + ## How to write your own protocol Assuming that your protocol is named `my_proto`, you need to create a directory that can be introduced by `require "apisix.stream.xrpc.protocols.my_proto"`. diff --git a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua index 013725832480..3ea0c7eaf3a1 100644 --- a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua +++ b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua @@ -29,6 +29,10 @@ local DONE = ngx.DONE local str_byte = string.byte +core.ctx.register_var("rpc_len", function(ctx) + return ctx.len +end) + local _M = {} local router_version local router diff --git a/t/xrpc/pingpong2.t b/t/xrpc/pingpong2.t index 93c57bd27576..cdcf367c2f09 100644 --- a/t/xrpc/pingpong2.t +++ b/t/xrpc/pingpong2.t @@ -220,7 +220,7 @@ failed to validate the 'filter' expression: rule too short { name = "syslog", filter = { - {"len", ">", 10} + {"rpc_len", ">", 10} }, conf = {} } @@ -272,7 +272,7 @@ log filter: syslog filter result: true { name = "syslog", filter = { - {"len", "<", 10} + {"rpc_len", "<", 10} }, conf = {} } @@ -324,8 +324,8 @@ log filter: syslog filter result: false { name = "syslog", filter = { - {"len", ">", 12}, - {"len", "<", 14} + {"rpc_len", ">", 12}, + {"rpc_len", "<", 14} }, conf = {} } @@ -377,8 +377,8 @@ log filter: syslog filter result: true { name = "syslog", filter = { - {"len", "<", 10}, - {"len", ">", 12} + {"rpc_len", "<", 10}, + {"rpc_len", ">", 12} }, conf = {} } @@ -516,7 +516,7 @@ qr/message received:.*\"client_ip\\"\:\\"127.0.0.1\\"/ { name = "syslog", filter = { - {"len", ">", 10} + {"rpc_len", ">", 10} }, conf = { host = "127.0.0.1", @@ -576,7 +576,7 @@ qr/message received:.*\"client_ip\\"\:\\"127.0.0.1\\"/ { name = "syslog", filter = { - {"len", ">", 10} + {"rpc_len", ">", 10} }, conf = { host = "127.0.0.1", @@ -650,7 +650,7 @@ unlock with key xrpc-pingpong-logger#table { name = "syslog", filter = { - {"len", ">", 10} + {"rpc_len", ">", 10} }, conf = { host = "127.0.0.1", @@ -698,7 +698,7 @@ unlock with key xrpc-pingpong-logger#table { name = "syslog", filter = { - {"len", ">", 10} + {"rpc_len", ">", 10} }, conf = { host = "127.0.0.1", diff --git a/t/xrpc/pingpong3.t b/t/xrpc/pingpong3.t new file mode 100644 index 000000000000..c6d98810d656 --- /dev/null +++ b/t/xrpc/pingpong3.t @@ -0,0 +1,193 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX; + +my $nginx_binary = $ENV{'TEST_NGINX_BINARY'} || 'nginx'; +my $version = eval { `$nginx_binary -V 2>&1` }; + +if ($version !~ m/\/apisix-nginx-module/) { + plan(skip_all => "apisix-nginx-module not installed"); +} else { + plan('no_plan'); +} + +add_block_preprocessor(sub { + my ($block) = @_; + + if (!$block->extra_yaml_config) { + my $extra_yaml_config = <<_EOC_; +xrpc: + protocols: + - name: pingpong +_EOC_ + $block->set_value("extra_yaml_config", $extra_yaml_config); + } + + my $config = $block->config // <<_EOC_; + location /t { + content_by_lua_block { + ngx.req.read_body() + local sock = ngx.socket.tcp() + sock:settimeout(1000) + local ok, err = sock:connect("127.0.0.1", 1985) + if not ok then + ngx.log(ngx.ERR, "failed to connect: ", err) + return ngx.exit(503) + end + + local bytes, err = sock:send(ngx.req.get_body_data()) + if not bytes then + ngx.log(ngx.ERR, "send stream request error: ", err) + return ngx.exit(503) + end + while true do + local data, err = sock:receiveany(4096) + if not data then + sock:close() + break + end + ngx.print(data) + end + } + } +_EOC_ + + $block->set_value("config", $config); + + my $stream_upstream_code = $block->stream_upstream_code // <<_EOC_; + local sock = ngx.req.socket(true) + sock:settimeout(10) + while true do + local data = sock:receiveany(4096) + if not data then + return + end + sock:send(data) + end +_EOC_ + + $block->set_value("stream_upstream_code", $stream_upstream_code); + + if ((!defined $block->error_log) && (!defined $block->no_error_log)) { + $block->set_value("no_error_log", "[error]\nRPC is not finished"); + } + + if (!defined $block->extra_stream_config) { + my $stream_config = <<_EOC_; + server { + listen 8125 udp; + content_by_lua_block { + require("lib.mock_layer4").dogstatsd() + } + } +_EOC_ + $block->set_value("extra_stream_config", $stream_config); + } + + $block; +}); + +run_tests; + +__DATA__ + +=== TEST 1: set custom log format +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/plugin_metadata/syslog', + ngx.HTTP_PUT, + [[{ + "log_format": { + "rpc_time": "$rpc_time" + } + }]] + ) + + if code >= 300 then + ngx.status = code + ngx.say(body) + return + end + + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed + + + +=== TEST 2: use vae rpc_time +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + { + protocol = { + name = "pingpong", + logger = { + { + name = "syslog", + filter = { + {"rpc_time", ">=", 0} + }, + conf = { + host = "127.0.0.1", + port = 8125, + sock_type = "udp", + batch_max_size = 1, + flush_limit = 1 + } + } + } + }, + upstream = { + nodes = { + ["127.0.0.1:1995"] = 1 + }, + type = "roundrobin" + } + } + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed + + + +=== TEST 3: verify the data received by the log server +--- request eval +"POST /t +" . +"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" +--- stream_conf_enable +--- wait: 0.5 +--- error_log eval +qr/message received:.*\"rpc_time\\"\:(0.\d+|0)\}\"/