diff --git a/src/grpc/packet_router/hpr_packet_router_service.erl b/src/grpc/packet_router/hpr_packet_router_service.erl index 3af1a38c..bea859fd 100644 --- a/src/grpc/packet_router/hpr_packet_router_service.erl +++ b/src/grpc/packet_router/hpr_packet_router_service.erl @@ -15,20 +15,20 @@ ]). -define(REG_KEY(Gateway), {?MODULE, Gateway}). --define(SESSION_TIMER, timer:minutes(30)). --define(SESSION_RESET, session_reset). +-define(SESSION_TIMER, timer:minutes(35)). +-define(SESSION_KILL, session_kill). -record(handler_state, { started :: non_neg_integer(), pubkey_bin :: undefined | binary(), nonce :: undefined | binary(), - session_key :: undefined | binary(), - session_timer :: undefined | reference() + session_key :: undefined | binary() }). -spec init(atom(), grpcbox_stream:t()) -> grpcbox_stream:t(). init(_Rpc, StreamState) -> HandlerState = #handler_state{started = erlang:system_time(millisecond)}, + ok = schedule_session_kill(), grpcbox_stream:stream_handler_state(StreamState, HandlerState). -spec route(hpr_envelope_up:envelope(), grpcbox_stream:t()) -> @@ -68,9 +68,9 @@ handle_info({give_away, NewPid, PubKeyBin}, StreamState) -> lager:info("give_away registration to ~p", [NewPid]), gproc:give_away({n, l, ?REG_KEY(PubKeyBin)}, NewPid), grpcbox_stream:send(true, hpr_envelope_down:new(undefined), StreamState); -handle_info(?SESSION_RESET, StreamState0) -> - {EnvDown, StreamState1} = create_session_offer(StreamState0), - grpcbox_stream:send(false, EnvDown, StreamState1); +handle_info(?SESSION_KILL, StreamState0) -> + lager:debug("received session kill for stream"), + grpcbox_stream:send(true, hpr_envelope_down:new(undefined), StreamState0); handle_info(_Msg, StreamState) -> StreamState. @@ -185,12 +185,7 @@ handle_session_init(SessionInit, StreamState) -> hpr_utils:bin_to_hex_string(Nonce), libp2p_crypto:bin_to_b58(SessionKey) ]), - HandlerState1 = HandlerState0#handler_state{ - session_key = SessionKey, - session_timer = schedule_session_reset( - HandlerState0#handler_state.session_timer - ) - }, + HandlerState1 = HandlerState0#handler_state{session_key = SessionKey}, {ok, grpcbox_stream:stream_handler_state(StreamState, HandlerState1)} end end. @@ -209,12 +204,10 @@ create_session_offer(StreamState0) -> ]), {EnvDown, StreamState1}. --spec schedule_session_reset(OldTimer :: undefined | reference()) -> reference(). -schedule_session_reset(OldTimer) when is_reference(OldTimer) -> - _ = erlang:cancel_timer(OldTimer), - erlang:send_after(?SESSION_TIMER, self(), ?SESSION_RESET); -schedule_session_reset(_OldTimer) -> - erlang:send_after(?SESSION_TIMER, self(), ?SESSION_RESET). +-spec schedule_session_kill() -> ok. +schedule_session_kill() -> + erlang:send_after(?SESSION_TIMER, self(), ?SESSION_KILL), + ok. %% ------------------------------------------------------------------ %% EUnit tests diff --git a/test/hpr_packet_router_service_SUITE.erl b/test/hpr_packet_router_service_SUITE.erl index 0fd69cb8..c2fd5405 100644 --- a/test/hpr_packet_router_service_SUITE.erl +++ b/test/hpr_packet_router_service_SUITE.erl @@ -7,7 +7,8 @@ ]). -export([ - session_test/1 + session_test/1, + session_timeout_test/1 ]). -include_lib("eunit/include/eunit.hrl"). @@ -24,7 +25,8 @@ %%-------------------------------------------------------------------- all() -> [ - session_test + session_test, + session_timeout_test ]. %%-------------------------------------------------------------------- @@ -137,6 +139,61 @@ session_test(_Config) -> ok. +session_timeout_test(_Config) -> + RouteID = "8d502f32-4d58-4746-965e-8c7dfdcfc625", + Route = hpr_route:test_new(#{ + id => RouteID, + net_id => 0, + oui => 4020, + server => #{ + host => "127.0.0.1", + port => 8082, + protocol => {packet_router, #{}} + }, + max_copies => 2 + }), + EUIPairs = [ + hpr_eui_pair:test_new(#{ + route_id => RouteID, app_eui => 802041902051071031, dev_eui => 8942655256770396549 + }) + ], + DevAddrRanges = [ + hpr_devaddr_range:test_new(#{ + route_id => RouteID, start_addr => 16#00000000, end_addr => 16#00000010 + }) + ], + + %% Normal test with session reset + {ok, GatewayPid} = hpr_test_gateway:start(#{ + forward => self(), + route => Route, + eui_pairs => EUIPairs, + devaddr_ranges => DevAddrRanges, + ignore_session_offer => false + }), + + {ok, _} = hpr_test_gateway:receive_session_init(GatewayPid, timer:seconds(1)), + {error, timeout} = hpr_test_gateway:receive_stream_down(GatewayPid), + + SessionKey = hpr_test_gateway:session_key(GatewayPid), + PubKeyBin = hpr_test_gateway:pubkey_bin(GatewayPid), + {ok, Pid} = hpr_packet_router_service:locate(PubKeyBin), + + %% Checking that session keys are the same + ?assertEqual(SessionKey, session_key_from_stream(Pid)), + Pid ! session_kill, + + ok = hpr_test_gateway:receive_stream_down(GatewayPid), + + ok. + %% =================================================================== %% Helpers %% =================================================================== + +session_key_from_stream(Pid) -> + State = sys:get_state(Pid), + StreamState = erlang:element(2, State), + CallbackState = erlang:element(20, StreamState), + HandlerState = erlang:element(3, CallbackState), + erlang:element(5, HandlerState). diff --git a/test/hpr_protocol_router_SUITE.erl b/test/hpr_protocol_router_SUITE.erl index fcb91607..8fcf7117 100644 --- a/test/hpr_protocol_router_SUITE.erl +++ b/test/hpr_protocol_router_SUITE.erl @@ -320,20 +320,19 @@ gateway_disconnect_test(_Config) -> _ConnPid = h2_stream_set:connection(StreamSet), ok = gen_server:stop(GatewayPid), - ok = - receive - {hpr_test_gateway, GatewayPid, - {terminate, #{channel := GatewayStreamSet, stream_pid := GatewayStreamPid}}} -> - GatewayConnPid = h2_stream_set:connection(GatewayStreamSet), - ok = test_utils:wait_until( - fun() -> - false == erlang:is_process_alive(GatewayConnPid) andalso - false == erlang:is_process_alive(GatewayStreamPid) andalso - false == erlang:is_process_alive(GatewayPid) - end - ) - after timer:seconds(3) -> ct:fail(no_terminate_rcvd) - end, + case hpr_test_gateway:receive_terminate(GatewayPid) of + {error, timeout} -> + ct:fail(no_terminate_rcvd); + {ok, #{channel := GatewayStreamSet, stream_pid := GatewayStreamPid}} -> + GatewayConnPid = h2_stream_set:connection(GatewayStreamSet), + ok = test_utils:wait_until( + fun() -> + false == erlang:is_process_alive(GatewayConnPid) andalso + false == erlang:is_process_alive(GatewayStreamPid) andalso + false == erlang:is_process_alive(GatewayPid) + end + ) + end, ok = test_utils:wait_until( fun() -> diff --git a/test/hpr_test_gateway.erl b/test/hpr_test_gateway.erl index cbf1fdfe..93c8a2da 100644 --- a/test/hpr_test_gateway.erl +++ b/test/hpr_test_gateway.erl @@ -8,10 +8,14 @@ -export([ start/1, pubkey_bin/1, + session_key/1, send_packet/2, receive_send_packet/1, receive_env_down/1, - receive_register/1 + receive_register/1, + receive_session_init/2, + receive_stream_down/1, + receive_terminate/1 ]). %% ------------------------------------------------------------------ @@ -31,6 +35,8 @@ -define(RCV_TIMEOUT, 100). -define(SEND_PACKET, send_packet). -define(REGISTER, register). +-define(SESSION_INIT, session_init). +-define(STREAM_DOWN, stream_down). -record(state, { forward :: pid(), @@ -57,6 +63,10 @@ start(Args) -> pubkey_bin(Pid) -> gen_server:call(Pid, pubkey_bin). +-spec session_key(Pid :: pid()) -> binary(). +session_key(Pid) -> + gen_server:call(Pid, session_key). + -spec send_packet(Pid :: pid(), Args :: map()) -> ok. send_packet(Pid, Args) -> gen_server:cast(Pid, {?SEND_PACKET, Args}). @@ -82,7 +92,7 @@ receive_env_down(GatewayPid) -> end. -spec receive_register(GatewayPid :: pid()) -> - {ok, EnvDown :: hpr_envelope_up:envelope()} | {error, timeout}. + {ok, EnvUp :: hpr_envelope_up:envelope()} | {error, timeout}. receive_register(GatewayPid) -> receive {?MODULE, GatewayPid, {?REGISTER, EnvUp}} -> @@ -91,6 +101,34 @@ receive_register(GatewayPid) -> {error, timeout} end. +-spec receive_session_init(GatewayPid :: pid(), Timeout :: non_neg_integer()) -> + {ok, EnvUp :: hpr_envelope_up:envelope()} | {error, timeout}. +receive_session_init(GatewayPid, Timeout) -> + receive + {?MODULE, GatewayPid, {?SESSION_INIT, EnvUp}} -> + {ok, EnvUp} + after Timeout -> + {error, timeout} + end. + +-spec receive_stream_down(GatewayPid :: pid()) -> ok | {error, timeout}. +receive_stream_down(GatewayPid) -> + receive + {?MODULE, GatewayPid, ?STREAM_DOWN} -> + ok + after timer:seconds(2) -> + {error, timeout} + end. + +-spec receive_terminate(GatewayPid :: pid()) -> {ok, any()} | {error, timeout}. +receive_terminate(GatewayPid) -> + receive + {?MODULE, GatewayPid, {terminate, Stream}} -> + {ok, Stream} + after timer:seconds(2) -> + {error, timeout} + end. + %% ------------------------------------------------------------------ %%% gen_server Function Definitions %% ------------------------------------------------------------------ @@ -115,6 +153,8 @@ init( handle_call(pubkey_bin, _From, #state{pubkey_bin = PubKeyBin} = State) -> {reply, PubKeyBin, State}; +handle_call(session_key, _From, #state{session_key = {SessionKey, _}} = State) -> + {reply, SessionKey, State}; handle_call(_Msg, _From, State) -> lager:debug("unknown call ~p", [_Msg]), {reply, ok, State}. @@ -199,7 +239,7 @@ handle_info(?CONNECT, #state{forward = Pid, pubkey_bin = PubKeyBin, sig_fun = Si EnvUp = hpr_envelope_up:new(SignedReg), ok = grpcbox_client:send(Stream, EnvUp), Pid ! {?MODULE, self(), {?REGISTER, EnvUp}}, - lager:debug("connected and registered"), + lager:debug("connected and registering"), {noreply, State#state{stream = Stream}} end; %% GRPC stream callbacks @@ -209,6 +249,8 @@ handle_info( ) -> lager:debug("got EnvDown ~p", [EnvDown]), case hpr_envelope_down:data(EnvDown) of + undefined -> + {noreply, State}; {packet, _Packet} -> Pid ! {?MODULE, self(), {data, EnvDown}}, {noreply, State}; @@ -224,25 +266,30 @@ handle_info( lager:debug("session initialized"), {noreply, State#state{session_key = {SessionKey, libp2p_crypto:mk_sig_fun(PrivKey)}}} end; -handle_info( - {'DOWN', Ref, process, Pid, _Reason}, - #state{stream = #{stream_pid := Pid, monitor_ref := Ref}} = State -) -> - lager:debug("test gateway stream went down"), - {noreply, State#state{stream = undefined}}; handle_info({headers, _StreamID, _Headers}, State) -> + lager:debug("test gateway got headers ~p for ~w", [_Headers, _StreamID]), {noreply, State}; handle_info({trailers, _StreamID, _Trailers}, State) -> + lager:debug("test gateway got trailers ~p for ~w", [_Trailers, _StreamID]), {noreply, State}; +handle_info({eos, StreamID}, #state{forward = ForwardPid} = State) -> + lager:debug("test gateway got eos for ~w", [StreamID]), + ForwardPid ! {?MODULE, self(), ?STREAM_DOWN}, + {noreply, State#state{stream = undefined}}; handle_info(_Msg, State) -> lager:debug("unknown info ~p", [_Msg]), {noreply, State}. +terminate(_Reason, #state{forward = Pid, pubkey_bin = PubKeyBin, stream = undefined}) -> + ok = grpcbox_channel:stop(PubKeyBin), + lager:debug("terminate ~p", [_Reason]), + Pid ! {?MODULE, self(), {terminate, undefined}}, + ok; terminate(_Reason, #state{forward = Pid, pubkey_bin = PubKeyBin, stream = Stream}) -> ok = grpcbox_client:close_send(Stream), ok = grpcbox_channel:stop(PubKeyBin), - Pid ! {?MODULE, self(), {terminate, Stream}}, lager:debug("terminate ~p", [_Reason]), + Pid ! {?MODULE, self(), {terminate, Stream}}, ok. %% ------------------------------------------------------------------