Skip to content

Commit

Permalink
Fixed store_ management.
Browse files Browse the repository at this point in the history
Improved protocol error checking.
  • Loading branch information
redboltz committed Oct 20, 2023
1 parent 5e90794 commit 07a6247
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 172 deletions.
39 changes: 31 additions & 8 deletions include/mqtt/endpoint.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8140,7 +8140,16 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
ep_.pid_man_.release_id(packet_id_);
return true;
} ();
if (erased) ep_.on_serialize_remove(packet_id_);
if (erased) {
ep_.on_serialize_remove(packet_id_);
}
else {
MQTT_LOG("mqtt_impl", error)
<< MQTT_ADD_VALUE(address, &ep_)
<< "invalid puback received. packet_id:" << packet_id_;
ep_.call_protocol_error_handlers();
return;

Check warning on line 8151 in include/mqtt/endpoint.hpp

View check run for this annotation

Codecov / codecov/patch

include/mqtt/endpoint.hpp#L8147-L8151

Added lines #L8147 - L8151 were not covered by tests
}
switch (ep_.version_) {
case protocol_version::v3_1_1:
if (ep_.on_puback(packet_id_)) {
Expand Down Expand Up @@ -8288,6 +8297,13 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
if (is_error(reason_code_)) ep_.pid_man_.release_id(packet_id_);
return true;
} ();
if (!erased) {
MQTT_LOG("mqtt_impl", error)
<< MQTT_ADD_VALUE(address, &ep_)
<< "invalid pubrec received. packet_id:" << packet_id_;
ep_.call_protocol_error_handlers();
return;
}
{
auto res =
[&] {
Expand Down Expand Up @@ -8647,7 +8663,16 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
ep_.pid_man_.release_id(packet_id_);
return true;
} ();
if (erased) ep_.on_serialize_remove(packet_id_);
if (erased) {
ep_.on_serialize_remove(packet_id_);
}
else {
MQTT_LOG("mqtt_impl", error)
<< MQTT_ADD_VALUE(address, &ep_)
<< "invalid pubcomp received. packet_id:" << packet_id_;
ep_.call_protocol_error_handlers();
return;

Check warning on line 8674 in include/mqtt/endpoint.hpp

View check run for this annotation

Codecov / codecov/patch

include/mqtt/endpoint.hpp#L8670-L8674

Added lines #L8670 - L8674 were not covered by tests
}
switch (ep_.version_) {
case protocol_version::v3_1_1:
if (ep_.on_pubcomp(packet_id_)) {
Expand Down Expand Up @@ -9993,9 +10018,8 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
// publish store is erased when pubrec is received.
// pubrel store is erased when pubcomp is received.
// If invalid client send pubrec twice with the same packet id,
// then send corresponding pubrel twice is a possible client/server
// implementation.
// In this case, overwrite store_.
// then send disconnect with protocol_error reason_code (v5), or
// simply close the socket (v3.1.1).
if (store_.insert_or_update(
packet_id,
control_packet_type::pubcomp,
Expand Down Expand Up @@ -10794,9 +10818,8 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
// publish store is erased when pubrec is received.
// pubrel store is erased when pubcomp is received.
// If invalid client send pubrec twice with the same packet id,
// then send corresponding pubrel twice is a possible client/server
// implementation.
// In this case, overwrite store_.
// then send disconnect with protocol_error reason_code (v5), or
// simply close the socket (v3.1.1).
MQTT_LOG("mqtt_impl", warning)
<< MQTT_ADD_VALUE(address, this)
<< "overwrite pubrel"
Expand Down
2 changes: 1 addition & 1 deletion include/mqtt/store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class store {
>
>
>,
mi::ordered_non_unique<
mi::ordered_unique<
mi::tag<tag_packet_id>,
mi::const_mem_fun<
elem_t, packet_id_t,
Expand Down
88 changes: 6 additions & 82 deletions test/system/st_async_pubsub_2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -621,23 +621,12 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2_protocol_error_resend_pubrec ) {
cont("h_suback"),
// publish topic1 QoS2
cont("h_pubrec"),
cont("h_pubcomp"),
deps("h_publish", "h_suback"),
// pubrec send twice
cont("h_pubrel1"),
cont("h_pubrel2"),
deps("h_unsuback", "h_pubcomp", "h_pubrel2"),
// disconnect
cont("h_close"),
cont("h_error"),
};

auto g = MQTT_NS::shared_scope_guard(
[&c] {
auto unsub_pid = c->acquire_unique_packet_id();
c->async_unsubscribe(unsub_pid, "topic1");
}
);

switch (c->get_protocol_version()) {
case MQTT_NS::protocol_version::v3_1_1:
c->set_connack_handler(
Expand All @@ -663,13 +652,6 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2_protocol_error_resend_pubrec ) {
c->async_pubrel(packet_id);
return true;
});
c->set_pubcomp_handler(
[&chk, g]
(packet_id_t) mutable {
MQTT_CHK("h_pubcomp");
g.reset();
return true;
});
c->set_suback_handler(
[&chk, &c]
(packet_id_t, std::vector<MQTT_NS::suback_return_code> results) {
Expand All @@ -680,13 +662,6 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2_protocol_error_resend_pubrec ) {
c->async_publish(pid_pub, "topic1", "topic1_contents", MQTT_NS::qos::exactly_once);
return true;
});
c->set_unsuback_handler(
[&chk, &c]
(packet_id_t) {
MQTT_CHK("h_unsuback");
c->async_disconnect();
return true;
});
c->set_publish_handler(
[&chk, &c]
(MQTT_NS::optional<packet_id_t> packet_id,
Expand All @@ -705,23 +680,6 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2_protocol_error_resend_pubrec ) {
c->async_pubrec(*packet_id);
return true;
});
c->set_pubrel_handler(
[&chk, &c, g]
(packet_id_t packet_id) mutable {
auto ret = MQTT_ORDERED(
[&] {
MQTT_CHK("h_pubrel1");
c->async_pubcomp(packet_id);
},
[&] () {
MQTT_CHK("h_pubrel2");
c->async_pubcomp(packet_id);
g.reset();
}
);
BOOST_TEST(ret);
return true;
});
break;
case MQTT_NS::protocol_version::v5:
c->set_v5_connack_handler(
Expand All @@ -747,13 +705,6 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2_protocol_error_resend_pubrec ) {
c->async_pubrel(packet_id);
return true;
});
c->set_v5_pubcomp_handler(
[&chk, g]
(packet_id_t, MQTT_NS::v5::pubcomp_reason_code, MQTT_NS::v5::properties /*props*/) mutable {
MQTT_CHK("h_pubcomp");
g.reset();
return true;
});
c->set_v5_suback_handler(
[&chk, &c]
(packet_id_t, std::vector<MQTT_NS::v5::suback_reason_code> reasons, MQTT_NS::v5::properties /*props*/) {
Expand All @@ -764,15 +715,6 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2_protocol_error_resend_pubrec ) {
c->async_publish(pid_pub, "topic1", "topic1_contents", MQTT_NS::qos::exactly_once);
return true;
});
c->set_v5_unsuback_handler(
[&chk, &c]
(packet_id_t, std::vector<MQTT_NS::v5::unsuback_reason_code> reasons, MQTT_NS::v5::properties /*props*/) {
MQTT_CHK("h_unsuback");
BOOST_TEST(reasons.size() == 1U);
BOOST_TEST(reasons[0] == MQTT_NS::v5::unsuback_reason_code::success);
c->async_disconnect();
return true;
});
c->set_v5_publish_handler(
[&chk, &c]
(MQTT_NS::optional<packet_id_t> packet_id,
Expand All @@ -792,41 +734,23 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2_protocol_error_resend_pubrec ) {
c->async_pubrec(*packet_id);
return true;
});
c->set_v5_pubrel_handler(
[&chk, &c, g]
(packet_id_t packet_id, MQTT_NS::v5::pubrel_reason_code, MQTT_NS::v5::properties /*props*/) mutable {
auto ret = MQTT_ORDERED(
[&] {
MQTT_CHK("h_pubrel1");
c->async_pubcomp(packet_id);
},
[&] () {
MQTT_CHK("h_pubrel2");
c->async_pubcomp(packet_id);
g.reset();
}
);
BOOST_TEST(ret);
return true;
});
break;
default:
BOOST_CHECK(false);
break;
}

c->set_close_handler(
[&chk, &finish]
[]
() {
MQTT_CHK("h_close");
finish();
BOOST_CHECK(false);
});
c->set_error_handler(
[]
[&chk, &finish]
(MQTT_NS::error_code) {
BOOST_CHECK(false);
MQTT_CHK("h_error");
finish();
});
g.reset();
c->async_connect();
ioc.run();
BOOST_TEST(chk.all());
Expand Down
87 changes: 6 additions & 81 deletions test/system/st_pubsub_1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1757,22 +1757,12 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2_protocol_error_resend_pubrec ) {
cont("h_suback"),
// publish topic1 QoS2
cont("h_pubrec"),
cont("h_pubcomp"),
deps("h_publish", "h_suback"),
// pubrec send twice
cont("h_pubrel1"),
cont("h_pubrel2"),
deps("h_unsuback", "h_pubcomp", "h_pubrel2"),
// disconnect
cont("h_close"),
cont("h_error"),
};

auto g = MQTT_NS::shared_scope_guard(
[&c] {
c->unsubscribe("topic1");
}
);

switch (c->get_protocol_version()) {
case MQTT_NS::protocol_version::v3_1_1:
c->set_connack_handler(
Expand All @@ -1797,13 +1787,6 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2_protocol_error_resend_pubrec ) {
c->pubrel(packet_id);
return true;
});
c->set_pubcomp_handler(
[&chk, g]
(packet_id_t) mutable {
MQTT_CHK("h_pubcomp");
g.reset();
return true;
});
c->set_suback_handler(
[&chk, &c]
(packet_id_t, std::vector<MQTT_NS::suback_return_code> results) {
Expand All @@ -1813,13 +1796,6 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2_protocol_error_resend_pubrec ) {
c->publish("topic1", "topic1_contents", MQTT_NS::qos::exactly_once);
return true;
});
c->set_unsuback_handler(
[&chk, &c]
(packet_id_t) {
MQTT_CHK("h_unsuback");
c->disconnect();
return true;
});
c->set_publish_handler(
[&chk, &c]
(MQTT_NS::optional<packet_id_t> packet_id,
Expand All @@ -1838,23 +1814,6 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2_protocol_error_resend_pubrec ) {
c->pubrec(*packet_id);
return true;
});
c->set_pubrel_handler(
[&chk, &c, g]
(packet_id_t packet_id) mutable {
auto ret = MQTT_ORDERED(
[&] {
MQTT_CHK("h_pubrel1");
c->pubcomp(packet_id);
},
[&] () {
MQTT_CHK("h_pubrel2");
c->pubcomp(packet_id);
g.reset();
}
);
BOOST_TEST(ret);
return true;
});
break;
case MQTT_NS::protocol_version::v5:
c->set_v5_connack_handler(
Expand All @@ -1879,13 +1838,6 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2_protocol_error_resend_pubrec ) {
c->pubrel(packet_id);
return true;
});
c->set_v5_pubcomp_handler(
[&chk, g]
(packet_id_t, MQTT_NS::v5::pubcomp_reason_code, MQTT_NS::v5::properties /*props*/) mutable {
MQTT_CHK("h_pubcomp");
g.reset();
return true;
});
c->set_v5_suback_handler(
[&chk, &c]
(packet_id_t, std::vector<MQTT_NS::v5::suback_reason_code> reasons, MQTT_NS::v5::properties /*props*/) {
Expand All @@ -1895,15 +1847,6 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2_protocol_error_resend_pubrec ) {
c->publish("topic1", "topic1_contents", MQTT_NS::qos::exactly_once);
return true;
});
c->set_v5_unsuback_handler(
[&chk, &c]
(packet_id_t, std::vector<MQTT_NS::v5::unsuback_reason_code> reasons, MQTT_NS::v5::properties /*props*/) {
MQTT_CHK("h_unsuback");
BOOST_TEST(reasons.size() == 1U);
BOOST_TEST(reasons[0] == MQTT_NS::v5::unsuback_reason_code::success);
c->disconnect();
return true;
});
c->set_v5_publish_handler(
[&chk, &c]
(MQTT_NS::optional<packet_id_t> packet_id,
Expand All @@ -1923,42 +1866,24 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2_protocol_error_resend_pubrec ) {
c->pubrec(*packet_id);
return true;
});
c->set_v5_pubrel_handler(
[&chk, &c, g]
(packet_id_t packet_id, MQTT_NS::v5::pubrel_reason_code, MQTT_NS::v5::properties /*props*/) mutable {
auto ret = MQTT_ORDERED(
[&] {
MQTT_CHK("h_pubrel1");
c->pubcomp(packet_id);
},
[&] () {
MQTT_CHK("h_pubrel2");
c->pubcomp(packet_id);
g.reset();
}
);
BOOST_TEST(ret);
return true;
});
break;
default:
BOOST_CHECK(false);
break;
}

c->set_close_handler(
[&chk, &finish]
[]
() {
MQTT_CHK("h_close");
finish();
BOOST_CHECK(false);
});
c->set_error_handler(
[]
[&chk, &finish]
(MQTT_NS::error_code) {
BOOST_CHECK(false);
MQTT_CHK("h_error");
finish();
});

g.reset();
c->connect();
ioc.run();
BOOST_TEST(chk.all());
Expand Down

0 comments on commit 07a6247

Please sign in to comment.