diff --git a/changelog.md b/changelog.md index 80fc86d..bbd1b60 100644 --- a/changelog.md +++ b/changelog.md @@ -1,7 +1,8 @@ # 1.13.4 -- Handle `tcp_error` and `ssl_error` at `waiting_for_connack` state. -- Change log level for `reconnect_due_to_connection_error` from `error` to `info` +- Handle CONNECT packet send error asynchronously so to allow a retry. +- Handle `tcp_error` and `ssl_error` at `waiting_for_connack` state so to allow a retry. +- Change log level for `reconnect_due_to_connection_error` from `error` to `info`. - Fix compile warnings on OTP 27. # 1.13.3 diff --git a/src/emqtt.erl b/src/emqtt.erl index 657af97..01c8591 100644 --- a/src/emqtt.erl +++ b/src/emqtt.erl @@ -129,6 +129,7 @@ -type(host() :: inet:ip_address() | inet:hostname() | binary_host()). -define(NO_HANDLER, undefined). +-define(socket_reconnecting, socket_reconnecting). -type(mfas() :: {module(), atom(), list()} | {function(), list()}). @@ -214,12 +215,13 @@ }). %% 'Via' field add ability of multi stream support for QUIC transport -%% For TCP based it should be always 'default' +%% For TCP based, always try use 'default' -type via() :: default % via default socket | {new_data_stream, quicer:stream_opts()} % Create and use new long living data stream | {new_req_stream, quicer:stream_opts()} % @TODO create and use short lived req stream | {logic_stream_id, non_neg_integer(), quicer:stream_opts()} - | inet:socket() | emqtt_quic:quic_sock(). + | inet:socket() | emqtt_quic:quic_sock() + | ?socket_reconnecting. -opaque(mqtt_msg() :: #mqtt_msg{}). @@ -306,8 +308,15 @@ -type(sent_at() :: non_neg_integer()). %% in millisecond +-type opname() :: connect | subscribe | unsubscribe | ping. +-record(callid, {op :: opname(), + via :: via(), + packet_id :: packet_id() | undefined + }). +-type callid() :: #callid{}. -export_type([publish_success/0, publish_reply/0]). + -define(PUB_REQ(Msg, Via, ExpireAt, Callback), {publish, Via, Msg, ExpireAt, Callback}). -define(INFLIGHT_PUBLISH(Via, Msg, SentAt, ExpireAt, Callback), @@ -960,7 +969,7 @@ initialized({call, From}, {connect, ConnMod}, State) -> case do_connect(ConnMod, qoe_inject(?FUNCTION_NAME, State)) of {ok, #state{connect_timeout = Timeout, socket = Via} = NewState} -> {next_state, waiting_for_connack, - add_call(new_call({connect, Via}, From), NewState), + add_call(new_call(call_id(connect, Via), From), NewState), {state_timeout, Timeout, Timeout}}; {error, Reason} -> Error = {error, Reason}, @@ -985,9 +994,10 @@ initialized({call, From}, quic_mqtt_connect, #state{socket = {quic, Conn, undefi case mqtt_connect(maybe_update_ctrl_sock(emqtt_quic, maybe_init_quic_state(emqtt_quic, State), NewSocket)) of {ok, #state{socket = Via} = NewState} -> {keep_state, - add_call(new_call({connect, Via}, From), NewState), + add_call(new_call(call_id(connect, Via), From), NewState), {reply, From, ok}}; {error, Reason} = Error-> + %% TODO: handle error async to allow reconnect {stop_and_reply, {shutdown, Reason}, [{reply, From, Error}]} end; initialized({call, From}, {new_data_stream, _StreamOpts} = Via0, State) -> @@ -999,18 +1009,38 @@ initialized(info, ?PUB_REQ(#mqtt_msg{}, _Via, _ExpireAt, _Callback) = PubReq, initialized(EventType, EventContent, State) -> handle_event(EventType, EventContent, initialized, State). -do_connect(ConnMod, #state{sock_opts = SockOpts, - connect_timeout = Timeout} = State) -> +do_connect(ConnMod, #state{pending_calls = Pendings, + sock_opts = SockOpts, + connect_timeout = Timeout + } = State) -> State0 = maybe_init_quic_state(ConnMod, State), - IsConnOpened = proplists:is_defined(handle, SockOpts), + IsUsingQuicHandle = proplists:is_defined(handle, SockOpts), case sock_connect(ConnMod, hosts(State0), SockOpts, Timeout) of skip -> {ok, State0}; - {ok, Sock} when not IsConnOpened -> - State1 = maybe_update_ctrl_sock(ConnMod, State0, Sock), + {ok, NewSock} when not IsUsingQuicHandle -> + State1 = maybe_update_ctrl_sock(ConnMod, State0, NewSock), State2 = qoe_inject(handshaked, State1), - mqtt_connect(run_sock(State2#state{conn_mod = ConnMod, socket = Sock})); + NewPendings = refresh_calls(Pendings, NewSock), + State3 = run_sock(State2#state{conn_mod = ConnMod, + socket = NewSock, + pending_calls = NewPendings + }), + case mqtt_connect(State3) of + {ok, State4} -> + {ok, State4}; + {error, Reason} -> + ?LOG(info, "failed_to_send_CONNECT", #{reason => Reason}, State), + %% Failed to send CONNECT packet. + %% wait for the async socket close or error event + {ok, State3} + end; + {error, econnreset} -> + %% TODO: handle econnreset. + %% For now this may lead to immediate shtudown even if reconnect is allowed + {error, econnreset}; {error, Reason} -> + %% cannot think of other reasons for delayed retry {error, Reason} end. @@ -1075,6 +1105,15 @@ reconnect({call, From}, stop, _State) -> {stop_and_reply, normal, [{reply, From, ok}]}; reconnect({call, From}, status, _State) -> {keep_state_and_data, {reply, From, reconnect}}; +reconnect(info, {Close, _StaleSock}, _State) when ?SOCK_CLOSED(Close) -> + %% ignore stale socket close events + keep_state_and_data; +reconnect(info, {Error, StaleSock, _Reason}, #state{socket = StaleSock}) when ?SOCK_ERROR(Error) -> + %% ignore stale socket error events + keep_state_and_data; +reconnect(info, {'EXIT', Owner, Reason}, State = #state{owner = Owner}) -> + ?LOG(debug, "EXIT_from_owner", #{reason => Reason}, State), + {stop, {shutdown, {owner, Owner, Reason}}, State}; reconnect(_EventType, _, _State) -> {keep_state_and_data, postpone}. @@ -1108,7 +1147,7 @@ waiting_for_connack(cast, {?CONNACK_PACKET(?RC_SUCCESS, Inflight1 = emqtt_inflight:limit(ReceiveMaximum, Inflight), State4 = State3#state{inflight = Inflight1}, Retry = [{next_event, info, immediate_retry} || not emqtt_inflight:is_empty(Inflight1)], - case take_call({connect, Via}, State4) of + case take_call(call_id(connect, Via), State4) of {value, #call{from = From}, State5} -> {next_state, connected, State5, [{reply, From, Reply} | Retry]}; false -> @@ -1122,7 +1161,7 @@ waiting_for_connack(cast, {?CONNACK_PACKET(ReasonCode, Properties), Via}, State = #state{proto_ver = ProtoVer, socket = Via, reconnect = Re}) -> Reason = reason_code_name(ReasonCode, ProtoVer), - case take_call({connect, Via}, State) of + case take_call(call_id(connect, Via), State) of {value, #call{from = From}, _State} -> Reply = {error, {Reason, Properties}}, {stop_and_reply, {shutdown, Reason}, [{reply, From, Reply}]}; @@ -1160,7 +1199,7 @@ waiting_for_connack(info, ?PUB_REQ(_Msg, _Via, _ExpireAt, _Callback), _State) -> {keep_state_and_data, postpone}; waiting_for_connack(state_timeout, _Timeout, #state{reconnect = Re} = State) -> - case take_call({connect, default_via(State)}, State) of + case take_call(call_id(connect, default_via(State)), State) of {value, #call{from = From}, _State} -> Reply = {error, connack_timeout}, {stop_and_reply, connack_timeout, [{reply, From, Reply}]}; @@ -1171,7 +1210,7 @@ waiting_for_connack(state_timeout, _Timeout, #state{reconnect = Re} = State) -> waiting_for_connack(EventType, EventContent, State) -> case handle_event(EventType, EventContent, waiting_for_connack, #state{socket = Via} = State) of {stop, Reason, NewState} -> - case take_call({connect, Via}, NewState) of + case take_call(call_id(connect, Via), NewState) of {value, #call{from = From}, _State} -> Reply = case Reason of {shutdown, _ShutdownReason} -> @@ -1220,7 +1259,7 @@ connected({call, From}, SubReq = {subscribe, Via0, Properties, Topics}, {Via, State1} = maybe_new_stream(Via0, State), case send(Via, ?SUBSCRIBE_PACKET(PacketId, Properties, Topics), State1) of {ok, NewState} -> - Call = new_call({subscribe, Via, PacketId}, From, SubReq), + Call = new_call(call_id(subscribe, Via, PacketId), From, SubReq), Subscriptions1 = lists:foldl(fun({Topic, Opts}, Acc) -> maps:put(Topic, Opts, Acc) @@ -1239,7 +1278,7 @@ connected({call, From}, UnsubReq = {unsubscribe, Via0, Properties, Topics}, {Via, State1} = maybe_new_stream(Via0, State), case send(Via, ?UNSUBSCRIBE_PACKET(PacketId, Properties, Topics), State1) of {ok, NewState} -> - Call = new_call({unsubscribe, Via, PacketId}, From, UnsubReq), + Call = new_call(call_id(unsubscribe, Via, PacketId), From, UnsubReq), {keep_state, ensure_ack_timer(add_call(Call, NewState))}; Error = {error, Reason} -> {stop_and_reply, Reason, [{reply, From, Error}]} @@ -1251,7 +1290,7 @@ connected({call, From}, {ping, Via0}, State) -> {Via, State1} = maybe_new_stream(Via0, State), case send(Via, ?PACKET(?PINGREQ), State1) of {ok, NewState} -> - Call = new_call({ping, Via}, From), + Call = new_call(call_id(ping, Via), From), {keep_state, ensure_ack_timer(add_call(Call, NewState))}; Error = {error, Reason} -> {stop_and_reply, Reason, [{reply, From, Error}]} @@ -1320,7 +1359,7 @@ connected(cast, {?PUBCOMP_PACKET(_PacketId, _ReasonCode, _Properties) = PubComp, connected(cast, {?SUBACK_PACKET(PacketId, Properties, ReasonCodes), Via}, State = #state{subscriptions = _Subscriptions}) -> - case take_call({subscribe, Via, PacketId}, State) of + case take_call(call_id(subscribe, Via, PacketId), State) of {value, #call{from = From}, NewState} -> NewProperties = case Properties of undefined -> #{via => Via}; @@ -1335,7 +1374,7 @@ connected(cast, {?SUBACK_PACKET(PacketId, Properties, ReasonCodes), Via}, connected(cast, {?UNSUBACK_PACKET(PacketId, Properties, ReasonCodes), Via}, State = #state{subscriptions = Subscriptions}) -> - case take_call({unsubscribe, Via, PacketId}, State) of + case take_call(call_id(unsubscribe, Via, PacketId), State) of {value, #call{from = From, req = {_, Via, _, Topics}}, NewState} -> Subscriptions1 = lists:foldl(fun(Topic, Acc) -> @@ -1356,7 +1395,7 @@ connected(cast, {?UNSUBACK_PACKET(PacketId, Properties, ReasonCodes), Via}, connected(cast, {?PACKET(?PINGRESP), _Via}, #state{pending_calls = []}) -> keep_state_and_data; connected(cast, {?PACKET(?PINGRESP), Via}, State) -> - case take_call({ping, Via}, State) of + case take_call(call_id(ping, Via), State) of {value, #call{from = From}, NewState} -> {keep_state, NewState, [{reply, From, pong}]}; false -> @@ -1365,8 +1404,8 @@ connected(cast, {?PACKET(?PINGRESP), Via}, State) -> connected(cast, {?DISCONNECT_PACKET(_ReasonCode), _Via}, #state{reconnect = Re, conn_mod = ConnMod, socket = Sock} = State) when ?NEED_RECONNECT(Re) -> - _ = ConnMod:close(Sock), - next_reconnect(State#state{socket = undefined}); + _ = close_socket(ConnMod, Sock), + next_reconnect(State); connected(cast, {?DISCONNECT_PACKET(ReasonCode, Properties), _Via}, State) -> {stop, {disconnected, ReasonCode, Properties}, State}; @@ -1489,37 +1528,37 @@ handle_event(info, {Error, Sock, Reason}, connected, when ?SOCK_ERROR(Error) andalso ?NEED_RECONNECT(Re) -> ?LOG(info, "reconnect_due_to_connection_error", #{error => Error, reason => Reason}, State), - next_reconnect(State#state{socket = undefined}); + next_reconnect(State); handle_event(info, {Error, Sock, Reason}, waiting_for_connack, #state{reconnect = Re, socket = Sock} = State) when ?SOCK_ERROR(Error) andalso ?NEED_RECONNECT(Re) -> ?LOG(info, "socket_error_before_connack", #{error => Error, reason => Reason}, State), - next_reconnect(State#state{socket = undefined}); + next_reconnect(State); handle_event(info, {ssl_error = Error, SSLSock, Reason}, connected, #state{reconnect = Re, socket = #ssl_socket{ssl = SSLSock}} = State) when ?NEED_RECONNECT(Re) -> ?LOG(info, "reconnect_due_to_connection_error", #{error => Error, reason => Reason}, State), - next_reconnect(State#state{socket = undefined}); + next_reconnect(State); handle_event(info, {ssl_error = Error, SSLSock, Reason}, waiting_for_connack, #state{reconnect = Re, socket = #ssl_socket{ssl = SSLSock}} = State) when ?NEED_RECONNECT(Re) -> ?LOG(info, "socket_error_before_connack", #{error => Error, reason => Reason}, State), - next_reconnect(State#state{socket = undefined}); + next_reconnect(State); handle_event(info, {Error, Sock, Reason}, _StateName, #state{socket = Sock} = State) - when Error =:= tcp_error; Error =:= ssl_error; Error =:= 'EXIT' -> + when ?SOCK_ERROR(Error) -> ?LOG(error, "connection_error", - #{error => Error, reason =>Reason}, State), + #{error => Error, reason => Reason}, State), {stop, {shutdown, Reason}, State}; handle_event(info, {ssl_error = Error, SSLSock, Reason}, _StateName, #state{socket = #ssl_socket{ssl = SSLSock}} = State) -> - ?LOG(error, "connection_error", + ?LOG(error, "TLS connection_error", #{error => Error, reason => Reason}, State), {stop, {shutdown, Reason}, State}; @@ -1536,12 +1575,12 @@ handle_event(info, {tcp_closed, Sock} = Event, StateName, #state{socket = SockIn handle_event(info, {Closed, _Sock}, connected, #state{ reconnect = Re} = State) when ?SOCK_CLOSED(Closed) andalso ?NEED_RECONNECT(Re) -> ?LOG(info, "socket_closed_when_connected", #{}, State), - next_reconnect(State#state{socket = undefined}); + next_reconnect(State); handle_event(info, {Closed, _Sock}, waiting_for_connack, #state{ reconnect = Re} = State) when ?SOCK_CLOSED(Closed) andalso ?NEED_RECONNECT(Re) -> ?LOG(info, "socket_closed_before_connack", #{}, State), - next_reconnect(State#state{socket = undefined}); + next_reconnect(State); handle_event(info, {Closed, Sock}, StateName, State) when Closed =:= tcp_closed; Closed =:= ssl_closed -> @@ -1889,17 +1928,22 @@ ensure_keepalive_timer(I, State) when is_integer(I) -> new_call(Id, From) -> new_call(Id, From, undefined). + new_call(Id, From, Req) -> #call{id = Id, from = From, req = Req, ts = os:timestamp()}. +set_call_via(#call{id = OldId} = Call, Via) -> + Call#call{id = call_id(OldId, Via)}. + add_call(Call, Data = #state{pending_calls = Calls}) -> Data#state{pending_calls = [Call | Calls]}. -take_call(Id, Data = #state{pending_calls = Calls}) -> +take_call(#callid{} = Id, Data = #state{pending_calls = Calls}) -> case lists:keytake(Id, #call.id, Calls) of {value, Call, Left} -> {value, Call, Data#state{pending_calls = Left}}; - false -> false + false -> + false end. timeout_calls(Timeout, Calls) -> @@ -2269,7 +2313,7 @@ next_reconnect(#state{retry_timer = RetryTimer, sock_opts = OldSockOpts, reconnect = Reconnect, reconnect_timeout = Timeout, - ack_timer = AckTimer, + ack_timer = AckTimer, socket = OldSocket, conn_mod = ConnMod } = State, Actions) -> @@ -2277,8 +2321,8 @@ next_reconnect(#state{retry_timer = RetryTimer, ok = cancel_timer(RetryTimer), ok = cancel_timer(KeepAliveTimer), ok = cancel_timer(AckTimer), - {next_state, reconnect, State#state{socket = undefined, - sock_opts = proplists:delete(handle, OldSockOpts), + State1 = update_for_reconnecting(State), + {next_state, reconnect, State1#state{sock_opts = proplists:delete(handle, OldSockOpts), retry_timer = undefined, keepalive_timer = undefined }, @@ -2286,6 +2330,8 @@ next_reconnect(#state{retry_timer = RetryTimer, close_socket(_, undefined) -> ok; +close_socket(_, ?socket_reconnecting) -> + ok; close_socket(ConnMod, Socket) -> _ = ConnMod:close(Socket), ok. @@ -2315,16 +2361,19 @@ now_ts() -> maybe_init_quic_state(emqtt_quic, Old = #state{extra = #{control_stream_sock := {quic, _, _} }}) -> %% Already opened Old; -maybe_init_quic_state(emqtt_quic, #state{extra = Extra, clientid = Cid, - reconnect = Re, parse_state = PS} = Old) -> +maybe_init_quic_state(emqtt_quic, State) -> + do_init_quic_state(State); +maybe_init_quic_state(_, Old) -> + Old. + +do_init_quic_state(#state{extra = Extra, clientid = Cid, + reconnect = Re, parse_state = PS} = Old) -> Old#state{extra = emqtt_quic:init_state(Extra#{ clientid => Cid , conn_parse_state => PS %% set once , data_stream_socks => [] , logic_stream_map => #{} , control_stream_sock => undefined - , reconnect => ?NEED_RECONNECT(Re)})}; -maybe_init_quic_state(_, Old) -> - Old. + , reconnect => ?NEED_RECONNECT(Re)})}. update_data_streams(#{ data_stream_socks := Socks , conn_parse_state := PS @@ -2397,8 +2446,37 @@ maybe_new_stream(Def, State) -> default_via(#state{socket = Via})-> Via. +%% Update #state{} for reconnecting, forget about old connections. +update_for_reconnecting(#state{socket = Socket, pending_calls = Calls} = State0) + when Socket =/= ?socket_reconnecting -> + NewSocket = ?socket_reconnecting, + PendingCalls = refresh_calls(Calls, ?socket_reconnecting), + State1 = maybe_reinit_quic_state(State0), + State1#state{socket = NewSocket, pending_calls = PendingCalls}. + +maybe_reinit_quic_state(#state{extra = #{control_stream_sock := _}} = S) -> + do_init_quic_state(S); +maybe_reinit_quic_state(S) -> + S. + +refresh_calls(Calls, Via) -> + lists:map(fun(X)-> set_call_via(X, Via) end, Calls). + %% @doc avoid sensitive data leakage in the debug log redact_packet(#mqtt_packet{variable = #mqtt_packet_connect{} = Conn} = Packet) -> Packet#mqtt_packet{variable = Conn#mqtt_packet_connect{password = <<"******">>}}; redact_packet(Packet) -> Packet. + +-spec call_id(opname(), + via(), + packet_id() | undefined + ) -> callid(). +call_id(Op, Via, PacketId) -> + #callid{op = Op, via = Via, packet_id = PacketId}. + +-spec call_id(callid() | opname(), via()) -> callid(). +call_id(#callid{} = C, Via) -> + C#callid{via = Via}; +call_id(Op, Via) when is_atom(Op) -> + call_id(Op, Via, undefined). diff --git a/src/emqtt_quic_stream.erl b/src/emqtt_quic_stream.erl index 1fcd523..2318ae2 100644 --- a/src/emqtt_quic_stream.erl +++ b/src/emqtt_quic_stream.erl @@ -115,14 +115,19 @@ handle_stream_data(Stream, Bin, _Flags, #{ is_local := true , stream_parse_state := PSS} = S) -> ?LOG(debug, "RECV_Data", #{data => Bin}, S), Via = {quic, Conn, Stream}, - #{ Via := PS } = PSS, - case parse(Bin, PS, []) of - {keep_state, NewPS, Packets} -> - {keep_state, S#{stream_parse_state := PSS#{Via := NewPS}}, - [{next_event, cast, {P, Via} } - || P <- lists:reverse(Packets)]}; - {stop, _} = Stop -> - Stop + case maps:get(Via, PSS, undefined) of + undefined -> + ?LOG(warning, "unknown stream data", #{stream => Stream}, S), + {stop, {unknown_stream_data, Stream}, S}; + PS -> + case parse(Bin, PS, []) of + {keep_state, NewPS, Packets} -> + {keep_state, S#{stream_parse_state := maps:update(Via, NewPS, PSS)}, + [{next_event, cast, {P, Via} } + || P <- lists:reverse(Packets)]}; + {stop, _} = Stop -> + Stop + end end; %% Remote stream handle_stream_data(_Stream, _Bin, _Flags, diff --git a/test/emqtt_SUITE.erl b/test/emqtt_SUITE.erl index 6d9ccd9..d28b5a3 100644 --- a/test/emqtt_SUITE.erl +++ b/test/emqtt_SUITE.erl @@ -83,6 +83,8 @@ groups() -> t_pubcomp, t_reconnect_disabled, t_reconnect_enabled, + t_retry_CONNECT_packet_send, + t_retry_CONNECT_asyn_socket_error, t_reconnect_enabled_server_disconnect, t_reconnect_stop, t_reconnect_reach_max_attempts, @@ -301,8 +303,8 @@ t_ssl_error_server_reject_client(Config) -> , {verify, verify_none} ]} ]), - ?assertMatch({error, {ssl_error, _Sock, {tls_alert, {unknown_ca, _}}}}, - emqtt:connect(C)), + {error, Reason} = emqtt:connect(C), + ?assertMatch({ssl_error, _Sock, {tls_alert, {unknown_ca, _}}}, Reason), ok. t_reconnect_enabled(Config) -> @@ -349,6 +351,63 @@ t_reconnect_enabled(Config) -> ], receive_messages(1)) end. +t_retry_CONNECT_packet_send(Config) -> + retry_CONNECT_packet_send(true, Config). + +t_retry_CONNECT_asyn_socket_error(Config) -> + retry_CONNECT_packet_send(false, Config). + +retry_CONNECT_packet_send(SendError, Config) -> + ConnFun = ?config(conn_fun, Config), + Port = ?config(port, Config), + process_flag(trap_exit, true), + meck:new(emqtt_sock, [passthrough, no_history]), + meck:new(emqtt_quic, [passthrough, no_history]), + Tester = self(), + F = fun(Sock, _Packet) -> + Msg = case ConnFun of + connect -> + {tcp_error, Sock, closed}; + quic_connect -> + {quic_error, Sock, closed} + end, + _ = erlang:send_after(10, self(), Msg), + Tester ! sync, + case SendError of + true -> + {error, einval}; + false -> + ok + end + end, + meck:expect(emqtt_sock, send, F), + meck:expect(emqtt_quic, send, F), + {ok, C} = emqtt:start_link([{port, Port}, + {reconnect, true}, + {clean_start, false}, + {reconnect_timeout, 1}]), % 1 sec + spawn_link(fun() -> + Res = emqtt:ConnFun(C), + Tester ! {connect_result, Res} + end), + receive + sync -> ok + after + 1000 -> + error(timeout) + end, + %% unload mock, expect reconnect to succeed + meck:unload(emqtt_sock), + meck:unload(emqtt_quic), + receive + {connect_result, Res} -> + ?assertMatch({ok, _}, Res) + after + 3000 -> + error(timeout) + end, + ok. + t_reconnect_enabled_server_disconnect(Config) -> ConnFun = ?config(conn_fun, Config), Port = ?config(port, Config), @@ -685,7 +744,6 @@ test_publish_port_error(Config) -> exit(Sock, kill), PublishResult = receive {publish_result, R} -> R after 5000 -> - io:format(user, "~p\n", [sys:get_state(C)]), ct:fail(timeout) end, ?assertEqual({error, {shutdown, killed}}, PublishResult), @@ -753,7 +811,6 @@ test_publish_port_error_retry(Config) -> end after 10_000 -> - io:format(user, "~p~n", [sys:get_state(C)]), error(timeout) end end,