Skip to content

Commit

Permalink
ETS leaks (#304)
Browse files Browse the repository at this point in the history
* format

* Use gproc instead of ets for http roaming

* Replace ets by gproc for gwmp protocol

* Fix dialyzer
  • Loading branch information
macpie authored Jul 31, 2024
1 parent fe56018 commit 7aba67e
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 132 deletions.
2 changes: 1 addition & 1 deletion src/grpc/iot_config/hpr_org_list_req.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@

-spec new() -> req().
new() ->
#iot_config_org_list_req_v1_pb{}.
#iot_config_org_list_req_v1_pb{}.
10 changes: 5 additions & 5 deletions src/grpc/iot_config/hpr_route_list_req.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ new(Signer, Oui) ->

-spec timestamp(Req :: req()) -> non_neg_integer().
timestamp(Req) ->
Req#iot_config_route_list_req_v1_pb.timestamp.
Req#iot_config_route_list_req_v1_pb.timestamp.

-spec signer(Req:: req()) -> binary().
-spec signer(Req :: req()) -> binary().
signer(Req) ->
Req#iot_config_route_list_req_v1_pb.signer.
Req#iot_config_route_list_req_v1_pb.signer.

-spec signature(Req:: req()) -> binary().
-spec signature(Req :: req()) -> binary().
signature(Req) ->
Req#iot_config_route_list_req_v1_pb.signature.
Req#iot_config_route_list_req_v1_pb.signature.

-spec sign(RouteListReq :: req(), SigFun :: fun()) -> req().
sign(RouteListReq, SigFun) ->
Expand Down
57 changes: 8 additions & 49 deletions src/protocols/gwmp/hpr_gwmp_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@
-export([
start_link/0,
init/1,
maybe_start_worker/2,
lookup_worker/1
maybe_start_worker/1
]).

-define(UDP_WORKER, hpr_gwmp_worker).
-define(ETS, hpr_gwmp_sup_ets).

-define(WORKER(I), #{
id => I,
Expand All @@ -40,61 +38,22 @@
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

-spec maybe_start_worker(WorkerKey :: binary(), Args :: map()) -> {ok, pid()} | {error, any()}.
maybe_start_worker(WorkerKey, Args) ->
case ets:lookup(?ETS, WorkerKey) of
[] ->
start_worker(WorkerKey, Args);
[{WorkerKey, Pid}] ->
case erlang:is_process_alive(Pid) of
true ->
{ok, Pid};
false ->
_ = ets:delete(?ETS, WorkerKey),
start_worker(WorkerKey, Args)
end
end.

-spec lookup_worker(PubKeyBin :: binary()) -> {ok, pid()} | {error, not_found}.
lookup_worker(WorkerKey) ->
case ets:lookup(?ETS, WorkerKey) of
[] ->
{error, not_found};
[{WorkerKey, Pid}] ->
case erlang:is_process_alive(Pid) of
true -> {ok, Pid};
false -> {error, not_found}
end
-spec maybe_start_worker(Args :: map()) -> {ok, pid()} | {error, any()}.
maybe_start_worker(#{key := Key} = Args) ->
case gproc:lookup_local_name(Key) of
Pid when is_pid(Pid) ->
{ok, Pid};
undefined ->
supervisor:start_child(?MODULE, [Args])
end.

%%====================================================================
%% Supervisor callbacks
%%====================================================================

init([]) ->
?ETS = ets:new(?ETS, [public, named_table, set]),
{ok, {?FLAGS, [?WORKER(?UDP_WORKER)]}}.

%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------

-spec start_worker(PubKeyBin :: binary(), Args :: map()) ->
{ok, pid()} | {error, any()}.
start_worker(PubKeyBin, Args) ->
ChildArgs = maps:merge(
#{pubkeybin => PubKeyBin},
Args
),
case supervisor:start_child(?MODULE, [ChildArgs]) of
{error, Err} ->
{error, Err};
{ok, Pid} = OK ->
case ets:insert_new(?ETS, {PubKeyBin, Pid}) of
true ->
OK;
false ->
supervisor:terminate_child(?UDP_WORKER, Pid),
maybe_start_worker(PubKeyBin, Args)
end
end.
4 changes: 3 additions & 1 deletion src/protocols/gwmp/hpr_gwmp_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,12 @@ push_data(WorkerPid, PacketUp, SocketDest, Timestamp, GatewayLocation) ->
%% gen_server Function Definitions
%% ------------------------------------------------------------------

init(#{pubkeybin := PubKeyBin} = Args) ->
init(#{key := Key, pubkeybin := PubKeyBin} = Args) ->
process_flag(trap_exit, true),
lager:info("~p init with ~p", [?SERVER, Args]),

true = gproc:add_local_name(Key),

PullDataTimer = maps:get(pull_data_timer, Args, ?PULL_DATA_TIMER),

lager:md([
Expand Down
3 changes: 2 additions & 1 deletion src/protocols/hpr_protocol_gwmp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
) -> ok | {error, any()}.
send(PacketUp, Route, Timestamp, GatewayLocation) ->
Gateway = hpr_packet_up:gateway(PacketUp),
case hpr_gwmp_sup:maybe_start_worker(Gateway, #{}) of
Key = {?MODULE, Gateway},
case hpr_gwmp_sup:maybe_start_worker(#{key => Key, pubkeybin => Gateway}) of
{error, Reason} ->
{error, {gwmp_sup_err, Reason}};
{ok, Pid} ->
Expand Down
29 changes: 11 additions & 18 deletions src/protocols/hpr_protocol_http_roaming.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,31 @@
GatewayLocation :: hpr_gateway_location:loc()
) -> ok | {error, any()}.
send(PacketUp, Route, Timestamp, GatewayLocation) ->
WorkerKey = worker_key_from(PacketUp, Route),
PubKeyBin = hpr_packet_up:gateway(PacketUp),
Protocol = protocol_from(Route),
WorkerKey = worker_key_from(PacketUp, Protocol),
PubKeyBin = hpr_packet_up:gateway(PacketUp),
%% start worker
case
hpr_http_roaming_sup:maybe_start_worker(
WorkerKey,
#{protocol => Protocol, net_id => hpr_route:net_id(Route)}
)
hpr_http_roaming_sup:maybe_start_worker(#{
key => WorkerKey, protocol => Protocol, net_id => hpr_route:net_id(Route)
})
of
{error, worker_not_started, _} = Err ->
{error, Reason} = Err ->
lager:error(
"failed to start http connector for ~s: ~p",
[hpr_utils:gateway_name(PubKeyBin), Err]
[hpr_utils:gateway_name(PubKeyBin), Reason]
),
{error, worker_not_started};
Err;
{ok, WorkerPid} ->
hpr_http_roaming_worker:handle_packet(WorkerPid, PacketUp, Timestamp, GatewayLocation),
ok
end.

-spec worker_key_from(hpr_packet_up:packet(), hpr_route:route()) ->
-spec worker_key_from(PacketUp :: hpr_packet_up:packet(), Protocol :: #http_protocol{}) ->
hpr_http_roaming_sup:worker_key().
worker_key_from(PacketUp, Route) ->
%% get phash
worker_key_from(PacketUp, Protocol) ->
Phash = hpr_packet_up:phash(PacketUp),
NetId = hpr_route:net_id(Route),

%% get protocol
Protocol = protocol_from(Route),
{Phash, Protocol, NetId}.
{?MODULE, Protocol#http_protocol.route_id, Phash}.

-spec protocol_from(hpr_route:route()) -> hpr_http_roaming_sup:http_protocol().
protocol_from(Route) ->
Expand All @@ -62,7 +56,6 @@ protocol_from(Route) ->
end,
AuthHeader = hpr_route:http_auth_header(Route),
ReceiverNSID = hpr_route:http_receiver_nsid(Route),

#http_protocol{
route_id = hpr_route:id(Route),
flow_type = FlowType,
Expand Down
63 changes: 9 additions & 54 deletions src/protocols/http/hpr_http_roaming_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
%% API
-export([
start_link/0,
maybe_start_worker/2,
lookup_worker/1
maybe_start_worker/1
]).

%% Supervisor callbacks
Expand All @@ -40,15 +39,7 @@
period => 60
}).

-define(ETS, hpr_http_sup_ets).

-type worker_key() :: {
PHash :: binary(),
Protocol :: http_protocol(),
NetId :: non_neg_integer()
}.

-export_type([worker_key/0, http_protocol/0]).
-export_type([http_protocol/0]).

%%====================================================================
%% API functions
Expand All @@ -58,59 +49,23 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

-spec maybe_start_worker(
WorkerKey :: worker_key(),
Args :: map()
) -> {ok, pid()} | {error, any()} | {error, worker_not_started, any()}.
maybe_start_worker(WorkerKey, Args) ->
case ets:lookup(?ETS, WorkerKey) of
[] ->
start_worker(WorkerKey, Args);
[{WorkerKey, Pid}] ->
case erlang:is_process_alive(Pid) of
true ->
{ok, Pid};
false ->
_ = ets:delete(?ETS, WorkerKey),
start_worker(WorkerKey, Args)
end
end.

-spec lookup_worker(WorkerKey :: worker_key()) -> {ok, pid()} | {error, not_found}.
lookup_worker(WorkerKey) ->
case ets:lookup(?ETS, WorkerKey) of
[] ->
{error, not_found};
[{WorkerKey, Pid}] ->
case erlang:is_process_alive(Pid) of
true -> {ok, Pid};
false -> {error, not_found}
end
) -> {ok, pid()} | {error, any()}.
maybe_start_worker(#{key := Key} = Args) ->
case gproc:lookup_local_name(Key) of
Pid when is_pid(Pid) ->
{ok, Pid};
undefined ->
supervisor:start_child(?MODULE, [Args])
end.

%%====================================================================
%% Supervisor callbacks
%%====================================================================

init([]) ->
ets:new(?ETS, [public, named_table, set]),
{ok, {?FLAGS, [?WORKER(hpr_http_roaming_worker)]}}.

%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------

-spec start_worker(WorkerKey :: worker_key(), map()) ->
{ok, pid()} | {error, worker_not_started, any()}.
start_worker(WorkerKey, Args) ->
case supervisor:start_child(?MODULE, [Args]) of
{error, Err} ->
{error, worker_not_started, Err};
{ok, Pid} = OK ->
case ets:insert_new(?ETS, {WorkerKey, Pid}) of
true ->
OK;
false ->
supervisor:terminate_child(?MODULE, Pid),
maybe_start_worker(WorkerKey, Args)
end
end.
4 changes: 3 additions & 1 deletion src/protocols/http/hpr_http_roaming_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,11 @@ init(Args) ->
auth_header = Auth,
receiver_nsid = ReceiverNSID
},
net_id := NetID
net_id := NetID,
key := Key
} = Args,
lager:debug("~p init with ~p", [?MODULE, Args]),
true = gproc:add_local_name(Key),
{ok, #state{
net_id = NetID,
route_id = RouteID,
Expand Down
13 changes: 11 additions & 2 deletions test/hpr_protocol_gwmp_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ pull_ack_test(_Config) ->
?assert(erlang:is_binary(Token)),

%% There is an outstanding pull_data
{ok, WorkerPid} = hpr_gwmp_sup:lookup_worker(PubKeyBin),
{ok, WorkerPid} = lookup_worker(PubKeyBin),
?assertEqual(
1,
maps:size(element(5, sys:get_state(WorkerPid))),
Expand Down Expand Up @@ -508,7 +508,7 @@ pull_ack_hostname_test(_Config) ->
?assert(erlang:is_binary(Token)),

%% There is an outstanding pull_data
{ok, WorkerPid} = hpr_gwmp_sup:lookup_worker(PubKeyBin),
{ok, WorkerPid} = lookup_worker(PubKeyBin),
?assertEqual(
1,
maps:size(element(5, sys:get_state(WorkerPid))),
Expand Down Expand Up @@ -636,6 +636,15 @@ gateway_disconnect_test(_Config) ->
%% Helpers
%% ===================================================================

-spec lookup_worker(Key :: binary()) -> {ok, pid()} | {error, not_found}.
lookup_worker(Key) ->
case gproc:lookup_local_name({hpr_protocol_gwmp, Key}) of
Pid when is_pid(Pid) ->
{ok, Pid};
undefined ->
{error, not_found}
end.

test_route(Port) ->
test_route("127.0.0.1", Port).

Expand Down

0 comments on commit 7aba67e

Please sign in to comment.