Skip to content

Commit

Permalink
Merge pull request #12738 from rabbitmq/mergify/bp/v4.0.x/pr-12674
Browse files Browse the repository at this point in the history
rabbit_amqqueue: Add `is_feature_used` callback to `transient_nonexcl_queues` depr. feature (backport #12674)
  • Loading branch information
dumbbell authored Nov 15, 2024
2 parents 954ef30 + 1e9932b commit 7afb420
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 2 deletions.
15 changes: 14 additions & 1 deletion deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
emit_consumers_local/3, internal_delete/3]).

%% Deprecated feature callback.
-export([are_transient_nonexcl_used/1]).

-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
-include("amqqueue.hrl").
Expand Down Expand Up @@ -110,9 +113,19 @@
-rabbit_deprecated_feature(
{transient_nonexcl_queues,
#{deprecation_phase => permitted_by_default,
doc_url => "https://blog.rabbitmq.com/posts/2021/08/4.0-deprecation-announcements/#removal-of-transient-non-exclusive-queues"
doc_url => "https://blog.rabbitmq.com/posts/2021/08/4.0-deprecation-announcements/#removal-of-transient-non-exclusive-queues",
callbacks => #{is_feature_used => {?MODULE, are_transient_nonexcl_used}}
}}).

are_transient_nonexcl_used(_) ->
case rabbit_db_queue:list_transient() of
{ok, Queues} ->
NonExclQueues = [Q || Q <- Queues, not is_exclusive(Q)],
length(NonExclQueues) > 0;
{error, _} ->
undefined
end.

-define(CONSUMER_INFO_KEYS,
[queue_name, channel_pid, consumer_tag, ack_required, prefetch_count,
active, activity_status, arguments]).
Expand Down
37 changes: 36 additions & 1 deletion deps/rabbit/src/rabbit_db_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@

%% Used by on_node_up and on_node_down.
%% Can be deleted once transient entities/mnesia are removed.
-export([foreach_transient/1,
-export([list_transient/0,
foreach_transient/1,
delete_transient/1]).

%% Only used by rabbit_amqqueue:forget_node_for_queue, which is only called
Expand Down Expand Up @@ -965,6 +966,40 @@ set_in_khepri(Q) ->
Path = khepri_queue_path(amqqueue:get_name(Q)),
rabbit_khepri:put(Path, Q).

%% -------------------------------------------------------------------
%% list_transient().
%% -------------------------------------------------------------------

-spec list_transient() -> {ok, Queues} | {error, any()} when
Queues :: [amqqueue:amqqueue()].
%% @doc Applies `UpdateFun' to all transient queue records.
%%
%% @private

list_transient() ->
rabbit_khepri:handle_fallback(
#{mnesia => fun() -> list_transient_in_mnesia() end,
khepri => fun() -> list_transient_in_khepri() end
}).

list_transient_in_mnesia() ->
Pattern = amqqueue:pattern_match_all(),
AllQueues = mnesia:dirty_match_object(
?MNESIA_TABLE,
Pattern),
{ok, AllQueues}.

list_transient_in_khepri() ->
try
List = ets:match_object(
?KHEPRI_PROJECTION,
amqqueue:pattern_match_on_durable(false)),
{ok, List}
catch
error:badarg ->
{error, {khepri_projection_missing, ?KHEPRI_WILDCARD_STAR}}
end.

%% -------------------------------------------------------------------
%% delete_transient().
%% -------------------------------------------------------------------
Expand Down

0 comments on commit 7afb420

Please sign in to comment.