Skip to content

Commit

Permalink
Bugfixes
Browse files Browse the repository at this point in the history
  • Loading branch information
dcorbacho committed Feb 16, 2023
1 parent 2c97741 commit be22e98
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 47 deletions.
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_core_ff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ migrate_tables_to_khepri(FeatureName, TablesAndOwners) ->
"Feature flag `~s`: migration from Mnesia to Khepri "
"finished",
[FeatureName]),
rabbit_khepri:set_ready(),
_ = rabbit_khepri:set_ready(),
ok;
{'DOWN', MonitorRef, process, Pid, Info} ->
?LOG_ERROR(
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_db.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ recover_mnesia_tables() ->
%% the feature flag. See rabbit_core_ff:final_sync_from_mnesia_to_khepri/2
%% Unlock them here as mnesia is still fully functional.
Tables = [Table || {Table, _} <- rabbit_table:definitions()],
[mnesia:change_table_access_mode(Table, read_write) || Table <- Tables],
_ = [mnesia:change_table_access_mode(Table, read_write) || Table <- Tables],
ok.

init_in_khepri() ->
Expand Down
6 changes: 4 additions & 2 deletions deps/rabbit/src/rabbit_db_rtparams.erl
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ get_or_set_in_khepri(Key, Default) ->
{ok, undefined} ->
Record = #runtime_parameters{key = Key,
value = Default},
khepri_tx:put(Path, Record),
ok = khepri_tx:put(Path, Record),
Record;
{ok, R} ->
R
Expand Down Expand Up @@ -282,7 +282,9 @@ get_all_in_khepri_tx(VHost, Component) ->
%% Inside of a transaction, using `rabbit_vhost:exists` will cause
%% a deadlock and timeout on the transaction, as it uses `rabbit_khepri:exists`.
%% The `with` function uses the `khepri_tx` API instead
_ -> rabbit_db_vhost:with_fun_in_khepri_tx(VHost, fun() -> ok end)
_ ->
Fun = rabbit_db_vhost:with_fun_in_khepri_tx(VHost, fun() -> ok end),
Fun()
end,
case khepri_tx:get_many(Path) of
{ok, Map} ->
Expand Down
3 changes: 2 additions & 1 deletion deps/rabbit/src/rabbit_mirror_queue_master.erl
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ migrate_queue_record_in_khepri(QName, GM, Self) ->
%% start HA queues
end)
end,
ok = rabbit_khepri:transaction(Fun, rw).
_ = rabbit_khepri:transaction(Fun, rw),
ok.

-spec stop_mirroring(master_state()) -> {atom(), any()}.

Expand Down
12 changes: 6 additions & 6 deletions deps/rabbit/src/rabbit_mirror_queue_misc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ remove_from_queue_in_mnesia(QueueName, Self, DeadGMPids) ->
Q1 = amqqueue:set_pid(Q0, QPid1),
Q2 = amqqueue:set_slave_pids(Q1, SPids1),
Q3 = amqqueue:set_gm_pids(Q2, AliveGM),
store_updated_slaves_in_mnesia(Q3),
_ = store_updated_slaves_in_mnesia(Q3),
%% If we add and remove nodes at the
%% same time we might tell the old
%% master we need to sync and then
Expand All @@ -131,7 +131,7 @@ remove_from_queue_in_mnesia(QueueName, Self, DeadGMPids) ->
%% [1].
Q1 = amqqueue:set_slave_pids(Q0, Alive),
Q2 = amqqueue:set_gm_pids(Q1, AliveGM),
store_updated_slaves_in_mnesia(Q2),
_ = store_updated_slaves_in_mnesia(Q2),
{ok, QPid1, DeadPids, []}
end
end
Expand Down Expand Up @@ -209,7 +209,7 @@ remove_from_queue_in_khepri(QueueName, Self, DeadGMPids) ->
Q1 = amqqueue:set_pid(Q0, QPid1),
Q2 = amqqueue:set_slave_pids(Q1, SPids1),
Q3 = amqqueue:set_gm_pids(Q2, AliveGM),
store_updated_slaves_in_khepri(Q3, Decorators),
_ = store_updated_slaves_in_khepri(Q3, Decorators),
%% If we add and remove nodes at the
%% same time we might tell the old
%% master we need to sync and then
Expand All @@ -223,7 +223,7 @@ remove_from_queue_in_khepri(QueueName, Self, DeadGMPids) ->
%% [1].
Q1 = amqqueue:set_slave_pids(Q0, Alive),
Q2 = amqqueue:set_gm_pids(Q1, AliveGM),
store_updated_slaves_in_khepri(Q2, Decorators),
_ = store_updated_slaves_in_khepri(Q2, Decorators),
{ok, QPid1, DeadPids, []}
end
end
Expand All @@ -249,7 +249,7 @@ on_vhost_up(VHost) ->
on_vhost_up_in_khepri(VHost)
end
}),
[add_mirror(QName, node(), async) || QName <- QNames],
_ = [add_mirror(QName, node(), async) || QName <- QNames],
ok.

on_vhost_up_in_mnesia(VHost) ->
Expand Down Expand Up @@ -453,7 +453,7 @@ store_updated_slaves_in_khepri(Q0, Decorators) ->
%% HA queues are not supported in Khepri. This update is just enough to make
%% some of the current tests work, which might start some HA queue.
%% It will be removed before Khepri is released.
rabbit_db_queue:update_in_khepri_tx(amqqueue:get_name(Q0), fun(_) -> Q4 end),
_ = rabbit_db_queue:update_in_khepri_tx(amqqueue:get_name(Q0), fun(_) -> Q4 end),
%% Wake it up so that we emit a stats event
rabbit_amqqueue:notify_policy_changed(Q3),
Q3.
Expand Down
6 changes: 3 additions & 3 deletions deps/rabbit/src/rabbit_mirror_queue_slave.erl
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,9 @@ init_it_in_khepri(Self, GM, Node, QName) ->
PSPids = amqqueue:get_slave_pids_pending_shutdown(Q),
%% TODO we can't kill processes!
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of
[] -> stop_pending_slaves(QName, PSPids),
[] -> _ = stop_pending_slaves(QName, PSPids),
%% TODO make add_slave_in_khepri and add_slave_in_mnesia
add_slave(Q, Self, GM),
_ = add_slave(Q, Self, GM),
{new, QPid, GMPids};
%% TODO is_process_alive should never go on a khepri transaction!
[QPid] -> case rabbit_mnesia:is_process_alive(QPid) of
Expand All @@ -236,7 +236,7 @@ init_it_in_khepri(Self, GM, Node, QName) ->
SPids1 = SPids -- [SPid],
Q1 = amqqueue:set_slave_pids(Q, SPids1),
Q2 = amqqueue:set_gm_pids(Q1, GMPids1),
add_slave(Q2, Self, GM),
_ = add_slave(Q2, Self, GM),
{new, QPid, GMPids1}
end
end;
Expand Down
3 changes: 1 addition & 2 deletions deps/rabbit/src/rabbit_vhost_limit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,5 @@ get_limit(VirtualHost, Limit) ->
N when N < 0 -> undefined;
N when N >= 0 -> {ok, N}
end
end;
{error, {timeout, _}} -> throw({error, {cannot_get_limit, VirtualHost, timeout}})
end
end.
4 changes: 0 additions & 4 deletions deps/rabbit/src/rabbit_vhost_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,6 @@ handle_info(check_vhost, VHost) ->
fun() ->
rabbit_vhost_sup_sup:stop_and_delete_vhost(VHost)
end),
{noreply, VHost};
_ ->
%% An error just happened, the node could be down.
%% There is nothing we can do, just wait until the next check
{noreply, VHost}
end;
handle_info(_, VHost) ->
Expand Down
56 changes: 29 additions & 27 deletions deps/rabbit/test/rabbit_db_binding_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ create(Config) ->
create1(_Config) ->
XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
Exchange1 = #exchange{name = XName1, durable = true},
Exchange2 = #exchange{name = XName2, durable = true},
Exchange1 = #exchange{name = XName1, durable = true, decorators = {[], []}},
Exchange2 = #exchange{name = XName2, durable = true, decorators = {[], []}},
Binding = #binding{source = XName1, key = <<"">>, destination = XName2, args = #{}},
?assertMatch({error, {resources_missing, [_, _]}},
rabbit_db_binding:create(Binding, fun(_, _) -> ok end)),
Expand All @@ -105,12 +105,14 @@ exists(Config) ->
exists1(_Config) ->
XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
Exchange1 = #exchange{name = XName1, durable = true},
Exchange2 = #exchange{name = XName2, durable = true},
Exchange1 = #exchange{name = XName1, durable = true, decorators = {[], []}},
Exchange2 = #exchange{name = XName2, durable = true, decorators = {[], []}},
Binding = #binding{source = XName1, key = <<"">>, destination = XName2, args = #{}},
?assertEqual(false, rabbit_db_exchange:exists(Binding)),
?assertMatch({error, {resources_missing, [{not_found, _}, {not_found, _}]}},
rabbit_db_binding:exists(Binding)),
?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange1)),
?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange2)),
?assertEqual(false, rabbit_db_binding:exists(Binding)),
?assertMatch(ok, rabbit_db_binding:create(Binding, fun(_, _) -> ok end)),
?assertEqual(true, rabbit_db_binding:exists(Binding)),
passed.
Expand All @@ -121,8 +123,8 @@ delete(Config) ->
delete1(_Config) ->
XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
Exchange1 = #exchange{name = XName1, durable = true, auto_delete = false},
Exchange2 = #exchange{name = XName2, durable = true, auto_delete = false},
Exchange1 = #exchange{name = XName1, durable = true, auto_delete = false, decorators = {[], []}},
Exchange2 = #exchange{name = XName2, durable = true, auto_delete = false, decorators = {[], []}},
Binding = #binding{source = XName1, key = <<"">>, destination = XName2, args = #{}},
?assertEqual(ok, rabbit_db_binding:delete(Binding, fun(_, _) -> ok end)),
?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange1)),
Expand All @@ -142,8 +144,8 @@ auto_delete(Config) ->
auto_delete1(_Config) ->
XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
Exchange1 = #exchange{name = XName1, durable = true, auto_delete = true},
Exchange2 = #exchange{name = XName2, durable = true, auto_delete = false},
Exchange1 = #exchange{name = XName1, durable = true, auto_delete = true, decorators = {[], []}},
Exchange2 = #exchange{name = XName2, durable = true, auto_delete = false, decorators = {[], []}},
Binding = #binding{source = XName1, key = <<"">>, destination = XName2, args = #{}},
?assertEqual(ok, rabbit_db_binding:delete(Binding, fun(_, _) -> ok end)),
?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange1)),
Expand All @@ -163,8 +165,8 @@ get_all(Config) ->
get_all1(_Config) ->
XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
Exchange1 = #exchange{name = XName1, durable = true},
Exchange2 = #exchange{name = XName2, durable = true},
Exchange1 = #exchange{name = XName1, durable = true, decorators = {[], []}},
Exchange2 = #exchange{name = XName2, durable = true, decorators = {[], []}},
Binding = #binding{source = XName1, key = <<"">>, destination = XName2, args = #{}},
?assertEqual([], rabbit_db_binding:get_all()),
?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange1)),
Expand All @@ -179,8 +181,8 @@ get_all_by_vhost(Config) ->
get_all_by_vhost1(_Config) ->
XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
Exchange1 = #exchange{name = XName1, durable = true},
Exchange2 = #exchange{name = XName2, durable = true},
Exchange1 = #exchange{name = XName1, durable = true, decorators = {[], []}},
Exchange2 = #exchange{name = XName2, durable = true, decorators = {[], []}},
Binding = #binding{source = XName1, key = <<"">>, destination = XName2, args = #{}},
?assertEqual([], rabbit_db_binding:get_all(?VHOST)),
?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange1)),
Expand All @@ -197,8 +199,8 @@ get_all_for_source(Config) ->
get_all_for_source1(_Config) ->
XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
Exchange1 = #exchange{name = XName1, durable = true},
Exchange2 = #exchange{name = XName2, durable = true},
Exchange1 = #exchange{name = XName1, durable = true, decorators = {[], []}},
Exchange2 = #exchange{name = XName2, durable = true, decorators = {[], []}},
Binding = #binding{source = XName1, key = <<"">>, destination = XName2, args = #{}},
?assertEqual([], rabbit_db_binding:get_all_for_source(XName1)),
?assertEqual([], rabbit_db_binding:get_all_for_source(XName2)),
Expand All @@ -216,8 +218,8 @@ get_all_for_destination(Config) ->
get_all_for_destination1(_Config) ->
XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
Exchange1 = #exchange{name = XName1, durable = true},
Exchange2 = #exchange{name = XName2, durable = true},
Exchange1 = #exchange{name = XName1, durable = true, decorators = {[], []}},
Exchange2 = #exchange{name = XName2, durable = true, decorators = {[], []}},
Binding = #binding{source = XName1, key = <<"">>, destination = XName2, args = #{}},
?assertEqual([], rabbit_db_binding:get_all_for_destination(XName1)),
?assertEqual([], rabbit_db_binding:get_all_for_destination(XName2)),
Expand All @@ -235,8 +237,8 @@ get_all_for_source_and_destination(Config) ->
get_all_for_source_and_destination1(_Config) ->
XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
Exchange1 = #exchange{name = XName1, durable = true},
Exchange2 = #exchange{name = XName2, durable = true},
Exchange1 = #exchange{name = XName1, durable = true, decorators = {[], []}},
Exchange2 = #exchange{name = XName2, durable = true, decorators = {[], []}},
Binding = #binding{source = XName1, key = <<"">>, destination = XName2, args = #{}},
?assertEqual([], rabbit_db_binding:get_all(XName1, XName2, false)),
?assertEqual([], rabbit_db_binding:get_all(XName2, XName1, false)),
Expand All @@ -256,8 +258,8 @@ get_all_for_source_and_destination_reverse(Config) ->
get_all_for_source_and_destination_reverse1(_Config) ->
XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
Exchange1 = #exchange{name = XName1, durable = true},
Exchange2 = #exchange{name = XName2, durable = true},
Exchange1 = #exchange{name = XName1, durable = true, decorators = {[], []}},
Exchange2 = #exchange{name = XName2, durable = true, decorators = {[], []}},
Binding = #binding{source = XName1, key = <<"">>, destination = XName2, args = #{}},
?assertEqual([], rabbit_db_binding:get_all(XName1, XName2, true)),
?assertEqual([], rabbit_db_binding:get_all(XName2, XName1, true)),
Expand All @@ -276,8 +278,8 @@ fold(Config) ->
fold1(_Config) ->
XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
Exchange1 = #exchange{name = XName1, durable = true},
Exchange2 = #exchange{name = XName2, durable = true},
Exchange1 = #exchange{name = XName1, durable = true, decorators = {[], []}},
Exchange2 = #exchange{name = XName2, durable = true, decorators = {[], []}},
Binding = #binding{source = XName1, key = <<"">>, destination = XName2, args = #{}},
?assertEqual([], rabbit_db_binding:fold(fun(B, Acc) -> [B | Acc] end, [])),
?assertMatch({new, #exchange{}}, rabbit_db_exchange:create_or_get(Exchange1)),
Expand All @@ -292,8 +294,8 @@ match(Config) ->
match1(_Config) ->
XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
Exchange1 = #exchange{name = XName1, durable = true},
Exchange2 = #exchange{name = XName2, durable = true},
Exchange1 = #exchange{name = XName1, durable = true, decorators = {[], []}},
Exchange2 = #exchange{name = XName2, durable = true, decorators = {[], []}},
Binding = #binding{source = XName1, key = <<"">>, destination = XName2,
args = #{foo => bar}},
?assertEqual([], rabbit_db_binding:match(XName1, fun(#binding{args = Args}) ->
Expand All @@ -318,8 +320,8 @@ match_routing_key(Config) ->
match_routing_key1(_Config) ->
XName1 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange1">>),
XName2 = rabbit_misc:r(?VHOST, exchange, <<"test-exchange2">>),
Exchange1 = #exchange{name = XName1, durable = true},
Exchange2 = #exchange{name = XName2, durable = true},
Exchange1 = #exchange{name = XName1, durable = true, decorators = {[], []}},
Exchange2 = #exchange{name = XName2, durable = true, decorators = {[], []}},
Binding = #binding{source = XName1, key = <<"*.*">>, destination = XName2,
args = #{foo => bar}},
?assertEqual([], rabbit_db_binding:match_routing_key(XName1, [<<"a.b.c">>], false)),
Expand Down

0 comments on commit be22e98

Please sign in to comment.