diff --git a/apisix/core/ctx.lua b/apisix/core/ctx.lua index 6e6d3295fc2a..300cf757407c 100644 --- a/apisix/core/ctx.lua +++ b/apisix/core/ctx.lua @@ -143,14 +143,16 @@ do var_x_forwarded_proto = true, } + -- sort in alphabetical local apisix_var_names = { + balancer_ip = true, + balancer_port = true, + consumer_name = true, + mqtt_client_id = true, route_id = true, route_name = true, service_id = true, service_name = true, - consumer_name = true, - balancer_ip = true, - balancer_port = true, } local mt = { diff --git a/apisix/stream/plugins/mqtt-proxy.lua b/apisix/stream/plugins/mqtt-proxy.lua index 7c890505aa9b..3a285890cea4 100644 --- a/apisix/stream/plugins/mqtt-proxy.lua +++ b/apisix/stream/plugins/mqtt-proxy.lua @@ -29,6 +29,7 @@ local schema = { protocol_name = {type = "string"}, protocol_level = {type = "integer"}, upstream = { + description = "Deprecated. We should configure upstream outside of the plugin", type = "object", properties = { ip = {type = "string"}, -- deprecated, use "host" instead @@ -57,13 +58,7 @@ local _M = { function _M.check_schema(conf) - local ok, err = core.schema.check(schema, conf) - - if not ok then - return false, err - end - - return true + return core.schema.check(schema, conf) end @@ -185,6 +180,11 @@ function _M.preread(conf, ctx) core.log.info("mqtt client id: ", res.client_id) + -- when client id is missing, fallback to balance by client IP + if res.client_id ~= "" then + ctx.mqtt_client_id = res.client_id + end + if not conf.upstream then return end diff --git a/docs/en/latest/apisix-variable.md b/docs/en/latest/apisix-variable.md index 08b1f813799a..9591010e3670 100644 --- a/docs/en/latest/apisix-variable.md +++ b/docs/en/latest/apisix-variable.md @@ -34,6 +34,7 @@ List in alphabetical order: | graphql_name | the [operation name](https://graphql.org/learn/queries/#operation-name) of GraphQL | HeroComparison | | graphql_operation | the operation type of GraphQL | mutation | | graphql_root_fields | the top level fields of GraphQL | ["hero"] | +| mqtt_client_id | the client id in MQTT protocol | | | route_id | id of `route` | | | route_name | name of `route` | | | service_id | id of `service` | | diff --git a/docs/en/latest/plugins/mqtt-proxy.md b/docs/en/latest/plugins/mqtt-proxy.md index 353069cfe256..cda882d5a46f 100644 --- a/docs/en/latest/plugins/mqtt-proxy.md +++ b/docs/en/latest/plugins/mqtt-proxy.md @@ -90,6 +90,38 @@ curl http://127.0.0.1:9080/apisix/admin/stream_routes/1 -H 'X-API-KEY: edd1c9f03 In case Docker is used in combination with MacOS `host.docker.internal` is the right parameter for `host`. +This plugin exposes a variable `mqtt_client_id`, and we can use it to load balance via the client id. For example: + +```shell +curl http://127.0.0.1:9080/apisix/admin/stream_routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' +{ + "plugins": { + "mqtt-proxy": { + "protocol_name": "MQTT", + "protocol_level": 4 + } + }, + "upstream": { + "type": "chash", + "key": "mqtt_client_id", + "nodes": [ + { + "host": "127.0.0.1", + "port": 1995, + "weight": 1 + }, + { + "host": "127.0.0.2", + "port": 1995, + "weight": 1 + } + ] + } +}' +``` + +MQTT connections with different client ID will be forwarded to different node via the consistent hash algorithm. If the client ID is missing, we will balance via client IP instead. + ## Delete Plugin ```shell diff --git a/docs/zh/latest/plugins/mqtt-proxy.md b/docs/zh/latest/plugins/mqtt-proxy.md index 7514b2054213..05101d046b42 100644 --- a/docs/zh/latest/plugins/mqtt-proxy.md +++ b/docs/zh/latest/plugins/mqtt-proxy.md @@ -88,6 +88,38 @@ curl http://127.0.0.1:9080/apisix/admin/stream_routes/1 -H 'X-API-KEY: edd1c9f03 在 Docker 与 MacOS 结合使用的情况下,`host.docker.internal` 是 `host` 的正确参数。 +这个插件暴露了一个变量 `mqtt_client_id`,我们可以用它来通过客户端 ID 进行负载均衡。比如说: + +```shell +curl http://127.0.0.1:9080/apisix/admin/stream_routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' +{ + "plugins": { + "mqtt-proxy": { + "protocol_name": "MQTT", + "protocol_level": 4 + } + }, + "upstream": { + "type": "chash", + "key": "mqtt_client_id", + "nodes": [ + { + "host": "127.0.0.1", + "port": 1995, + "weight": 1 + }, + { + "host": "127.0.0.2", + "port": 1995, + "weight": 1 + } + ] + } +}' +``` + +不同客户端 ID 的 MQTT 连接将通过一致性哈希算法被转发到不同的节点。如果客户端 ID 为空,我们将通过客户端 IP 进行均衡。 + #### 禁用插件 当你想去掉插件的时候,很简单,在插件的配置中把对应的 json 配置删除即可,无须重启服务,即刻生效: diff --git a/t/stream-plugin/mqtt-proxy.t b/t/stream-plugin/mqtt-proxy.t index df3eb10a539c..4a59e376d362 100644 --- a/t/stream-plugin/mqtt-proxy.t +++ b/t/stream-plugin/mqtt-proxy.t @@ -398,3 +398,97 @@ qr/mqtt client id: \S+/ mqtt client id: clint-111 --- no_error_log [error] + + + +=== TEST 17: balance with mqtt_client_id +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + [[{ + "remote_addr": "127.0.0.1", + "server_port": 1985, + "plugins": { + "mqtt-proxy": { + "protocol_name": "MQTT", + "protocol_level": 5 + } + }, + "upstream": { + "type": "chash", + "key": "mqtt_client_id", + "nodes": [ + { + "host": "0.0.0.0", + "port": 1995, + "weight": 1 + }, + { + "host": "127.0.0.1", + "port": 1995, + "weight": 1 + } + ] + } + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 18: hit route with empty id +--- stream_request eval +"\x10\x0d\x00\x04\x4d\x51\x54\x54\x05\x02\x00\x3c\x00\x00\x00" +--- stream_response +hello world +--- grep_error_log eval +qr/(mqtt client id: \w+|proxy request to \S+)/ +--- grep_error_log_out +proxy request to 127.0.0.1:1995 +--- no_error_log +[error] + + + +=== TEST 19: hit route with different client id, part 1 +--- stream_request eval +"\x10\x0e\x00\x04\x4d\x51\x54\x54\x05\x02\x00\x3c\x00\x00\x01\x66" +--- stream_response +hello world +--- grep_error_log eval +qr/(mqtt client id: \w+|proxy request to \S+)/ +--- grep_error_log_out +mqtt client id: f +proxy request to 0.0.0.0:1995 +--- no_error_log +[error] + + + +=== TEST 20: hit route with different client id, part 2 +--- stream_request eval +"\x10\x0e\x00\x04\x4d\x51\x54\x54\x05\x02\x00\x3c\x00\x00\x01\x67" +--- stream_response +hello world +--- grep_error_log eval +qr/(mqtt client id: \w+|proxy request to \S+)/ +--- grep_error_log_out +mqtt client id: g +proxy request to 127.0.0.1:1995 +--- no_error_log +[error]