Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: improve the implementation of pubsub module #7043

Merged
merged 7 commits into from
May 13, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ jobs:
- linux_openresty_1_17
test_dir:
- t/plugin
- t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc t/pubsub
- t/node t/router t/script t/stream-node t/utils t/wasm t/xds-library t/xrpc
- t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc
- t/node t/pubsub t/router t/script t/stream-node t/utils t/wasm t/xds-library t/xrpc
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pubsub < node

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused, according to the principle you mentioned at #6995 (comment), p should be in the middle of n and r in alphabetical order except for plugin. lmnopqrst


runs-on: ${{ matrix.platform }}
timeout-minutes: 90
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/centos7-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ jobs:
matrix:
test_dir:
- t/plugin
- t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc t/pubsub
- t/node t/router t/script t/stream-node t/utils t/wasm t/xds-library
- t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc
- t/node t/pubsub t/router t/script t/stream-node t/utils t/wasm t/xds-library

steps:
- name: Check out code
Expand Down
95 changes: 51 additions & 44 deletions apisix/core/pubsub.lua
Original file line number Diff line number Diff line change
Expand Up @@ -58,23 +58,42 @@ local function init_pb_state()
end


-- parse command name and parameters from client message
local function get_cmd(data)
for key, value in pairs(data) do
-- There are sequence and command properties in the data,
-- select the handler according to the command value.
if key ~= "sequence" then
return key, value
end
end
end


-- send generic response to client
local function send_resp(ws, sequence, data)
data.sequence = sequence
local ok, encoded = pcall(pb.encode, "PubSubResp", data)
if not ok or not data then
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not data?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A careless mistake that I will fix

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.error("failed to encode response message, err: ", encoded)
return
end

local _, err = ws:send_binary(encoded)
if err then
log.error("failed to send response to client, err: ", err)
end
end


-- send error response to client
local function send_error(ws, sequence, err_msg)
local ok, data = pcall(pb.encode, "PubSubResp", {
sequence = sequence,
return send_resp(ws, sequence, {
error_resp = {
code = 0,
message = err_msg,
},
})
if not ok or not data then
log.error("failed to encode error response message, err: ", data)
end

local _, err = ws:send_binary(data)
if err then
log.error("failed to send response to client, err: ", err)
end
end


Expand Down Expand Up @@ -119,8 +138,8 @@ end
-- no error exists.
--
-- @function core.pubsub.on
-- @tparam string command to add callback
-- @tparam function handler callback on receipt of command
-- @tparam string The command to add callback.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
-- @tparam string The command to add callback.
-- @tparam string command The command to add callback.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

-- @tparam func handler The callback function on receipt of command.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-- @tparam function ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

The documentation indicates that func should be used.

-- @usage
-- pubsub:on(command, function (params)
-- return data, err
Expand Down Expand Up @@ -180,40 +199,28 @@ function _M.wait(self)
-- command sequence code
local sequence = data.sequence

-- call command handler to generate response data
for key, value in pairs(data) do
-- There are sequence and command properties in the data,
-- select the handler according to the command value.
if key ~= "sequence" then
local handler = self.cmd_handler[key]
if not handler then
log.error("pubsub callback handler not registered for the",
" command, command: ", key)
send_error(ws, sequence, "unknown command: " .. key)
break
end

local resp, err = handler(value)
if not resp then
send_error(ws, sequence, err)
break
end

-- write back the sequence
resp.sequence = sequence
local ok, data = pcall(pb.encode, "PubSubResp", resp)
if not ok or not data then
log.error("failed to encode response message, err: ", data)
break
end
local _, err = ws:send_binary(data)
if err then
log.error("failed to send response to client, err: ", err)
end
break
end
local cmd, params = get_cmd(data)
if not cmd and not params then
log.warn("pubsub server receives empty command")
goto continue
end

-- find the handler for the current command
local handler = self.cmd_handler[cmd]
if not handler then
log.error("pubsub callback handler not registered for the",
" command, command: ", cmd)
send_error(ws, sequence, "unknown command")
goto continue
end

-- call command handler to generate response data
local resp, err = handler(params)
if not resp then
send_error(ws, sequence, err)
goto continue
end
send_resp(ws, sequence, resp)

::continue::
end
Expand Down
2 changes: 1 addition & 1 deletion t/pubsub/pubsub.t
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ ret: error
}
}
--- response_body
unknown command: cmd_empty
unknown command
--- error_log
pubsub callback handler not registered for the command, command: cmd_empty

Expand Down