Skip to content

Commit

Permalink
Merge pull request #265 from emqx/fix-shutdown-handler
Browse files Browse the repository at this point in the history
chore: improve code for wrapping shutdown
  • Loading branch information
terry-xiaoyu authored Nov 25, 2024
2 parents f670f3a + a7016ff commit c98ad4c
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 19 deletions.
28 changes: 14 additions & 14 deletions src/emqtt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1639,9 +1639,10 @@ handle_unknown_event(EventType, EventContent, StateName, State) ->

%% Mandatory callback functions
terminate(Reason, _StateName, State = #state{conn_mod = ConnMod, socket = Socket}) ->
reply_all_inflight_reqs(Reason, State),
ok = reply_all_pendings_reqs(Reason, State),
ok = eval_msg_handler(State, disconnected, Reason),
Reason1 = unwrap_shutdown(Reason),
ok = reply_all_inflight_reqs(Reason1, State),
ok = reply_all_pendings_reqs(Reason1, State),
ok = eval_msg_handler(State, disconnected, Reason1),
ok = close_socket(ConnMod, Socket).

%% Downgrade
Expand Down Expand Up @@ -2018,7 +2019,7 @@ deliver(_Via, {pubrel, Msg}, State) ->
State.

eval_msg_handler(#state{msg_handler = ?NO_HANDLER, owner = Owner}, disconnected,
{shutdown, {disconnected, ReasonCode, Properties}}) when is_integer(ReasonCode) ->
{disconnected, ReasonCode, Properties}) when is_integer(ReasonCode) ->
%% Special handling for disconnected message when there is no handler callback
Owner ! {disconnected, ReasonCode, Properties},
ok;
Expand Down Expand Up @@ -2077,19 +2078,11 @@ maybe_reconnect(Reason, #state{reconnect = Re} = State) when ?NEED_RECONNECT(Re)
maybe_reconnect(Reason, State) ->
shutdown(Reason, State).

shutdown(normal, State) ->
{stop, normal, State};
shutdown({shutdown, _} = Reason, State) ->
{stop, Reason, State};
shutdown(Reason, State) ->
{stop, {shutdown, Reason}, State}.
{stop, wrap_shutdown(Reason), State}.

shutdown_reply(normal, From, Reply) ->
{stop_and_reply, normal, [{reply, From, Reply}]};
shutdown_reply({shutdown, _} = Reason, From, Reply) ->
{stop_and_reply, Reason, [{reply, From, Reply}]};
shutdown_reply(Reason, From, Reply) ->
{stop_and_reply, {shutdown, Reason}, [{reply, From, Reply}]}.
{stop_and_reply, wrap_shutdown(Reason), [{reply, From, Reply}]}.

reply_all_inflight_reqs(Reason, #state{inflight = Inflight}) ->
%% reply error to all pendings caller
Expand Down Expand Up @@ -2318,6 +2311,13 @@ prepare_reconnect(#state{
next_retry_cnt(infinity) -> infinity;
next_retry_cnt(Cnt) -> Cnt - 1.

wrap_shutdown(normal) -> normal;
wrap_shutdown({shutdown, _} = Reason) -> Reason;
wrap_shutdown(Reason) -> {shutdown, Reason}.

unwrap_shutdown({shutdown, Reason}) -> Reason;
unwrap_shutdown(Reason) -> Reason.

close_socket(_, undefined) ->
ok;
close_socket(_, ?socket_reconnecting) ->
Expand Down
10 changes: 5 additions & 5 deletions test/emqtt_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ test_publish_port_error(Config) ->
after 5000 ->
ct:fail(timeout)
end,
?assertEqual({error, {shutdown, killed}}, PublishResult),
?assertEqual({error, killed}, PublishResult),
meck:unload(emqtt_sock).

t_publish_port_error_retry(Config) ->
Expand Down Expand Up @@ -937,10 +937,10 @@ t_eval_callback_in_order(Config) ->
mock_quic_send(fun(_, _) -> {error, closed} end),

?assertMatch([{1, ok}, %% qos0: treat send as successfully
{2, {error, {shutdown, closed}}}, %% from inflight
{3, {error, {shutdown, closed}}},
{4, {error, {shutdown, closed}}}, %% from pending request queue
{5, {error, {shutdown, closed}}},
{2, {error, closed}}, %% from inflight
{3, {error, closed}},
{4, {error, closed}}, %% from pending request queue
{5, {error, closed}},
{'EXIT', C, {shutdown, closed}}], ?COLLECT_ASYNC_RESULT(C)),

meck:unload(emqtt_sock),
Expand Down

0 comments on commit c98ad4c

Please sign in to comment.