Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] MQTT Support #1754

Merged
merged 11 commits into from
Dec 7, 2020
Merged

[ADDED] MQTT Support #1754

merged 11 commits into from
Dec 7, 2020

Conversation

kozlovic
Copy link
Member

This PR introduces native support for MQTT clients. It requires use
of accounts with JetStream enabled. Since as of now clustering is
not available, MQTT will be limited to single instance.

Only QoS 0 and 1 are supported at the moment. MQTT clients can
exchange messages with NATS clients and vice-versa.

Since JetStream is required, accounts with JetStream enabled must
exist in order for an MQTT client to connect to the NATS Server.
The administrator can limit the users that can use MQTT with the
allowed_connection_types option in the user section. For instance:

accounts {
  mqtt {
    users [
      {user: all, password: pwd, allowed_connection_types: ["STANDARD", "WEBSOCKET", "MQTT"]}
      {user: mqtt_only, password: pwd, allowed_connection_types: "MQTT"}
    ]
    jetstream: enabled
  }
}

The "mqtt_only" can only be used for MQTT connections, which the user
"all" accepts standard, websocket and MQTT clients.

Here is what a configuration to enable MQTT looks like:

mqtt {
  # Specify a host and port to listen for websocket connections
  #
  # listen: "host:port"

  # It can also be configured with individual parameters,
  # namely host and port.
  #
  # host: "hostname"
  port: 1883

  # TLS configuration section
  #
  # tls {
  #  cert_file: "/path/to/cert.pem"
  #  key_file: "/path/to/key.pem"
  #  ca_file: "/path/to/ca.pem"
  #
  #  # Time allowed for the TLS handshake to complete
  #  timeout: 2.0
  #
  #  # Takes the user name from the certificate
  #  #
  #  # verify_an_map: true
  #}

  # Authentication override. Here are possible options.
  #
  # authorization {
  #   # Simple username/password
  #   #
  #   user: "some_user_name"
  #   password: "some_password"
  #
  #   # Token. The server will check the MQTT's password in the connect
  #   # protocol against this token.
  #   #
  #   # token: "some_token"
  #
  #   # Time allowed for the client to send the MQTT connect protocol
  #   # after the TCP connection is established.
  #   #
  #   timeout: 2.0
  #}

  # If an MQTT client connects and does not provide a username/password and
  # this option is set, the server will use this client (and therefore account).
  #
  # no_auth_user: "some_user_name"

  # This is the time after which the server will redeliver a QoS 1 message
  # sent to a subscription that has not acknowledged (PUBACK) the message.
  # The default is 30 seconds.
  #
  # ack_wait: "1m"

  # This limits the number of QoS1 messages sent to a session without receiving
  # acknowledgement (PUBACK) from that session. MQTT specification defines
  # a packet identifier as an unsigned int 16, which means that the maximum
  # value is 65535. The default value is 1024.
  #
  # max_ack_pending: 100
}

Signed-off-by: Ivan Kozlovic ivan@synadia.com

@kozlovic
Copy link
Member Author

@derekcollison Obviously too much to review, but if you could focus on areas that existed before but have been modified for MQTT. Namely consumer.go, jetstream.go. I also needed a reverse-match for sublist, so also have a look at that.

@derekcollison
Copy link
Member

Will take a look.

Copy link
Member

@derekcollison derekcollison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general LGTM. Did not look too closely at mqtt code and tests but integration points.

server/client.go Outdated Show resolved Hide resolved
server/client.go Outdated Show resolved Hide resolved
server/consumer.go Outdated Show resolved Hide resolved
server/jetstream.go Outdated Show resolved Hide resolved
server/opts.go Show resolved Hide resolved
server/stream.go Outdated Show resolved Hide resolved
server/stream.go Outdated Show resolved Hide resolved
server/stream.go Outdated Show resolved Hide resolved
server/stream.go Outdated Show resolved Hide resolved
server/sublist.go Show resolved Hide resolved
This PR introduces native support for MQTT clients. It requires use
of accounts with JetStream enabled. Since as of now clustering is
not available, MQTT will be limited to single instance.

Only QoS 0 and 1 are supported at the moment. MQTT clients can
exchange messages with NATS clients and vice-versa.

Since JetStream is required, accounts with JetStream enabled must
exist in order for an MQTT client to connect to the NATS Server.
The administrator can limit the users that can use MQTT with the
allowed_connection_types option in the user section. For instance:
```
accounts {
  mqtt {
    users [
      {user: all, password: pwd, allowed_connection_types: ["STANDARD", "WEBSOCKET", "MQTT"]}
      {user: mqtt_only, password: pwd, allowed_connection_types: "MQTT"}
    ]
    jetstream: enabled
  }
}
```
The "mqtt_only" can only be used for MQTT connections, which the user
"all" accepts standard, websocket and MQTT clients.

Here is what a configuration to enable MQTT looks like:
```
mqtt {
  # Specify a host and port to listen for websocket connections
  #
  # listen: "host:port"

  # It can also be configured with individual parameters,
  # namely host and port.
  #
  # host: "hostname"
  port: 1883

  # TLS configuration section
  #
  # tls {
  #  cert_file: "/path/to/cert.pem"
  #  key_file: "/path/to/key.pem"
  #  ca_file: "/path/to/ca.pem"
  #
  #  # Time allowed for the TLS handshake to complete
  #  timeout: 2.0
  #
  #  # Takes the user name from the certificate
  #  #
  #  # verify_an_map: true
  #}

  # Authentication override. Here are possible options.
  #
  # authorization {
  #   # Simple username/password
  #   #
  #   user: "some_user_name"
  #   password: "some_password"
  #
  #   # Token. The server will check the MQTT's password in the connect
  #   # protocol against this token.
  #   #
  #   # token: "some_token"
  #
  #   # Time allowed for the client to send the MQTT connect protocol
  #   # after the TCP connection is established.
  #   #
  #   timeout: 2.0
  #}

  # If an MQTT client connects and does not provide a username/password and
  # this option is set, the server will use this client (and therefore account).
  #
  # no_auth_user: "some_user_name"

  # This is the time after which the server will redeliver a QoS 1 message
  # sent to a subscription that has not acknowledged (PUBACK) the message.
  # The default is 30 seconds.
  #
  # ack_wait: "1m"

  # This limits the number of QoS1 messages sent to a session without receiving
  # acknowledgement (PUBACK) from that session. MQTT specification defines
  # a packet identifier as an unsigned int 16, which means that the maximum
  # value is 65535. The default value is 1024.
  #
  # max_ack_pending: 100
}
```

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
Tests dealing with MQTT "will" needed to wait for the server to
process the MQTT client close of the connection. Only then we
have the guarantee that the server produced the "will" message.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
MQTT streams are special in that we do not set subjects in the config
since they capture all subjects. Otherwise, we would have been forced
to create a stream on say "MQTT.>" but then all publishes would have
to be prefixed with "MQTT." in order for them to be captured.

However, if one uses the "nats" tool to inspect those streams, the tool
would fail with:

```
server response is not a valid "io.nats.jetstream.api.v1.stream_info_response" message:
(root): Must validate one and only one schema (oneOf)
config: subjects is required
config: Must validate all the schemas (allOf)
```

To solve that, if we detect that user asks for the MQTT streams, we
artificially set the returned config's subject to ">".

Alternatively, we may want to not return those streams at all, although
there may be value to see the info for mqtt streams/consumers.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
server/jetstream_api.go Outdated Show resolved Hide resolved
- Added non-public stream and consumer configuration options to
achieve the "no subject" and "no interest" capabilities. Had
to implement custom FileStreamInfo and FileConsumerInfo marshal/
unmarshal methods so that those non public fields can be
persisted/recovered properly.
- Restored some of JS original code (since now can use config
instead of passing booleans to the functions).
- Use RLock for deliveryFormsCycle() check (unrelated to MQTT).
- Removed restriction on creating streams with MQTT prefix.
- Preventing API deletion of internal streams and their consumers.
- Added comment on Sublist's ReverseMatch method.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
@kozlovic
Copy link
Member Author

kozlovic commented Dec 1, 2020

@derekcollison I have addressed some of the comments, namely using non public stream/consumer config options. I am now going to look at others, like createClient() and if I really need the subscription to be created before calling processSub, etc..

Based on how the MQTT callback operates, it is safe to finish setup
of the MQTT subscriptions after processSub() returns. So I have
reverted the changes to processSub() which will minimize changes
to non-MQTT related code.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
@kozlovic
Copy link
Member Author

kozlovic commented Dec 1, 2020

@derekcollison Good news, I was able to revert changes to processSub() and still maintain code correctness/no race with mqtt subscriptions. So now moving to other remarks such as createClient(), etc..

@derekcollison
Copy link
Member

Just ping me when you want me to take another look. We are very close IMO..

This duplicate quite a bit of code, but reduces the conditionals
in the createClient() function.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This hides the check on "c.mqtt != nil" or "c.ws != nil".
Added some tests.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
Copy link
Member

@derekcollison derekcollison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - minor comments.

server/auth.go Outdated
@@ -367,7 +366,7 @@ func (s *Server) processClientOrLeafAuthentication(c *client, opts *Options) boo
noAuthUser string
)
tlsMap := opts.TLSMap
if c.mqtt != nil {
if c.isMqtt() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe make more generic?

switch c.clientType() {
  case NATS:
  case MQTT:
  case WS:
}

Or something like that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check commit 2ca0794 when you get a chance. I am not sold on that and I am afraid that it gets mis-used.. meaning you still want to check for c.kind == CLIENT when you deal with routing of messages for instance. So you would need extendedKind() only in specific situations (like the places I use it in this commit).
I can easily revert this commit or have another approach.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The enum for what gets returned should include a NON_CLIENT option as well.

Copy link
Member Author

@kozlovic kozlovic Dec 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you miss the fact that this is extendedKind(), not Type() which means that it can be invoked on any type of connection and will return say ROUTER, LEAF, but for a client, it may return CLIENT (if regular) or the new CLIENT_MQTT or CLIENT_WS.
What you suggest (and I do have a stashed version of that) is to have a clientType() that should be invoked only for c.kind == CLIENT and that will then return NATS/MQTT/WS or NON_CLIENT if incorrectly invoked on a non CLIENT connection.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the second option is what I had in mind, but minor so..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, replaced that commit with 035cffa, which I believe does what you asked.

It returns NON_CLIENT if invoked from a non CLIENT connection.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
Copy link
Member

@derekcollison derekcollison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

The test TestMQTTPersistedSession() flapped once on GA. It turns
out that when the server was sending CONNACK the test was immediately
using a NATS publisher to send a message that was not received by
the MQTT subscription for the recovered session.

Sending the CONNACK before restoring subscriptions allowed for a
window where a different connection could publish and messages
would be missed. It is technically ok, I think, and test could
have been easily fixed to ensure that we don't NATS publish before
the session is fully restored.

However, I have changed the order to first restore subscriptions
then send the CONNACK. The way locking happens with MQTT subscriptions
we are sure that the CONNACK will be sent first because even if
there are inflight messages, the MQTT callbacks will have to wait
for the session lock under which the subscriptions are restored
and the CONNACK sent.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
Essentially make publish a zero alloc. Use c.mqtt.pp as the parser
publish packet structure. Messages were initially copied because
MQTT messages don't have CR_LF but was adding it so that it worked
for NATS pub/subs and MQTT pub/subs.
Now an MQTT producer sending to NATS sub will queue CR_LF after
payload.

Here is result of benchcmp for MQTT pub runs only:
```
benchmark                                     old ns/op     new ns/op     delta
BenchmarkMQTT_QoS0_Pub_______0b_Payload-8     157           55.6          -64.59%
BenchmarkMQTT_QoS0_Pub_______8b_Payload-8     167           61.0          -63.47%
BenchmarkMQTT_QoS0_Pub______32b_Payload-8     181           65.3          -63.92%
BenchmarkMQTT_QoS0_Pub_____128b_Payload-8     243           78.1          -67.86%
BenchmarkMQTT_QoS0_Pub_____256b_Payload-8     298           95.0          -68.12%
BenchmarkMQTT_QoS0_Pub_______1K_Payload-8     604           224           -62.91%
BenchmarkMQTT_QoS1_Pub_______0b_Payload-8     1713          1314          -23.29%
BenchmarkMQTT_QoS1_Pub_______8b_Payload-8     1703          1311          -23.02%
BenchmarkMQTT_QoS1_Pub______32b_Payload-8     1722          1345          -21.89%
BenchmarkMQTT_QoS1_Pub_____128b_Payload-8     2105          1432          -31.97%
BenchmarkMQTT_QoS1_Pub_____256b_Payload-8     2148          1503          -30.03%
BenchmarkMQTT_QoS1_Pub_______1K_Payload-8     3024          1889          -37.53%

benchmark                                     old MB/s     new MB/s     speedup
BenchmarkMQTT_QoS0_Pub_______0b_Payload-8     31.76        89.91        2.83x
BenchmarkMQTT_QoS0_Pub_______8b_Payload-8     77.79        213.01       2.74x
BenchmarkMQTT_QoS0_Pub______32b_Payload-8     204.52       566.26       2.77x
BenchmarkMQTT_QoS0_Pub_____128b_Payload-8     550.65       1715.96      3.12x
BenchmarkMQTT_QoS0_Pub_____256b_Payload-8     877.77       2757.16      3.14x
BenchmarkMQTT_QoS0_Pub_______1K_Payload-8     1705.02      4607.72      2.70x
BenchmarkMQTT_QoS1_Pub_______0b_Payload-8     6.42         8.37         1.30x
BenchmarkMQTT_QoS1_Pub_______8b_Payload-8     11.16        14.49        1.30x
BenchmarkMQTT_QoS1_Pub______32b_Payload-8     24.97        31.97        1.28x
BenchmarkMQTT_QoS1_Pub_____128b_Payload-8     66.52        97.74        1.47x
BenchmarkMQTT_QoS1_Pub_____256b_Payload-8     124.78       178.27       1.43x
BenchmarkMQTT_QoS1_Pub_______1K_Payload-8     342.64       548.32       1.60x

benchmark                                     old allocs     new allocs     delta
BenchmarkMQTT_QoS0_Pub_______0b_Payload-8     3              0              -100.00%
BenchmarkMQTT_QoS0_Pub_______8b_Payload-8     3              0              -100.00%
BenchmarkMQTT_QoS0_Pub______32b_Payload-8     3              0              -100.00%
BenchmarkMQTT_QoS0_Pub_____128b_Payload-8     4              0              -100.00%
BenchmarkMQTT_QoS0_Pub_____256b_Payload-8     4              0              -100.00%
BenchmarkMQTT_QoS0_Pub_______1K_Payload-8     4              0              -100.00%
BenchmarkMQTT_QoS1_Pub_______0b_Payload-8     5              2              -60.00%
BenchmarkMQTT_QoS1_Pub_______8b_Payload-8     5              2              -60.00%
BenchmarkMQTT_QoS1_Pub______32b_Payload-8     5              2              -60.00%
BenchmarkMQTT_QoS1_Pub_____128b_Payload-8     7              3              -57.14%
BenchmarkMQTT_QoS1_Pub_____256b_Payload-8     7              3              -57.14%
BenchmarkMQTT_QoS1_Pub_______1K_Payload-8     7              3              -57.14%

benchmark                                     old bytes     new bytes     delta
BenchmarkMQTT_QoS0_Pub_______0b_Payload-8     80            0             -100.00%
BenchmarkMQTT_QoS0_Pub_______8b_Payload-8     88            0             -100.00%
BenchmarkMQTT_QoS0_Pub______32b_Payload-8     120           0             -100.00%
BenchmarkMQTT_QoS0_Pub_____128b_Payload-8     224           0             -100.00%
BenchmarkMQTT_QoS0_Pub_____256b_Payload-8     369           1             -99.73%
BenchmarkMQTT_QoS0_Pub_______1K_Payload-8     1250          31            -97.52%
BenchmarkMQTT_QoS1_Pub_______0b_Payload-8     106           28            -73.58%
BenchmarkMQTT_QoS1_Pub_______8b_Payload-8     122           28            -77.05%
BenchmarkMQTT_QoS1_Pub______32b_Payload-8     154           28            -81.82%
BenchmarkMQTT_QoS1_Pub_____128b_Payload-8     381           157           -58.79%
BenchmarkMQTT_QoS1_Pub_____256b_Payload-8     655           287           -56.18%
BenchmarkMQTT_QoS1_Pub_______1K_Payload-8     2312          1078          -53.37%
```

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
@kozlovic
Copy link
Member Author

kozlovic commented Dec 4, 2020

@derekcollison I could have merged and then work on optimization, but I decided to include this one in the main PR. If could could review 1d7c471 when you have time. I still have some work to do with conversion of topic/subject for delivery of messages to subscriptions. If I am not done soon with that one, we could merge this PR and will continue on perf improvement later.

Copy link
Member

@derekcollison derekcollison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants