Skip to content

Commit

Permalink
Macpie/route sync (#302)
Browse files Browse the repository at this point in the history
* Cleanup refreshes

* Export ips (needs cleanup)

* Make export ips a cli command

* Fix eunit
  • Loading branch information
macpie authored Jul 8, 2024
1 parent 62cb334 commit 5222fc7
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 43 deletions.
29 changes: 28 additions & 1 deletion src/cli/hpr_cli_info.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ info_usage() ->
[
"\n\n",
"info key - Print HPR's Public Key\n"
"info ips - Export all connected hotspots IPs to /tmp/hotspot_ip.json\n"
]
].

info_cmd() ->
[
[["info", "key"], [], [], fun info_key/3]
[["info", "key"], [], [], fun info_key/3],
[["info", "ips"], [], [], fun info_ips/3]
].

info_key(["info", "key"], [], []) ->
Expand All @@ -50,6 +52,31 @@ info_key(["info", "key"], [], []) ->
info_key(_, _, _) ->
usage.

info_ips(["info", "ips"], [], []) ->
List = lists:map(
fun({_Pid, {IP, PubKeyBin}}) ->
B58 = libp2p_crypto:bin_to_b58(PubKeyBin),
Name = hpr_utils:gateway_name(B58),
#{
key => binary:list_to_bin(B58),
name => binary:list_to_bin(Name),
ip => binary:list_to_bin(IP)
}
end,
gproc:lookup_local_properties(hpr_packet_router_service:ip_key())
),
Json = jsx:encode(List),
case file:open("/tmp/hotspot_ip.json", [write]) of
{ok, File} ->
file:write(File, Json),
file:close(File),
c_text("Exported to /tmp/hotspot_ip.json");
{error, Reason} ->
c_text("Failed to export ~p", [Reason])
end;
info_ips(_, _, _) ->
usage.

%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------
Expand Down
82 changes: 41 additions & 41 deletions src/grpc/iot_config/hpr_route_stream_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -471,43 +471,48 @@ process_route_stream_res(remove, {devaddr_range, DevAddrRange}) ->
process_route_stream_res(remove, {skf, SKF}) ->
hpr_skf_storage:delete(SKF).

-spec refresh_skfs(hpr_route:id()) ->
-spec refresh_devaddrs(hpr_route:id()) ->
{ok, {
Old :: list({{SessionKey :: binary(), Devaddr :: binary()}, MaxCopies :: non_neg_integer()}),
Current :: list(hpr_skf:skf())
Old :: list(hpr_devaddr_range:devaddr_range()),
Current :: list(hpr_devaddr_range:devaddr_range())
}}
| {error, any()}.
refresh_skfs(RouteID) ->
SKFReq = #iot_config_route_skf_list_req_v1_pb{
refresh_devaddrs(RouteID) ->
DevaddrReq = #iot_config_route_get_devaddr_ranges_req_v1_pb{
route_id = RouteID,
timestamp = erlang:system_time(millisecond),
signer = hpr_utils:pubkey_bin()
},
SigFun = hpr_utils:sig_fun(),
EncodedReq = iot_config_pb:encode_msg(SKFReq),
Signed = SKFReq#iot_config_route_skf_list_req_v1_pb{signature = SigFun(EncodedReq)},

EncodedReq = iot_config_pb:encode_msg(DevaddrReq),
Signed = DevaddrReq#iot_config_route_get_devaddr_ranges_req_v1_pb{
signature = SigFun(EncodedReq)
},
case
helium_iot_config_route_client:list_skfs(
helium_iot_config_route_client:get_devaddr_ranges(
Signed,
#{channel => ?IOT_CONFIG_CHANNEL}
)
of
{ok, Stream} ->
case recv_from_stream(Stream) of
SKFs when erlang:is_list(SKFs) ->
Previous = hpr_skf_storage:lookup_route(RouteID),
PreviousCnt = hpr_skf_storage:replace_route(RouteID, SKFs),
Devaddrs when erlang:is_list(Devaddrs) ->
PreviousDevaddrs = hpr_devaddr_range_storage:lookup_for_route(RouteID),
PreviousCnt = hpr_devaddr_range_storage:replace_route(RouteID, Devaddrs),
lager:info(
"route refresh skfs ~p",
[{{previous, PreviousCnt}, {current, length(SKFs)}}]
[{previous, PreviousCnt}, {current, length(Devaddrs)}],
"route refresh devaddrs"
),
{ok, {Previous, SKFs}};
Devaddrs1 = [
{hpr_devaddr_range:start_addr(Range), hpr_devaddr_range:end_addr(Range)}
|| Range <- Devaddrs
],
{ok, {PreviousDevaddrs, Devaddrs1}};
Err ->
Err
end;
{error, _} = Err ->
lager:error([{route_id, RouteID}, Err], "failed to refresh route skfs"),
{error, _E} = Err ->
lager:error([{route_id, RouteID}, Err], "failed to refresh route devaddrs"),
Err
end.

Expand All @@ -533,14 +538,14 @@ refresh_euis(RouteID) ->
{ok, Stream} ->
case recv_from_stream(Stream) of
EUIs when erlang:is_list(EUIs) ->
Previous = hpr_eui_pair_storage:lookup_for_route(RouteID),
PreviousEUIS = hpr_eui_pair_storage:lookup_for_route(RouteID),
PreviousCnt = hpr_eui_pair_storage:replace_route(RouteID, EUIs),
lager:info(
[{previous, PreviousCnt}, {current, length(EUIs)}],
"route refresh euis"
),
EUIs0 = [{hpr_eui_pair:app_eui(EUI), hpr_eui_pair:dev_eui(EUI)} || EUI <- EUIs],
{ok, {Previous, EUIs0}};
{ok, {PreviousEUIS, EUIs0}};
Err ->
Err
end;
Expand All @@ -549,49 +554,44 @@ refresh_euis(RouteID) ->
Err
end.

-spec refresh_devaddrs(hpr_route:id()) ->
-spec refresh_skfs(hpr_route:id()) ->
{ok, {
Old :: list(hpr_devaddr_range:devaddr_range()),
Current :: list(hpr_devaddr_range:devaddr_range())
Old :: list({{SessionKey :: binary(), Devaddr :: binary()}, MaxCopies :: non_neg_integer()}),
Current :: list(hpr_skf:skf())
}}
| {error, any()}.
refresh_devaddrs(RouteID) ->
DevaddrReq = #iot_config_route_get_devaddr_ranges_req_v1_pb{
refresh_skfs(RouteID) ->
SKFReq = #iot_config_route_skf_list_req_v1_pb{
route_id = RouteID,
timestamp = erlang:system_time(millisecond),
signer = hpr_utils:pubkey_bin()
},
SigFun = hpr_utils:sig_fun(),
EncodedReq = iot_config_pb:encode_msg(DevaddrReq),
Signed = DevaddrReq#iot_config_route_get_devaddr_ranges_req_v1_pb{
signature = SigFun(EncodedReq)
},
EncodedReq = iot_config_pb:encode_msg(SKFReq),
Signed = SKFReq#iot_config_route_skf_list_req_v1_pb{signature = SigFun(EncodedReq)},

case
helium_iot_config_route_client:get_devaddr_ranges(
helium_iot_config_route_client:list_skfs(
Signed,
#{channel => ?IOT_CONFIG_CHANNEL}
)
of
{ok, Stream} ->
case recv_from_stream(Stream) of
Devaddrs when erlang:is_list(Devaddrs) ->
Previous = hpr_devaddr_range_storage:lookup_for_route(RouteID),
PreviousCnt = hpr_devaddr_range_storage:replace_route(RouteID, Devaddrs),
SKFs when erlang:is_list(SKFs) ->
PreviousSKFs = hpr_skf_storage:lookup_route(RouteID),
% An error should not happen here as we only take routes that are in ETS already
{ok, PreviousCnt} = hpr_skf_storage:replace_route(RouteID, SKFs),
lager:info(
[{previous, PreviousCnt}, {current, length(Devaddrs)}],
"route refresh devaddrs"
"route refresh skfs ~p",
[{{previous, PreviousCnt}, {current, length(SKFs)}}]
),
Devaddrs1 = [
{hpr_devaddr_range:start_addr(Range), hpr_devaddr_range:end_addr(Range)}
|| Range <- Devaddrs
],
{ok, {Previous, Devaddrs1}};
{ok, {PreviousSKFs, SKFs}};
Err ->
Err
end;
{error, _E} = Err ->
lager:error([{route_id, RouteID}, Err], "failed to refresh route devaddrs"),
{error, _} = Err ->
lager:error([{route_id, RouteID}, Err], "failed to refresh route skfs"),
Err
end.

Expand Down
37 changes: 36 additions & 1 deletion src/grpc/packet_router/hpr_packet_router_service.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@

-behaviour(helium_packet_router_packet_bhvr).

-export([ip_key/0]).

-export([
init/2,
route/2,
handle_info/2
]).

-export([
send_packet_down/2,
locate/1,
register/1
]).

-define(REG_KEY(Gateway), {?MODULE, Gateway}).
-define(IP_KEY, {?MODULE, ip}).
-define(SESSION_TIMER, timer:minutes(35)).
-define(SESSION_KILL, session_kill).

Expand All @@ -26,6 +28,9 @@
last_phash = <<>> :: binary()
}).

-spec ip_key() -> ?IP_KEY.
ip_key() -> ?IP_KEY.

-spec init(atom(), grpcbox_stream:t()) -> grpcbox_stream:t().
init(_Rpc, StreamState) ->
HandlerState = #handler_state{started = erlang:system_time(millisecond)},
Expand Down Expand Up @@ -159,6 +164,7 @@ handle_packet(PacketUp, Timestamp, StreamState) ->
| {stop, grpcbox_stream:t()}.
handle_register(Reg, StreamState0) ->
PubKeyBin = hpr_register:gateway(Reg),
ok = record_ip(StreamState0, PubKeyBin),
lager:md([{stream_gateway, hpr_utils:gateway_name(PubKeyBin)}]),
case hpr_register:verify(Reg) of
false ->
Expand Down Expand Up @@ -239,6 +245,33 @@ schedule_session_kill() ->
erlang:send_after(?SESSION_TIMER, self(), ?SESSION_KILL),
ok.

-spec record_ip(StreamState :: grpcbox_stream:t(), PubKeyBin :: libp2p_crypto:pubkey_bin()) -> ok.
record_ip(StreamState, PubKeyBin) ->
case get_ip_port(StreamState) of
{ok, IP} ->
true = gproc:add_local_property(?IP_KEY, {IP, PubKeyBin}),
lager:debug("IP recorded ~p for ~p", [IP, PubKeyBin]);
{error, _R} ->
lager:warning("failed to get IP for ~p : ~p", [PubKeyBin, _R])
end.

-spec get_ip_port(StreamState :: grpcbox_stream:t()) -> {ok, string()} | {error, any()}.
get_ip_port(StreamState) ->
try
{_, Socket} = element(4, StreamState),
case inet:peername(Socket) of
{ok, {IP, Port}} ->
IPString = inet_parse:ntoa(IP),
{ok, lists:concat([IPString, ":", integer_to_list(Port)])};
{error, Reason} ->
{error, Reason}
end
catch
C:E:S ->
lager:debug("ERROR ~p ~p ~p", [C, E, S]),
{error, E}
end.

%% ------------------------------------------------------------------
%% EUnit tests
%% ------------------------------------------------------------------
Expand Down Expand Up @@ -300,6 +333,7 @@ route_register_test() ->
meck:new(hpr_gateway_location, [passthrough]),
meck:expect(hpr_gateway_location, get, fun(_) -> ok end),
application:ensure_all_started(gproc),
application:ensure_all_started(lager),

Self = self(),
#{secret := PrivKey, public := PubKey} = libp2p_crypto:generate_keys(ed25519),
Expand Down Expand Up @@ -328,6 +362,7 @@ route_register_test() ->
?assertEqual(Pid, gproc:lookup_local_name(?REG_KEY(Gateway))),

application:stop(gproc),
application:stop(lager),
meck:unload(hpr_metrics),
meck:unload(hpr_gateway_location),
ok.
Expand Down

0 comments on commit 5222fc7

Please sign in to comment.