Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(http/kafka-logger): support to log response body #5550

Merged
merged 11 commits into from
Nov 26, 2021
23 changes: 16 additions & 7 deletions apisix/plugins/http-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ local schema = {
batch_max_size = {type = "integer", minimum = 1, default = 1000},
include_req_body = {type = "boolean", default = false},
include_resp_body = {type = "boolean", default = false},
spacewander marked this conversation as resolved.
Show resolved Hide resolved
include_resp_body_expr = {
type = "array",
minItems = 1,
items = {
type = "array",
items = {
type = "string"
}
}
},
concat_method = {type = "string", default = "json",
enum = {"json", "new_line"}}
},
Expand All @@ -70,6 +80,11 @@ local _M = {


function _M.check_schema(conf, schema_type)
local ok, err = log_util.check_log_scheme(conf)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not check log schema with the TYPE_METADATA schema_type. And it is schema, not scheme.

if not ok then
return err
end

if schema_type == core.schema.TYPE_METADATA then
return core.schema.check(metadata_schema, conf)
end
Expand Down Expand Up @@ -164,13 +179,7 @@ end


function _M.body_filter(conf, ctx)
if conf.include_resp_body then
local final_body = core.response.hold_body_chunk(ctx, true)
if not final_body then
return
end
ctx.resp_body = final_body
end
log_util.collect_body(conf, ctx)
end


Expand Down
31 changes: 15 additions & 16 deletions apisix/plugins/kafka-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ local log_util = require("apisix.utils.log-util")
local producer = require ("resty.kafka.producer")
local batch_processor = require("apisix.utils.batch-processor")
local plugin = require("apisix.plugin")
local expr = require("resty.expr.v1")

local math = math
local pairs = pairs
Expand Down Expand Up @@ -75,7 +74,6 @@ local schema = {
inactive_timeout = {type = "integer", minimum = 1, default = 5},
batch_max_size = {type = "integer", minimum = 1, default = 1000},
include_req_body = {type = "boolean", default = false},
include_resp_body = {type = "boolean", default = false},
include_req_body_expr = {
type = "array",
minItems = 1,
Expand All @@ -86,6 +84,17 @@ local schema = {
}
}
},
include_resp_body = {type = "boolean", default = false},
include_resp_body_expr = {
type = "array",
minItems = 1,
items = {
type = "array",
items = {
type = "string"
}
}
},
-- in lua-resty-kafka, cluster_name is defined as number
-- see https://github.com/doujiang24/lua-resty-kafka#new-1
cluster_name = {type = "integer", minimum = 1, default = 1},
Expand All @@ -110,13 +119,9 @@ local _M = {


function _M.check_schema(conf, schema_type)

if conf.include_req_body_expr then
local ok, err = expr.new(conf.include_req_body_expr)
if not ok then
return nil,
{error_msg = "failed to validate the 'include_req_body_expr' expression: " .. err}
end
local ok, err = log_util.check_log_scheme(conf)
if not ok then
return err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return err
return nil, err

end

if schema_type == core.schema.TYPE_METADATA then
Expand Down Expand Up @@ -193,13 +198,7 @@ end


function _M.body_filter(conf, ctx)
if conf.include_resp_body then
local final_body = core.response.hold_body_chunk(ctx, true)
if not final_body then
return
end
ctx.resp_body = final_body
end
log_util.collect_body(conf, ctx)
end


Expand Down
60 changes: 54 additions & 6 deletions apisix/utils/log-util.lua
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ local function get_full_log(ngx, conf)
if not conf.request_expr then
local request_expr, err = expr.new(conf.include_req_body_expr)
if not request_expr then
core.log.error('generate log expr err ' .. err)
core.log.error('generate request expr err ' .. err)
return log
end
conf.request_expr = request_expr
Expand All @@ -154,11 +154,8 @@ local function get_full_log(ngx, conf)
end
end

if conf.include_resp_body then
local body = ctx.resp_body
if body then
log.response.body = body
end
if not ctx.resp_body then
dmsolr marked this conversation as resolved.
Show resolved Hide resolved
log.response.body = ctx.resp_body
end

return log
Expand Down Expand Up @@ -203,4 +200,55 @@ function _M.latency_details_in_ms(ctx)
return latency, upstream_latency, apisix_latency
end


function _M.check_log_scheme(conf)
if conf.include_req_body_expr then
local ok, err = expr.new(conf.include_req_body_expr)
if not ok then
return nil,
{error_msg = "failed to validate the 'include_req_body_expr' expression: " .. err}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should return err.
And please add a test to cover this.

end
end
if conf.include_resp_body_expr then
local ok, err = expr.new(conf.include_resp_body_expr)
if not ok then
return nil,
{error_msg = "failed to validate the 'include_resp_body_expr' expression: " .. err}
end
end
return true, nil
end


function _M.collect_body(conf, ctx)
if conf.include_resp_body then
local log_response_body = true

if not conf.include_resp_body_expr then
dmsolr marked this conversation as resolved.
Show resolved Hide resolved
if not conf.response_expr then
local response_expr, err = expr.new(conf.include_resp_body_expr)
if not response_expr then
core.log.error('generate response expr err ' .. err)
return
end
conf.response_expr = response_expr
end

local result = conf.response_expr:eval(ctx.var)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to cache eval result to the ctx

if not result then
log_response_body = false
end
end

if log_response_body then
local final_body = core.response.hold_body_chunk(ctx, true)
if not final_body then
return
end
ctx.resp_body = final_body
end
end
end


return _M
4 changes: 2 additions & 2 deletions docs/en/latest/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ For more info on Batch-Processor in Apache APISIX please refer.
| buffer_duration | integer | optional | 60 | [1,...] | Maximum age in seconds of the oldest entry in a batch before the batch must be processed.|
| max_retry_count | integer | optional | 0 | [0,...] | Maximum number of retries before removing from the processing pipe line. |
| retry_delay | integer | optional | 1 | [0,...] | Number of seconds the process execution should be delayed if the execution fails. |
| include_req_body | boolean | optional | false | [false, true] | Whether to include the request body. false: indicates that the requested body is not included; true: indicates that the requested body is included. |
| include_req_body | boolean | optional | false | [false, true] | Whether to include the request body. false: indicates that the requested body is not included; true: indicates that the requested body is included. Note: if the request body is too big to be kept in the memory, it can't be logged due to Nginx's limitation. |
| include_req_body_expr | array | optional | | | When `include_req_body` is true, control the behavior based on the result of the [lua-resty-expr](https://github.com/api7/lua-resty-expr) expression. If present, only log the request body when the result is true. |
| include_resp_body| boolean | optional | false | [false, true] | Whether to include the response body. The response body is included if and only if it is `true`. |
| include_req_body_expr | array | optional | | | Whether to logging request body, based on [lua-resty-expr](https://github.com/api7/lua-resty-expr), this option require to turn on `include_req_body` option. |
| cluster_name | integer | optional | 1 | [0,...] | the name of the cluster. When there are two or more kafka clusters, you can specify different names. And this only works with async producer_type.|

### examples of meta_format
Expand Down
2 changes: 1 addition & 1 deletion docs/zh/latest/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ title: kafka-logger
| max_retry_count | integer | 可选 | 0 | [0,...] | 从处理管道中移除之前的最大重试次数。 |
| retry_delay | integer | 可选 | 1 | [0,...] | 如果执行失败,则应延迟执行流程的秒数。 |
| include_req_body | boolean | 可选 | false | [false, true] | 是否包括请求 body。false: 表示不包含请求的 body ; true: 表示包含请求的 body 。|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

注意:如果请求 body 没办法完全放在内存中,由于 Nginx 的限制,我们没有办法把它记录下来。 This part is overridden.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry.
I think there is the same limitation in the http-logger plugin. Right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

| include_resp_body| boolean | 可选 | false | [false, true] | 是否包括响应体。包含响应体,当为`true`。 |
| include_req_body_expr | array | 可选 | | | 是否采集请求body, 基于[lua-resty-expr](https://github.com/api7/lua-resty-expr)。 该选项需要开启 `include_req_body`|
| include_resp_body| boolean | 可选 | false | [false, true] | 是否包括响应体。包含响应体,当为`true`。 |
| cluster_name | integer | 可选 | 1 | [0,...] | kafka 集群的名称。当有两个或多个 kafka 集群时,可以指定不同的名称。只适用于 producer_type 是 async 模式。|

### meta_format 参考示例
Expand Down
58 changes: 56 additions & 2 deletions t/plugin/http-logger-json.t
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,61 @@ POST /hello



=== TEST 3: json body with request_body and response_body
=== TEST 3: json body with response_body and response_body expression
--- apisix_yaml
routes:
-
uri: /hello
upstream:
nodes:
"127.0.0.1:1980": 1
type: roundrobin
plugins:
http-logger:
batch_max_size: 1
uri: http://127.0.0.1:1980/log
include_resp_body: true
include_resp_body_expr:
- - arg_bar
- ==
- foo
#END
--- request
POST /hello?bar=foo
{"sample_payload":"hello"}
--- error_log
"response":{"body":"hello world\n"



=== TEST 4: json body with response_body, expr not hit
--- apisix_yaml
routes:
-
uri: /hello
upstream:
nodes:
"127.0.0.1:1980": 1
type: roundrobin
plugins:
http-logger:
batch_max_size: 1
uri: http://127.0.0.1:1980/log
include_resp_body: true
include_resp_body_expr:
- - arg_bar
- ==
- foo
#END
--- request
POST /hello?bar=bar
{"sample_payload":"hello"}
--- no_error_log
"response":{"body":"hello world\n"



=== TEST 5: json body with request_body and response_body
--- apisix_yaml
routes:
-
Expand All @@ -112,7 +166,7 @@ qr/(.*"response":\{.*"body":"hello world\\n".*|.*\{\\\"sample_payload\\\":\\\"he



=== TEST 4: json body without request_body or response_body
=== TEST 6: json body without request_body or response_body
--- apisix_yaml
routes:
-
Expand Down
79 changes: 79 additions & 0 deletions t/plugin/kafka-logger.t
Original file line number Diff line number Diff line change
Expand Up @@ -1193,3 +1193,82 @@ hello world
--- no_error_log eval
qr/send data to kafka: \{.*"body":"abcdef"/
--- wait: 2



=== TEST 29: set route(id: 1,include_resp_body = true,include_resp_body_expr = array)
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/1',
ngx.HTTP_PUT,
[=[{
"plugins": {
"kafka-logger": {
"broker_list" :
{
"127.0.0.1":9092
},
"kafka_topic" : "test2",
"key" : "key1",
"timeout" : 1,
"include_resp_body": true,
"include_resp_body_expr": [
[
"arg_name",
"==",
"qwerty"
]
],
"batch_max_size": 1
}
},
"upstream": {
"nodes": {
"127.0.0.1:1980": 1
},
"type": "roundrobin"
},
"uri": "/hello"
}]=]
)
if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}

--- request
GET /t
--- response_body
passed
--- no_error_log
[error]



=== TEST 30: hit route, expr eval success
--- request
POST /hello?name=qwerty
abcdef
--- response_body
hello world
--- no_error_log
[error]
--- error_log eval
qr/send data to kafka: \{.*"body":"hello world\\n"/
--- wait: 2



=== TEST 31: hit route,expr eval fail
--- request
POST /hello?name=zcxv
abcdef
--- response_body
hello world
--- no_error_log eval
qr/send data to kafka: \{.*"body":"hello world\\n"/
--- wait: 2