Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support kafka consumer for pubsub scenario #6995

Closed
wants to merge 98 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
98 commits
Select commit Hold shift + click to select a range
a0d1743
feat: mvp
bzp2010 May 6, 2022
8a354ef
chore: update dependency
bzp2010 May 6, 2022
2f71053
feat: add ssl verify to upstream
bzp2010 May 6, 2022
a6292fb
chore: fix
bzp2010 May 6, 2022
d40d19a
fix: typo
bzp2010 May 6, 2022
32f2182
chore: add license
bzp2010 May 6, 2022
3b3625f
feat: separate pubsub module
bzp2010 May 6, 2022
53c4c21
refactor: kafka access phase
bzp2010 May 6, 2022
3a22a26
fix: typo
bzp2010 May 6, 2022
7058417
fix: locally global var
bzp2010 May 6, 2022
6f84c48
fix: ensure install pubsub.proto
bzp2010 May 6, 2022
0eea32b
fix
bzp2010 May 6, 2022
3cb6506
fix: lint
bzp2010 May 6, 2022
eacdc7a
fix: compatible upstream tls verify
bzp2010 May 6, 2022
0871a90
fix: update schema check rule
bzp2010 May 6, 2022
fd2dd12
fix: recovery check rule
bzp2010 May 6, 2022
07162a8
test
bzp2010 May 6, 2022
abf9bee
fix: avoid matched_upstream check errors
bzp2010 May 7, 2022
646a656
chore: update kafka consumer config define
bzp2010 May 7, 2022
621be00
test: support upstream tls
bzp2010 May 7, 2022
f6c60d3
test: improve plugin schema check
bzp2010 May 7, 2022
37539f1
test: add kafka upstream base cases
bzp2010 May 7, 2022
7a3ead7
test: create test conusmer topic
bzp2010 May 7, 2022
5ac5b38
test: reduce testmsg count and add msgid
bzp2010 May 7, 2022
20eedff
test: add kafka consumer cases
bzp2010 May 7, 2022
be28226
feat: let kafka upstream obey tls
bzp2010 May 7, 2022
ca99c68
chore: remove unused ctx var
bzp2010 May 7, 2022
1cf7b4c
fix: lint
bzp2010 May 7, 2022
d6434c2
fix: create test messages
bzp2010 May 7, 2022
b07dc70
feat: add pub-sub test in ci
bzp2010 May 7, 2022
dbec646
fix: ci
bzp2010 May 7, 2022
826f89a
fix: ci
bzp2010 May 7, 2022
d6c8628
test: add pubsub in centos7
bzp2010 May 7, 2022
7de4bce
chore: change plugin to kafka-proxy
bzp2010 May 7, 2022
464a028
chore: recover tls config
bzp2010 May 7, 2022
4238e62
feat: move tls and verify to plugin
bzp2010 May 7, 2022
491a16b
docs: add plugin to sidebar
bzp2010 May 7, 2022
0bdfe23
docs: add tls to kafka-proxy plugin
bzp2010 May 7, 2022
4eadcb7
fix: ci
bzp2010 May 8, 2022
a7b4b63
fix: typo
bzp2010 May 8, 2022
ef152fe
fix: ci
bzp2010 May 8, 2022
0efa8f6
docs: add pubsub module documentations
bzp2010 May 8, 2022
448b9bc
docs: update typo
bzp2010 May 8, 2022
0ce6fba
docs: add kafka pubsub documentations
bzp2010 May 8, 2022
56bb7a6
fix: lint
bzp2010 May 8, 2022
dce5b4f
docs: add to sidebar
bzp2010 May 8, 2022
9227083
docs: fix typo
bzp2010 May 8, 2022
bf1020c
Merge branch 'master' into feat-kafka
bzp2010 May 8, 2022
7a6854f
chore: add plugin to config-default
bzp2010 May 8, 2022
5a1f388
fix: typo
bzp2010 May 8, 2022
08bf6b8
docs: update architecture image
bzp2010 May 8, 2022
0b8ed17
docs: update architecture image
bzp2010 May 8, 2022
244a293
test: fix plugin list
bzp2010 May 8, 2022
c75f918
docs: fix review
bzp2010 May 9, 2022
3eace42
docs: adjust desc
bzp2010 May 9, 2022
dc6832e
docs: fix typo
bzp2010 May 9, 2022
a16cf7f
Merge branch 'master' into feat-kafka
bzp2010 May 9, 2022
501efb6
feat: support tls and sasl in kafka test server
bzp2010 May 9, 2022
3d30ee9
fix: ci
bzp2010 May 9, 2022
4cecaf1
fix: ci
bzp2010 May 9, 2022
7529e7b
fix: ci
bzp2010 May 9, 2022
db34949
test: add tls cases in kafka-proxy
bzp2010 May 9, 2022
f423ae4
feat: sasl change to table
bzp2010 May 9, 2022
359418b
docs: add ldoc for pubsub module
bzp2010 May 9, 2022
66b1759
docs: fix typo
bzp2010 May 9, 2022
a467ba4
chore: move kafka_access to separate module
bzp2010 May 9, 2022
5792c59
docs: change category name
bzp2010 May 9, 2022
7384042
docs: update kafka-proxy keyword
bzp2010 May 9, 2022
58a973d
docs: update
bzp2010 May 9, 2022
c732581
docs: add comments to pubsub proto
bzp2010 May 9, 2022
0130cf3
fix: kafka tls and sasl
bzp2010 May 9, 2022
78b601a
fix: lint
bzp2010 May 9, 2022
0ed9d7c
chore: install pubsub package
bzp2010 May 9, 2022
a572035
fix: lint
bzp2010 May 9, 2022
0a7a1f6
fix: lint
bzp2010 May 9, 2022
181d31d
chore: change to dynamic generate kafka cert
bzp2010 May 9, 2022
ec377a1
ci: debug
bzp2010 May 9, 2022
b1ddf53
fix: ci
bzp2010 May 9, 2022
8d5b8f8
ci: debug
bzp2010 May 9, 2022
9eef154
ci: add tips
bzp2010 May 10, 2022
8e15ebd
fix: ci
bzp2010 May 10, 2022
b7e288d
test: move kafka to admin case
bzp2010 May 10, 2022
49e30b1
feat: move back tls to upstream
bzp2010 May 10, 2022
c8b6696
chore: remove tls in kafka-proxy
bzp2010 May 10, 2022
73bed0a
ci: move pubsub cases
bzp2010 May 10, 2022
ad8facf
ci: move pubsub cases
bzp2010 May 10, 2022
1d1206c
fix: ci
bzp2010 May 10, 2022
6d72f21
feat: config socket tls by upstream
bzp2010 May 10, 2022
53d21e7
test: tls cases
bzp2010 May 10, 2022
c9c88a6
feat: add tls default value
bzp2010 May 10, 2022
4c75286
test: add tls and sasl cases
bzp2010 May 10, 2022
23c273d
fix: ci
bzp2010 May 10, 2022
d79d439
docs: remove enable tls
bzp2010 May 10, 2022
6fa0e26
chore: move pubsub proto
bzp2010 May 10, 2022
78cd57b
chore: adjust pubsub event loop error handle
bzp2010 May 10, 2022
6f651aa
fix: install
bzp2010 May 11, 2022
9e9b7e2
fix: lint
bzp2010 May 11, 2022
f085dd9
fix
bzp2010 May 11, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
- linux_openresty_1_17
test_dir:
- t/plugin
- t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc
- t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc t/pubsub
- t/node t/router t/script t/stream-node t/utils t/wasm t/xds-library t/xrpc
Copy link
Member

@spacewander spacewander May 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should add the t/pubsub to this as the t/plugin is the most time-consuming.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to t/admin t/cli .... t/pubsub runner, it is currently the least time-consuming runner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the rule for dividing the test cases is not to make the time cost in balance, but to put t/plugin in a separate job, and arrange the remaining into two groups according to the lexicographic order

Copy link
Contributor Author

@bzp2010 bzp2010 May 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed #7043

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolved #7043


runs-on: ${{ matrix.platform }}
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/centos7-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
matrix:
test_dir:
- t/plugin
- t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc
- t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc t/pubsub
- t/node t/router t/script t/stream-node t/utils t/wasm t/xds-library

steps:
Expand Down Expand Up @@ -79,6 +79,8 @@ jobs:

- name: Run other docker containers for test
run: |
# generating SSL certificates for Kafka
keytool -genkeypair -keyalg RSA -dname "CN=127.0.0.1" -alias 127.0.0.1 -keystore ./ci/pod/kafka/kafka-server/selfsigned.jks -validity 365 -keysize 2048 -storepass changeit
make ci-env-up
./ci/linux-ci-init-service.sh

Expand Down
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ install: runtime
$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix
$(ENV_INSTALL) apisix/*.lua $(ENV_INST_LUADIR)/apisix/

$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/include/apisix/model
$(ENV_INSTALL) apisix/include/apisix/model/*.proto $(ENV_INST_LUADIR)/apisix/include/apisix/model/

$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/admin
$(ENV_INSTALL) apisix/admin/*.lua $(ENV_INST_LUADIR)/apisix/admin/

Expand Down Expand Up @@ -340,6 +343,9 @@ install: runtime
$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/plugins/zipkin
$(ENV_INSTALL) apisix/plugins/zipkin/*.lua $(ENV_INST_LUADIR)/apisix/plugins/zipkin/

$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/pubsub
$(ENV_INSTALL) apisix/pubsub/*.lua $(ENV_INST_LUADIR)/apisix/pubsub/

$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/ssl/router
$(ENV_INSTALL) apisix/ssl/router/*.lua $(ENV_INST_LUADIR)/apisix/ssl/router/

Expand Down
1 change: 1 addition & 0 deletions apisix/core.lua
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,5 @@ return {
tablepool = require("tablepool"),
resolver = require("apisix.core.resolver"),
os = require("apisix.core.os"),
pubsub = require("apisix.core.pubsub"),
}
167 changes: 167 additions & 0 deletions apisix/core/pubsub.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
--
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add ldoc like other files under core/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

--- Extensible framework to support publish-and-subscribe scenarios
--
-- @module core.pubsub

local log = require("apisix.core.log")
local ws_server = require("resty.websocket.server")
local protoc = require("protoc")
local pb = require("pb")
local setmetatable = setmetatable
local pcall = pcall
local pairs = pairs

protoc.reload()
pb.option("int64_as_string")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this call pollute the global option of pb?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@spacewander

I think it's possible that this pb module is a C module imported from pb.so, which is perhaps loaded only once; I can't be sure. 🤔

BTW, user-defined option lists are allowed in grpc-transcode, so if this is possible here, grpc-transcode will also have problems (multiple different grpc routes allow different options to be defined)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can solve with the pb_State like how we store the proto in grpc-transcode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to research this further and will update it later if needed, you can review the other code first.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I help you to do it today, and put the result as: #7025

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed #7028

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolved as grpc-transcode in #7028

local pubsub_protoc = protoc.new()


local _M = { version = 0.1 }
local mt = { __index = _M }


---
-- Create pubsub module instance
--
-- @function core.pubsub.new
-- @treturn pubsub module instance
-- @treturn string|nil error message if present
-- @usage
-- local pubsub, err = core.pubsub.new()
function _M.new()
-- compile the protobuf file on initial load module
-- ensure that each worker is loaded once
if not pubsub_protoc.loaded["pubsub.proto"] then
pubsub_protoc:addpath("apisix/include/apisix/model")
local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
if not ok then
pubsub_protoc:reset()
return nil, "failed to load pubsub protocol: "..err
end
end

local ws, err = ws_server:new()
if not ws then
return nil, err
end

local obj = setmetatable({
ws_server = ws,
cmd_handler = {},
}, mt)

return obj
end


---
-- Add command callbacks to pubsub module instances
--
-- The callback function prototype: function (params)
-- The params in the parameters contain the data defined in the requested command.
-- Its first return value is the data, which needs to contain the data needed for
-- the particular resp, returns nil if an error exists.
-- Its second return value is a string type error message, no need to return when
-- no error exists.
--
-- @function core.pubsub.on
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add some @tparam

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed #7043

-- @usage
-- pubsub:on(command, function (params)
-- return data, err
-- end)
function _M.on(self, command, handler)
self.cmd_handler[command] = handler
end


---
-- Put the pubsub instance into an event loop, waiting to process client commands
--
-- @function core.pubsub.wait
-- @treturn string|nil error message if present, will terminate the event loop
-- @usage
-- local err = pubsub:wait()
function _M.wait(self)
local ws = self.ws_server
while true do
-- read raw data frames from websocket connection
local raw_data, raw_type, err = ws:recv_frame()
if err then
-- terminate the event loop when a fatal error occurs
if ws.fatal then
ws:send_close()
return "websocket server: "..err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to use break in the while loop and handle the error in the same place

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

end

-- skip this loop for non-fatal errors
log.error("failed to receive websocket frame: "..err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.error("failed to receive websocket frame: "..err)
log.error("failed to receive websocket frame: ", err)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

goto continue
end

-- handle client close connection
if raw_type == "close" then
ws:send_close()
return
end

-- the pub-sub messages use binary, if the message is not
-- binary, skip this message
if raw_type ~= "binary" then
goto continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to add warn log for unexpected input

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

end

local data = pb.decode("PubSubReq", raw_data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check decode error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkd

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

local sequence = data.sequence

-- call command handler to generate response data
for key, value in pairs(data) do
-- There are sequence and command properties in the data,
-- select the handler according to the command value.
if key ~= "sequence" then
local handler = self.cmd_handler[key]
if not handler then
log.error("callback handler not registered for the",
" this command, command: ", key)
goto continue
end

local resp, err = handler(value)
if not resp then
ws:send_binary(pb.encode("PubSubResp", {
sequence = sequence,
error_resp = {
code = 0,
message = err,
},
}))
goto continue
end

-- write back the sequence
resp.sequence = sequence
ws:send_binary(pb.encode("PubSubResp", resp))
end
end

::continue::
end
end


return _M
117 changes: 117 additions & 0 deletions apisix/include/apisix/model/pubsub.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
//
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

syntax = "proto3";

option java_package = "org.apache.apisix.api.pubsub";
option java_outer_classname = "PubSubProto";
option java_multiple_files = true;
option go_package = "github.com/apache/apisix/api/pubsub;pubsub";

/**
* Get the offset of the specified topic partition from Apache Kafka.
*/
message CmdKafkaListOffset {
string topic = 1;
int32 partition = 2;
int64 timestamp = 3;
}

/**
* Fetch messages of the specified topic partition from Apache Kafka.
*/
message CmdKafkaFetch {
string topic = 1;
int32 partition = 2;
int64 offset = 3;
}

/**
* Client request definition for pubsub scenarios
*
* The sequence field is used to associate requests and responses.
* Apache APISIX will set a consistent sequence for the associated
* requests and responses, and the client can explicitly know the
* response corresponding to any of the requests.
*
* The req field is the command data sent by the client, and its
* type will be chosen from any of the lists in the definition.
*
* Field numbers 1 to 30 in the definition are used to define basic
* information and future extensions, and numbers after 30 are used
* to define commands.
*/
message PubSubReq {
int64 sequence = 1;
oneof req {
CmdKafkaFetch cmd_kafka_fetch = 31;
CmdKafkaListOffset cmd_kafka_list_offset = 32;
};
}

/**
* The definition of a message in Kafka with the current message
* offset, production timestamp, Key, and message content.
*/
message KafkaMessage {
int64 offset = 1;
int64 timestamp = 2;
bytes key = 3;
bytes value = 4;
}

/**
* The response body of the service when an error occurs,
* containing the error code and the error message.
*/
message ErrorResp {
int32 code = 1;
string message = 2;
}

/**
* The response of Fetch messages from Apache Kafka.
*/
message KafkaFetchResp {
repeated KafkaMessage messages = 1;
}

/**
* The response of list offset from Apache Kafka.
*/
message KafkaListOffsetResp {
int64 offset = 1;
}

/**
* Server response definition for pubsub scenarios
*
* The sequence field will be the same as the value in the
* request, which is used to associate the associated request
* and response.
*
* The resp field is the response data sent by the server, and
* its type will be chosen from any of the lists in the definition.
*/
message PubSubResp {
int64 sequence = 1;
oneof resp {
ErrorResp error_resp = 31;
KafkaFetchResp kafka_fetch_resp = 32;
KafkaListOffsetResp kafka_list_offset_resp = 33;
};
}
6 changes: 6 additions & 0 deletions apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ local xrpc = require("apisix.stream.xrpc")
local ctxdump = require("resty.ctxdump")
local ngx_balancer = require("ngx.balancer")
local debug = require("apisix.debug")
local pubsub_kafka = require("apisix.pubsub.kafka")
local ngx = ngx
local get_method = ngx.req.get_method
local ngx_exit = ngx.exit
Expand Down Expand Up @@ -504,6 +505,11 @@ function _M.http_access_phase()
api_ctx.upstream_scheme = "grpc"
end

-- load balancer is not required by kafka upstream
if api_ctx.matched_upstream and api_ctx.matched_upstream.scheme == "kafka" then
return pubsub_kafka.access(api_ctx)
end

local code, err = set_upstream(route, api_ctx)
if code then
core.log.error("failed to set upstream: ", err)
Expand Down
Loading