From 7aba67e949d50cb10ddebd1f162c1bbf9df890d0 Mon Sep 17 00:00:00 2001 From: macpie Date: Wed, 31 Jul 2024 14:21:31 -0700 Subject: [PATCH] ETS leaks (#304) * format * Use gproc instead of ets for http roaming * Replace ets by gproc for gwmp protocol * Fix dialyzer --- src/grpc/iot_config/hpr_org_list_req.erl | 2 +- src/grpc/iot_config/hpr_route_list_req.erl | 10 +-- src/protocols/gwmp/hpr_gwmp_sup.erl | 57 +++-------------- src/protocols/gwmp/hpr_gwmp_worker.erl | 4 +- src/protocols/hpr_protocol_gwmp.erl | 3 +- src/protocols/hpr_protocol_http_roaming.erl | 29 ++++----- src/protocols/http/hpr_http_roaming_sup.erl | 63 +++---------------- .../http/hpr_http_roaming_worker.erl | 4 +- test/hpr_protocol_gwmp_SUITE.erl | 13 +++- 9 files changed, 53 insertions(+), 132 deletions(-) diff --git a/src/grpc/iot_config/hpr_org_list_req.erl b/src/grpc/iot_config/hpr_org_list_req.erl index e75fe8ed..c9581efb 100644 --- a/src/grpc/iot_config/hpr_org_list_req.erl +++ b/src/grpc/iot_config/hpr_org_list_req.erl @@ -8,4 +8,4 @@ -spec new() -> req(). new() -> - #iot_config_org_list_req_v1_pb{}. + #iot_config_org_list_req_v1_pb{}. diff --git a/src/grpc/iot_config/hpr_route_list_req.erl b/src/grpc/iot_config/hpr_route_list_req.erl index d52339a9..9e9e1f86 100644 --- a/src/grpc/iot_config/hpr_route_list_req.erl +++ b/src/grpc/iot_config/hpr_route_list_req.erl @@ -26,15 +26,15 @@ new(Signer, Oui) -> -spec timestamp(Req :: req()) -> non_neg_integer(). timestamp(Req) -> - Req#iot_config_route_list_req_v1_pb.timestamp. + Req#iot_config_route_list_req_v1_pb.timestamp. --spec signer(Req:: req()) -> binary(). +-spec signer(Req :: req()) -> binary(). signer(Req) -> - Req#iot_config_route_list_req_v1_pb.signer. + Req#iot_config_route_list_req_v1_pb.signer. --spec signature(Req:: req()) -> binary(). +-spec signature(Req :: req()) -> binary(). signature(Req) -> - Req#iot_config_route_list_req_v1_pb.signature. + Req#iot_config_route_list_req_v1_pb.signature. -spec sign(RouteListReq :: req(), SigFun :: fun()) -> req(). sign(RouteListReq, SigFun) -> diff --git a/src/protocols/gwmp/hpr_gwmp_sup.erl b/src/protocols/gwmp/hpr_gwmp_sup.erl index 74b2f532..fef4448a 100644 --- a/src/protocols/gwmp/hpr_gwmp_sup.erl +++ b/src/protocols/gwmp/hpr_gwmp_sup.erl @@ -11,12 +11,10 @@ -export([ start_link/0, init/1, - maybe_start_worker/2, - lookup_worker/1 + maybe_start_worker/1 ]). -define(UDP_WORKER, hpr_gwmp_worker). --define(ETS, hpr_gwmp_sup_ets). -define(WORKER(I), #{ id => I, @@ -40,31 +38,13 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). --spec maybe_start_worker(WorkerKey :: binary(), Args :: map()) -> {ok, pid()} | {error, any()}. -maybe_start_worker(WorkerKey, Args) -> - case ets:lookup(?ETS, WorkerKey) of - [] -> - start_worker(WorkerKey, Args); - [{WorkerKey, Pid}] -> - case erlang:is_process_alive(Pid) of - true -> - {ok, Pid}; - false -> - _ = ets:delete(?ETS, WorkerKey), - start_worker(WorkerKey, Args) - end - end. - --spec lookup_worker(PubKeyBin :: binary()) -> {ok, pid()} | {error, not_found}. -lookup_worker(WorkerKey) -> - case ets:lookup(?ETS, WorkerKey) of - [] -> - {error, not_found}; - [{WorkerKey, Pid}] -> - case erlang:is_process_alive(Pid) of - true -> {ok, Pid}; - false -> {error, not_found} - end +-spec maybe_start_worker(Args :: map()) -> {ok, pid()} | {error, any()}. +maybe_start_worker(#{key := Key} = Args) -> + case gproc:lookup_local_name(Key) of + Pid when is_pid(Pid) -> + {ok, Pid}; + undefined -> + supervisor:start_child(?MODULE, [Args]) end. %%==================================================================== @@ -72,29 +52,8 @@ lookup_worker(WorkerKey) -> %%==================================================================== init([]) -> - ?ETS = ets:new(?ETS, [public, named_table, set]), {ok, {?FLAGS, [?WORKER(?UDP_WORKER)]}}. %% ------------------------------------------------------------------ %% Internal Function Definitions %% ------------------------------------------------------------------ - --spec start_worker(PubKeyBin :: binary(), Args :: map()) -> - {ok, pid()} | {error, any()}. -start_worker(PubKeyBin, Args) -> - ChildArgs = maps:merge( - #{pubkeybin => PubKeyBin}, - Args - ), - case supervisor:start_child(?MODULE, [ChildArgs]) of - {error, Err} -> - {error, Err}; - {ok, Pid} = OK -> - case ets:insert_new(?ETS, {PubKeyBin, Pid}) of - true -> - OK; - false -> - supervisor:terminate_child(?UDP_WORKER, Pid), - maybe_start_worker(PubKeyBin, Args) - end - end. diff --git a/src/protocols/gwmp/hpr_gwmp_worker.erl b/src/protocols/gwmp/hpr_gwmp_worker.erl index 78252a62..80e8d91a 100644 --- a/src/protocols/gwmp/hpr_gwmp_worker.erl +++ b/src/protocols/gwmp/hpr_gwmp_worker.erl @@ -86,10 +86,12 @@ push_data(WorkerPid, PacketUp, SocketDest, Timestamp, GatewayLocation) -> %% gen_server Function Definitions %% ------------------------------------------------------------------ -init(#{pubkeybin := PubKeyBin} = Args) -> +init(#{key := Key, pubkeybin := PubKeyBin} = Args) -> process_flag(trap_exit, true), lager:info("~p init with ~p", [?SERVER, Args]), + true = gproc:add_local_name(Key), + PullDataTimer = maps:get(pull_data_timer, Args, ?PULL_DATA_TIMER), lager:md([ diff --git a/src/protocols/hpr_protocol_gwmp.erl b/src/protocols/hpr_protocol_gwmp.erl index c62cd772..119de08c 100644 --- a/src/protocols/hpr_protocol_gwmp.erl +++ b/src/protocols/hpr_protocol_gwmp.erl @@ -10,7 +10,8 @@ ) -> ok | {error, any()}. send(PacketUp, Route, Timestamp, GatewayLocation) -> Gateway = hpr_packet_up:gateway(PacketUp), - case hpr_gwmp_sup:maybe_start_worker(Gateway, #{}) of + Key = {?MODULE, Gateway}, + case hpr_gwmp_sup:maybe_start_worker(#{key => Key, pubkeybin => Gateway}) of {error, Reason} -> {error, {gwmp_sup_err, Reason}}; {ok, Pid} -> diff --git a/src/protocols/hpr_protocol_http_roaming.erl b/src/protocols/hpr_protocol_http_roaming.erl index cc70d39a..48f11835 100644 --- a/src/protocols/hpr_protocol_http_roaming.erl +++ b/src/protocols/hpr_protocol_http_roaming.erl @@ -20,37 +20,31 @@ GatewayLocation :: hpr_gateway_location:loc() ) -> ok | {error, any()}. send(PacketUp, Route, Timestamp, GatewayLocation) -> - WorkerKey = worker_key_from(PacketUp, Route), - PubKeyBin = hpr_packet_up:gateway(PacketUp), Protocol = protocol_from(Route), + WorkerKey = worker_key_from(PacketUp, Protocol), + PubKeyBin = hpr_packet_up:gateway(PacketUp), %% start worker case - hpr_http_roaming_sup:maybe_start_worker( - WorkerKey, - #{protocol => Protocol, net_id => hpr_route:net_id(Route)} - ) + hpr_http_roaming_sup:maybe_start_worker(#{ + key => WorkerKey, protocol => Protocol, net_id => hpr_route:net_id(Route) + }) of - {error, worker_not_started, _} = Err -> + {error, Reason} = Err -> lager:error( "failed to start http connector for ~s: ~p", - [hpr_utils:gateway_name(PubKeyBin), Err] + [hpr_utils:gateway_name(PubKeyBin), Reason] ), - {error, worker_not_started}; + Err; {ok, WorkerPid} -> hpr_http_roaming_worker:handle_packet(WorkerPid, PacketUp, Timestamp, GatewayLocation), ok end. --spec worker_key_from(hpr_packet_up:packet(), hpr_route:route()) -> +-spec worker_key_from(PacketUp :: hpr_packet_up:packet(), Protocol :: #http_protocol{}) -> hpr_http_roaming_sup:worker_key(). -worker_key_from(PacketUp, Route) -> - %% get phash +worker_key_from(PacketUp, Protocol) -> Phash = hpr_packet_up:phash(PacketUp), - NetId = hpr_route:net_id(Route), - - %% get protocol - Protocol = protocol_from(Route), - {Phash, Protocol, NetId}. + {?MODULE, Protocol#http_protocol.route_id, Phash}. -spec protocol_from(hpr_route:route()) -> hpr_http_roaming_sup:http_protocol(). protocol_from(Route) -> @@ -62,7 +56,6 @@ protocol_from(Route) -> end, AuthHeader = hpr_route:http_auth_header(Route), ReceiverNSID = hpr_route:http_receiver_nsid(Route), - #http_protocol{ route_id = hpr_route:id(Route), flow_type = FlowType, diff --git a/src/protocols/http/hpr_http_roaming_sup.erl b/src/protocols/http/hpr_http_roaming_sup.erl index 03d7001b..6b9a488e 100644 --- a/src/protocols/http/hpr_http_roaming_sup.erl +++ b/src/protocols/http/hpr_http_roaming_sup.erl @@ -18,8 +18,7 @@ %% API -export([ start_link/0, - maybe_start_worker/2, - lookup_worker/1 + maybe_start_worker/1 ]). %% Supervisor callbacks @@ -40,15 +39,7 @@ period => 60 }). --define(ETS, hpr_http_sup_ets). - --type worker_key() :: { - PHash :: binary(), - Protocol :: http_protocol(), - NetId :: non_neg_integer() -}. - --export_type([worker_key/0, http_protocol/0]). +-export_type([http_protocol/0]). %%==================================================================== %% API functions @@ -58,33 +49,14 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). -spec maybe_start_worker( - WorkerKey :: worker_key(), Args :: map() -) -> {ok, pid()} | {error, any()} | {error, worker_not_started, any()}. -maybe_start_worker(WorkerKey, Args) -> - case ets:lookup(?ETS, WorkerKey) of - [] -> - start_worker(WorkerKey, Args); - [{WorkerKey, Pid}] -> - case erlang:is_process_alive(Pid) of - true -> - {ok, Pid}; - false -> - _ = ets:delete(?ETS, WorkerKey), - start_worker(WorkerKey, Args) - end - end. - --spec lookup_worker(WorkerKey :: worker_key()) -> {ok, pid()} | {error, not_found}. -lookup_worker(WorkerKey) -> - case ets:lookup(?ETS, WorkerKey) of - [] -> - {error, not_found}; - [{WorkerKey, Pid}] -> - case erlang:is_process_alive(Pid) of - true -> {ok, Pid}; - false -> {error, not_found} - end +) -> {ok, pid()} | {error, any()}. +maybe_start_worker(#{key := Key} = Args) -> + case gproc:lookup_local_name(Key) of + Pid when is_pid(Pid) -> + {ok, Pid}; + undefined -> + supervisor:start_child(?MODULE, [Args]) end. %%==================================================================== @@ -92,25 +64,8 @@ lookup_worker(WorkerKey) -> %%==================================================================== init([]) -> - ets:new(?ETS, [public, named_table, set]), {ok, {?FLAGS, [?WORKER(hpr_http_roaming_worker)]}}. %% ------------------------------------------------------------------ %% Internal Function Definitions %% ------------------------------------------------------------------ - --spec start_worker(WorkerKey :: worker_key(), map()) -> - {ok, pid()} | {error, worker_not_started, any()}. -start_worker(WorkerKey, Args) -> - case supervisor:start_child(?MODULE, [Args]) of - {error, Err} -> - {error, worker_not_started, Err}; - {ok, Pid} = OK -> - case ets:insert_new(?ETS, {WorkerKey, Pid}) of - true -> - OK; - false -> - supervisor:terminate_child(?MODULE, Pid), - maybe_start_worker(WorkerKey, Args) - end - end. diff --git a/src/protocols/http/hpr_http_roaming_worker.erl b/src/protocols/http/hpr_http_roaming_worker.erl index fdfb4f21..40574b01 100644 --- a/src/protocols/http/hpr_http_roaming_worker.erl +++ b/src/protocols/http/hpr_http_roaming_worker.erl @@ -83,9 +83,11 @@ init(Args) -> auth_header = Auth, receiver_nsid = ReceiverNSID }, - net_id := NetID + net_id := NetID, + key := Key } = Args, lager:debug("~p init with ~p", [?MODULE, Args]), + true = gproc:add_local_name(Key), {ok, #state{ net_id = NetID, route_id = RouteID, diff --git a/test/hpr_protocol_gwmp_SUITE.erl b/test/hpr_protocol_gwmp_SUITE.erl index 84f30f19..13f73a97 100644 --- a/test/hpr_protocol_gwmp_SUITE.erl +++ b/test/hpr_protocol_gwmp_SUITE.erl @@ -442,7 +442,7 @@ pull_ack_test(_Config) -> ?assert(erlang:is_binary(Token)), %% There is an outstanding pull_data - {ok, WorkerPid} = hpr_gwmp_sup:lookup_worker(PubKeyBin), + {ok, WorkerPid} = lookup_worker(PubKeyBin), ?assertEqual( 1, maps:size(element(5, sys:get_state(WorkerPid))), @@ -508,7 +508,7 @@ pull_ack_hostname_test(_Config) -> ?assert(erlang:is_binary(Token)), %% There is an outstanding pull_data - {ok, WorkerPid} = hpr_gwmp_sup:lookup_worker(PubKeyBin), + {ok, WorkerPid} = lookup_worker(PubKeyBin), ?assertEqual( 1, maps:size(element(5, sys:get_state(WorkerPid))), @@ -636,6 +636,15 @@ gateway_disconnect_test(_Config) -> %% Helpers %% =================================================================== +-spec lookup_worker(Key :: binary()) -> {ok, pid()} | {error, not_found}. +lookup_worker(Key) -> + case gproc:lookup_local_name({hpr_protocol_gwmp, Key}) of + Pid when is_pid(Pid) -> + {ok, Pid}; + undefined -> + {error, not_found} + end. + test_route(Port) -> test_route("127.0.0.1", Port).