Skip to content

Commit

Permalink
feat(pubsub): support kafka (#7032)
Browse files Browse the repository at this point in the history
  • Loading branch information
bzp2010 authored and spacewander committed Jun 30, 2022
1 parent 0580935 commit 6993631
Show file tree
Hide file tree
Showing 12 changed files with 608 additions and 13 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,9 @@ install: runtime
$(ENV_INSTALL) apisix/discovery/kubernetes/*.lua $(ENV_INST_LUADIR)/apisix/discovery/kubernetes
$(ENV_INSTALL) apisix/discovery/tars/*.lua $(ENV_INST_LUADIR)/apisix/discovery/tars

$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/pubsub
$(ENV_INSTALL) apisix/pubsub/*.lua $(ENV_INST_LUADIR)/apisix/pubsub/

$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/http
$(ENV_INSTALL) apisix/http/*.lua $(ENV_INST_LUADIR)/apisix/http/

Expand Down
57 changes: 52 additions & 5 deletions apisix/include/apisix/model/pubsub.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,24 @@ message CmdPing {
*/
message CmdEmpty {}

/**
* Get the offset of the specified topic partition from Apache Kafka.
*/
message CmdKafkaListOffset {
string topic = 1;
int32 partition = 2;
int64 timestamp = 3;
}

/**
* Fetch messages of the specified topic partition from Apache Kafka.
*/
message CmdKafkaFetch {
string topic = 1;
int32 partition = 2;
int64 offset = 3;
}

/**
* Client request definition for pubsub scenarios
*
Expand All @@ -55,16 +73,18 @@ message CmdEmpty {}
message PubSubReq {
int64 sequence = 1;
oneof req {
CmdEmpty cmd_empty = 31;
CmdPing cmd_ping = 32;
CmdEmpty cmd_empty = 31;
CmdPing cmd_ping = 32;
CmdKafkaFetch cmd_kafka_fetch = 33;
CmdKafkaListOffset cmd_kafka_list_offset = 34;
};
}

/**
* The response body of the service when an error occurs,
* containing the error code and the error message.
*/
message ErrorResp {
message ErrorResp {
int32 code = 1;
string message = 2;
}
Expand All @@ -77,6 +97,31 @@ message PongResp {
bytes state = 1;
}

/**
* The definition of a message in Kafka with the current message
* offset, production timestamp, Key, and message content.
*/
message KafkaMessage {
int64 offset = 1;
int64 timestamp = 2;
bytes key = 3;
bytes value = 4;
}

/**
* The response of Fetch messages from Apache Kafka.
*/
message KafkaFetchResp {
repeated KafkaMessage messages = 1;
}

/**
* The response of list offset from Apache Kafka.
*/
message KafkaListOffsetResp {
int64 offset = 1;
}

/**
* Server response definition for pubsub scenarios
*
Expand All @@ -90,7 +135,9 @@ message PongResp {
message PubSubResp {
int64 sequence = 1;
oneof resp {
ErrorResp error_resp = 31;
PongResp pong_resp = 32;
ErrorResp error_resp = 31;
PongResp pong_resp = 32;
KafkaFetchResp kafka_fetch_resp = 33;
KafkaListOffsetResp kafka_list_offset_resp = 34;
};
}
8 changes: 8 additions & 0 deletions apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ local xrpc = require("apisix.stream.xrpc")
local ctxdump = require("resty.ctxdump")
local ngx_balancer = require("ngx.balancer")
local debug = require("apisix.debug")
local pubsub_kafka = require("apisix.pubsub.kafka")
local ngx = ngx
local get_method = ngx.req.get_method
local ngx_exit = ngx.exit
Expand Down Expand Up @@ -504,6 +505,13 @@ function _M.http_access_phase()
api_ctx.upstream_scheme = "grpc"
end

-- load balancer is not required by kafka upstream, so the upstream
-- node selection process is intercepted and left to kafka to
-- handle on its own
if api_ctx.matched_upstream and api_ctx.matched_upstream.scheme == "kafka" then
return pubsub_kafka.access(api_ctx)
end

local code, err = set_upstream(route, api_ctx)
if code then
core.log.error("failed to set upstream: ", err)
Expand Down
137 changes: 137 additions & 0 deletions apisix/pubsub/kafka.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

local core = require("apisix.core")
local bconsumer = require("resty.kafka.basic-consumer")
local ffi = require("ffi")
local C = ffi.C
local tostring = tostring
local type = type
local ipairs = ipairs
local str_sub = string.sub

ffi.cdef[[
int64_t atoll(const char *num);
]]


local _M = {}


-- Handles the conversion of 64-bit integers in the lua-protobuf.
--
-- Because of the limitations of luajit, we cannot use native 64-bit
-- numbers, so pb decode converts int64 to a string in #xxx format
-- to avoid loss of precision, by this function, we convert this
-- string to int64 cdata numbers.
local function pb_convert_to_int64(src)
if type(src) == "string" then
-- the format is #1234, so there is a small minimum length of 2
if #src < 2 then
return 0
end
return C.atoll(ffi.cast("char *", src) + 1)
else
return src
end
end


-- Takes over requests of type kafka upstream in the http_access phase.
function _M.access(api_ctx)
local pubsub, err = core.pubsub.new()
if not pubsub then
core.log.error("failed to initialize pubsub module, err: ", err)
core.response.exit(400)
return
end

local up_nodes = api_ctx.matched_upstream.nodes

-- kafka client broker-related configuration
local broker_list = {}
for i, node in ipairs(up_nodes) do
broker_list[i] = {
host = node.host,
port = node.port,
}
end

local client_config = {refresh_interval = 30 * 60 * 1000}

-- load and create the consumer instance when it is determined
-- that the websocket connection was created successfully
local consumer = bconsumer:new(broker_list, client_config)

pubsub:on("cmd_kafka_list_offset", function (params)
-- The timestamp parameter uses a 64-bit integer, which is difficult
-- for luajit to handle well, so the int64_as_string option in
-- lua-protobuf is used here. Smaller numbers will be decoded as
-- lua number, while overly larger numbers will be decoded as strings
-- in the format #number, where the # symbol at the beginning of the
-- string will be removed and converted to int64_t with the atoll function.
local timestamp = pb_convert_to_int64(params.timestamp)

local offset, err = consumer:list_offset(params.topic, params.partition, timestamp)

if not offset then
return nil, "failed to list offset, topic: " .. params.topic ..
", partition: " .. params.partition .. ", err: " .. err
end

offset = tostring(offset)
return {
kafka_list_offset_resp = {
offset = str_sub(offset, 1, #offset - 2)
}
}
end)

pubsub:on("cmd_kafka_fetch", function (params)
local offset = pb_convert_to_int64(params.offset)

local ret, err = consumer:fetch(params.topic, params.partition, offset)
if not ret then
return nil, "failed to fetch message, topic: " .. params.topic ..
", partition: " .. params.partition .. ", err: " .. err
end

-- split into multiple messages when the amount of data in
-- a single batch is too large
local messages = ret.records

-- special handling of int64 for luajit compatibility
for _, message in ipairs(messages) do
local timestamp = tostring(message.timestamp)
message.timestamp = str_sub(timestamp, 1, #timestamp - 2)
local offset = tostring(message.offset)
message.offset = str_sub(offset, 1, #offset - 2)
end

return {
kafka_fetch_resp = {
messages = messages,
},
}
end)

-- start processing client commands
pubsub:wait()
end


return _M
6 changes: 4 additions & 2 deletions apisix/schema_def.lua
Original file line number Diff line number Diff line change
Expand Up @@ -451,10 +451,12 @@ local upstream_schema = {
},
scheme = {
default = "http",
enum = {"grpc", "grpcs", "http", "https", "tcp", "tls", "udp"},
enum = {"grpc", "grpcs", "http", "https", "tcp", "tls", "udp",
"kafka"},
description = "The scheme of the upstream." ..
" For L7 proxy, it can be one of grpc/grpcs/http/https." ..
" For L4 proxy, it can be one of tcp/tls/udp."
" For L4 proxy, it can be one of tcp/tls/udp." ..
" For specific protocols, it can be kafka."
},
labels = labels_def,
discovery_type = {
Expand Down
9 changes: 9 additions & 0 deletions ci/linux-ci-init-service.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@
docker exec -i apache-apisix_kafka-server1_1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server1:2181 --replication-factor 1 --partitions 1 --topic test2
docker exec -i apache-apisix_kafka-server1_1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server1:2181 --replication-factor 1 --partitions 3 --topic test3
docker exec -i apache-apisix_kafka-server2_1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server2:2181 --replication-factor 1 --partitions 1 --topic test4
docker exec -i apache-apisix_kafka-server1_1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server1:2181 --replication-factor 1 --partitions 1 --topic test-consumer

# create messages for test-consumer
for i in `seq 30`
do
docker exec -i apache-apisix_kafka-server1_1 bash -c "echo "testmsg$i" | /opt/bitnami/kafka/bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test-consumer"
echo "Produces messages to the test-consumer topic, msg: testmsg$i"
done
echo "Kafka service initialization completed"

# prepare openwhisk env
docker pull openwhisk/action-nodejs-v14:nightly
Expand Down
3 changes: 2 additions & 1 deletion docs/en/latest/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@
"type": "category",
"label": "PubSub",
"items": [
"pubsub"
"pubsub",
"pubsub/kafka"
]
},
{
Expand Down
12 changes: 8 additions & 4 deletions docs/en/latest/pubsub.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
title: PubSub
keywords:
- APISIX
- Pub-Sub
- PubSub
description: This document contains information about the Apache APISIX pubsub framework.
---

Expand All @@ -25,7 +25,7 @@ description: This document contains information about the Apache APISIX pubsub f
#
-->

## What is Pub-Sub
## What is PubSub

Publish-subscribe is a messaging paradigm:

Expand All @@ -38,9 +38,13 @@ In Apache APISIX, the most common scenario is handling north-south traffic from

## Architecture

![pub-sub architecture](../../assets/images/pubsub-architecture.svg)
![pubsub architecture](../../assets/images/pubsub-architecture.svg)

Currently, Apache APISIX supports WebSocket communication with the client, which can be any application that supports WebSocket, with Protocol Buffer as the serialization mechanism, see the [protocol definition](../../../apisix/pubsub.proto).
Currently, Apache APISIX supports WebSocket communication with the client, which can be any application that supports WebSocket, with Protocol Buffer as the serialization mechanism, see the [protocol definition](https://github.com/apache/apisix/blob/master/apisix/include/apisix/model/pubsub.proto).

## Supported messaging systems

- [Aapche Kafka](pubsub/kafka.md)

## How to support other messaging systems

Expand Down
Loading

0 comments on commit 6993631

Please sign in to comment.