Skip to content
This repository has been archived by the owner on Nov 17, 2020. It is now read-only.

Commit

Permalink
Merge pull request #57 from rabbitmq/rabbitmq-mqtt-21
Browse files Browse the repository at this point in the history
Downgrade QoS2 messages to QoS1
  • Loading branch information
michaelklishin committed Jan 27, 2016
2 parents 5fdccca + f7e0431 commit 7454d00
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 7 deletions.
11 changes: 8 additions & 3 deletions src/rabbit_mqtt_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,14 @@ process_request(?PUBACK,
end;

process_request(?PUBLISH,
#mqtt_frame{
fixed = #mqtt_frame_fixed{ qos = ?QOS_2 }}, PState) ->
{error, qos2_not_supported, PState};
Frame = #mqtt_frame{
fixed = Fixed = #mqtt_frame_fixed{ qos = ?QOS_2 }},
PState) ->
% Downgrade QOS_2 to QOS_1
process_request(?PUBLISH,
Frame#mqtt_frame{
fixed = Fixed#mqtt_frame_fixed{ qos = ?QOS_1 }},
PState);
process_request(?PUBLISH,
#mqtt_frame{
fixed = #mqtt_frame_fixed{ qos = Qos,
Expand Down
16 changes: 12 additions & 4 deletions test/src/com/rabbitmq/mqtt/test/MqttTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -263,18 +263,24 @@ public void testSubscribeQos1() throws MqttException, InterruptedException {

publish(client, topic, 0, payload);
publish(client, topic, 1, payload);
publish(client, topic, 2, payload);
Thread.sleep(testDelay);

Assert.assertEquals(2, receivedMessages.size());
Assert.assertEquals(3, receivedMessages.size());
MqttMessage msg1 = receivedMessages.get(0);
MqttMessage msg2 = receivedMessages.get(1);
MqttMessage msg3 = receivedMessages.get(1);

Assert.assertEquals(true, Arrays.equals(msg1.getPayload(), payload));
Assert.assertEquals(0, msg1.getQos());

Assert.assertEquals(true, Arrays.equals(msg2.getPayload(), payload));
Assert.assertEquals(1, msg2.getQos());

// Downgraded QoS 2 to QoS 1
Assert.assertEquals(true, Arrays.equals(msg3.getPayload(), payload));
Assert.assertEquals(1, msg3.getQos());

client.disconnect();
}

Expand Down Expand Up @@ -478,18 +484,20 @@ public void testSubscribeMultiple() throws MqttException {

publish(client, "/topic/2", 0, "msq2-qos0".getBytes());
publish(client, "/topic/3", 1, "msq3-qos1".getBytes());
publish(client, "/topic/4", 2, "msq3-qos2".getBytes());
publish(client, topic, 0, "msq4-qos0".getBytes());
publish(client, topic, 1, "msq4-qos1".getBytes());

Assert.assertEquals(2, receivedMessages.size());

Assert.assertEquals(3, receivedMessages.size());
client.disconnect();
client2.disconnect();
}

public void testPublishMultiple() throws MqttException, InterruptedException {
int pubCount = 50;
for (int subQos=0; subQos < 2; subQos++){
for (int pubQos=0; pubQos < 2; pubQos++){
for (int subQos=0; subQos <= 2; subQos++){
for (int pubQos=0; pubQos <= 2; pubQos++){
client.connect(conOpt);
client.subscribe(topic, subQos);
client.setCallback(this);
Expand Down

0 comments on commit 7454d00

Please sign in to comment.