Skip to content

Commit

Permalink
feat(kafka-logger): add required_acks option (#4878)
Browse files Browse the repository at this point in the history
  • Loading branch information
okaybase authored Aug 25, 2021
1 parent 6c6ad38 commit 8c793ed
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 0 deletions.
6 changes: 6 additions & 0 deletions apisix/plugins/kafka-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ local schema = {
default = "async",
enum = {"async", "sync"},
},
required_acks = {
type = "integer",
default = 1,
enum = { 0, 1, -1 },
},
key = {type = "string"},
timeout = {type = "integer", minimum = 1, default = 3},
name = {type = "string", default = "kafka logger"},
Expand Down Expand Up @@ -207,6 +212,7 @@ function _M.log(conf, ctx)

broker_config["request_timeout"] = conf.timeout * 1000
broker_config["producer_type"] = conf.producer_type
broker_config["required_acks"] = conf.required_acks

local prod, err = core.lrucache.plugin_ctx(lrucache, ctx, nil, create_producer,
broker_list, broker_config)
Expand Down
1 change: 1 addition & 0 deletions docs/en/latest/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ For more info on Batch-Processor in Apache APISIX please refer.
| broker_list | object | required | | | An array of Kafka brokers. |
| kafka_topic | string | required | | | Target topic to push data. |
| producer_type | string | optional | async | ["async", "sync"] | Producer's mode of sending messages. |
| required_acks | integer | optional | 1 | [0, 1, -1] | The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. Semantics is the same as kafka producer acks(If set `acks=0` then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. `acks=1` This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. `acks=-1` This means the leader will wait for the full set of in-sync replicas to acknowledge the record.). |
| key | string | optional | | | Used for partition allocation of messages. |
| timeout | integer | optional | 3 | [1,...] | Timeout for the upstream to send data. |
| name | string | optional | "kafka logger" | | A unique identifier to identity the batch processor. |
Expand Down
1 change: 1 addition & 0 deletions docs/zh/latest/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ title: kafka-logger
| broker_list | object | 必须 | | | 要推送的 kafka 的 broker 列表。 |
| kafka_topic | string | 必须 | | | 要推送的 topic。 |
| producer_type | string | 可选 | async | ["async", "sync"] | 生产者发送消息的模式。 |
| required_acks | integer | 可选 | 1 | [0, 1, -1] | 生产者在确认一个请求发送完成之前需要收到的反馈信息的数量。这个参数是为了保证发送请求的可靠性。语义同 kafka 生产者的 acks 参数(如果设置 `acks=0`,则 producer 不会等待服务器的反馈。该消息会被立刻添加到 socket buffer 中并认为已经发送完成。如果设置 `acks=1`,leader 节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功。如果设置 `acks=-1`,这就意味着 leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成。)。 |
| key | string | 可选 | | | 用于消息的分区分配。 |
| timeout | integer | 可选 | 3 | [1,...] | 发送数据的超时时间。 |
| name | string | 可选 | "kafka logger" | | batch processor 的唯一标识。 |
Expand Down
135 changes: 135 additions & 0 deletions t/plugin/kafka-logger.t
Original file line number Diff line number Diff line change
Expand Up @@ -722,3 +722,138 @@ GET /t
[qr/partition_id: 1/,
qr/partition_id: 0/,
qr/partition_id: 2/]



=== TEST 20: required_acks, matches none of the enum values
--- config
location /t {
content_by_lua_block {
local plugin = require("apisix.plugins.kafka-logger")
local ok, err = plugin.check_schema({
broker_list = {
["127.0.0.1"] = 3000
},
required_acks = 10,
kafka_topic ="test",
key= "key1"
})
if not ok then
ngx.say(err)
end
ngx.say("done")
}
}
--- request
GET /t
--- response_body
property "required_acks" validation failed: matches none of the enum values
done
--- no_error_log
[error]



=== TEST 21: report log to kafka, with required_acks(1, 0, -1)
--- config
location /t {
content_by_lua_block {
local data = {
{
input = {
plugins = {
["kafka-logger"] = {
broker_list = {
["127.0.0.1"] = 9092
},
kafka_topic = "test2",
producer_type = "sync",
timeout = 1,
batch_max_size = 1,
required_acks = 1,
meta_format = "origin",
}
},
upstream = {
nodes = {
["127.0.0.1:1980"] = 1
},
type = "roundrobin"
},
uri = "/hello",
},
},
{
input = {
plugins = {
["kafka-logger"] = {
broker_list = {
["127.0.0.1"] = 9092
},
kafka_topic = "test2",
producer_type = "sync",
timeout = 1,
batch_max_size = 1,
required_acks = -1,
meta_format = "origin",
}
},
upstream = {
nodes = {
["127.0.0.1:1980"] = 1
},
type = "roundrobin"
},
uri = "/hello",
},
},
{
input = {
plugins = {
["kafka-logger"] = {
broker_list = {
["127.0.0.1"] = 9092
},
kafka_topic = "test2",
producer_type = "sync",
timeout = 1,
batch_max_size = 1,
required_acks = 0,
meta_format = "origin",
}
},
upstream = {
nodes = {
["127.0.0.1:1980"] = 1
},
type = "roundrobin"
},
uri = "/hello",
},
},
}

local t = require("lib.test_admin").test
local err_count = 0
for i in ipairs(data) do
local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT, data[i].input)

if code >= 300 then
err_count = err_count + 1
end
ngx.print(body)

t('/hello', ngx.HTTP_GET)
end

assert(err_count == 0)
}
}
--- request
GET /t
--- no_error_log
[error]
--- error_log
send data to kafka: GET /hello
send data to kafka: GET /hello
send data to kafka: GET /hello

0 comments on commit 8c793ed

Please sign in to comment.