Skip to content

Commit

Permalink
Merge pull request #986 from redboltz/fix_v5_pubrec_error_send
Browse files Browse the repository at this point in the history
Fixed qos2_publish_handled_ invalidly remains issue.
  • Loading branch information
redboltz authored Oct 3, 2023
2 parents 8e9c8f9 + 0310600 commit ec7bc54
Showing 1 changed file with 27 additions and 3 deletions.
30 changes: 27 additions & 3 deletions include/mqtt/endpoint.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4386,6 +4386,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
* @return set of packet_ids
*/
std::set<packet_id_t> get_qos2_publish_handled_pids() const {
LockGuard<Mutex> lck(qos2_publish_handled_mtx_);
return qos2_publish_handled_;
}

Expand All @@ -4395,6 +4396,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
* @param pids packet ids
*/
void restore_qos2_publish_handled_pids(std::set<packet_id_t> pids) {
LockGuard<Mutex> lck(qos2_publish_handled_mtx_);
qos2_publish_handled_ = force_move(pids);
}

Expand Down Expand Up @@ -7951,7 +7953,12 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
}
break;
case qos::exactly_once:
if (ep_.qos2_publish_handled_.find(*packet_id_) == ep_.qos2_publish_handled_.end()) {
if (
[&] {
LockGuard<Mutex> lck(ep_.qos2_publish_handled_mtx_);
return ep_.qos2_publish_handled_.find(*packet_id_) == ep_.qos2_publish_handled_.end();
} ()
) {
if (handler_call()) {
ep_.on_mqtt_message_processed(
force_move(
Expand All @@ -7962,7 +7969,10 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
)
)
);
ep_.qos2_publish_handled_.emplace(*packet_id_);
{
LockGuard<Mutex> lck(ep_.qos2_publish_handled_mtx_);
ep_.qos2_publish_handled_.emplace(*packet_id_);
}
ep_.auto_pub_response(
[this] {
if (ep_.connected_) {
Expand Down Expand Up @@ -8488,7 +8498,10 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
}
);
};
ep_.qos2_publish_handled_.erase(packet_id_);
{
LockGuard<Mutex> lck(ep_.qos2_publish_handled_mtx_);
ep_.qos2_publish_handled_.erase(packet_id_);
}
switch (ep_.version_) {
case protocol_version::v3_1_1:
if (ep_.on_pubrel(packet_id_)) {
Expand Down Expand Up @@ -9929,6 +9942,10 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
v5::pubrec_reason_code reason,
v5::properties props
) {
if (is_error(reason)) {
LockGuard<Mutex> lck(qos2_publish_handled_mtx_);
qos2_publish_handled_.erase(packet_id);
}
switch (version_) {
case protocol_version::v3_1_1: {
auto msg = v3_1_1::basic_pubrec_message<PacketIdBytes>(packet_id);
Expand Down Expand Up @@ -10692,6 +10709,10 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
v5::properties props,
async_handler_t func
) {
if (is_error(reason)) {
LockGuard<Mutex> lck(qos2_publish_handled_mtx_);
qos2_publish_handled_.erase(packet_id);
}
switch (version_) {
case protocol_version::v3_1_1: {
auto msg = v3_1_1::basic_pubrec_message<PacketIdBytes>(packet_id);
Expand Down Expand Up @@ -11669,7 +11690,10 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,

Mutex store_mtx_;
store<PacketIdBytes> store_;

mutable Mutex qos2_publish_handled_mtx_;
std::set<packet_id_t> qos2_publish_handled_;

std::deque<async_packet> queue_;

packet_id_manager<packet_id_t> pid_man_;
Expand Down

0 comments on commit ec7bc54

Please sign in to comment.