Skip to content

Latest commit

 

History

History
87 lines (54 loc) · 4.61 KB

File metadata and controls

87 lines (54 loc) · 4.61 KB

Spring Boot Camel PAHO MQTT5 Shared Subscription Quickstart

This quickstart demonstrates how to set up multiple mqtt5 consumers that use shared subscription feature of mqtt5 in order to balance the load across the consumers.

MQTT 5.0 protocol introduction-shared subscription

Shared subscription is a new feature introduced by MQTT 5.0 protocol, which is equivalent to the load balancing function of the subscribers.

We know that the general non-shared subscription message publishing process is like this:

nonSharedSubscription

Under this structure, if the subscription node fails, it will cause the publisher’s message to be lost (QoS 0) or accumulate in the server (QoS 1, 2). Under normal circumstances, the solution to this problem is to directly increase the subscription node, but this generates a large number of duplicate messages, not only wasting performance, in some business scenarios, the subscription node also needs to remove itself, further increasing the the complexity.

Secondly, when the publisher’s production capacity is strong, there may be a situation where the subscriber’s consumption capacity cannot keep up in time. At this time, the subscriber can only solve it by implementing load balancing, which again increases the user’s development cost.

Protocol specification

Now, in the MQTT 5.0 protocol, you can solve the problems mentioned above through the shared subscription feature. When you use a shared subscription, the flow of messages becomes:

sharedSubscription

Like non-shared subscriptions, shared subscriptions include a topic filter and subscription options, the only difference is that the topic filter format for shared subscriptions must be in this form $share/{ShareName}/{filter}. The meanings of these fields are:

  • $share The prefix indicates that this will be a shared subscription

  • {ShareName} It is a string that does not contain "/", "+" and "#". Subscribe to the session by using the same {ShareName} Indicates sharing the same subscription, and messages matching the subscription will only be published to one of the sessions at a time

  • {filter} Is the topic filter in non-shared subscriptions

It should be noted that if the server is sending a QoS 2 message to the selected subscriber, and the network is interrupted before the distribution is completed, the server will continue to complete the distribution of the message when the subscriber reconnects. If the subscriber’s session is terminated before it reconnects, the server will discard the message without attempting to send it to other subscribers. If it is a QoS 1 message, the server can wait for the subscriber to reconnect and continue to complete the distribution, or it can immediately try to distribute the message to other subscribers when the subscriber disconnects. The MQTT protocol does not mandate it. It depends on the specific implementation. However, if its session is terminated while waiting for the subscriber to reconnect, the server will try to send the message to other subscribers.

Trying out the example on your local machine

Start the ActiveMQ broker in a docker container:

docker run --name artemis \
  -e AMQ_USER=admin \
  -e AMQ_PASSWORD=admin \
  -d -p 1883:1883 \
  quay.io/artemiscloud/activemq-artemis-broker

This quickstart contains one producer and six consumers that are part of the same conversation group (group1). This means that everytime a new message is produced to the topic, only one consumer from the group receives the message.

How to run the example

You can run this example using

mvn spring-boot:run

In the logs, you should see something like this:

427793

Trying out the example on OpenShift

First, start with creating a new OpenShift project:

oc new-project csb-example-paho-mqtt5

Start a new deployment for Artemis:

oc new-app quay.io/artemiscloud/activemq-artemis-broker -e AMQ_USER=admin -e AMQ_PASSWORD=admin

Patch the default activemq-artemis-broker service to serve the 1883 port:

oc patch service/activemq-artemis-broker -p '{"spec":{"ports":[{"name":"1883-tcp", "port": 1883, "protocol": "TCP", "targetPort": 1883}]}}'

How to run

The application is deployed using the openshift-maven-plugin that takes care of creating all the necessary OpenShift resources.

Simply use the following command to deploy the application:

mvn clean package -Popenshift

and wait until application pod reaches the Ready state. After that you will see the expected output in the application’s pod log.