diff --git a/apisix/plugins/ai.lua b/apisix/plugins/ai.lua index eeb78ca80420..699b140b477f 100644 --- a/apisix/plugins/ai.lua +++ b/apisix/plugins/ai.lua @@ -19,10 +19,11 @@ local apisix = require("apisix") local core = require("apisix.core") local router = require("apisix.router") local event = require("apisix.core.event") -local load_balancer = require("apisix.balancer") local balancer = require("ngx.balancer") +local ngx = ngx local is_http = ngx.config.subsystem == "http" local enable_keepalive = balancer.enable_keepalive and is_http +local is_apisix_or, response = pcall(require, "resty.apisix.response") local ipairs = ipairs local pcall = pcall local loadstring = loadstring @@ -65,7 +66,7 @@ local _M = { local orig_router_match local orig_handle_upstream = apisix.handle_upstream -local orig_balancer_run = load_balancer.run +local orig_http_balancer_phase = apisix.http_balancer_phase local default_keepalive_pool = {} @@ -116,7 +117,21 @@ end local pool_opt -local function ai_balancer_run(route) +local function ai_http_balancer_phase() + local api_ctx = ngx.ctx.api_ctx + if not api_ctx then + core.log.error("invalid api_ctx") + return core.response.exit(500) + end + + if is_apisix_or then + local ok, err = response.skip_body_filter_by_lua() + if not ok then + core.log.error("failed to skip body filter by lua: ", err) + end + end + + local route = api_ctx.matched_route local server = route.value.upstream.nodes[1] if enable_keepalive then local ok, err = balancer.set_current_peer(server.host, server.port or 80, pool_opt) @@ -132,6 +147,7 @@ local function ai_balancer_run(route) end end + local function routes_analyze(routes) local route_flags = core.table.new(0, 16) local route_up_flags = core.table.new(0, 12) @@ -161,6 +177,8 @@ local function routes_analyze(routes) route_flags["service_id"] = true elseif key == "plugin_config_id" then route_flags["plugin_config_id"] = true + elseif key == "script" then + route_flags["script"] = true end -- collect upstream flags @@ -198,10 +216,14 @@ local function routes_analyze(routes) end end + local global_rules_flag = router.global_rules and router.global_rules.values + and #router.global_rules.values ~= 0 + if route_flags["vars"] or route_flags["filter_fun"] or route_flags["remote_addr"] or route_flags["service_id"] - or route_flags["plugin_config_id"] then + or route_flags["plugin_config_id"] + or global_rules_flag then router.router_http.match = orig_router_match else core.log.info("use ai plane to match route") @@ -215,10 +237,12 @@ local function routes_analyze(routes) end if route_flags["service"] + or route_flags["script"] or route_flags["service_id"] or route_flags["upstream_id"] or route_flags["enable_websocket"] or route_flags["plugins"] + or route_flags["plugin_config_id"] or route_up_flags["has_domain"] or route_up_flags["pass_host"] or route_up_flags["scheme"] @@ -228,13 +252,14 @@ local function routes_analyze(routes) or route_up_flags["tls"] or route_up_flags["keepalive"] or route_up_flags["service_name"] - or route_up_flags["more_nodes"] then + or route_up_flags["more_nodes"] + or global_rules_flag then apisix.handle_upstream = orig_handle_upstream - load_balancer.run = orig_balancer_run + apisix.http_balancer_phase = orig_http_balancer_phase else - -- replace the upstream module + -- replace the upstream and balancer module apisix.handle_upstream = ai_upstream - load_balancer.run = ai_balancer_run + apisix.http_balancer_phase = ai_http_balancer_phase end end diff --git a/t/debug/dynamic-hook.t b/t/debug/dynamic-hook.t index 87d4450d569c..9832d5dc74a8 100644 --- a/t/debug/dynamic-hook.t +++ b/t/debug/dynamic-hook.t @@ -30,6 +30,11 @@ run_tests(); __DATA__ === TEST 1: dynamic enable +# ai module would conflict with the debug module +--- extra_yaml_config +plugins: + #- ai + - example-plugin --- debug_config eval: $::debug_config --- config location /t { @@ -86,6 +91,11 @@ call require("apisix").http_access_phase() args:{} === TEST 2: dynamic enable by per request and disable after handle request +# ai module would conflict with the debug module +--- extra_yaml_config +plugins: + #- ai + - example-plugin --- debug_config eval: $::debug_config --- config location /t { diff --git a/t/plugin/ai2.t b/t/plugin/ai2.t new file mode 100644 index 000000000000..2a54ae97f08a --- /dev/null +++ b/t/plugin/ai2.t @@ -0,0 +1,377 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX; + +my $nginx_binary = $ENV{'TEST_NGINX_BINARY'} || 'nginx'; +my $version = eval { `$nginx_binary -V 2>&1` }; + +if ($version !~ m/\/apisix-nginx-module/) { + plan(skip_all => "apisix-nginx-module not installed"); +} else { + plan('no_plan'); +} + +repeat_each(1); +log_level('info'); +worker_connections(256); +no_root_location(); +no_shuffle(); + +add_block_preprocessor(sub { + my ($block) = @_; + + if ((!defined $block->error_log) && (!defined $block->no_error_log)) { + $block->set_value("no_error_log", "[error]"); + } + + if (!defined $block->request) { + $block->set_value("request", "GET /t"); + } + + if (!defined $block->extra_init_by_lua) { + my $extra_init_by_lua = <<_EOC_; + local apisix = require("apisix") + apisix.http_header_filter_phase = function () + ngx.header.content_length = 14 + end + + apisix.http_body_filter_phase = function () + ngx.arg[1] = "do body filter" + end +_EOC_ + + $block->set_value("extra_init_by_lua", $extra_init_by_lua); + } +}); + +run_tests(); + +__DATA__ + +=== TEST 1: enable skip body filter +--- extra_init_by_lua + local apisix = require("apisix") + apisix.http_header_filter_phase = function () + ngx.header.content_length = nil + end + + apisix.http_body_filter_phase = function () + ngx.arg[1] = "do body filter" + end +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "methods": ["GET"], + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + ngx.say(body) + return + end + ngx.sleep(0.5) + local http = require "resty.http" + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local httpc = http.new() + local res, err = httpc:request_uri(uri) + assert(res.status == 200) + if not res then + ngx.log(ngx.ERR, err) + return + end + ngx.print(res.body) + } + } +--- response_body +hello world + + + +=== TEST 2: route with plugin_config_id, disable skip body filter +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/plugin_configs/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "serverless-pre-function": { + "phase": "before_proxy", + "functions" : ["return function(conf, ctx) ngx.log(ngx.WARN, \"run before_proxy phase balancer_ip : \", ctx.balancer_ip) end"] + } + } + }]] + ) + if code >= 300 then + ngx.status = code + ngx.say(body) + return + end + ngx.sleep(0.5) + + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugin_config_id": "1", + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + ngx.say(body) + return + end + ngx.sleep(0.5) + + local http = require "resty.http" + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local httpc = http.new() + local res, err = httpc:request_uri(uri) + assert(res.status == 200) + if not res then + ngx.log(ngx.ERR, err) + return + end + ngx.say(res.body) + } + } +--- response_body +do body filter +--- error_log +run before_proxy phase balancer_ip : 127.0.0.1 +--- no_error_log +enable sample upstream + + + +=== TEST 3: route with plugins, disable skip body filter +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/consumers', + ngx.HTTP_PUT, + [[{ + "username": "new_consumer", + "plugins": { + "key-auth": { + "key": "auth-jack" + }, + "serverless-pre-function": { + "phase": "before_proxy", + "functions" : ["return function(conf, ctx) ngx.log(ngx.WARN, \"run before_proxy phase balancer_ip : \", ctx.balancer_ip) end"] + } + } + }]] + ) + ngx.sleep(0.5) + + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "key-auth": {} + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + ngx.say(body) + return + end + ngx.sleep(0.5) + + local http = require "resty.http" + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local httpc = http.new() + local headers = { + ["apikey"] = "auth-jack" + } + local res, err = httpc:request_uri(uri, {headers = headers}) + assert(res.status == 200) + if not res then + ngx.log(ngx.ERR, err) + return + end + ngx.say(res.body) + } + } +--- response_body +do body filter +--- error_log +run before_proxy phase balancer_ip : 127.0.0.1 +--- no_error_log +enable sample upstream + + + +=== TEST 4: one of route has plugins, disable skip body filter +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "serverless-pre-function": { + "phase": "before_proxy", + "functions" : ["return function(conf, ctx) ngx.log(ngx.WARN, \"run before_proxy phase balancer_ip : \", ctx.balancer_ip) end"] + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + ngx.say(body) + return + end + ngx.sleep(0.5) + + local code, body = t('/apisix/admin/routes/2', + ngx.HTTP_PUT, + [[{ + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello1" + }]] + ) + if code >= 300 then + ngx.status = code + ngx.say(body) + return + end + ngx.sleep(0.5) + + local http = require "resty.http" + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello1" + local httpc = http.new() + local headers = { + ["apikey"] = "auth-jack" + } + local res, err = httpc:request_uri(uri, {headers = headers}) + assert(res.status == 200) + if not res then + ngx.log(ngx.ERR, err) + return + end + ngx.say(res.body) + } + } +--- response_body +do body filter +--- no_error_log +enable sample upstream + + + +=== TEST 5: exist global_rules, disable skip body filter +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/global_rules/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "serverless-pre-function": { + "phase": "before_proxy", + "functions" : ["return function(conf, ctx) ngx.log(ngx.WARN, \"run before_proxy phase balancer_ip : \", ctx.balancer_ip) end"] + } + } + }]] + ) + if code >= 300 then + ngx.status = code + ngx.say(body) + return + end + ngx.sleep(0.5) + + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "methods": ["GET"], + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + ngx.say(body) + return + end + ngx.sleep(0.5) + local http = require "resty.http" + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local httpc = http.new() + local res, err = httpc:request_uri(uri) + assert(res.status == 200) + if not res then + ngx.log(ngx.ERR, err) + return + end + ngx.say(res.body) + } + } +--- response_body +do body filter +--- error_log +run before_proxy phase balancer_ip : 127.0.0.1 +--- no_error_log +enable sample upstream