Skip to content

Commit

Permalink
Quorum queues v4 (#10637)
Browse files Browse the repository at this point in the history
 This commit contains the following new quorum queue features:

* Fair share high/low priorities
* SAC consumers honour consumer priorities
* Credited consumer refactoring to meet AMQP requirements.
* Use checkpoints feature to reduce memory use for queues with long backlogs
 * Consumer cancel option that immediately removes consumer and returns all pending messages.
 * More compact commands of the most common commands such as enqueue, settle and credit
 * Correctly track the delivery-count to be compatible with the AMQP spec
 * Support the "modified" AMQP 1.0 outcome better.

Commits:

* Quorum queues v4 scaffolding.

Create the new version but not including any changes yet.

QQ: force delete followers after leader has terminated.

Also try a longer sleep for mqtt_shared_SUITE so that the
delete operation stands a chance to time out and move on
to the forced deletion stage.

In some mixed machine version scenarios some followers will never
apply the poison pill command so we may as well force delete them
just in case.

QQ: skip test in amqp_client that cannot pass with mixed machine versions

QQ: remove dead code

Code relating to prior machine versions and state conversions.

rabbit_fifo_prop_SUITE fixes

* QQ: add v4 ff and new more compact enqueue command.

Also update rabbit_fifo_* suites to test more relevant code versions
where applicable.

QQ: always use the updated credit mode format

QQv4: use more compact consumer reference in settle, credit, return

This introudces a new type: consumer_key() which is either the consumer_id
or the raft index the checkout was processed at. If the consumer is
using one of the updated credit spec formats rabbit_fifo will use the
raft index as the primary key for the consumer such that the rabbit
fifo client can then use the more space efficient integer index
instead of the full consumer id in subsequent commands.

There is compatibility code to still accept the consumer id in
settle, return, discard and credit commands but this is slighlyt
slower and of course less space efficient.

The old form will be used in cases where the fifo client may have
already remove the local consumer state (as happens after a cancel).

Lots of test refactorings of the rabbit_fifo_SUITE to begin to use
the new forms.

* More test refactoring and new API fixes

rabbit_fifo_prop_SUITE refactoring and other fixes.

* First pass SAC consumer priority implementation.

Single active consumers will be activated if they have a higher priority
than the currently active consumer. if the currently active consumer
has pending messages, no further messages will be assigned to the
consumer and the activation of the new consumer will happen once
all pending messages are settled. This is to ensure processing order.

Consumers with the same priority will internally be ordered to
favour those with credit then those that attached first.

QQ: add SAC consumer priority integration tests

QQ: add check for ff in tests

* QQ: add new consumer cancel option: 'remove'

This option immediately removes and returns all messages for a
consumer instead of the softer 'cancel' option which keeps the
consumer around until all pending messages have been either
settled or returned.

This involves a change to the rabbit_queue_type:cancel/5 API
to rabbit_queue_type:cancel/3.

* QQ: capture checked out time for each consumer message.

This will form the basis for queue initiated consumer timeouts.

* QQ: Refactor to use the new ra_machine:handle_aux/5 API

Instead of the old ra_machine:handle_aux/6 callback.

* QQ hi/lo priority queue

* QQ: Avoid using mc:size/1 inside rabbit_fifo

As we dont want to depend on external functions for things that may
change the state of the queue.

* QQ bug fix: Maintain order when returning multiple

Prior to this commit, quorum queues requeued messages in an undefined
order, which is wrong.

This commit fixes this bug and requeues messages always in the order as
nacked / rejected / released by the client.

We ensure that order of requeues is deterministic from the client's
point of view and doesn't depend on whether the quorum queue soft limit
was exceeded temporarily.
So, even when rabbit_fifo_client batches requeues, the order as nacked
by the client is still maintained.

* Simplify

* Add rabbit_quorum_queue:file_handle* functions back.

For backwards compat.

* dialyzer fix

* dynamic_qq_SUITE: avoid mixed versions failure.

* QQ: track number of requeues for message.

To be able to calculate the correct value for the AMQP delivery_count
header we need to be able to distinguish between messages that were
"released" or returned in QQ speak and those that were returned
due to errors such as channel termination.

This commit implement such tracking as well as the calculation
of a new mc annotations `delivery_count` that AMQP makes use
of to set the header value accordingly.

* Use QQ consumer removal when AMQP client detaches

This enables us to unskip some AMQP tests.

* Use AMQP address v2 in fsharp-tests

* QQ: track number of requeues for message.

To be able to calculate the correct value for the AMQP delivery_count
header we need to be able to distinguish between messages that were
"released" or returned in QQ speak and those that were returned
due to errors such as channel termination.

This commit implement such tracking as well as the calculation
of a new mc annotations `delivery_count` that AMQP makes use
of to set the header value accordingly.

* rabbit_fifo: Use Ra checkpoints

* quorum queues: Use a custom interval for checkpoints

* rabbit_fifo_SUITE: List actual effects in ?ASSERT_EFF failure

* QQ: Checkpoints modifications

* fixes

* QQ: emit release cursors on tick for followers and leaders

else followers could end up holding on to segments a bit longer
after traffic stops.

* Support draining a QQ SAC waiting consumer

By issuing drain=true, the client says "either send a transfer or a flow frame".
Since there are no messages to send to an inactive consumer, the sending
queue should advance the delivery-count consuming all link-credit and send
a credit_reply with drain=true to the session proc which causes the session
proc to send a flow frame to the client.

* Extract applying #credit{} cmd into 2 functions

This commit is only refactoring and doesn't change any behaviour.

* Fix default priority level

Prior to this commit, when a message didn't have a priority level set,
it got enqueued as high prio.

This is wrong because the default priority is 4 and
"for example, if 2 distinct priorities are implemented,
then levels 0 to 4 are equivalent, and levels 5 to 9 are equivalent
and levels 4 and 5 are distinct."
Hence, by default a message without priority set, must be enqueued as
low prio.

* bazel run gazelle

* Avoid deprecated time unit

* Fix aux_test

* Delete dead code

* Fix rabbit_fifo_q:get_lowest_index/1

* Delete unused normalize functions

* Generate less garbage

* Add integration test for QQ SAC with consumer priority

* Improve readability

* Change modified outcome behaviour

With the new quorum queue v4 improvements where a requeue counter was
added in addition to the quorum queue delivery counter, the following
sentence from #6292 (comment)
doesn't apply anymore:

> Also the case where delivery_failed=false|undefined requires the release of the
> message without incrementing the delivery_count. Again this is not something
> that our queues are able to do so again we have to reject without requeue.

Therefore, we simplify the modified outcome behaviour:
RabbitMQ will from now on only discard the message if the modified's
undeliverable-here field is true.

* Introduce single feature flag rabbitmq_4.0.0

 ## What?

Merge all feature flags introduced in RabbitMQ 4.0.0 into a single
feature flag called rabbitmq_4.0.0.

 ## Why?

1. This fixes the crash in
#10637 (comment)
2. It's better user experience.

* QQ: expose priority metrics in UI

* Enable skipped test after rebasing onto main

* QQ: add new command "modify" to better handle AMQP modified outcomes.

This new command can be used to annotate returned or rejected messages.

This commit also retains the delivery-count across dead letter boundaries
such that the AMQP header delivery-count field can now include _all_ failed
deliver attempts since the message was originally received.

Internally the quorum queue has moved it's delivery_count header to
only track the AMQP protocol delivery attempts and now introduces
a new acquired_count to track all message acquisitions by consumers.

* Type tweaks and naming

* Add test for modified outcome with classic queue

* Add test routing on message-annotations in modified outcome

* Skip tests in mixed version tests

Skip tests in mixed version tests because feature flag
rabbitmq_4.0.0 is needed for the new #modify{} Ra command
being sent to quorum queues.

---------

Co-authored-by: David Ansari <david.ansari@gmx.de>
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
(cherry picked from commit 194d4ba)
  • Loading branch information
kjnilsson authored and ansd committed Aug 8, 2024
1 parent cc8bc22 commit ba2388b
Show file tree
Hide file tree
Showing 40 changed files with 8,107 additions and 3,046 deletions.
3 changes: 2 additions & 1 deletion deps/amqp10_client/src/amqp10_msg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ header(first_acquirer = K,
header(delivery_count = K,
#amqp10_msg{header = #'v1_0.header'{delivery_count = D}}) ->
header_value(K, D);
header(K, #amqp10_msg{header = undefined}) -> header_value(K, undefined).
header(K, #amqp10_msg{header = undefined}) ->
header_value(K, undefined).

-spec delivery_annotations(amqp10_msg()) -> #{annotations_key() => any()}.
delivery_annotations(#amqp10_msg{delivery_annotations = undefined}) ->
Expand Down
13 changes: 13 additions & 0 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,9 @@ rabbitmq_suite(
rabbitmq_suite(
name = "rabbit_fifo_int_SUITE",
size = "medium",
additional_beam = [
":test_test_util_beam",
],
deps = [
"//deps/rabbit_common:erlang_app",
"@aten//:erlang_app",
Expand All @@ -722,6 +725,7 @@ rabbitmq_suite(
],
deps = [
"//deps/rabbit_common:erlang_app",
"@meck//:erlang_app",
"@proper//:erlang_app",
"@ra//:erlang_app",
],
Expand All @@ -735,6 +739,15 @@ rabbitmq_suite(
],
)

rabbitmq_suite(
name = "rabbit_fifo_q_SUITE",
size = "small",
deps = [
"//deps/rabbit_common:erlang_app",
"@proper//:erlang_app",
],
)

rabbitmq_integration_suite(
name = "rabbit_fifo_dlx_integration_SUITE",
size = "medium",
Expand Down
20 changes: 19 additions & 1 deletion deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,10 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_fifo_dlx_sup.erl",
"src/rabbit_fifo_dlx_worker.erl",
"src/rabbit_fifo_index.erl",
"src/rabbit_fifo_q.erl",
"src/rabbit_fifo_v0.erl",
"src/rabbit_fifo_v1.erl",
"src/rabbit_fifo_v3.erl",
"src/rabbit_file.erl",
"src/rabbit_global_counters.erl",
"src/rabbit_guid.erl",
Expand Down Expand Up @@ -399,8 +401,10 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_fifo_dlx_sup.erl",
"src/rabbit_fifo_dlx_worker.erl",
"src/rabbit_fifo_index.erl",
"src/rabbit_fifo_q.erl",
"src/rabbit_fifo_v0.erl",
"src/rabbit_fifo_v1.erl",
"src/rabbit_fifo_v3.erl",
"src/rabbit_file.erl",
"src/rabbit_global_counters.erl",
"src/rabbit_guid.erl",
Expand Down Expand Up @@ -541,6 +545,7 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_fifo_dlx.hrl",
"src/rabbit_fifo_v0.hrl",
"src/rabbit_fifo_v1.hrl",
"src/rabbit_fifo_v3.hrl",
"src/rabbit_stream_coordinator.hrl",
"src/rabbit_stream_sac_coordinator.hrl",
],
Expand Down Expand Up @@ -672,8 +677,10 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_fifo_dlx_sup.erl",
"src/rabbit_fifo_dlx_worker.erl",
"src/rabbit_fifo_index.erl",
"src/rabbit_fifo_q.erl",
"src/rabbit_fifo_v0.erl",
"src/rabbit_fifo_v1.erl",
"src/rabbit_fifo_v3.erl",
"src/rabbit_file.erl",
"src/rabbit_global_counters.erl",
"src/rabbit_guid.erl",
Expand Down Expand Up @@ -1288,7 +1295,8 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
testonly = True,
srcs = ["test/rabbit_fifo_SUITE.erl"],
outs = ["test/rabbit_fifo_SUITE.beam"],
hdrs = ["src/rabbit_fifo.hrl"],
hdrs = ["src/rabbit_fifo.hrl",
"src/rabbit_fifo_dlx.hrl"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/rabbit_common:erlang_app"],
Expand Down Expand Up @@ -2142,3 +2150,13 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
)
erlang_bytecode(
name = "rabbit_fifo_q_SUITE_beam_files",
testonly = True,
srcs = ["test/rabbit_fifo_q_SUITE.erl"],
outs = ["test/rabbit_fifo_q_SUITE.beam"],
hdrs = ["src/rabbit_fifo.hrl"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["@proper//:erlang_app"],
)
4 changes: 2 additions & 2 deletions deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ record_death(Reason, SourceQueue,
routing_keys = RKeys,
count = 1,
anns = DeathAnns},
ReasonBin = atom_to_binary(Reason),
Anns = case Anns0 of
#{deaths := Deaths0} ->
Deaths = case Deaths0 of
Expand All @@ -406,7 +407,7 @@ record_death(Reason, SourceQueue,
[{Key, NewDeath} | Deaths0]
end
end,
Anns0#{<<"x-last-death-reason">> := atom_to_binary(Reason),
Anns0#{<<"x-last-death-reason">> := ReasonBin,
<<"x-last-death-queue">> := SourceQueue,
<<"x-last-death-exchange">> := Exchange,
deaths := Deaths};
Expand All @@ -419,7 +420,6 @@ record_death(Reason, SourceQueue,
_ ->
[{Key, NewDeath}]
end,
ReasonBin = atom_to_binary(Reason),
Anns0#{<<"x-first-death-reason">> => ReasonBin,
<<"x-first-death-queue">> => SourceQueue,
<<"x-first-death-exchange">> => Exchange,
Expand Down
40 changes: 17 additions & 23 deletions deps/rabbit/src/mc_amqp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -222,14 +222,7 @@ get_property(priority, Msg) ->
-spec protocol_state(state(), mc:annotations()) -> iolist().
protocol_state(Msg0 = #msg_body_decoded{header = Header0,
message_annotations = MA0}, Anns) ->
FirstAcquirer = first_acquirer(Anns),
Header = case Header0 of
undefined ->
#'v1_0.header'{durable = true,
first_acquirer = FirstAcquirer};
#'v1_0.header'{} ->
Header0#'v1_0.header'{first_acquirer = FirstAcquirer}
end,
Header = update_header_from_anns(Header0, Anns),
MA = protocol_state_message_annotations(MA0, Anns),
Msg = Msg0#msg_body_decoded{header = Header,
message_annotations = MA},
Expand All @@ -238,14 +231,7 @@ protocol_state(Msg0 = #msg_body_decoded{header = Header0,
protocol_state(#msg_body_encoded{header = Header0,
message_annotations = MA0,
bare_and_footer = BareAndFooter}, Anns) ->
FirstAcquirer = first_acquirer(Anns),
Header = case Header0 of
undefined ->
#'v1_0.header'{durable = true,
first_acquirer = FirstAcquirer};
#'v1_0.header'{} ->
Header0#'v1_0.header'{first_acquirer = FirstAcquirer}
end,
Header = update_header_from_anns(Header0, Anns),
MA = protocol_state_message_annotations(MA0, Anns),
Sections = to_sections(Header, MA, []),
[encode(Sections), BareAndFooter];
Expand All @@ -269,10 +255,9 @@ protocol_state(#v1{message_annotations = MA0,
_ ->
undefined
end,
Header = #'v1_0.header'{durable = Durable,
priority = Priority,
ttl = Ttl,
first_acquirer = first_acquirer(Anns)},
Header = update_header_from_anns(#'v1_0.header'{durable = Durable,
priority = Priority,
ttl = Ttl}, Anns),
MA = protocol_state_message_annotations(MA0, Anns),
Sections = to_sections(Header, MA, []),
[encode(Sections), BareAndFooter].
Expand Down Expand Up @@ -573,13 +558,22 @@ msg_body_encoded([{{pos, Pos}, {body, Code}}], BarePos, Msg)
binary_part_bare_and_footer(Payload, Start) ->
binary_part(Payload, Start, byte_size(Payload) - Start).

-spec first_acquirer(mc:annotations()) -> boolean().
first_acquirer(Anns) ->
update_header_from_anns(undefined, Anns) ->
update_header_from_anns(#'v1_0.header'{durable = true}, Anns);
update_header_from_anns(Header, Anns) ->
DeliveryCount = case Anns of
#{delivery_count := C} -> C;
_ -> 0
end,
Redelivered = case Anns of
#{redelivered := R} -> R;
_ -> false
end,
not Redelivered.
FirstAcq = not Redelivered andalso
DeliveryCount =:= 0 andalso
not is_map_key(deaths, Anns),
Header#'v1_0.header'{first_acquirer = FirstAcq,
delivery_count = {uint, DeliveryCount}}.

encode_deaths(Deaths) ->
lists:map(
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/mc_amqpl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ convert_from(mc_amqp, Sections, Env) ->
{Headers2, CorrId091} = message_id(CorrId, <<"x-correlation-id">>, Headers1),

Headers = case Env of
#{message_containers_store_amqp_v1 := false} ->
#{'rabbitmq_4.0.0' := false} ->
Headers3 = case AProp of
undefined ->
Headers2;
Expand Down
4 changes: 3 additions & 1 deletion deps/rabbit/src/mc_compat.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ get_annotation(?ANN_ROUTING_KEYS, #basic_message{routing_keys = RKeys}) ->
get_annotation(?ANN_EXCHANGE, #basic_message{exchange_name = Ex}) ->
Ex#resource.name;
get_annotation(id, #basic_message{id = Id}) ->
Id.
Id;
get_annotation(_Key, #basic_message{}) ->
undefined.

set_annotation(id, Value, #basic_message{} = Msg) ->
Msg#basic_message{id = Value};
Expand Down
64 changes: 40 additions & 24 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@
send_settled :: boolean(),
max_message_size :: unlimited | pos_integer(),

%% When feature flag credit_api_v2 becomes required,
%% When feature flag rabbitmq_4.0.0 becomes required,
%% the following 2 fields should be deleted.
credit_api_version :: 1 | 2,
%% When credit API v1 is used, our session process holds the delivery-count
Expand Down Expand Up @@ -225,7 +225,7 @@
frames :: [transfer_frame_body(), ...],
queue_ack_required :: boolean(),
%% Queue that sent us this message.
%% When feature flag credit_api_v2 becomes required, this field should be deleted.
%% When feature flag rabbitmq_4.0.0 becomes required, this field should be deleted.
queue_pid :: pid() | credit_api_v2,
delivery_id :: delivery_number(),
outgoing_unsettled :: #outgoing_unsettled{}
Expand Down Expand Up @@ -1068,17 +1068,17 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
QType = amqqueue:get_type(Q),
%% Whether credit API v1 or v2 is used is decided only here at link attachment time.
%% This decision applies to the whole life time of the link.
%% This means even when feature flag credit_api_v2 will be enabled later, this consumer will
%% This means even when feature flag rabbitmq_4.0.0 will be enabled later, this consumer will
%% continue to use credit API v1. This is the safest and easiest solution avoiding
%% transferring link flow control state (the delivery-count) at runtime from this session
%% process to the queue process.
%% Eventually, after feature flag credit_api_v2 gets enabled and a subsequent rolling upgrade,
%% Eventually, after feature flag rabbitmq_4.0.0 gets enabled and a subsequent rolling upgrade,
%% all consumers will use credit API v2.
%% Streams always use credit API v2 since the stream client (rabbit_stream_queue) holds the link
%% flow control state. Hence, credit API mixed version isn't an issue for streams.
{CreditApiVsn, Mode, DeliveryCount, ClientFlowCtl,
QueueFlowCtl, CreditReqInFlight, StashedCreditReq} =
case rabbit_feature_flags:is_enabled(credit_api_v2) orelse
case rabbit_feature_flags:is_enabled('rabbitmq_4.0.0') orelse
QType =:= rabbit_stream_queue of
true ->
{2,
Expand Down Expand Up @@ -1861,20 +1861,30 @@ settle_op_from_outcome(#'v1_0.rejected'{}) ->
discard;
settle_op_from_outcome(#'v1_0.released'{}) ->
requeue;
%% Keep the same Modified behaviour as in RabbitMQ 3.x
settle_op_from_outcome(#'v1_0.modified'{delivery_failed = true,
undeliverable_here = UndelHere})
when UndelHere =/= true ->
requeue;
settle_op_from_outcome(#'v1_0.modified'{}) ->
%% If delivery_failed is not true, we can't increment its delivery_count.
%% So, we will have to reject without requeue.
%%
%% If undeliverable_here is true, this is not quite correct because
%% undeliverable_here refers to the link, and not the message in general.
%% However, we cannot filter messages from being assigned to individual consumers.
%% That's why we will have to reject it without requeue.
discard;

%% Not all queue types support the modified outcome fields correctly.
%% However, we still allow the client to settle with the modified outcome
%% because some client libraries such as Apache QPid make use of it:
%% https://github.com/apache/qpid-jms/blob/90eb60f59cb59b7b9ad8363ee8a843d6903b8e77/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java#L464
%% In such cases, it's better when RabbitMQ does not end the session.
%% See https://github.com/rabbitmq/rabbitmq-server/issues/6121
settle_op_from_outcome(#'v1_0.modified'{delivery_failed = DelFailed,
undeliverable_here = UndelHere,
message_annotations = Anns0
}) ->
Anns = case Anns0 of
#'v1_0.message_annotations'{content = C} ->
C;
_ ->
[]
end,
{modify,
default(DelFailed, false),
default(UndelHere, false),
%% TODO: this must exist elsewhere
lists:foldl(fun ({{symbol, K}, V}, Acc) ->
Acc#{K => unwrap(V)}
end, #{}, Anns)};
settle_op_from_outcome(Outcome) ->
protocol_error(
?V_1_0_AMQP_ERROR_INVALID_FIELD,
Expand Down Expand Up @@ -1981,7 +1991,7 @@ handle_queue_actions(Actions, State) ->
S0 = #state{outgoing_links = OutgoingLinks0,
outgoing_pending = Pending}) ->
%% credit API v1
%% Delete this branch when feature flag credit_api_v2 becomes required.
%% Delete this branch when feature flag rabbitmq_4.0.0 becomes required.
Handle = ctag_to_handle(Ctag),
Link = #outgoing_link{delivery_count = Count0} = maps:get(Handle, OutgoingLinks0),
{Count, Credit, S} = case Drain of
Expand Down Expand Up @@ -2788,7 +2798,7 @@ delivery_count_rcv(undefined) ->
%% credits to a queue has to synchronously wait for a credit reply from the queue:
%% https://github.com/rabbitmq/rabbitmq-server/blob/b9566f4d02f7ceddd2f267a92d46affd30fb16c8/deps/rabbitmq_codegen/credit_extension.json#L43
%% This blocks our entire AMQP 1.0 session process. Since the credit reply from the
%% queue did not contain the consumr tag prior to feature flag credit_api_v2, we
%% queue did not contain the consumr tag prior to feature flag rabbitmq_4.0.0, we
%% must behave here the same way as non-native AMQP 1.0: We wait until the queue
%% sends us a credit reply sucht that we can correlate that reply with our consumer tag.
process_credit_reply_sync(
Expand Down Expand Up @@ -2853,7 +2863,7 @@ process_credit_reply_sync_quorum_queue(Ctag, QName, Credit, State0) ->
no_return().
credit_reply_timeout(QType, QName) ->
Fmt = "Timed out waiting for credit reply from ~s ~s. "
"Hint: Enable feature flag credit_api_v2",
"Hint: Enable feature flag rabbitmq_4.0.0",
Args = [QType, rabbit_misc:rs(QName)],
rabbit_log:error(Fmt, Args),
protocol_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR, Fmt, Args).
Expand Down Expand Up @@ -3441,12 +3451,13 @@ cap_credit(DesiredCredit) ->
min(DesiredCredit, MaxCredit).

ensure_mc_cluster_compat(Mc) ->
IsEnabled = rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1),
Feature = 'rabbitmq_4.0.0',
IsEnabled = rabbit_feature_flags:is_enabled(Feature),
case IsEnabled of
true ->
Mc;
false ->
McEnv = #{message_containers_store_amqp_v1 => IsEnabled},
McEnv = #{Feature => IsEnabled},
%% other nodes in the cluster may not understand the new internal
%% amqp mc format - in this case we convert to AMQP legacy format
%% for compatibility
Expand Down Expand Up @@ -3497,3 +3508,8 @@ format_status(
permission_cache => PermissionCache,
topic_permission_cache => TopicPermissionCache},
maps:update(state, State, Status).

unwrap({_Tag, V}) ->
V;
unwrap(V) ->
V.
6 changes: 3 additions & 3 deletions deps/rabbit/src/rabbit_amqp_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ send_command_sync(Writer, ChannelNum, Performative) ->
Request = {send_command, ChannelNum, Performative},
gen_server:call(Writer, Request, ?CALL_TIMEOUT).

%% Delete this function when feature flag credit_api_v2 becomes required.
%% Delete this function when feature flag rabbitmq_4.0.0 becomes required.
-spec send_command_and_notify(pid(),
pid(),
rabbit_types:channel_number(),
Expand Down Expand Up @@ -111,7 +111,7 @@ handle_cast({send_command, SessionPid, ChannelNum, Performative, Payload}, State
State1 = internal_send_command_async(ChannelNum, Performative, Payload, State0),
State = credit_flow_ack(SessionPid, State1),
no_reply(State);
%% Delete below function clause when feature flag credit_api_v2 becomes required.
%% Delete below function clause when feature flag rabbitmq_4.0.0 becomes required.
handle_cast({send_command_and_notify, QueuePid, SessionPid, ChannelNum, Performative, Payload}, State0) ->
State1 = internal_send_command_async(ChannelNum, Performative, Payload, State0),
State = credit_flow_ack(SessionPid, State1),
Expand All @@ -131,7 +131,7 @@ handle_info({{'DOWN', session}, _MRef, process, SessionPid, _Reason},
credit_flow:peer_down(SessionPid),
State = State0#state{monitored_sessions = maps:remove(SessionPid, Sessions)},
no_reply(State);
%% Delete below function clause when feature flag credit_api_v2 becomes required.
%% Delete below function clause when feature flag rabbitmq_4.0.0 becomes required.
handle_info({'DOWN', _MRef, process, QueuePid, _Reason}, State) ->
rabbit_amqqueue:notify_sent_queue_down(QueuePid),
no_reply(State).
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1516,7 +1516,7 @@ handle_cast({credit, SessionPid, CTag, Credit, Drain},
backing_queue = BQ,
backing_queue_state = BQS0} = State) ->
%% Credit API v1.
%% Delete this function clause when feature flag credit_api_v2 becomes required.
%% Delete this function clause when feature flag rabbitmq_4.0.0 becomes required.
%% Behave like non-native AMQP 1.0: Send send_credit_reply before deliveries.
rabbit_classic_queue:send_credit_reply_credit_api_v1(
SessionPid, amqqueue:get_name(Q), BQ:len(BQS0)),
Expand Down
Loading

0 comments on commit ba2388b

Please sign in to comment.