Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: clickhouse logger #6215

Merged
merged 19 commits into from
Feb 16, 2022
Merged
179 changes: 179 additions & 0 deletions apisix/plugins/clickhouse-logger.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

local bp_manager_mod = require("apisix.utils.batch-processor-manager")
local log_util = require("apisix.utils.log-util")
local core = require("apisix.core")
local http = require("resty.http")
local url = require("net.url")
local plugin = require("apisix.plugin")

local ngx = ngx
local tostring = tostring

local plugin_name = "clickhouse-logger"
local batch_processor_manager = bp_manager_mod.new(plugin_name)

local schema = {
type = "object",
properties = {
endpoint_addr = core.schema.uri_def,
user = {type = "string", default = ""},
password = {type = "string", default = ""},
database = {type = "string", default = ""},
logtable = {type = "string", default = ""},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use pattern to validate logtable to prevent SQL injection?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clickhouse providers HTTP restful style to access DB, prevent SQL injection need DBA reasonably assigns permissions for apisix. Other than that, I don't know what protection means there are.

timeout = {type = "integer", minimum = 1, default = 3},
name = {type = "string", default = "clickhouse logger"},
max_retry_count = {type = "integer", minimum = 0, default = 0},
zhendongcmss marked this conversation as resolved.
Show resolved Hide resolved
retry_delay = {type = "integer", minimum = 0, default = 1},
batch_max_size = {type = "integer", minimum = 1, default = 100}
},
required = {"endpoint_addr", "user", "password", "database", "logtable"}
}


local metadata_schema = {
type = "object",
properties = {
log_format = log_util.metadata_schema_log_format,
},
}


local _M = {
version = 0.1,
priority = 398,
name = plugin_name,
schema = schema,
metadata_schema = metadata_schema,
}


function _M.check_schema(conf, schema_type)
if schema_type == core.schema.TYPE_METADATA then
return core.schema.check(metadata_schema, conf)
end
return core.schema.check(schema, conf)
end


local function send_http_data(conf, log_message)
local err_msg
local res = true
local url_decoded = url.parse(conf.endpoint_addr)
local host = url_decoded.host
local port = url_decoded.port

core.log.info("sending a batch logs to ", conf.endpoint_addr)

if ((not port) and url_decoded.scheme == "https") then
port = 443
elseif not port then
port = 80
end
zhendongcmss marked this conversation as resolved.
Show resolved Hide resolved

local httpc = http.new()
httpc:set_timeout(conf.timeout * 1000)
local ok, err = httpc:connect(host, port)

if not ok then
return false, "failed to connect to host[" .. host .. "] port["
.. tostring(port) .. "] " .. err
end

if url_decoded.scheme == "https" then
zhendongcmss marked this conversation as resolved.
Show resolved Hide resolved
zhendongcmss marked this conversation as resolved.
Show resolved Hide resolved
ok, err = httpc:ssl_handshake(true, host, false)
zhendongcmss marked this conversation as resolved.
Show resolved Hide resolved
if not ok then
return false, "failed to perform SSL with host[" .. host .. "] "
.. "port[" .. tostring(port) .. "] " .. err
end
end

local httpc_res, httpc_err = httpc:request({
method = "POST",
path = url_decoded.path,
query = url_decoded.query,
body = "INSERT INTO " .. conf.logtable .." FORMAT JSONEachRow " .. log_message,
headers = {
["Host"] = url_decoded.host,
["Content-Type"] = "application/json",
["X-ClickHouse-User"] = conf.user,
["X-ClickHouse-Key"] = conf.password,
["X-ClickHouse-Database"] = conf.database
}
})

if not httpc_res then
return false, "error while sending data to [" .. host .. "] port["
.. tostring(port) .. "] " .. httpc_err
end

-- some error occurred in the server
if httpc_res.status >= 400 then
res = false
err_msg = "server returned status code[" .. httpc_res.status .. "] host["
.. host .. "] port[" .. tostring(port) .. "] "
.. "body[" .. httpc_res:read_body() .. "]"
end

return res, err_msg
end


function _M.log(conf, ctx)
local metadata = plugin.plugin_metadata("http-logger")
zhendongcmss marked this conversation as resolved.
Show resolved Hide resolved
core.log.info("metadata: ", core.json.delay_encode(metadata))
local entry

if metadata and metadata.value.log_format
and core.table.nkeys(metadata.value.log_format) > 0
then
entry = log_util.get_custom_format_log(ctx, metadata.value.log_format)
else
entry = log_util.get_full_log(ngx, conf)
end

if batch_processor_manager:add_entry(conf, entry) then
return
end

-- Generate a function to be executed by the batch processor
local func = function(entries, batch_max_size)
local data, err

if batch_max_size == 1 then
data, err = core.json.encode(entries[1]) -- encode as single {}
else
local log_table = {}
for i = 1, #entries do
core.table.insert(log_table, core.json.encode(entries[i]))
end
data = core.table.concat(log_table, " ") -- assemble multi items as string "{} {}"
end

if not data then
return false, 'error occurred while encoding the data: ' .. err
end

return send_http_data(conf, data)
end

batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
end


return _M
1 change: 1 addition & 0 deletions conf/config-default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ plugins: # plugin list (sorted by priority)
- syslog # priority: 401
- udp-logger # priority: 400
- file-logger # priority: 399
#- clickhouse-logger # priority: 398
zhendongcmss marked this conversation as resolved.
Show resolved Hide resolved
#- log-rotate # priority: 100
# <- recommend to use priority (0, 100) for your custom plugins
- example-plugin # priority: 0
Expand Down
1 change: 1 addition & 0 deletions docs/en/latest/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
"plugins/kafka-logger",
"plugins/rocketmq-logger",
"plugins/udp-logger",
"plugins/clickhouse-logger",
"plugins/syslog",
"plugins/log-rotate",
"plugins/error-log-logger",
Expand Down
157 changes: 157 additions & 0 deletions docs/en/latest/plugins/clickhouse-logger.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
---
title: clickhouse-logger
---

<!--
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
-->

## Summary

- [**Name**](#name)
- [**Attributes**](#attributes)
- [**How To Enable**](#how-to-enable)
- [**Test Plugin**](#test-plugin)
- [**Metadata**](#metadata)
- [**Disable Plugin**](#disable-plugin)

## Name

`clickhouse-logger` is a plugin which push Log data requests to clickhouse.

## Attributes

| 名称 | 类型 | 必选项 | 默认值 | 有效值 | 描述 |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @zhendongcmss, here have some Chinese 😄 Would you like to translate them?

Copy link
Member

@juzhiyuan juzhiyuan Feb 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @yzeng25, do you have interest to do this? 🤗

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created an issue, let's fix it: #6414

| ---------------- | ------- | ------ | ------------- | ------- | ------------------------------------------------ |
| endpoint_addr | string | required | | | The `clickhouse` endpoint. |
| database | string | required | | | The DB name to store log. |
| logtable | string | required | | | The table name. |
| user | string | required | | |clickhouse user. |
| password | string | required | | |clickhouse password. |
| timeout | integer | optional | 3 | [1,...] | Time to keep the connection alive after sending a request. |
| name | string | optional | "clickhouse logger" | | A unique identifier to identity the logger. |
| batch_max_size | integer | optional | 100 | [1,...] | Set the maximum number of logs sent in each batch. When the number of logs reaches the set maximum, all logs will be automatically pushed to the clickhouse. |
| max_retry_count | integer | optional | 0 | [0,...] | Maximum number of retries before removing from the processing pipe line. |
| retry_delay | integer | optional | 1 | [0,...] | Number of seconds the process execution should be delayed if the execution fails. |

## How To Enable

The following is an example of how to enable the `clickhouse-logger` for a specific route.

```shell
curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"plugins": {
"clickhouse-logger": {
"user": "default",
"password": "a",
"database": "default",
"logtable": "test",
"endpoint_addr": "http://127.0.0.1:8123"
}
},
"upstream": {
"type": "roundrobin",
"nodes": {
"127.0.0.1:1980": 1
}
},
"uri": "/hello"
}'
```

## Test Plugin

> success:

```shell
$ curl -i http://127.0.0.1:9080/hello
HTTP/1.1 200 OK
...
hello, world
```

## Metadata

| Name | Type | Requirement | Default | Valid | Description |
| ---------------- | ------- | ----------- | ------------- | ------- | ---------------------------------------------------------------------------------------- |
| log_format | object | optional | {"host": "$host", "@timestamp": "$time_iso8601", "client_ip": "$remote_addr"} | | Log format declared as key value pair in JSON format. Only string is supported in the `value` part. If the value starts with `$`, it means to get `APISIX` variables or [Nginx variable](http://nginx.org/en/docs/varindex.html). |
zhendongcmss marked this conversation as resolved.
Show resolved Hide resolved
zhendongcmss marked this conversation as resolved.
Show resolved Hide resolved

Note that **the metadata configuration is applied in global scope**, which means it will take effect on all Route or Service which use clickhouse-logger plugin.

**APISIX Variables**

| Variable Name | Description | Usage Example |
|------------------|-------------------------|----------------|
| route_id | id of `route` | $route_id |
| route_name | name of `route` | $route_name |
| service_id | id of `service` | $service_id |
| service_name | name of `service` | $service_name |
| consumer_name | username of `consumer` | $consumer_name |

### Example

```shell
curl http://127.0.0.1:9080/apisix/admin/plugin_metadata/http-logger -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"log_format": {
"host": "$host",
"@timestamp": "$time_iso8601",
"client_ip": "$remote_addr"
}
}'
```

create logtable

```sql
CREATE TABLE default.test (
`host` String,
`client_ip` String,
`route_id` String,
`@timestamp` String,
PRIMARY KEY(`@timestamp`)
) ENGINE = MergeTree()
```

On clickhouse run `select * from default.test;`, will got below row:

```
┌─host──────┬─client_ip─┬─route_id─┬─@timestamp────────────────┐
│ 127.0.0.1 │ 127.0.0.1 │ 1 │ 2022-01-17T10:03:10+08:00 │
└───────────┴───────────┴──────────┴───────────────────────────┘
```

## Disable Plugin

Remove the corresponding json configuration in the plugin configuration to disable the `clickhouse-logger`.
APISIX plugins are hot-reloaded, therefore no need to restart APISIX.

```shell
$ curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"uri": "/hello",
"plugins": {},
"upstream": {
"type": "roundrobin",
"nodes": {
"127.0.0.1:1980": 1
}
}
}'
```
1 change: 1 addition & 0 deletions docs/zh/latest/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
"plugins/kafka-logger",
"plugins/rocketmq-logger",
"plugins/udp-logger",
"plugins/clickhouse-logger",
"plugins/syslog",
"plugins/log-rotate",
"plugins/error-log-logger",
Expand Down
Loading