Skip to content

Commit

Permalink
Remove most of the fd related FHC code
Browse files Browse the repository at this point in the history
Stats were not removed, including management UI stats
relating to FDs.

Web-MQTT and Web-STOMP configuration relating to FHC
were not removed.

The file_handle_cache itself must be kept until we
remove CQv1.
  • Loading branch information
lhoguin committed Jun 13, 2024
1 parent 0f00374 commit 7ca27e2
Show file tree
Hide file tree
Showing 25 changed files with 60 additions and 394 deletions.
8 changes: 0 additions & 8 deletions deps/amqp_client/src/amqp_network_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ do_connect({Addr, Family},
connection_timeout = Timeout,
socket_options = ExtraOpts},
SIF, State) ->
ok = obtain(),
case gen_tcp:connect(Addr, Port,
[Family | ?RABBIT_TCP_OPTS] ++ ExtraOpts,
Timeout) of
Expand All @@ -134,7 +133,6 @@ do_connect({Addr, Family},
SIF, State) ->
{ok, GlobalSslOpts} = application:get_env(amqp_client, ssl_options),
app_utils:start_applications([asn1, crypto, public_key, ssl]),
ok = obtain(),
case gen_tcp:connect(Addr, Port,
[Family | ?RABBIT_TCP_OPTS] ++ ExtraOpts,
Timeout) of
Expand Down Expand Up @@ -379,11 +377,5 @@ handshake_recv(Expecting) ->
end
end.

obtain() ->
case code:is_loaded(file_handle_cache) of
false -> ok;
_ -> file_handle_cache:obtain()
end.

get_reason(#'connection.close'{reply_code = ErrCode}) ->
?PROTOCOL:amqp_exception(ErrCode).
7 changes: 2 additions & 5 deletions deps/rabbit/src/rabbit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -751,9 +751,6 @@ status() ->
get_disk_free_limit, []}},
{disk_free, {rabbit_disk_monitor,
get_disk_free, []}}]),
S3 = rabbit_misc:with_exit_handler(
fun () -> [] end,
fun () -> [{file_descriptors, file_handle_cache:info()}] end),
S4 = [{processes, [{limit, erlang:system_info(process_limit)},
{used, erlang:system_info(process_count)}]},
{run_queue, erlang:statistics(run_queue)},
Expand Down Expand Up @@ -788,7 +785,7 @@ status() ->
(_) -> false
end,
maps:to_list(product_info())),
S1 ++ S2 ++ S3 ++ S4 ++ S5 ++ S6 ++ S7 ++ S8.
S1 ++ S2 ++ S4 ++ S5 ++ S6 ++ S7 ++ S8.

alarms() ->
Alarms = rabbit_misc:with_exit_handler(rabbit_misc:const([]),
Expand Down Expand Up @@ -1663,7 +1660,7 @@ config_files() ->
start_fhc() ->
ok = rabbit_sup:start_restartable_child(
file_handle_cache,
[fun rabbit_alarm:set_alarm/1, fun rabbit_alarm:clear_alarm/1]),
[fun(_) -> ok end, fun(_) -> ok end]),
ensure_working_fhc().

ensure_working_fhc() ->
Expand Down
21 changes: 0 additions & 21 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

-module(rabbit_amqqueue).

-export([warn_file_limit/0]).
-export([recover/1, stop/1, start/1, declare/6, declare/7,
delete_immediately/1, delete_exclusive/2, delete/4, purge/1,
forget_all_durable/1]).
Expand Down Expand Up @@ -119,21 +118,6 @@
active, activity_status, arguments]).
-define(KILL_QUEUE_DELAY_INTERVAL, 100).

warn_file_limit() ->
DurableQueues = find_recoverable_queues(),
L = length(DurableQueues),

%% if there are not enough file handles, the server might hang
%% when trying to recover queues, warn the user:
case file_handle_cache:get_limit() < L of
true ->
rabbit_log:warning(
"Recovering ~tp queues, available file handles: ~tp. Please increase max open file handles limit to at least ~tp!",
[L, file_handle_cache:get_limit(), L]);
false ->
ok
end.

-spec recover(rabbit_types:vhost()) ->
{Recovered :: [amqqueue:amqqueue()],
Failed :: [amqqueue:amqqueue()]}.
Expand Down Expand Up @@ -183,11 +167,6 @@ find_local_durable_queues(VHostName) ->
rabbit_queue_type:is_recoverable(Q)
end).

find_recoverable_queues() ->
rabbit_db_queue:filter_all_durable(fun(Q) ->
rabbit_queue_type:is_recoverable(Q)
end).

-spec declare(name(),
boolean(),
boolean(),
Expand Down
7 changes: 0 additions & 7 deletions deps/rabbit/src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,6 @@ init_it2(Recover, From, State = #q{q = Q,
(Res == created orelse Res == existing) ->
case matches(Recover, Q, Q1) of
true ->
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [self()]),
ok = rabbit_memory_monitor:register(
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
Expand Down Expand Up @@ -1194,7 +1192,6 @@ prioritise_cast(Msg, _Len, State) ->
delete_immediately -> 8;
{delete_exclusive, _Pid} -> 8;
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
{run_backing_queue, _Mod, _Fun} -> 6;
{ack, _AckTags, _ChPid} -> 4; %% [1]
{resume, _ChPid} -> 3;
Expand Down Expand Up @@ -1510,10 +1507,6 @@ handle_cast({set_ram_duration_target, Duration},
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
noreply(State#q{backing_queue_state = BQS1});

handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);

handle_cast({credit, SessionPid, CTag, Credit, Drain},
#q{q = Q,
backing_queue = BQ,
Expand Down
25 changes: 2 additions & 23 deletions deps/rabbit/src/rabbit_classic_queue_index_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,6 @@
-define(HEADER_SIZE, 64). %% bytes
-define(ENTRY_SIZE, 32). %% bytes

%% The file_handle_cache module tracks reservations at
%% the level of the process. This means we cannot
%% handle them independently in the store and index.
%% Because the index may reserve more FDs than the
%% store the index becomes responsible for this and
%% will always reserve at least 2 FDs, and release
%% everything when terminating.
-define(STORE_FD_RESERVATIONS, 2).

-include_lib("rabbit_common/include/rabbit.hrl").
%% Set to true to get an awful lot of debug logs.
-if(false).
Expand Down Expand Up @@ -538,7 +529,6 @@ terminate(VHost, Terms, State0 = #qi { dir = Dir,
ok = file:sync(Fd),
ok = file:close(Fd)
end, OpenFds),
file_handle_cache:release_reservation(),
%% Write recovery terms for faster recovery.
_ = rabbit_recovery_terms:store(VHost,
filename:basename(rabbit_file:binary_to_filename(Dir)),
Expand All @@ -555,7 +545,6 @@ delete_and_terminate(State = #qi { dir = Dir,
_ = maps:map(fun(_, Fd) ->
ok = file:close(Fd)
end, OpenFds),
file_handle_cache:release_reservation(),
%% Erase the data on disk.
ok = erase_index_dir(rabbit_file:binary_to_filename(Dir)),
State#qi{ segments = #{},
Expand Down Expand Up @@ -626,18 +615,9 @@ new_segment_file(Segment, SegmentEntryCount, State = #qi{ segments = Segments })
%% using too many FDs when the consumer lags a lot. We
%% limit at 4 because we try to keep up to 2 for reading
%% and 2 for writing.
reduce_fd_usage(SegmentToOpen, State = #qi{ fds = OpenFds })
reduce_fd_usage(_SegmentToOpen, State = #qi{ fds = OpenFds })
when map_size(OpenFds) < 4 ->
%% The only case where we need to update reservations is
%% when we are opening a segment that wasn't already open,
%% and we are not closing another segment at the same time.
case OpenFds of
#{SegmentToOpen := _} ->
State;
_ ->
file_handle_cache:set_reservation(?STORE_FD_RESERVATIONS + map_size(OpenFds) + 1),
State
end;
State;
reduce_fd_usage(SegmentToOpen, State = #qi{ fds = OpenFds0 }) ->
case OpenFds0 of
#{SegmentToOpen := _} ->
Expand Down Expand Up @@ -868,7 +848,6 @@ delete_segment(Segment, State0 = #qi{ fds = OpenFds0 }) ->
State = case maps:take(Segment, OpenFds0) of
{Fd, OpenFds} ->
ok = file:close(Fd),
file_handle_cache:set_reservation(?STORE_FD_RESERVATIONS + map_size(OpenFds)),
State0#qi{ fds = OpenFds };
error ->
State0
Expand Down
5 changes: 0 additions & 5 deletions deps/rabbit/src/rabbit_classic_queue_store_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@
%% need to look into the store to discard them. Messages on disk
%% will be dropped at the same time as the index deletes the
%% corresponding segment file.
%%
%% The file_handle_cache reservations are done by the v2 index
%% because they are handled at a pid level. Since we are using
%% up to 2 FDs in this module we make the index reserve 2 extra
%% FDs.

-module(rabbit_classic_queue_store_v2).

Expand Down
11 changes: 2 additions & 9 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -894,10 +894,8 @@ state_enter0(leader, #?MODULE{consumers = Cons,
Mons = [{monitor, process, P} || P <- Pids],
Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
FHReservation = [{mod_call, rabbit_quorum_queue,
file_handle_leader_reservation, [Resource]}],
NotifyDecs = notify_decorators_startup(Resource),
Effects = TimerEffs ++ Mons ++ Nots ++ NodeMons ++ FHReservation ++ [NotifyDecs],
Effects = TimerEffs ++ Mons ++ Nots ++ NodeMons ++ [NotifyDecs],
case BLH of
undefined ->
Effects;
Expand All @@ -914,12 +912,7 @@ state_enter0(eol, #?MODULE{enqueuers = Enqs,
AllConsumers = maps:merge(Custs, WaitingConsumers1),
[{send_msg, P, eol, ra_event}
|| P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++
[{aux, eol},
{mod_call, rabbit_quorum_queue, file_handle_release_reservation, []} | Effects];
state_enter0(State, #?MODULE{cfg = #cfg{resource = _Resource}}, Effects)
when State =/= leader ->
FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []},
[FHReservation | Effects];
[{aux, eol} | Effects];
state_enter0(_, _, Effects) ->
%% catch all as not handling all states
Effects.
Expand Down
10 changes: 2 additions & 8 deletions deps/rabbit/src/rabbit_fifo_v0.erl
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,6 @@ state_enter(leader, #?STATE{consumers = Cons,
enqueuers = Enqs,
waiting_consumers = WaitingConsumers,
cfg = #cfg{name = Name,
resource = Resource,
become_leader_handler = BLH},
prefix_msgs = {0, [], 0, []}
}) ->
Expand All @@ -559,8 +558,7 @@ state_enter(leader, #?STATE{consumers = Cons,
Mons = [{monitor, process, P} || P <- Pids],
Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
FHReservation = [{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}],
Effects = Mons ++ Nots ++ NodeMons ++ FHReservation,
Effects = Mons ++ Nots ++ NodeMons,
case BLH of
undefined ->
Effects;
Expand All @@ -575,11 +573,7 @@ state_enter(eol, #?STATE{enqueuers = Enqs,
#{}, WaitingConsumers0),
AllConsumers = maps:merge(Custs, WaitingConsumers1),
[{send_msg, P, eol, ra_event}
|| P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++
[{mod_call, rabbit_quorum_queue, file_handle_release_reservation, []}];
state_enter(State, #?STATE{cfg = #cfg{resource = _Resource}}) when State =/= leader ->
FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []},
[FHReservation];
|| P <- maps:keys(maps:merge(Enqs, AllConsumers))];
state_enter(_, _) ->
%% catch all as not handling all states
[].
Expand Down
10 changes: 2 additions & 8 deletions deps/rabbit/src/rabbit_fifo_v1.erl
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,6 @@ state_enter(leader, #?STATE{consumers = Cons,
enqueuers = Enqs,
waiting_consumers = WaitingConsumers,
cfg = #cfg{name = Name,
resource = Resource,
become_leader_handler = BLH},
prefix_msgs = {0, [], 0, []}
}) ->
Expand All @@ -687,8 +686,7 @@ state_enter(leader, #?STATE{consumers = Cons,
Mons = [{monitor, process, P} || P <- Pids],
Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
FHReservation = [{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}],
Effects = Mons ++ Nots ++ NodeMons ++ FHReservation,
Effects = Mons ++ Nots ++ NodeMons,
case BLH of
undefined ->
Effects;
Expand All @@ -704,11 +702,7 @@ state_enter(eol, #?STATE{enqueuers = Enqs,
AllConsumers = maps:merge(Custs, WaitingConsumers1),
[{send_msg, P, eol, ra_event}
|| P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++
[{aux, eol},
{mod_call, rabbit_quorum_queue, file_handle_release_reservation, []}];
state_enter(State, #?STATE{cfg = #cfg{resource = _Resource}}) when State =/= leader ->
FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []},
[FHReservation];
[{aux, eol}];
state_enter(_, _) ->
%% catch all as not handling all states
[].
Expand Down
Loading

0 comments on commit 7ca27e2

Please sign in to comment.