Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): support kafka #7032

Merged
merged 20 commits into from
May 16, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,9 @@ install: runtime
$(ENV_INSTALL) apisix/discovery/kubernetes/*.lua $(ENV_INST_LUADIR)/apisix/discovery/kubernetes
$(ENV_INSTALL) apisix/discovery/tars/*.lua $(ENV_INST_LUADIR)/apisix/discovery/tars

$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/pubsub
$(ENV_INSTALL) apisix/pubsub/*.lua $(ENV_INST_LUADIR)/apisix/pubsub/

$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/http
$(ENV_INSTALL) apisix/http/*.lua $(ENV_INST_LUADIR)/apisix/http/

Expand Down
57 changes: 52 additions & 5 deletions apisix/include/apisix/model/pubsub.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,24 @@ message CmdPing {
*/
message CmdEmpty {}

/**
* Get the offset of the specified topic partition from Apache Kafka.
*/
message CmdKafkaListOffset {
string topic = 1;
int32 partition = 2;
int64 timestamp = 3;
}

/**
* Fetch messages of the specified topic partition from Apache Kafka.
*/
message CmdKafkaFetch {
string topic = 1;
int32 partition = 2;
int64 offset = 3;
}

/**
* Client request definition for pubsub scenarios
*
Expand All @@ -55,16 +73,18 @@ message CmdEmpty {}
message PubSubReq {
int64 sequence = 1;
oneof req {
CmdEmpty cmd_empty = 31;
CmdPing cmd_ping = 32;
CmdEmpty cmd_empty = 31;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove cmd_empty which is test-only? Using cmd_kafka_fetch in pubsub.t is enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@spacewander This would make the pubsub module test the relevant code that relies on kafka, and I'm not sure if I should do that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about adding a comment to show that this cmd is test-only?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After rechecking, I found that CmdEmpty has added a test-only flag.

/**
* An empty command, a placeholder for testing purposes only
*/
message CmdEmpty {}

CmdPing cmd_ping = 32;
CmdKafkaFetch cmd_kafka_fetch = 33;
CmdKafkaListOffset cmd_kafka_list_offset = 34;
};
}

/**
* The response body of the service when an error occurs,
* containing the error code and the error message.
*/
message ErrorResp {
message ErrorResp {
int32 code = 1;
string message = 2;
}
Expand All @@ -77,6 +97,31 @@ message PongResp {
bytes state = 1;
}

/**
* The definition of a message in Kafka with the current message
* offset, production timestamp, Key, and message content.
*/
message KafkaMessage {
int64 offset = 1;
int64 timestamp = 2;
bytes key = 3;
bytes value = 4;
}

/**
* The response of Fetch messages from Apache Kafka.
*/
message KafkaFetchResp {
repeated KafkaMessage messages = 1;
}

/**
* The response of list offset from Apache Kafka.
*/
message KafkaListOffsetResp {
int64 offset = 1;
}

/**
* Server response definition for pubsub scenarios
*
Expand All @@ -90,7 +135,9 @@ message PongResp {
message PubSubResp {
int64 sequence = 1;
oneof resp {
ErrorResp error_resp = 31;
PongResp pong_resp = 32;
ErrorResp error_resp = 31;
PongResp pong_resp = 32;
KafkaFetchResp kafka_fetch_resp = 33;
KafkaListOffsetResp kafka_list_offset_resp = 34;
};
}
8 changes: 8 additions & 0 deletions apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ local xrpc = require("apisix.stream.xrpc")
local ctxdump = require("resty.ctxdump")
local ngx_balancer = require("ngx.balancer")
local debug = require("apisix.debug")
local pubsub_kafka = require("apisix.pubsub.kafka")
local ngx = ngx
local get_method = ngx.req.get_method
local ngx_exit = ngx.exit
Expand Down Expand Up @@ -504,6 +505,13 @@ function _M.http_access_phase()
api_ctx.upstream_scheme = "grpc"
end

-- load balancer is not required by kafka upstream, so the upstream
-- node selection process is intercepted and left to kafka to
-- handle on its own
if api_ctx.matched_upstream and api_ctx.matched_upstream.scheme == "kafka" then
return pubsub_kafka.access(api_ctx)
end

local code, err = set_upstream(route, api_ctx)
if code then
core.log.error("failed to set upstream: ", err)
Expand Down
133 changes: 133 additions & 0 deletions apisix/pubsub/kafka.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

local core = require("apisix.core")
local bconsumer = require("resty.kafka.basic-consumer")
local ffi = require("ffi")
local C = ffi.C
local tostring = tostring
local type = type
local ipairs = ipairs
local str_sub = string.sub

ffi.cdef[[
int64_t atoll(const char *num);
]]


local _M = {}


-- Handles the conversion of 64-bit integers in the lua-protobuf.
--
-- Because of the limitations of luajit, we cannot use native 64-bit
-- numbers, so pb decode converts int64 to a string in #xxx format
-- to avoid loss of precision, by this function, we convert this
-- string to int64 cdata numbers.
local function pb_convert_to_int64(src)
if type(src) == "string" then
return C.atoll(ffi.cast("char *", src) + 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's check src length to avoid out of bound

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

else
return src
end
end


-- Takes over requests of type kafka upstream in the http_access phase.
function _M.access(api_ctx)
local pubsub, err = core.pubsub.new()
if not pubsub then
core.log.error("failed to initialize pubsub module, err: ", err)
core.response.exit(400)
tzssangglass marked this conversation as resolved.
Show resolved Hide resolved
return
end

local up_nodes = api_ctx.matched_upstream.nodes

-- kafka client broker-related configuration
local broker_list = {}
for i, node in ipairs(up_nodes) do
broker_list[i] = {
host = node.host,
port = node.port,
}
end

local client_config = {refresh_interval = 30 * 60 * 1000}
tzssangglass marked this conversation as resolved.
Show resolved Hide resolved

-- load and create the consumer instance when it is determined
-- that the websocket connection was created successfully
local consumer = bconsumer:new(broker_list, client_config)
tzssangglass marked this conversation as resolved.
Show resolved Hide resolved

pubsub:on("cmd_kafka_list_offset", function (params)
-- The timestamp parameter uses a 64-bit integer, which is difficult
-- for luajit to handle well, so the int64_as_string option in
-- lua-protobuf is used here. Smaller numbers will be decoded as
-- lua number, while overly larger numbers will be decoded as strings
-- in the format #number, where the # symbol at the beginning of the
-- string will be removed and converted to int64_t with the atoll function.
local timestamp = pb_convert_to_int64(params.timestamp)

local offset, err = consumer:list_offset(params.topic, params.partition, timestamp)

if not offset then
return nil, "failed to list offset, topic: " .. params.topic ..
", partition: " .. params.partition .. ", err: " .. err
end

offset = tostring(offset)
return {
kafka_list_offset_resp = {
offset = str_sub(offset, 1, #offset - 2)
}
}
end)

pubsub:on("cmd_kafka_fetch", function (params)
local offset = pb_convert_to_int64(params.offset)

local ret, err = consumer:fetch(params.topic, params.partition, offset)
if not ret then
return nil, "failed to fetch message, topic: " .. params.topic ..
", partition: " .. params.partition .. ", err: " .. err
end

-- split into multiple messages when the amount of data in
-- a single batch is too large
local messages = ret.records

-- special handling of int64 for luajit compatibility
for _, message in ipairs(messages) do
local timestamp = tostring(message.timestamp)
message.timestamp = str_sub(timestamp, 1, #timestamp - 2)
local offset = tostring(message.offset)
message.offset = str_sub(offset, 1, #offset - 2)
end

return {
kafka_fetch_resp = {
messages = messages,
},
}
end)

-- start processing client commands
pubsub:wait()
end


return _M
6 changes: 4 additions & 2 deletions apisix/schema_def.lua
Original file line number Diff line number Diff line change
Expand Up @@ -451,10 +451,12 @@ local upstream_schema = {
},
scheme = {
default = "http",
enum = {"grpc", "grpcs", "http", "https", "tcp", "tls", "udp"},
enum = {"grpc", "grpcs", "http", "https", "tcp", "tls", "udp",
"kafka"},
description = "The scheme of the upstream." ..
" For L7 proxy, it can be one of grpc/grpcs/http/https." ..
" For L4 proxy, it can be one of tcp/tls/udp."
" For L4 proxy, it can be one of tcp/tls/udp." ..
" For specific protocols, it can be kafka."
},
labels = labels_def,
discovery_type = {
Expand Down
9 changes: 9 additions & 0 deletions ci/linux-ci-init-service.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@
docker exec -i apache-apisix_kafka-server1_1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server1:2181 --replication-factor 1 --partitions 1 --topic test2
docker exec -i apache-apisix_kafka-server1_1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server1:2181 --replication-factor 1 --partitions 3 --topic test3
docker exec -i apache-apisix_kafka-server2_1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server2:2181 --replication-factor 1 --partitions 1 --topic test4
docker exec -i apache-apisix_kafka-server1_1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server1:2181 --replication-factor 1 --partitions 1 --topic test-consumer

# create messages for test-consumer
for i in `seq 30`
do
docker exec -i apache-apisix_kafka-server1_1 bash -c "echo "testmsg$i" | /opt/bitnami/kafka/bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test-consumer"
echo "Produces messages to the test-consumer topic, msg: testmsg$i"
done
echo "Kafka service initialization completed"

# prepare openwhisk env
docker pull openwhisk/action-nodejs-v14:nightly
Expand Down
3 changes: 2 additions & 1 deletion docs/en/latest/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@
"type": "category",
"label": "PubSub",
"items": [
"pubsub"
"pubsub",
"pubsub/kafka"
]
},
{
Expand Down
4 changes: 4 additions & 0 deletions docs/en/latest/pubsub.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ In Apache APISIX, the most common scenario is handling north-south traffic from

Currently, Apache APISIX supports WebSocket communication with the client, which can be any application that supports WebSocket, with Protocol Buffer as the serialization mechanism, see the [protocol definition](../../../apisix/pubsub.proto).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's update the path of definition. We should use absolute path as it's not in website

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed


## Supported messaging systems

- [Aapche Kafka](pubsub/kafka.md)

## How to support other messaging systems

Apache APISIX implement an extensible pubsub module, which is responsible for starting the WebSocket server, coding and decoding communication protocols, handling client commands, and adding support for the new messaging system.
Expand Down
Loading