Skip to content

Commit

Permalink
chore: improve the implementation of pubsub module (#7043)
Browse files Browse the repository at this point in the history
  • Loading branch information
bzp2010 authored and spacewander committed Jun 30, 2022
1 parent b660d53 commit f7e2cde
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 49 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ jobs:
- linux_openresty_1_17
test_dir:
- t/plugin
- t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc t/pubsub
- t/node t/router t/script t/stream-node t/utils t/wasm t/xds-library t/xrpc
- t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc
- t/node t/pubsub t/router t/script t/stream-node t/utils t/wasm t/xds-library t/xrpc

runs-on: ${{ matrix.platform }}
timeout-minutes: 90
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/centos7-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ jobs:
matrix:
test_dir:
- t/plugin
- t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc t/pubsub
- t/node t/router t/script t/stream-node t/utils t/wasm t/xds-library
- t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc
- t/node t/pubsub t/router t/script t/stream-node t/utils t/wasm t/xds-library

steps:
- name: Check out code
Expand Down
95 changes: 51 additions & 44 deletions apisix/core/pubsub.lua
Original file line number Diff line number Diff line change
Expand Up @@ -58,23 +58,42 @@ local function init_pb_state()
end


-- parse command name and parameters from client message
local function get_cmd(data)
for key, value in pairs(data) do
-- There are sequence and command properties in the data,
-- select the handler according to the command value.
if key ~= "sequence" then
return key, value
end
end
end


-- send generic response to client
local function send_resp(ws, sequence, data)
data.sequence = sequence
local ok, encoded = pcall(pb.encode, "PubSubResp", data)
if not ok or not encoded then
log.error("failed to encode response message, err: ", encoded)
return
end

local _, err = ws:send_binary(encoded)
if err then
log.error("failed to send response to client, err: ", err)
end
end


-- send error response to client
local function send_error(ws, sequence, err_msg)
local ok, data = pcall(pb.encode, "PubSubResp", {
sequence = sequence,
return send_resp(ws, sequence, {
error_resp = {
code = 0,
message = err_msg,
},
})
if not ok or not data then
log.error("failed to encode error response message, err: ", data)
end

local _, err = ws:send_binary(data)
if err then
log.error("failed to send response to client, err: ", err)
end
end


Expand Down Expand Up @@ -119,8 +138,8 @@ end
-- no error exists.
--
-- @function core.pubsub.on
-- @tparam string command to add callback
-- @tparam function handler callback on receipt of command
-- @tparam string command The command to add callback.
-- @tparam func handler The callback function on receipt of command.
-- @usage
-- pubsub:on(command, function (params)
-- return data, err
Expand Down Expand Up @@ -180,40 +199,28 @@ function _M.wait(self)
-- command sequence code
local sequence = data.sequence

-- call command handler to generate response data
for key, value in pairs(data) do
-- There are sequence and command properties in the data,
-- select the handler according to the command value.
if key ~= "sequence" then
local handler = self.cmd_handler[key]
if not handler then
log.error("pubsub callback handler not registered for the",
" command, command: ", key)
send_error(ws, sequence, "unknown command: " .. key)
break
end

local resp, err = handler(value)
if not resp then
send_error(ws, sequence, err)
break
end

-- write back the sequence
resp.sequence = sequence
local ok, data = pcall(pb.encode, "PubSubResp", resp)
if not ok or not data then
log.error("failed to encode response message, err: ", data)
break
end
local _, err = ws:send_binary(data)
if err then
log.error("failed to send response to client, err: ", err)
end
break
end
local cmd, params = get_cmd(data)
if not cmd and not params then
log.warn("pubsub server receives empty command")
goto continue
end

-- find the handler for the current command
local handler = self.cmd_handler[cmd]
if not handler then
log.error("pubsub callback handler not registered for the",
" command, command: ", cmd)
send_error(ws, sequence, "unknown command")
goto continue
end

-- call command handler to generate response data
local resp, err = handler(params)
if not resp then
send_error(ws, sequence, err)
goto continue
end
send_resp(ws, sequence, resp)

::continue::
end
Expand Down
2 changes: 1 addition & 1 deletion t/pubsub/pubsub.t
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ ret: error
}
}
--- response_body
unknown command: cmd_empty
unknown command
--- error_log
pubsub callback handler not registered for the command, command: cmd_empty
Expand Down

0 comments on commit f7e2cde

Please sign in to comment.