diff --git a/src/emqtt.erl b/src/emqtt.erl index fe0130b..e37cac1 100644 --- a/src/emqtt.erl +++ b/src/emqtt.erl @@ -1639,9 +1639,10 @@ handle_unknown_event(EventType, EventContent, StateName, State) -> %% Mandatory callback functions terminate(Reason, _StateName, State = #state{conn_mod = ConnMod, socket = Socket}) -> - reply_all_inflight_reqs(Reason, State), - ok = reply_all_pendings_reqs(Reason, State), - ok = eval_msg_handler(State, disconnected, Reason), + Reason1 = unwrap_shutdown(Reason), + ok = reply_all_inflight_reqs(Reason1, State), + ok = reply_all_pendings_reqs(Reason1, State), + ok = eval_msg_handler(State, disconnected, Reason1), ok = close_socket(ConnMod, Socket). %% Downgrade @@ -2018,7 +2019,7 @@ deliver(_Via, {pubrel, Msg}, State) -> State. eval_msg_handler(#state{msg_handler = ?NO_HANDLER, owner = Owner}, disconnected, - {shutdown, {disconnected, ReasonCode, Properties}}) when is_integer(ReasonCode) -> + {disconnected, ReasonCode, Properties}) when is_integer(ReasonCode) -> %% Special handling for disconnected message when there is no handler callback Owner ! {disconnected, ReasonCode, Properties}, ok; @@ -2077,19 +2078,11 @@ maybe_reconnect(Reason, #state{reconnect = Re} = State) when ?NEED_RECONNECT(Re) maybe_reconnect(Reason, State) -> shutdown(Reason, State). -shutdown(normal, State) -> - {stop, normal, State}; -shutdown({shutdown, _} = Reason, State) -> - {stop, Reason, State}; shutdown(Reason, State) -> - {stop, {shutdown, Reason}, State}. + {stop, wrap_shutdown(Reason), State}. -shutdown_reply(normal, From, Reply) -> - {stop_and_reply, normal, [{reply, From, Reply}]}; -shutdown_reply({shutdown, _} = Reason, From, Reply) -> - {stop_and_reply, Reason, [{reply, From, Reply}]}; shutdown_reply(Reason, From, Reply) -> - {stop_and_reply, {shutdown, Reason}, [{reply, From, Reply}]}. + {stop_and_reply, wrap_shutdown(Reason), [{reply, From, Reply}]}. reply_all_inflight_reqs(Reason, #state{inflight = Inflight}) -> %% reply error to all pendings caller @@ -2318,6 +2311,13 @@ prepare_reconnect(#state{ next_retry_cnt(infinity) -> infinity; next_retry_cnt(Cnt) -> Cnt - 1. +wrap_shutdown(normal) -> normal; +wrap_shutdown({shutdown, _} = Reason) -> Reason; +wrap_shutdown(Reason) -> {shutdown, Reason}. + +unwrap_shutdown({shutdown, Reason}) -> Reason; +unwrap_shutdown(Reason) -> Reason. + close_socket(_, undefined) -> ok; close_socket(_, ?socket_reconnecting) -> diff --git a/test/emqtt_SUITE.erl b/test/emqtt_SUITE.erl index e41d867..dbae4f8 100644 --- a/test/emqtt_SUITE.erl +++ b/test/emqtt_SUITE.erl @@ -740,7 +740,7 @@ test_publish_port_error(Config) -> after 5000 -> ct:fail(timeout) end, - ?assertEqual({error, {shutdown, killed}}, PublishResult), + ?assertEqual({error, killed}, PublishResult), meck:unload(emqtt_sock). t_publish_port_error_retry(Config) -> @@ -937,10 +937,10 @@ t_eval_callback_in_order(Config) -> mock_quic_send(fun(_, _) -> {error, closed} end), ?assertMatch([{1, ok}, %% qos0: treat send as successfully - {2, {error, {shutdown, closed}}}, %% from inflight - {3, {error, {shutdown, closed}}}, - {4, {error, {shutdown, closed}}}, %% from pending request queue - {5, {error, {shutdown, closed}}}, + {2, {error, closed}}, %% from inflight + {3, {error, closed}}, + {4, {error, closed}}, %% from pending request queue + {5, {error, closed}}, {'EXIT', C, {shutdown, closed}}], ?COLLECT_ASYNC_RESULT(C)), meck:unload(emqtt_sock),