From 36585105b47650de5d5b753895a0812c70ab0955 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Fri, 22 Sep 2023 13:41:33 +0000 Subject: [PATCH 1/2] Do not confirm MQTT messages if in partition Since MQTT publishers might publish to classic mirrored queues, add the same check as in rabbit_channel:send_confirms_and_nacks/1: ``` If we are in a minority and pause_minority mode then a) we are going to shut down imminently and b) we should not confirm anything until then, since anything we confirm is likely to be lost. ``` (cherry picked from commit d8ecc66a8ec1ce2f59481ddb28b3bd3aba164fbb) # Conflicts: # deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl --- deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index c92625f70172..735d658b11de 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -1169,6 +1169,7 @@ process_routing_confirm(#delivery{confirm = true, send_puback(PktIds0, State) when is_list(PktIds0) -> +<<<<<<< HEAD %% Classic queues confirm messages unordered. %% Let's sort them here assuming most MQTT clients send with an increasing packet identifier. PktIds = lists:usort(PktIds0), @@ -1176,6 +1177,20 @@ send_puback(PktIds0, State) send_puback(Id, State) end, PktIds); send_puback(PktId, State = #state{cfg = #cfg{proto_ver = ProtoVer}}) -> +======= + case rabbit_node_monitor:pause_partition_guard() of + ok -> + %% Classic queues confirm messages unordered. + %% Let's sort them here assuming most MQTT clients send with an increasing packet identifier. + PktIds = lists:usort(PktIds0), + lists:foreach(fun(Id) -> + send_puback(Id, ReasonCode, State) + end, PktIds); + pausing -> + ok + end; +send_puback(PktId, ReasonCode, State = #state{cfg = #cfg{proto_ver = ProtoVer}}) -> +>>>>>>> d8ecc66a8e (Do not confirm MQTT messages if in partition) rabbit_global_counters:messages_confirmed(ProtoVer, 1), Packet = #mqtt_packet{fixed = #mqtt_packet_fixed{type = ?PUBACK}, variable = #mqtt_packet_publish{packet_id = PktId}}, From 08c74f5dad4b00e3602659ce550775aa5c331b34 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Tue, 26 Sep 2023 08:57:31 +0200 Subject: [PATCH 2/2] Fix conflict --- deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 735d658b11de..e055a3c6a0a5 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -1169,28 +1169,18 @@ process_routing_confirm(#delivery{confirm = true, send_puback(PktIds0, State) when is_list(PktIds0) -> -<<<<<<< HEAD - %% Classic queues confirm messages unordered. - %% Let's sort them here assuming most MQTT clients send with an increasing packet identifier. - PktIds = lists:usort(PktIds0), - lists:foreach(fun(Id) -> - send_puback(Id, State) - end, PktIds); -send_puback(PktId, State = #state{cfg = #cfg{proto_ver = ProtoVer}}) -> -======= case rabbit_node_monitor:pause_partition_guard() of ok -> %% Classic queues confirm messages unordered. %% Let's sort them here assuming most MQTT clients send with an increasing packet identifier. PktIds = lists:usort(PktIds0), lists:foreach(fun(Id) -> - send_puback(Id, ReasonCode, State) + send_puback(Id, State) end, PktIds); pausing -> ok end; -send_puback(PktId, ReasonCode, State = #state{cfg = #cfg{proto_ver = ProtoVer}}) -> ->>>>>>> d8ecc66a8e (Do not confirm MQTT messages if in partition) +send_puback(PktId, State = #state{cfg = #cfg{proto_ver = ProtoVer}}) -> rabbit_global_counters:messages_confirmed(ProtoVer, 1), Packet = #mqtt_packet{fixed = #mqtt_packet_fixed{type = ?PUBACK}, variable = #mqtt_packet_publish{packet_id = PktId}},