Skip to content

Commit

Permalink
Add hibernate_after option for c2s and receiver.
Browse files Browse the repository at this point in the history
The default of 0 means that the processes hibernate on every
opportunity.
  • Loading branch information
kzemek committed Apr 30, 2018
1 parent 596fbc5 commit 5f4a249
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 58 deletions.
3 changes: 3 additions & 0 deletions doc/advanced-configuration/Listener-modules.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ You only need to declare running `ejabberd_c2s`, to have the other 2 modules sta
* `protocol_options` List of supported SSL protocols, default "no_sslv3".
It also accepts "no_tlsv1" and "no_tlsv1_1"
* `dhfile` (string, default: no DH file will be used) - Path to the Diffie Hellman parameter file
* `hibernate_after` (integer, default: 0) - Time in milliseconds after which a client process spawned by this listener will hibernate.
Hibernation greatly reduces memory consumption of client processes, but *may* result in increased CPU consumption if a client is used *very* frequently.
The default, recommended value of 0 means that the client processes will hibernate at every opportunity.

## HTTP-based services (BOSH, WebSocket, REST): `ejabberd_cowboy`

Expand Down
4 changes: 2 additions & 2 deletions include/ejabberd_c2s.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@
stream_mgmt_resume_tref,
stream_mgmt_constraint_check_tref,
csi_state = active :: mod_csi:state(),
csi_buffer = []
csi_buffer = [],
hibernate_after = 0 :: non_neg_integer()
}).
-type aux_key() :: atom().
-type aux_value() :: any().
Expand Down Expand Up @@ -138,7 +139,6 @@
%% This is the timeout to apply between event when starting a new
%% session:
-define(C2S_OPEN_TIMEOUT, 60000).
-define(C2S_HIBERNATE_TIMEOUT, 90000).

-define(STREAM_HEADER,
"<?xml version='1.0'?>"
Expand Down
42 changes: 28 additions & 14 deletions src/ejabberd_c2s.erl
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ init([{SockMod, Socket}, Opts]) ->
true -> verify_peer;
false -> verify_none
end,
HibernateAfter =
case lists:keyfind(hibernate_after, 1, Opts) of
{_, HA} -> HA;
_ -> 0
end,
StartTLS = lists:member(starttls, Opts) orelse Verify =:= verify_peer,
StartTLSRequired = lists:member(starttls_required, Opts),
TLSEnabled = lists:member(tls, Opts),
Expand Down Expand Up @@ -225,7 +230,8 @@ init([{SockMod, Socket}, Opts]) ->
access = Access,
shaper = Shaper,
ip = IP,
lang = default_language()},
lang = default_language(),
hibernate_after= HibernateAfter},
?C2S_OPEN_TIMEOUT}
end.

Expand Down Expand Up @@ -878,7 +884,8 @@ do_open_session(Acc, JID, StateData) ->
{stop, normal, NextStateData} -> % error, resume not possible
c2s_stream_error(mongoose_xmpp_errors:stream_internal_server_error(), NextStateData),
{stop, Acc1, NStateData};
{_, _, NextStateData, _} ->
Result when is_tuple(Result) ->
NextStateData = element(3, Result),
do_open_session_common(Acc1, JID, NextStateData)
end
end.
Expand Down Expand Up @@ -993,11 +1000,7 @@ session_established({xmlstreamelement, El}, StateData) ->
%% We hibernate the process to reduce memory consumption after a
%% configurable activity timeout
session_established(timeout, StateData) ->
%% TODO: Options must be stored in state:
Options = [],
proc_lib:hibernate(p1_fsm_old, enter_loop,
[?MODULE, Options, session_established, StateData]),
fsm_next_state(session_established, StateData);
hibernate({next_state, session_established, StateData});
session_established({xmlstreamend, _Name}, StateData) ->
send_trailer(StateData),
{stop, normal, StateData};
Expand Down Expand Up @@ -1094,18 +1097,18 @@ resume_session({xmlstreamelement, _}, StateData) ->
<<"session in resume state cannot accept incoming stanzas">>),
maybe_send_element_safe(StateData, Err),
maybe_send_trailer_safe(StateData),
{next_state, resume_session, StateData, hibernate};
hibernate({next_state, resume_session, StateData});

%%-------------------------------------------------------------------------
%% ignore mod_ping closed messages because we are already in resume session
%% state
resume_session(closed, StateData) ->
{next_state, resume_session, StateData, hibernate};
hibernate({next_state, resume_session, StateData});
resume_session(timeout, StateData) ->
{next_state, resume_session, StateData, hibernate};
hibernate({next_state, resume_session, StateData});
resume_session(Msg, StateData) ->
?WARNING_MSG("unexpected message ~p", [Msg]),
{next_state, resume_session, StateData, hibernate}.
hibernate({next_state, resume_session, StateData}).


%%----------------------------------------------------------------------
Expand Down Expand Up @@ -2579,15 +2582,15 @@ fsm_next_state_gc(StateName, PackedStateData) ->
%% @doc fsm_next_state: Generate the next_state FSM tuple with different
%% timeout, depending on the future state
fsm_next_state(session_established, StateData) ->
{next_state, session_established, StateData, ?C2S_HIBERNATE_TIMEOUT};
maybe_hibernate({next_state, session_established, StateData});
fsm_next_state(StateName, StateData) ->
{next_state, StateName, StateData, ?C2S_OPEN_TIMEOUT}.


%% @doc fsm_reply: Generate the reply FSM tuple with different timeout,
%% depending on the future state
fsm_reply(Reply, session_established, StateData) ->
{reply, Reply, session_established, StateData, ?C2S_HIBERNATE_TIMEOUT};
maybe_hibernate({reply, Reply, session_established, StateData});
fsm_reply(Reply, StateName, StateData) ->
{reply, Reply, StateName, StateData, ?C2S_OPEN_TIMEOUT}.

Expand Down Expand Up @@ -3106,7 +3109,7 @@ maybe_enter_resume_session(_SMID, #state{} = SD) ->
_TRef ->
SD
end,
{next_state, resume_session, NSD, hibernate}.
hibernate({next_state, resume_session, NSD}).

maybe_resume_session(NextState, El, StateData) ->
case {xml:get_tag_attr_s(<<"xmlns">>, El),
Expand Down Expand Up @@ -3353,3 +3356,14 @@ setup_accum(Acc, StateData) ->
Server = StateData#state.server,
mongoose_acc:update(Acc, #{server => Server, user => User}).

hibernate(Result) ->
case process_info(self(), message_queue_len) of
{_, 0} -> erlang:append_element(Result, hibernate);
_ -> Result
end.

maybe_hibernate(Result) ->
case element(tuple_size(Result), Result) of
#state{hibernate_after = 0} -> hibernate(Result);
#state{hibernate_after = HA} -> erlang:append_element(Result, HA)
end.
82 changes: 48 additions & 34 deletions src/ejabberd_receiver.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

%% API
-export([start_link/4,
start/3,
start/4,
change_shaper/2,
starttls/2,
Expand All @@ -54,11 +53,10 @@
max_stanza_size,
stanza_chunk_size,
parser,
timeout}).
timeout,
hibernate_after = 0 :: non_neg_integer()}).
-type state() :: #state{}.

-define(HIBERNATE_TIMEOUT, 90000).

%%====================================================================
%% API
%%====================================================================
Expand All @@ -67,21 +65,18 @@
%% Description: Starts the server
%%--------------------------------------------------------------------
-spec start_link(_, _, _, _) -> 'ignore' | {'error', _} | {'ok', pid()}.
start_link(Socket, SockMod, Shaper, MaxStanzaSize) ->
start_link(Socket, SockMod, Shaper, ConnOpts) ->
gen_server:start_link(
?MODULE, [Socket, SockMod, Shaper, MaxStanzaSize], []).
?MODULE, [Socket, SockMod, Shaper, ConnOpts], []).

%%--------------------------------------------------------------------
%% Function: start() -> {ok, Pid} | ignore | {error, Error}
%% Description: Starts the server
%%--------------------------------------------------------------------
start(Socket, SockMod, Shaper) ->
start(Socket, SockMod, Shaper, infinity).

start(Socket, SockMod, Shaper, MaxStanzaSize) ->
start(Socket, SockMod, Shaper, ConnOpts) ->
{ok, Pid} = supervisor:start_child(
ejabberd_receiver_sup,
[Socket, SockMod, Shaper, MaxStanzaSize]),
[Socket, SockMod, Shaper, ConnOpts]),
Pid.

-spec change_shaper(atom() | pid() | {atom(), _} | {'via', _, _}, _) -> 'ok'.
Expand Down Expand Up @@ -116,20 +111,31 @@ close(Pid) ->
%% Description: Initiates the server
%%--------------------------------------------------------------------
-spec init([any(), ...]) -> {'ok', state()}.
init([Socket, SockMod, Shaper, MaxStanzaSize]) ->
init([Socket, SockMod, Shaper, ConnOpts]) ->
ShaperState = shaper:new(Shaper),
Timeout = case SockMod of
ssl ->
20;
_ ->
infinity
end,
MaxStanzaSize =
case lists:keyfind(max_stanza_size, 1, ConnOpts) of
{_, Size} -> Size;
_ -> infinity
end,
HibernateAfter =
case lists:keyfind(hibernate_after, 1, ConnOpts) of
{_, HA} -> HA;
_ -> 0
end,
{ok, #state{socket = Socket,
sock_mod = SockMod,
shaper_state = ShaperState,
max_stanza_size = MaxStanzaSize,
stanza_chunk_size = 0,
timeout = Timeout}}.
timeout = Timeout,
hibernate_after = HibernateAfter}}.

%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
Expand All @@ -141,9 +147,8 @@ init([Socket, SockMod, Shaper, MaxStanzaSize]) ->
%% Description: Handling call messages
%%--------------------------------------------------------------------
handle_call(get_socket, _From, #state{socket = Socket} = State) ->
{reply, {ok, Socket}, State, ?HIBERNATE_TIMEOUT};
handle_call({starttls, TLSOpts}, From, #state{parser = Parser,
socket = TCPSocket} = State) ->
maybe_hibernate({reply, {ok, Socket}, State});
handle_call({starttls, TLSOpts}, From, #state{socket = TCPSocket} = State) ->
%% the next message from client is part of TLS handshake, it must
%% be handled by TLS library (another process in case of just_tls)
%% so deactivating the socket.
Expand All @@ -161,7 +166,7 @@ handle_call({starttls, TLSOpts}, From, #state{parser = Parser,
%% handshake. such call is simply ignored by just_tls backend.
case ejabberd_tls:recv_data(TLSSocket, <<"">>) of
{ok, TLSData} ->
{noreply, process_data(TLSData, NewState), ?HIBERNATE_TIMEOUT};
maybe_hibernate({noreply, process_data(TLSData, NewState)});
{error, Reason} ->
?WARNING_MSG("tcp_to_tls failed with reason ~p~n", [Reason]),
{stop, normal, NewState}
Expand All @@ -177,11 +182,11 @@ handle_call({compress, ZlibSocket}, _From,
sock_mod = ejabberd_zlib},
case ejabberd_zlib:recv_data(ZlibSocket, "") of
{ok, ZlibData} ->
{reply, ok, process_data(ZlibData, NewState), ?HIBERNATE_TIMEOUT};
maybe_hibernate({reply, ok, process_data(ZlibData, NewState)});
{error, inflate_size_exceeded} ->
apply(gen_fsm(), send_event,
[C2SPid, {xmlstreamerror, <<"child element too big">>}]),
{reply, ok, NewState, ?HIBERNATE_TIMEOUT};
maybe_hibernate({reply, ok, NewState});
{error, inflate_error} ->
{stop, normal, ok, NewState}
end;
Expand All @@ -190,10 +195,10 @@ handle_call({become_controller, C2SPid}, _From, State) ->
NewState = StateAfterReset#state{c2s_pid = C2SPid},
activate_socket(NewState),
Reply = ok,
{reply, Reply, NewState, ?HIBERNATE_TIMEOUT};
maybe_hibernate({reply, Reply, NewState});
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State, ?HIBERNATE_TIMEOUT}.
maybe_hibernate({reply, Reply, State}).

%%--------------------------------------------------------------------
%% Function: handle_cast(Msg, State) -> {noreply, State} |
Expand All @@ -203,11 +208,11 @@ handle_call(_Request, _From, State) ->
%%--------------------------------------------------------------------
handle_cast({change_shaper, Shaper}, State) ->
NewShaperState = shaper:new(Shaper),
{noreply, State#state{shaper_state = NewShaperState}, ?HIBERNATE_TIMEOUT};
maybe_hibernate({noreply, State#state{shaper_state = NewShaperState}});
handle_cast(close, State) ->
{stop, normal, State};
handle_cast(_Msg, State) ->
{noreply, State, ?HIBERNATE_TIMEOUT}.
maybe_hibernate({noreply, State}).

%%--------------------------------------------------------------------
%% Function: handle_info(Info, State) -> {noreply, State} |
Expand All @@ -226,8 +231,7 @@ handle_info({Tag, _TCPSocket, Data},
[data, xmpp, received, encrypted_size], size(Data)),
case ejabberd_tls:recv_data(Socket, Data) of
{ok, TLSData} ->
{noreply, process_data(TLSData, State),
?HIBERNATE_TIMEOUT};
maybe_hibernate({noreply, process_data(TLSData, State)});
{error, _Reason} ->
{stop, normal, State}
end;
Expand All @@ -236,17 +240,16 @@ handle_info({Tag, _TCPSocket, Data},
[data, xmpp, received, compressed_size], size(Data)),
case ejabberd_zlib:recv_data(Socket, Data) of
{ok, ZlibData} ->
{noreply, process_data(ZlibData, State),
?HIBERNATE_TIMEOUT};
maybe_hibernate({noreply, process_data(ZlibData, State)});
{error, inflate_size_exceeded} ->
apply(gen_fsm(), send_event,
[C2SPid, {xmlstreamerror, <<"child element too big">>}]),
{noreply, State, ?HIBERNATE_TIMEOUT};
maybe_hibernate({noreply, State});
{error, inflate_error} ->
{stop, normal, State}
end;
_ ->
{noreply, process_data(Data, State), ?HIBERNATE_TIMEOUT}
maybe_hibernate({noreply, process_data(Data, State)})
end;
handle_info({Tag, _TCPSocket}, State)
when (Tag == tcp_closed) or (Tag == ssl_closed) ->
Expand All @@ -255,18 +258,17 @@ handle_info({Tag, _TCPSocket, Reason}, State)
when (Tag == tcp_error) or (Tag == ssl_error) ->
case Reason of
timeout ->
{noreply, State, ?HIBERNATE_TIMEOUT};
maybe_hibernate({noreply, State});
_ ->
{stop, normal, State}
end;
handle_info({timeout, _Ref, activate}, State) ->
activate_socket(State),
{noreply, State, ?HIBERNATE_TIMEOUT};
maybe_hibernate({noreply, State});
handle_info(timeout, State) ->
proc_lib:hibernate(gen_server, enter_loop, [?MODULE, [], State]),
{noreply, State, ?HIBERNATE_TIMEOUT};
hibernate({noreply, State});
handle_info(_Info, State) ->
{noreply, State, ?HIBERNATE_TIMEOUT}.
maybe_hibernate({noreply, State}).

%%--------------------------------------------------------------------
%% Function: terminate(Reason, State) -> void()
Expand Down Expand Up @@ -421,3 +423,15 @@ gen_server_call_or_noproc(Pid, Message) ->
end.

gen_fsm() -> p1_fsm.

hibernate(Result) ->
case process_info(self(), message_queue_len) of
{_, 0} -> erlang:append_element(Result, hibernate);
_ -> Result
end.

maybe_hibernate(Result) ->
case element(tuple_size(Result), Result) of
#state{hibernate_after = 0} -> hibernate(Result);
#state{hibernate_after = HA} -> erlang:append_element(Result, HA)
end.
10 changes: 2 additions & 8 deletions src/ejabberd_socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,12 @@
start(Module, SockMod, Socket, Opts) ->
case Module:socket_type() of
xml_stream ->
MaxStanzaSize =
case lists:keysearch(max_stanza_size, 1, Opts) of
{value, {_, Size}} -> Size;
_ -> infinity
end,
{ReceiverMod, Receiver, RecRef} =
case catch SockMod:custom_receiver(Socket) of
{receiver, RecMod, RecPid} ->
{RecMod, RecPid, RecMod};
_ ->
RecPid = ejabberd_receiver:start(
Socket, SockMod, none, MaxStanzaSize),
RecPid = ejabberd_receiver:start(Socket, SockMod, none, Opts),
{ejabberd_receiver, RecPid, RecPid}
end,
SocketData = #socket_state{sockmod = SockMod,
Expand Down Expand Up @@ -143,7 +137,7 @@ connect(Addr, Port, Opts) ->
connect(Addr, Port, Opts, Timeout) ->
case gen_tcp:connect(Addr, Port, Opts, Timeout) of
{ok, Socket} ->
Receiver = ejabberd_receiver:start(Socket, gen_tcp, none),
Receiver = ejabberd_receiver:start(Socket, gen_tcp, none, Opts),
SocketData = #socket_state{sockmod = gen_tcp,
socket = Socket,
receiver = Receiver},
Expand Down

0 comments on commit 5f4a249

Please sign in to comment.