-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add elasticsearch-logger #7643
Changes from 9 commits
6e1f2d4
446b174
6dfc58c
54cc283
9916235
0e96d5c
00fe216
d7e95c4
f03e804
ecbd983
5765063
2b8801c
1f7c530
219c4fa
acf8cac
35a5304
54ea755
354e89b
1b1474b
cf163f7
471dcad
15f3dd7
1b5fda7
2f2e017
c392327
454ff00
78087d7
55c2ff1
2dc72a5
b98d4bd
edcfa11
9573140
a596e28
c2000cb
b7a306f
bc4bd75
2ce415d
bd2853b
9dfd2d5
214e9a7
31e27f0
b9059ab
b859b90
e7639ca
45ca13a
cb6afd3
4b37d1b
cd96bc4
4c4859a
1a56fd9
902aa05
1693a2a
9914004
05e93e7
5d9028a
5062c8d
a5972f2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,154 @@ | ||||||||||||||||||||||||||||||||||||
-- | ||||||||||||||||||||||||||||||||||||
-- Licensed to the Apache Software Foundation (ASF) under one or more | ||||||||||||||||||||||||||||||||||||
-- contributor license agreements. See the NOTICE file distributed with | ||||||||||||||||||||||||||||||||||||
-- this work for additional information regarding copyright ownership. | ||||||||||||||||||||||||||||||||||||
-- The ASF licenses this file to You under the Apache License, Version 2.0 | ||||||||||||||||||||||||||||||||||||
-- (the "License"); you may not use this file except in compliance with | ||||||||||||||||||||||||||||||||||||
-- the License. You may obtain a copy of the License at | ||||||||||||||||||||||||||||||||||||
-- | ||||||||||||||||||||||||||||||||||||
-- http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||||||||||||||||||||||||
-- | ||||||||||||||||||||||||||||||||||||
-- Unless required by applicable law or agreed to in writing, software | ||||||||||||||||||||||||||||||||||||
-- distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||||||||||||||||||||||||||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||||||||||||||||||||||||||
-- See the License for the specific language governing permissions and | ||||||||||||||||||||||||||||||||||||
-- limitations under the License. | ||||||||||||||||||||||||||||||||||||
-- | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
local ngx = ngx | ||||||||||||||||||||||||||||||||||||
local core = require("apisix.core") | ||||||||||||||||||||||||||||||||||||
local ngx_now = ngx.now | ||||||||||||||||||||||||||||||||||||
local http = require("resty.http") | ||||||||||||||||||||||||||||||||||||
local log_util = require("apisix.utils.log-util") | ||||||||||||||||||||||||||||||||||||
local bp_manager_mod = require("apisix.utils.batch-processor-manager") | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
local DEFAULT_ELASTICSEARCH_SOURCE = "apache-apisix-elasticsearch-logging" | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
local plugin_name = "elasticsearch-logging" | ||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should call it elasticsearch-logger like the kafka-logger plugin? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
OK, do I need to change all |
||||||||||||||||||||||||||||||||||||
local batch_processor_manager = bp_manager_mod.new(plugin_name) | ||||||||||||||||||||||||||||||||||||
local str_format = core.string.format | ||||||||||||||||||||||||||||||||||||
local str_sub = string.sub | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
local schema = { | ||||||||||||||||||||||||||||||||||||
type = "object", | ||||||||||||||||||||||||||||||||||||
properties = { | ||||||||||||||||||||||||||||||||||||
endpoint = { | ||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we wrap all the fields in an extra endpoint field? |
||||||||||||||||||||||||||||||||||||
type = "object", | ||||||||||||||||||||||||||||||||||||
properties = { | ||||||||||||||||||||||||||||||||||||
uri = core.schema.uri_def, | ||||||||||||||||||||||||||||||||||||
index = { type = "string"}, | ||||||||||||||||||||||||||||||||||||
type = { type = "string"}, | ||||||||||||||||||||||||||||||||||||
username = { type = "string"}, | ||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can store username & password in an additional field like apisix/apisix/plugins/kafka-proxy.lua Line 25 in a8d03ac
So that we can require them easily. |
||||||||||||||||||||||||||||||||||||
password = { type = "string"}, | ||||||||||||||||||||||||||||||||||||
timeout = { | ||||||||||||||||||||||||||||||||||||
type = "integer", | ||||||||||||||||||||||||||||||||||||
minimum = 1, | ||||||||||||||||||||||||||||||||||||
default = 10 | ||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||
ssl_verify = { | ||||||||||||||||||||||||||||||||||||
type = "boolean", | ||||||||||||||||||||||||||||||||||||
default = true | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||
required = { "uri", "index" } | ||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||
required = { "endpoint" }, | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
local _M = { | ||||||||||||||||||||||||||||||||||||
version = 0.1, | ||||||||||||||||||||||||||||||||||||
priority = 413, | ||||||||||||||||||||||||||||||||||||
name = plugin_name, | ||||||||||||||||||||||||||||||||||||
schema = batch_processor_manager:wrap_schema(schema), | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
function _M.check_schema(conf) | ||||||||||||||||||||||||||||||||||||
return core.schema.check(schema, conf) | ||||||||||||||||||||||||||||||||||||
end | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
local function get_logger_entry(conf) | ||||||||||||||||||||||||||||||||||||
local entry = log_util.get_full_log(ngx, conf) | ||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please also support the custom log format. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
How about the reference apisix/apisix/plugins/kafka-logger.lua Line 178 in 689e4f1
local entry
if conf.meta_format == "origin" then
entry = log_util.get_req_original(ctx, conf)
-- core.log.info("origin entry: ", entry)
else
local metadata = plugin.plugin_metadata(plugin_name)
core.log.info("metadata: ", core.json.delay_encode(metadata))
if metadata and metadata.value.log_format
and core.table.nkeys(metadata.value.log_format) > 0
then
entry = log_util.get_custom_format_log(ctx, metadata.value.log_format)
core.log.info("custom log format entry: ", core.json.delay_encode(entry))
else
entry = log_util.get_full_log(ngx, conf)
core.log.info("full log entry: ", core.json.delay_encode(entry))
end
end There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We do not reference the code of another plugin in one plugin. Unless we pull some generic code into a common module. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Is the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes |
||||||||||||||||||||||||||||||||||||
return core.json.encode({ | ||||||||||||||||||||||||||||||||||||
create = { | ||||||||||||||||||||||||||||||||||||
_index = conf.endpoint.index, | ||||||||||||||||||||||||||||||||||||
_type = conf.endpoint.type | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
}) .. "\n" .. | ||||||||||||||||||||||||||||||||||||
core.json.encode({ | ||||||||||||||||||||||||||||||||||||
time = ngx_now(), | ||||||||||||||||||||||||||||||||||||
host = entry.server.hostname, | ||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why should we invent a format structure for a specific plugin? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I refer to
How about There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we just need to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remember that when we update the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I found that apisix/apisix/utils/log-util.lua Lines 196 to 211 in c4d5f2f
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. my mistake. json string is ok. we can follow this. |
||||||||||||||||||||||||||||||||||||
source = DEFAULT_ELASTICSEARCH_SOURCE, | ||||||||||||||||||||||||||||||||||||
request_url = entry.request.url, | ||||||||||||||||||||||||||||||||||||
request_method = entry.request.method, | ||||||||||||||||||||||||||||||||||||
request_headers = entry.request.headers, | ||||||||||||||||||||||||||||||||||||
request_query = entry.request.querystring, | ||||||||||||||||||||||||||||||||||||
request_size = entry.request.size, | ||||||||||||||||||||||||||||||||||||
response_headers = entry.response.headers, | ||||||||||||||||||||||||||||||||||||
response_status = entry.response.status, | ||||||||||||||||||||||||||||||||||||
response_size = entry.response.size, | ||||||||||||||||||||||||||||||||||||
latency = entry.latency, | ||||||||||||||||||||||||||||||||||||
upstream = entry.upstream, | ||||||||||||||||||||||||||||||||||||
}) .. "\n" | ||||||||||||||||||||||||||||||||||||
end | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
local function send_to_elasticsearch(conf, entries) | ||||||||||||||||||||||||||||||||||||
local httpc, err = http.new() | ||||||||||||||||||||||||||||||||||||
if not httpc then | ||||||||||||||||||||||||||||||||||||
return false, str_format("create http error: %s", err) | ||||||||||||||||||||||||||||||||||||
end | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
local uri = conf.endpoint.uri .. | ||||||||||||||||||||||||||||||||||||
(str_sub(conf.endpoint.uri, -1) == "/" and "_bulk" or "/_bulk") | ||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use string.byte would be better |
||||||||||||||||||||||||||||||||||||
local body = core.table.concat(entries, "") | ||||||||||||||||||||||||||||||||||||
local headers = {["Content-Type"] = "application/json"} | ||||||||||||||||||||||||||||||||||||
if conf.endpoint.username and conf.endpoint.password then | ||||||||||||||||||||||||||||||||||||
local authorization = "Basic " .. ngx.encode_base64( | ||||||||||||||||||||||||||||||||||||
conf.endpoint.username .. ":" .. conf.endpoint.password | ||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||
headers["Authorization"] = authorization | ||||||||||||||||||||||||||||||||||||
end | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
core.log.info("uri: ", uri, ", body: ", body, ", headers: ", core.json.encode(headers)) | ||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This |
||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
httpc:set_timeout(conf.endpoint.timeout * 1000) | ||||||||||||||||||||||||||||||||||||
local resp, err = httpc:request_uri(uri, { | ||||||||||||||||||||||||||||||||||||
ssl_verify = conf.endpoint.ssl_verify, | ||||||||||||||||||||||||||||||||||||
method = "POST", | ||||||||||||||||||||||||||||||||||||
headers = headers, | ||||||||||||||||||||||||||||||||||||
body = body | ||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||
if not resp then | ||||||||||||||||||||||||||||||||||||
return false, str_format("RequestError: %s", err or "") | ||||||||||||||||||||||||||||||||||||
ccxhwmy marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||
end | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
if resp.status ~= 200 then | ||||||||||||||||||||||||||||||||||||
return false, str_format("response status: %d, response body: %s", | ||||||||||||||||||||||||||||||||||||
ccxhwmy marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||
resp.status, resp.body or "") | ||||||||||||||||||||||||||||||||||||
end | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
return true | ||||||||||||||||||||||||||||||||||||
end | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
function _M.log(conf, ctx) | ||||||||||||||||||||||||||||||||||||
local entry = get_logger_entry(conf) | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
if batch_processor_manager:add_entry(conf, entry) then | ||||||||||||||||||||||||||||||||||||
return | ||||||||||||||||||||||||||||||||||||
end | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
local process = function(entries) | ||||||||||||||||||||||||||||||||||||
return send_to_elasticsearch(conf, entries) | ||||||||||||||||||||||||||||||||||||
end | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, process) | ||||||||||||||||||||||||||||||||||||
end | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
return _M |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
--- | ||
title: elasticsearch-logging | ||
keywords: | ||
- APISIX | ||
- Plugin | ||
ccxhwmy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
- Elasticsearch-logging | ||
description: This document contains information about the Apache APISIX elasticsearch-logging Plugin. | ||
--- | ||
|
||
<!-- | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
--> | ||
|
||
## Description | ||
|
||
The `elasticsearch-logging` Plugin is used to forward logs to [Elasticsearch](https://www.elastic.co/guide/en/welcome-to-elastic/current/getting-started-general-purpose.html) for analysis and storage. | ||
|
||
When the Plugin is enabled, APISIX will serialize the request context information to [Elasticsearch Bulk format](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk) and submit it to the batch queue. When the maximum batch size is exceeded, the data in the queue is pushed to Elasticsearch. See [batch processor](../batch-processor.md) for more details. | ||
|
||
## Attributes | ||
|
||
| Name | Required | Default | Description | | ||
| ------------------- | -------- | --------------------------- | ------------------------------------------------------------ | | ||
| endpoint | True | | Elasticsearch endpoint configurations. | | ||
| endpoint.uri | True | | Elasticsearch API endpoint. | | ||
| endpoint.index | True | | Elasticsearch [_index field](https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-index-field.html#mapping-index-field) | | ||
| endpoint.type | False | Elasticsearch default value | Elasticsearch [_type field](https://www.elastic.co/guide/en/elasticsearch/reference/7.17/mapping-type-field.html#mapping-type-field) | | ||
| endpoint.username | False | | Elasticsearch [authentication](https://www.elastic.co/guide/en/elasticsearch/reference/current/setting-up-authentication.html) username | | ||
| endpoint.password | False | | Elasticsearch [authentication](https://www.elastic.co/guide/en/elasticsearch/reference/current/setting-up-authentication.html) password | | ||
| endpoint.ssl_verify | False | true | When set to `true` enables SSL verification as per [OpenResty docs](https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake). | | ||
| endpoint.timeout | False | 10 | Elasticsearch send data timeout in seconds. | | ||
|
||
This Plugin supports using batch processors to aggregate and process entries (logs/data) in a batch. This avoids the need for frequently submitting the data. The batch processor submits data every `5` seconds or when the data in the queue reaches `1000`. See [Batch Processor](../batch-processor.md#configuration) for more information or setting your custom configuration. | ||
|
||
## Enabling the Plugin | ||
|
||
### Full configuration | ||
|
||
The example below shows a complete configuration of the Plugin on a specific Route: | ||
|
||
```shell | ||
$ curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' | ||
ccxhwmy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
"plugins":{ | ||
"splunk-hec-logging":{ | ||
"endpoint":{ | ||
"uri": "http://127.0.0.1:9200", | ||
"index": "services", | ||
"type": "collector", | ||
"timeout": 60, | ||
"username": "elastic", | ||
"password": "123456", | ||
"ssl_verify": false | ||
}, | ||
"buffer_duration":60, | ||
"max_retry_count":0, | ||
"retry_delay":1, | ||
"inactive_timeout":2, | ||
"batch_max_size":10 | ||
} | ||
}, | ||
"upstream":{ | ||
"type":"roundrobin", | ||
"nodes":{ | ||
"127.0.0.1:1980":1 | ||
} | ||
}, | ||
"uri":"/elasticsearch.do" | ||
}' | ||
``` | ||
|
||
### Minimal configuration | ||
ccxhwmy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
The example below shows a bare minimum configuration of the Plugin on a Route: | ||
|
||
```shell | ||
$ curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' | ||
ccxhwmy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
"plugins":{ | ||
"splunk-hec-logging":{ | ||
"endpoint":{ | ||
"uri": "http://127.0.0.1:9200", | ||
"index": "services" | ||
} | ||
} | ||
}, | ||
"upstream":{ | ||
"type":"roundrobin", | ||
"nodes":{ | ||
"127.0.0.1:1980":1 | ||
} | ||
}, | ||
"uri":"/elasticsearch.do" | ||
}' | ||
``` | ||
|
||
## Example usage | ||
|
||
Once you have configured the Route to use the Plugin, when you make a request to APISIX, it will be logged in your Elasticsearch server: | ||
|
||
```shell | ||
$ curl -i http://127.0.0.1:9080/elasticsearch.do?q=hello | ||
ccxhwmy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
HTTP/1.1 200 OK | ||
... | ||
hello, world | ||
``` | ||
|
||
You should be able to login and search these logs from your Kibana discover: | ||
|
||
![kibana search view](../../../assets/images/plugin/elasticsearch-admin-en.png) | ||
|
||
## Disable Plugin | ||
|
||
To disable the `elasticsearch-logging` Plugin, you can delete the corresponding JSON configuration from the Plugin configuration. APISIX will automatically reload and you do not have to restart for this to take effect. | ||
|
||
```shell | ||
$ curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' | ||
ccxhwmy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
"plugins":{}, | ||
"upstream":{ | ||
"type":"roundrobin", | ||
"nodes":{ | ||
"127.0.0.1:1980":1 | ||
} | ||
}, | ||
"uri":"/elasticsearch.do" | ||
}' | ||
``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should move the localized variable after
require ...