-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: clickhouse logger #6215
feat: clickhouse logger #6215
Changes from 17 commits
a2d395b
234ac04
26830e5
ac2a148
5a3ed37
bdd1893
d9a8985
c268d5c
f234955
e0b5f3d
27995aa
74d59b5
d4a9c2c
234b920
d89631d
22c3fb0
34f3dcb
9118fb0
cede016
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,179 @@ | ||
-- | ||
-- Licensed to the Apache Software Foundation (ASF) under one or more | ||
-- contributor license agreements. See the NOTICE file distributed with | ||
-- this work for additional information regarding copyright ownership. | ||
-- The ASF licenses this file to You under the Apache License, Version 2.0 | ||
-- (the "License"); you may not use this file except in compliance with | ||
-- the License. You may obtain a copy of the License at | ||
-- | ||
-- http://www.apache.org/licenses/LICENSE-2.0 | ||
-- | ||
-- Unless required by applicable law or agreed to in writing, software | ||
-- distributed under the License is distributed on an "AS IS" BASIS, | ||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
-- See the License for the specific language governing permissions and | ||
-- limitations under the License. | ||
-- | ||
|
||
local bp_manager_mod = require("apisix.utils.batch-processor-manager") | ||
local log_util = require("apisix.utils.log-util") | ||
local core = require("apisix.core") | ||
local http = require("resty.http") | ||
local url = require("net.url") | ||
local plugin = require("apisix.plugin") | ||
|
||
local ngx = ngx | ||
local tostring = tostring | ||
|
||
local plugin_name = "clickhouse-logger" | ||
local batch_processor_manager = bp_manager_mod.new(plugin_name) | ||
|
||
local schema = { | ||
type = "object", | ||
properties = { | ||
endpoint_addr = core.schema.uri_def, | ||
user = {type = "string", default = ""}, | ||
password = {type = "string", default = ""}, | ||
database = {type = "string", default = ""}, | ||
logtable = {type = "string", default = ""}, | ||
timeout = {type = "integer", minimum = 1, default = 3}, | ||
name = {type = "string", default = "clickhouse logger"}, | ||
ssl_verify = {type = "boolean", default = true}, | ||
}, | ||
required = {"endpoint_addr", "user", "password", "database", "logtable"} | ||
} | ||
|
||
|
||
local metadata_schema = { | ||
type = "object", | ||
properties = { | ||
log_format = log_util.metadata_schema_log_format, | ||
}, | ||
} | ||
|
||
|
||
local _M = { | ||
version = 0.1, | ||
priority = 398, | ||
name = plugin_name, | ||
schema = batch_processor_manager:wrap_schema(schema), | ||
metadata_schema = metadata_schema, | ||
} | ||
|
||
|
||
function _M.check_schema(conf, schema_type) | ||
if schema_type == core.schema.TYPE_METADATA then | ||
return core.schema.check(metadata_schema, conf) | ||
end | ||
return core.schema.check(schema, conf) | ||
end | ||
|
||
|
||
local function send_http_data(conf, log_message) | ||
local err_msg | ||
local res = true | ||
local url_decoded = url.parse(conf.endpoint_addr) | ||
local host = url_decoded.host | ||
local port = url_decoded.port | ||
|
||
core.log.info("sending a batch logs to ", conf.endpoint_addr) | ||
|
||
if not port then | ||
if url_decoded.scheme == "https" then | ||
port = 443 | ||
else | ||
port = 80 | ||
end | ||
end | ||
|
||
local httpc = http.new() | ||
httpc:set_timeout(conf.timeout * 1000) | ||
local ok, err = httpc:connect(host, port) | ||
|
||
if not ok then | ||
return false, "failed to connect to host[" .. host .. "] port[" | ||
.. tostring(port) .. "] " .. err | ||
end | ||
|
||
if url_decoded.scheme == "https" and conf.ssl_verify then | ||
qizhendong1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ok, err = httpc:ssl_handshake(true, host, conf.ssl_verify) | ||
if not ok then | ||
return false, "failed to perform SSL with host[" .. host .. "] " | ||
.. "port[" .. tostring(port) .. "] " .. err | ||
end | ||
end | ||
|
||
local httpc_res, httpc_err = httpc:request({ | ||
method = "POST", | ||
path = url_decoded.path, | ||
query = url_decoded.query, | ||
body = "INSERT INTO " .. conf.logtable .." FORMAT JSONEachRow " .. log_message, | ||
headers = { | ||
["Host"] = url_decoded.host, | ||
["Content-Type"] = "application/json", | ||
["X-ClickHouse-User"] = conf.user, | ||
["X-ClickHouse-Key"] = conf.password, | ||
["X-ClickHouse-Database"] = conf.database | ||
} | ||
}) | ||
|
||
if not httpc_res then | ||
return false, "error while sending data to [" .. host .. "] port[" | ||
.. tostring(port) .. "] " .. httpc_err | ||
end | ||
|
||
-- some error occurred in the server | ||
if httpc_res.status >= 400 then | ||
res = false | ||
err_msg = "server returned status code[" .. httpc_res.status .. "] host[" | ||
.. host .. "] port[" .. tostring(port) .. "] " | ||
.. "body[" .. httpc_res:read_body() .. "]" | ||
end | ||
|
||
return res, err_msg | ||
end | ||
|
||
|
||
function _M.log(conf, ctx) | ||
local metadata = plugin.plugin_metadata(plugin_name) | ||
core.log.info("metadata: ", core.json.delay_encode(metadata)) | ||
local entry | ||
|
||
if metadata and metadata.value.log_format | ||
and core.table.nkeys(metadata.value.log_format) > 0 | ||
then | ||
entry = log_util.get_custom_format_log(ctx, metadata.value.log_format) | ||
else | ||
entry = log_util.get_full_log(ngx, conf) | ||
end | ||
|
||
if batch_processor_manager:add_entry(conf, entry) then | ||
return | ||
end | ||
|
||
-- Generate a function to be executed by the batch processor | ||
local func = function(entries, batch_max_size) | ||
local data, err | ||
|
||
if batch_max_size == 1 then | ||
data, err = core.json.encode(entries[1]) -- encode as single {} | ||
else | ||
local log_table = {} | ||
for i = 1, #entries do | ||
core.table.insert(log_table, core.json.encode(entries[i])) | ||
end | ||
data = core.table.concat(log_table, " ") -- assemble multi items as string "{} {}" | ||
end | ||
|
||
if not data then | ||
return false, 'error occurred while encoding the data: ' .. err | ||
end | ||
|
||
return send_http_data(conf, data) | ||
end | ||
|
||
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func) | ||
end | ||
|
||
|
||
return _M |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
--- | ||
title: clickhouse-logger | ||
--- | ||
|
||
<!-- | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
--> | ||
|
||
## Summary | ||
|
||
- [**Name**](#name) | ||
- [**Attributes**](#attributes) | ||
- [**How To Enable**](#how-to-enable) | ||
- [**Test Plugin**](#test-plugin) | ||
- [**Metadata**](#metadata) | ||
- [**Disable Plugin**](#disable-plugin) | ||
|
||
## Name | ||
|
||
`clickhouse-logger` is a plugin which push Log data requests to clickhouse. | ||
|
||
## Attributes | ||
|
||
| 名称 | 类型 | 必选项 | 默认值 | 有效值 | 描述 | | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @zhendongcmss, here have some Chinese 😄 Would you like to translate them? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi, @yzeng25, do you have interest to do this? 🤗 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I created an issue, let's fix it: #6414 |
||
| ---------------- | ------- | ------ | ------------- | ------- | ------------------------------------------------ | | ||
| endpoint_addr | string | required | | | The `clickhouse` endpoint. | | ||
| database | string | required | | | The DB name to store log. | | ||
| logtable | string | required | | | The table name. | | ||
| user | string | required | | | clickhouse user. | | ||
| password | string | required | | | clickhouse password. | | ||
| timeout | integer | optional | 3 | [1,...] | Time to keep the connection alive after sending a request. | | ||
| name | string | optional | "clickhouse logger" | | A unique identifier to identity the logger. | | ||
| batch_max_size | integer | optional | 100 | [1,...] | Set the maximum number of logs sent in each batch. When the number of logs reaches the set maximum, all logs will be automatically pushed to the clickhouse. | | ||
| max_retry_count | integer | optional | 0 | [0,...] | Maximum number of retries before removing from the processing pipe line. | | ||
| retry_delay | integer | optional | 1 | [0,...] | Number of seconds the process execution should be delayed if the execution fails. | | ||
| ssl_verify | boolean | optional | true | [true,false] | verify ssl. | | ||
|
||
## How To Enable | ||
|
||
The following is an example of how to enable the `clickhouse-logger` for a specific route. | ||
|
||
```shell | ||
curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' | ||
{ | ||
"plugins": { | ||
"clickhouse-logger": { | ||
"user": "default", | ||
"password": "a", | ||
"database": "default", | ||
"logtable": "test", | ||
"endpoint_addr": "http://127.0.0.1:8123" | ||
} | ||
}, | ||
"upstream": { | ||
"type": "roundrobin", | ||
"nodes": { | ||
"127.0.0.1:1980": 1 | ||
} | ||
}, | ||
"uri": "/hello" | ||
}' | ||
``` | ||
|
||
## Test Plugin | ||
|
||
> success: | ||
|
||
```shell | ||
$ curl -i http://127.0.0.1:9080/hello | ||
HTTP/1.1 200 OK | ||
... | ||
hello, world | ||
``` | ||
|
||
## Metadata | ||
|
||
| Name | Type | Requirement | Default | Valid | Description | | ||
| ---------------- | ------- | ----------- | ------------- | ------- | ---------------------------------------------------------------------------------------- | | ||
| log_format | object | optional | {"host": "$host", "@timestamp": "$time_iso8601", "client_ip": "$remote_addr"} | | Log format declared as key value pair in JSON format. Only string is supported in the `value` part. If the value starts with `$`, it means to get [APISIX variable](../apisix-variable.md) or [Nginx variable](http://nginx.org/en/docs/varindex.html). | | ||
|
||
Note that **the metadata configuration is applied in global scope**, which means it will take effect on all Route or Service which use clickhouse-logger plugin. | ||
|
||
### Example | ||
|
||
```shell | ||
curl http://127.0.0.1:9080/apisix/admin/plugin_metadata/clickhouse-logger -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' | ||
{ | ||
"log_format": { | ||
"host": "$host", | ||
"@timestamp": "$time_iso8601", | ||
"client_ip": "$remote_addr" | ||
} | ||
}' | ||
``` | ||
|
||
create clickhouse log table | ||
|
||
```sql | ||
CREATE TABLE default.test ( | ||
`host` String, | ||
`client_ip` String, | ||
`route_id` String, | ||
`@timestamp` String, | ||
PRIMARY KEY(`@timestamp`) | ||
) ENGINE = MergeTree() | ||
``` | ||
|
||
On clickhouse run `select * from default.test;`, will got below row: | ||
|
||
``` | ||
┌─host──────┬─client_ip─┬─route_id─┬─@timestamp────────────────┐ | ||
│ 127.0.0.1 │ 127.0.0.1 │ 1 │ 2022-01-17T10:03:10+08:00 │ | ||
└───────────┴───────────┴──────────┴───────────────────────────┘ | ||
``` | ||
|
||
## Disable Plugin | ||
|
||
Remove the corresponding json configuration in the plugin configuration to disable the `clickhouse-logger`. | ||
APISIX plugins are hot-reloaded, therefore no need to restart APISIX. | ||
|
||
```shell | ||
$ curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' | ||
{ | ||
"uri": "/hello", | ||
"plugins": {}, | ||
"upstream": { | ||
"type": "roundrobin", | ||
"nodes": { | ||
"127.0.0.1:1980": 1 | ||
} | ||
} | ||
}' | ||
``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use pattern to validate logtable to prevent SQL injection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clickhouse providers HTTP restful style to access DB, prevent SQL injection need DBA reasonably assigns permissions for apisix. Other than that, I don't know what protection means there are.