diff --git a/Makefile b/Makefile index de2e7a52a692..989fe3714f8a 100644 --- a/Makefile +++ b/Makefile @@ -295,6 +295,9 @@ install: runtime $(ENV_INSTALL) apisix/discovery/kubernetes/*.lua $(ENV_INST_LUADIR)/apisix/discovery/kubernetes $(ENV_INSTALL) apisix/discovery/tars/*.lua $(ENV_INST_LUADIR)/apisix/discovery/tars + $(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/pubsub + $(ENV_INSTALL) apisix/pubsub/*.lua $(ENV_INST_LUADIR)/apisix/pubsub/ + $(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/http $(ENV_INSTALL) apisix/http/*.lua $(ENV_INST_LUADIR)/apisix/http/ diff --git a/apisix/include/apisix/model/pubsub.proto b/apisix/include/apisix/model/pubsub.proto index fdaa5e8cbc8d..a4d98915654c 100644 --- a/apisix/include/apisix/model/pubsub.proto +++ b/apisix/include/apisix/model/pubsub.proto @@ -37,6 +37,24 @@ message CmdPing { */ message CmdEmpty {} +/** + * Get the offset of the specified topic partition from Apache Kafka. + */ +message CmdKafkaListOffset { + string topic = 1; + int32 partition = 2; + int64 timestamp = 3; +} + +/** + * Fetch messages of the specified topic partition from Apache Kafka. + */ +message CmdKafkaFetch { + string topic = 1; + int32 partition = 2; + int64 offset = 3; +} + /** * Client request definition for pubsub scenarios * @@ -55,8 +73,10 @@ message CmdEmpty {} message PubSubReq { int64 sequence = 1; oneof req { - CmdEmpty cmd_empty = 31; - CmdPing cmd_ping = 32; + CmdEmpty cmd_empty = 31; + CmdPing cmd_ping = 32; + CmdKafkaFetch cmd_kafka_fetch = 33; + CmdKafkaListOffset cmd_kafka_list_offset = 34; }; } @@ -64,7 +84,7 @@ message PubSubReq { * The response body of the service when an error occurs, * containing the error code and the error message. */ - message ErrorResp { +message ErrorResp { int32 code = 1; string message = 2; } @@ -77,6 +97,31 @@ message PongResp { bytes state = 1; } +/** + * The definition of a message in Kafka with the current message + * offset, production timestamp, Key, and message content. + */ +message KafkaMessage { + int64 offset = 1; + int64 timestamp = 2; + bytes key = 3; + bytes value = 4; +} + +/** + * The response of Fetch messages from Apache Kafka. + */ +message KafkaFetchResp { + repeated KafkaMessage messages = 1; +} + +/** + * The response of list offset from Apache Kafka. + */ +message KafkaListOffsetResp { + int64 offset = 1; +} + /** * Server response definition for pubsub scenarios * @@ -90,7 +135,9 @@ message PongResp { message PubSubResp { int64 sequence = 1; oneof resp { - ErrorResp error_resp = 31; - PongResp pong_resp = 32; + ErrorResp error_resp = 31; + PongResp pong_resp = 32; + KafkaFetchResp kafka_fetch_resp = 33; + KafkaListOffsetResp kafka_list_offset_resp = 34; }; } diff --git a/apisix/init.lua b/apisix/init.lua index 4306809d05dc..fd2e69a7f48e 100644 --- a/apisix/init.lua +++ b/apisix/init.lua @@ -42,6 +42,7 @@ local xrpc = require("apisix.stream.xrpc") local ctxdump = require("resty.ctxdump") local ngx_balancer = require("ngx.balancer") local debug = require("apisix.debug") +local pubsub_kafka = require("apisix.pubsub.kafka") local ngx = ngx local get_method = ngx.req.get_method local ngx_exit = ngx.exit @@ -504,6 +505,13 @@ function _M.http_access_phase() api_ctx.upstream_scheme = "grpc" end + -- load balancer is not required by kafka upstream, so the upstream + -- node selection process is intercepted and left to kafka to + -- handle on its own + if api_ctx.matched_upstream and api_ctx.matched_upstream.scheme == "kafka" then + return pubsub_kafka.access(api_ctx) + end + local code, err = set_upstream(route, api_ctx) if code then core.log.error("failed to set upstream: ", err) diff --git a/apisix/pubsub/kafka.lua b/apisix/pubsub/kafka.lua new file mode 100644 index 000000000000..ff035ce4e362 --- /dev/null +++ b/apisix/pubsub/kafka.lua @@ -0,0 +1,137 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +local core = require("apisix.core") +local bconsumer = require("resty.kafka.basic-consumer") +local ffi = require("ffi") +local C = ffi.C +local tostring = tostring +local type = type +local ipairs = ipairs +local str_sub = string.sub + +ffi.cdef[[ + int64_t atoll(const char *num); +]] + + +local _M = {} + + +-- Handles the conversion of 64-bit integers in the lua-protobuf. +-- +-- Because of the limitations of luajit, we cannot use native 64-bit +-- numbers, so pb decode converts int64 to a string in #xxx format +-- to avoid loss of precision, by this function, we convert this +-- string to int64 cdata numbers. +local function pb_convert_to_int64(src) + if type(src) == "string" then + -- the format is #1234, so there is a small minimum length of 2 + if #src < 2 then + return 0 + end + return C.atoll(ffi.cast("char *", src) + 1) + else + return src + end +end + + +-- Takes over requests of type kafka upstream in the http_access phase. +function _M.access(api_ctx) + local pubsub, err = core.pubsub.new() + if not pubsub then + core.log.error("failed to initialize pubsub module, err: ", err) + core.response.exit(400) + return + end + + local up_nodes = api_ctx.matched_upstream.nodes + + -- kafka client broker-related configuration + local broker_list = {} + for i, node in ipairs(up_nodes) do + broker_list[i] = { + host = node.host, + port = node.port, + } + end + + local client_config = {refresh_interval = 30 * 60 * 1000} + + -- load and create the consumer instance when it is determined + -- that the websocket connection was created successfully + local consumer = bconsumer:new(broker_list, client_config) + + pubsub:on("cmd_kafka_list_offset", function (params) + -- The timestamp parameter uses a 64-bit integer, which is difficult + -- for luajit to handle well, so the int64_as_string option in + -- lua-protobuf is used here. Smaller numbers will be decoded as + -- lua number, while overly larger numbers will be decoded as strings + -- in the format #number, where the # symbol at the beginning of the + -- string will be removed and converted to int64_t with the atoll function. + local timestamp = pb_convert_to_int64(params.timestamp) + + local offset, err = consumer:list_offset(params.topic, params.partition, timestamp) + + if not offset then + return nil, "failed to list offset, topic: " .. params.topic .. + ", partition: " .. params.partition .. ", err: " .. err + end + + offset = tostring(offset) + return { + kafka_list_offset_resp = { + offset = str_sub(offset, 1, #offset - 2) + } + } + end) + + pubsub:on("cmd_kafka_fetch", function (params) + local offset = pb_convert_to_int64(params.offset) + + local ret, err = consumer:fetch(params.topic, params.partition, offset) + if not ret then + return nil, "failed to fetch message, topic: " .. params.topic .. + ", partition: " .. params.partition .. ", err: " .. err + end + + -- split into multiple messages when the amount of data in + -- a single batch is too large + local messages = ret.records + + -- special handling of int64 for luajit compatibility + for _, message in ipairs(messages) do + local timestamp = tostring(message.timestamp) + message.timestamp = str_sub(timestamp, 1, #timestamp - 2) + local offset = tostring(message.offset) + message.offset = str_sub(offset, 1, #offset - 2) + end + + return { + kafka_fetch_resp = { + messages = messages, + }, + } + end) + + -- start processing client commands + pubsub:wait() +end + + +return _M diff --git a/apisix/schema_def.lua b/apisix/schema_def.lua index fbeef89bb17c..5a7830e878af 100644 --- a/apisix/schema_def.lua +++ b/apisix/schema_def.lua @@ -451,10 +451,12 @@ local upstream_schema = { }, scheme = { default = "http", - enum = {"grpc", "grpcs", "http", "https", "tcp", "tls", "udp"}, + enum = {"grpc", "grpcs", "http", "https", "tcp", "tls", "udp", + "kafka"}, description = "The scheme of the upstream." .. " For L7 proxy, it can be one of grpc/grpcs/http/https." .. - " For L4 proxy, it can be one of tcp/tls/udp." + " For L4 proxy, it can be one of tcp/tls/udp." .. + " For specific protocols, it can be kafka." }, labels = labels_def, discovery_type = { diff --git a/ci/linux-ci-init-service.sh b/ci/linux-ci-init-service.sh index 5f468502304d..73477a5febca 100755 --- a/ci/linux-ci-init-service.sh +++ b/ci/linux-ci-init-service.sh @@ -19,6 +19,15 @@ docker exec -i apache-apisix_kafka-server1_1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server1:2181 --replication-factor 1 --partitions 1 --topic test2 docker exec -i apache-apisix_kafka-server1_1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server1:2181 --replication-factor 1 --partitions 3 --topic test3 docker exec -i apache-apisix_kafka-server2_1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server2:2181 --replication-factor 1 --partitions 1 --topic test4 +docker exec -i apache-apisix_kafka-server1_1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server1:2181 --replication-factor 1 --partitions 1 --topic test-consumer + +# create messages for test-consumer +for i in `seq 30` +do + docker exec -i apache-apisix_kafka-server1_1 bash -c "echo "testmsg$i" | /opt/bitnami/kafka/bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test-consumer" + echo "Produces messages to the test-consumer topic, msg: testmsg$i" +done +echo "Kafka service initialization completed" # prepare openwhisk env docker pull openwhisk/action-nodejs-v14:nightly diff --git a/docs/en/latest/config.json b/docs/en/latest/config.json index d706ac028272..5e18f58a507d 100644 --- a/docs/en/latest/config.json +++ b/docs/en/latest/config.json @@ -216,7 +216,8 @@ "type": "category", "label": "PubSub", "items": [ - "pubsub" + "pubsub", + "pubsub/kafka" ] }, { diff --git a/docs/en/latest/pubsub.md b/docs/en/latest/pubsub.md index 96209a48609f..83af556baf8c 100644 --- a/docs/en/latest/pubsub.md +++ b/docs/en/latest/pubsub.md @@ -2,7 +2,7 @@ title: PubSub keywords: - APISIX - - Pub-Sub + - PubSub description: This document contains information about the Apache APISIX pubsub framework. --- @@ -25,7 +25,7 @@ description: This document contains information about the Apache APISIX pubsub f # --> -## What is Pub-Sub +## What is PubSub Publish-subscribe is a messaging paradigm: @@ -38,9 +38,13 @@ In Apache APISIX, the most common scenario is handling north-south traffic from ## Architecture -![pub-sub architecture](../../assets/images/pubsub-architecture.svg) +![pubsub architecture](../../assets/images/pubsub-architecture.svg) -Currently, Apache APISIX supports WebSocket communication with the client, which can be any application that supports WebSocket, with Protocol Buffer as the serialization mechanism, see the [protocol definition](../../../apisix/pubsub.proto). +Currently, Apache APISIX supports WebSocket communication with the client, which can be any application that supports WebSocket, with Protocol Buffer as the serialization mechanism, see the [protocol definition](https://github.com/apache/apisix/blob/master/apisix/include/apisix/model/pubsub.proto). + +## Supported messaging systems + +- [Aapche Kafka](pubsub/kafka.md) ## How to support other messaging systems diff --git a/docs/en/latest/pubsub/kafka.md b/docs/en/latest/pubsub/kafka.md new file mode 100644 index 000000000000..1eb8ff2ff2b9 --- /dev/null +++ b/docs/en/latest/pubsub/kafka.md @@ -0,0 +1,94 @@ +--- +title: Apache Kafka +keywords: + - APISIX + - PubSub + - Kafka +description: This document contains information about the Apache APISIX kafka pubsub scenario. +--- + + + +## Connect to Apache Kafka + +Connecting to Apache Kafka in Apache APISIX is very simple. + +Currently, we provide a simpler way to integrate by combining two APIs, ListOffsets and Fetch, to quickly implement the ability to pull Kafka messages. Still, they do not support Apache Kafka's consumer group feature for now and cannot be managed for offsets by Apache Kafka. + +### Limitations + +- Offsets need to be managed manually + +They can be stored by a custom backend service or obtained via the list_offset command before starting to fetch the message, which can use timestamp to get the starting offset, or to get the initial and end offsets. + +- Unsupported batch data acquisition + +A single instruction can only obtain the data of a Topic Partition, does not support batch data acquisition through a single instruction + +### Prepare + +First, it is necessary to compile the [communication protocol](https://github.com/apache/apisix/blob/master/apisix/include/apisix/model/pubsub.proto) as a language-specific SDK using the `protoc`, which provides the command and response definitions to connect to Kafka via APISIX using the WebSocket. + +The `sequence` field in the protocol is used to associate the request with the response, they will correspond one to one, the client can manage it in the way they want, APISIX will not modify it, only pass it back to the client through the response body. + +The following commands are currently used by Apache Kafka connect: + +- CmdKafkaFetch +- CmdKafkaListOffset + +> The `timestamp` field in the `CmdKafkaListOffset` command supports the following value: +> +> - `unix timestamp`: Offset of the first message after the specified timestamp +> - `-1`:Offset of the last message of the current Partition +> - `-2`:Offset of the first message of current Partition +> +> For more information, see [Apache Kafka Protocol Documentation](https://kafka.apache.org/protocol.html#The_Messages_ListOffsets) + +Possible response body: When an error occurs, `ErrorResp` will be returned, which includes the error string; the rest of the response will be returned after the execution of the particular command. + +- ErrorResp +- KafkaFetchResp +- KafkaListOffsetResp + +### How to use + +#### Create route + +Create a route, set the upstream `scheme` field to `kafka`, and configure `nodes` to be the address of the Kafka broker. + +```shell +curl -X PUT 'http://127.0.0.1:9080/apisix/admin/routes/kafka' \ + -H 'X-API-KEY: ' \ + -H 'Content-Type: application/json' \ + -d '{ + "uri": "/kafka", + "upstream": { + "nodes": { + "kafka-server1:9092": 1, + "kafka-server2:9092": 1, + "kafka-server3:9092": 1 + }, + "type": "none", + "scheme": "kafka" + } +}' +``` + +After configuring the route, you can use this feature. diff --git a/rockspec/apisix-master-0.rockspec b/rockspec/apisix-master-0.rockspec index af447f1bdb01..71cd813119c9 100644 --- a/rockspec/apisix-master-0.rockspec +++ b/rockspec/apisix-master-0.rockspec @@ -53,7 +53,7 @@ dependencies = { "nginx-lua-prometheus = 0.20220127", "jsonschema = 0.9.8", "lua-resty-ipmatcher = 0.6.1", - "lua-resty-kafka = 0.07", + "lua-resty-kafka = 0.20-0", "lua-resty-logger-socket = 2.0.1-0", "skywalking-nginx-lua = 0.6.0", "base64 = 1.5-2", diff --git a/t/admin/upstream5.t b/t/admin/upstream5.t new file mode 100644 index 000000000000..e06384175d39 --- /dev/null +++ b/t/admin/upstream5.t @@ -0,0 +1,61 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +repeat_each(1); +no_long_string(); +no_root_location(); +no_shuffle(); +log_level("info"); + +add_block_preprocessor(sub { + my ($block) = @_; + + if (!$block->request) { + $block->set_value("request", "GET /t"); + } + + if (!$block->no_error_log && !$block->error_log) { + $block->set_value("no_error_log", "[error]\n[alert]"); + } +}); + +run_tests; + +__DATA__ + +=== TEST 1: set upstream(kafka scheme) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin") + local code, body = t.test("/apisix/admin/upstreams/kafka", ngx.HTTP_PUT, [[{ + "nodes": { + "127.0.0.1:9092": 1 + }, + "type": "none", + "scheme": "kafka" + }]]) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed diff --git a/t/pubsub/kafka.t b/t/pubsub/kafka.t new file mode 100644 index 000000000000..8303f5e940de --- /dev/null +++ b/t/pubsub/kafka.t @@ -0,0 +1,229 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +repeat_each(1); +no_long_string(); +no_root_location(); + +add_block_preprocessor(sub { + my ($block) = @_; + + if ((!defined $block->error_log) && (!defined $block->no_error_log)) { + $block->set_value("no_error_log", "[error]"); + } + + if (!defined $block->request) { + $block->set_value("request", "GET /t"); + } +}); + +run_tests(); + +__DATA__ + +=== TEST 1: setup all-in-one test +--- config + location /t { + content_by_lua_block { + local data = { + { + url = "/apisix/admin/routes/kafka", + data = [[{ + "upstream": { + "nodes": { + "127.0.0.1:9092": 1 + }, + "type": "none", + "scheme": "kafka" + }, + "uri": "/kafka" + }]], + }, + { + url = "/apisix/admin/routes/kafka-invalid", + data = [[{ + "upstream": { + "nodes": { + "127.0.0.1:59092": 1 + }, + "type": "none", + "scheme": "kafka" + }, + "uri": "/kafka-invalid" + }]], + }, + } + + local t = require("lib.test_admin").test + + for _, data in ipairs(data) do + local code, body = t(data.url, ngx.HTTP_PUT, data.data) + ngx.say(body) + end + } + } +--- response_body eval +"passed\n"x2 + + + +=== TEST 2: hit route (with HTTP request) +--- request +GET /kafka +--- error_code: 400 +--- error_log +failed to initialize pubsub module, err: bad "upgrade" request header: nil + + + +=== TEST 3: hit route (Kafka) +--- config + # The messages used in this test are produced in the linux-ci-init-service.sh + # script that prepares the CI environment + location /t { + content_by_lua_block { + local lib_pubsub = require("lib.pubsub") + local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/kafka") + local data = { + { + sequence = 0, + cmd_kafka_list_offset = { + topic = "not-exist", + partition = 0, + timestamp = -1, + }, + }, + { + sequence = 1, + cmd_kafka_fetch = { + topic = "not-exist", + partition = 0, + offset = 0, + }, + }, + { + -- Query first message offset + sequence = 2, + cmd_kafka_list_offset = { + topic = "test-consumer", + partition = 0, + timestamp = -2, + }, + }, + { + -- Query last message offset + sequence = 3, + cmd_kafka_list_offset = { + topic = "test-consumer", + partition = 0, + timestamp = -1, + }, + }, + { + -- Query by timestamp, 9999999999999 later than the + -- production time of any message + sequence = 4, + cmd_kafka_list_offset = { + topic = "test-consumer", + partition = 0, + timestamp = "9999999999999", + }, + }, + { + -- Query by timestamp, 1500000000000 ms earlier than the + -- production time of any message + sequence = 5, + cmd_kafka_list_offset = { + topic = "test-consumer", + partition = 0, + timestamp = "1500000000000", + }, + }, + { + sequence = 6, + cmd_kafka_fetch = { + topic = "test-consumer", + partition = 0, + offset = 14, + }, + }, + { + sequence = 7, + cmd_kafka_fetch = { + topic = "test-consumer", + partition = 0, + offset = 999, + }, + }, + } + + for i = 1, #data do + local data = test_pubsub:send_recv_ws_binary(data[i]) + if data.error_resp then + ngx.say(data.sequence..data.error_resp.message) + end + if data.kafka_list_offset_resp then + ngx.say(data.sequence.."offset: "..data.kafka_list_offset_resp.offset) + end + if data.kafka_fetch_resp then + ngx.say(data.sequence.."offset: "..data.kafka_fetch_resp.messages[1].offset.. + " msg: "..data.kafka_fetch_resp.messages[1].value) + end + end + test_pubsub:close_ws() + } + } +--- response_body +0failed to list offset, topic: not-exist, partition: 0, err: not found topic +1failed to fetch message, topic: not-exist, partition: 0, err: not found topic +2offset: 0 +3offset: 30 +4offset: -1 +5offset: 0 +6offset: 14 msg: testmsg15 +7failed to fetch message, topic: test-consumer, partition: 0, err: OFFSET_OUT_OF_RANGE + + + +=== TEST 4: hit route (Kafka with invalid node ip) +--- config + # The messages used in this test are produced in the linux-ci-init-service.sh + # script that prepares the CI environment + location /t { + content_by_lua_block { + local lib_pubsub = require("lib.pubsub") + local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/kafka-invalid") + + local data = test_pubsub:send_recv_ws_binary({ + sequence = 0, + cmd_kafka_list_offset = { + topic = "test-consumer", + partition = 0, + timestamp = -2, + }, + }) + if data.error_resp then + ngx.say(data.sequence..data.error_resp.message) + end + test_pubsub:close_ws() + } + } +--- response_body +0failed to list offset, topic: test-consumer, partition: 0, err: not found topic +--- error_log +all brokers failed in fetch topic metadata