diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 6714e427791f..ea6eda1e3a1d 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -31,8 +31,8 @@ jobs: - linux_openresty_1_17 test_dir: - t/plugin - - t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc t/pubsub - - t/node t/router t/script t/stream-node t/utils t/wasm t/xds-library t/xrpc + - t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc + - t/node t/pubsub t/router t/script t/stream-node t/utils t/wasm t/xds-library t/xrpc runs-on: ${{ matrix.platform }} timeout-minutes: 90 diff --git a/.github/workflows/centos7-ci.yml b/.github/workflows/centos7-ci.yml index 03ae393b0d50..a63622e7c477 100644 --- a/.github/workflows/centos7-ci.yml +++ b/.github/workflows/centos7-ci.yml @@ -29,8 +29,8 @@ jobs: matrix: test_dir: - t/plugin - - t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc t/pubsub - - t/node t/router t/script t/stream-node t/utils t/wasm t/xds-library + - t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc + - t/node t/pubsub t/router t/script t/stream-node t/utils t/wasm t/xds-library steps: - name: Check out code diff --git a/apisix/core/pubsub.lua b/apisix/core/pubsub.lua index 14df27616690..1f392818c491 100644 --- a/apisix/core/pubsub.lua +++ b/apisix/core/pubsub.lua @@ -58,23 +58,42 @@ local function init_pb_state() end +-- parse command name and parameters from client message +local function get_cmd(data) + for key, value in pairs(data) do + -- There are sequence and command properties in the data, + -- select the handler according to the command value. + if key ~= "sequence" then + return key, value + end + end +end + + +-- send generic response to client +local function send_resp(ws, sequence, data) + data.sequence = sequence + local ok, encoded = pcall(pb.encode, "PubSubResp", data) + if not ok or not encoded then + log.error("failed to encode response message, err: ", encoded) + return + end + + local _, err = ws:send_binary(encoded) + if err then + log.error("failed to send response to client, err: ", err) + end +end + + -- send error response to client local function send_error(ws, sequence, err_msg) - local ok, data = pcall(pb.encode, "PubSubResp", { - sequence = sequence, + return send_resp(ws, sequence, { error_resp = { code = 0, message = err_msg, }, }) - if not ok or not data then - log.error("failed to encode error response message, err: ", data) - end - - local _, err = ws:send_binary(data) - if err then - log.error("failed to send response to client, err: ", err) - end end @@ -119,8 +138,8 @@ end -- no error exists. -- -- @function core.pubsub.on --- @tparam string command to add callback --- @tparam function handler callback on receipt of command +-- @tparam string command The command to add callback. +-- @tparam func handler The callback function on receipt of command. -- @usage -- pubsub:on(command, function (params) -- return data, err @@ -180,40 +199,28 @@ function _M.wait(self) -- command sequence code local sequence = data.sequence - -- call command handler to generate response data - for key, value in pairs(data) do - -- There are sequence and command properties in the data, - -- select the handler according to the command value. - if key ~= "sequence" then - local handler = self.cmd_handler[key] - if not handler then - log.error("pubsub callback handler not registered for the", - " command, command: ", key) - send_error(ws, sequence, "unknown command: " .. key) - break - end - - local resp, err = handler(value) - if not resp then - send_error(ws, sequence, err) - break - end - - -- write back the sequence - resp.sequence = sequence - local ok, data = pcall(pb.encode, "PubSubResp", resp) - if not ok or not data then - log.error("failed to encode response message, err: ", data) - break - end - local _, err = ws:send_binary(data) - if err then - log.error("failed to send response to client, err: ", err) - end - break - end + local cmd, params = get_cmd(data) + if not cmd and not params then log.warn("pubsub server receives empty command") + goto continue + end + + -- find the handler for the current command + local handler = self.cmd_handler[cmd] + if not handler then + log.error("pubsub callback handler not registered for the", + " command, command: ", cmd) + send_error(ws, sequence, "unknown command") + goto continue + end + + -- call command handler to generate response data + local resp, err = handler(params) + if not resp then + send_error(ws, sequence, err) + goto continue end + send_resp(ws, sequence, resp) ::continue:: end diff --git a/t/pubsub/pubsub.t b/t/pubsub/pubsub.t index 84b80f532294..5ce0b0b03367 100644 --- a/t/pubsub/pubsub.t +++ b/t/pubsub/pubsub.t @@ -151,7 +151,7 @@ ret: error } } --- response_body -unknown command: cmd_empty +unknown command --- error_log pubsub callback handler not registered for the command, command: cmd_empty