diff --git a/config/ct.config b/config/ct.config index 2b96267f..756bff94 100644 --- a/config/ct.config +++ b/config/ct.config @@ -1,6 +1,6 @@ [ {hpr, [ - {key, "data/hpr.key"}, + {data_dir, "data"}, {packet_reporter, #{ aws_bucket => <<"test-bucket">>, aws_bucket_region => <<"local">>, @@ -64,7 +64,8 @@ grpc_opts => #{ service_protos => [iot_config_pb], services => #{ - 'helium.iot_config.route' => hpr_test_iot_config_service_route + 'helium.iot_config.gateway' => hpr_test_ics_gateway_service, + 'helium.iot_config.route' => hpr_test_ics_route_service } }, transport_opts => #{ssl => false}, diff --git a/config/sys.config b/config/sys.config index 00dddf31..07306632 100644 --- a/config/sys.config +++ b/config/sys.config @@ -1,6 +1,6 @@ [ {hpr, [ - {key, "/var/data/hpr.key"}, + {data_dir, "/var/data"}, {no_routes, [{"localhost", 8080}, {"localhost", 8080}]}, {packet_reporter, #{ aws_bucket => <<"test-bucket">>, diff --git a/config/sys.config.src b/config/sys.config.src index d0c17e5c..3e01fa60 100644 --- a/config/sys.config.src +++ b/config/sys.config.src @@ -1,6 +1,6 @@ [ {hpr, [ - {key, "/var/data/hpr.key"}, + {data_dir, "/var/data"}, {http_roaming_sender_nsid, <<"${HPR_ROAMING_SENDER_NSID}">>}, {no_routes, [{"${HPR_PP_HOST_1}", 8080}, {"${HPR_PP_HOST_2}", 8080}]}, {packet_reporter, #{ diff --git a/include/hpr.hrl b/include/hpr.hrl index f4780a2a..7b7f146d 100644 --- a/include/hpr.hrl +++ b/include/hpr.hrl @@ -1,5 +1,7 @@ -define(APP, hpr). +-define(DATA_DIR, "/var/data"). + -define(IOT_CONFIG_CHANNEL, iot_config_channel). -define(DOWNLINK_CHANNEL, downlink_channel). -define(MULTI_BUY_CHANNEL, multi_buy_channel). diff --git a/include/hpr_metrics.hrl b/include/hpr_metrics.hrl index cdc5353d..fdb18c96 100644 --- a/include/hpr_metrics.hrl +++ b/include/hpr_metrics.hrl @@ -19,6 +19,9 @@ -define(METRICS_VM_ETS_MEMORY, "hpr_vm_ets_memory"). -define(METRICS_VM_PROC_Q, "hpr_vm_process_queue"). -define(METRICS_ICS_UPDATES_COUNTER, "hpr_iot_config_service_updates_counter"). +-define(METRICS_ICS_GATEWAY_LOCATION_HISTOGRAM, + "hpr_iot_config_service_gateway_location_histogram" +). -define(METRICS, [ {?METRICS_GRPC_CONNECTION_GAUGE, prometheus_gauge, [], "Number of active GRPC Connections"}, @@ -36,5 +39,7 @@ {?METRICS_FIND_ROUTES_HISTOGRAM, prometheus_histogram, [], "Find Routes"}, {?METRICS_VM_ETS_MEMORY, prometheus_gauge, [name], "HPR ets memory"}, {?METRICS_VM_PROC_Q, prometheus_gauge, [name], "Process queue"}, - {?METRICS_ICS_UPDATES_COUNTER, prometheus_counter, [type, action], "ICS updates counter"} + {?METRICS_ICS_UPDATES_COUNTER, prometheus_counter, [type, action], "ICS updates counter"}, + {?METRICS_ICS_GATEWAY_LOCATION_HISTOGRAM, prometheus_histogram, [status], + "ICS gateway location req"} ]). diff --git a/rebar.config b/rebar.config index ba6dded1..ca1b1fef 100644 --- a/rebar.config +++ b/rebar.config @@ -23,7 +23,8 @@ {iso8601, ".*", {git, "https://github.com/erlsci/iso8601.git", {tag, "1.3.1"}}}, {xxhash, {git, "https://github.com/pierreis/erlang-xxhash", {branch, "master"}}}, {erl_angry_purple_tiger, - {git, "https://github.com/helium/erl_angry_purple_tiger.git", {branch, "master"}}} + {git, "https://github.com/helium/erl_angry_purple_tiger.git", {branch, "master"}}}, + {h3, ".*", {git, "https://github.com/helium/erlang-h3.git", {branch, "master"}}} ]}. {erl_opts, [ diff --git a/rebar.lock b/rebar.lock index f7fbc39e..581cd3f0 100644 --- a/rebar.lock +++ b/rebar.lock @@ -56,6 +56,10 @@ {git,"https://github.com/novalabsxyz/grpcbox.git", {ref,"6243151a7c54c714a018b3d0b92dbf057c033730"}}, 0}, + {<<"h3">>, + {git,"https://github.com/helium/erlang-h3.git", + {ref,"90e1b6ebf93f88702ce8d24d9142833a8401e3ab"}}, + 0}, {<<"hackney">>,{pkg,<<"hackney">>,<<"1.17.0">>},0}, {<<"helium_proto">>, {git,"https://github.com/helium/proto.git", diff --git a/src/grpc/iot_config/hpr_gateway_location.erl b/src/grpc/iot_config/hpr_gateway_location.erl new file mode 100644 index 00000000..9e38436a --- /dev/null +++ b/src/grpc/iot_config/hpr_gateway_location.erl @@ -0,0 +1,159 @@ +%%%------------------------------------------------------------------- +%% @doc +%% == HPR Gateway Location == +%% @end +%%%------------------------------------------------------------------- +-module(hpr_gateway_location). + +-include("hpr.hrl"). +-include("../autogen/iot_config_pb.hrl"). + +-export([ + init/0, + get/1, + expire_locations/0 +]). + +-define(ETS, hpr_gateway_location_ets). +-define(DETS, hpr_gateway_location_dets). +-define(DEFAULT_DETS_FILE, "hpr_gateway_location_dets"). +-define(CLEANUP_INTERVAL, timer:hours(1)). +-define(CACHE_TIME, timer:hours(24)). +-define(NOT_FOUND, not_found). + +-record(location, { + gateway :: libp2p_crypto:pubkey_bin(), + timestamp :: non_neg_integer(), + h3_index :: h3:index() | undefined, + lat :: float() | undefined, + long :: float() | undefined +}). + +-type loc() :: {h3:index(), float(), float()} | undefined. + +-export_type([loc/0]). + +-spec init() -> ok. +init() -> + ?ETS = ets:new(?ETS, [ + public, + named_table, + set, + {read_concurrency, true}, + {keypos, #location.gateway} + ]), + ok = open_dets(), + case dets:to_ets(?DETS, ?ETS) of + {error, _Reason} -> + lager:error("failed to hydrate ets ~p", [_Reason]); + _ -> + lager:info("ets hydrated") + end, + {ok, _} = timer:apply_interval(?CLEANUP_INTERVAL, ?MODULE, expire_locations, []), + ok. + +-spec get(libp2p_crypto:pubkey_bin()) -> {ok, h3:index(), float(), float()} | {error, any()}. +get(PubKeyBin) -> + Yesterday = erlang:system_time(millisecond) - ?CACHE_TIME, + case ets:lookup(?ETS, PubKeyBin) of + [] -> + update_location(PubKeyBin); + [#location{timestamp = T}] when T < Yesterday -> + update_location(PubKeyBin); + [#location{h3_index = undefined}] -> + {error, ?NOT_FOUND}; + [#location{h3_index = H3Index, lat = Lat, long = Long}] -> + {ok, H3Index, Lat, Long} + end. + +-spec expire_locations() -> ok. +expire_locations() -> + Time = erlang:system_time(millisecond) - ?CACHE_TIME, + DETSDeleted = dets:select_delete(?DETS, [ + {{'_', '_', '$3', '_', '_', '_'}, [{'<', '$3', Time}], [true]} + ]), + lager:info("expiring ~w dets keys", [DETSDeleted]). + +%% ------------------------------------------------------------------ +%% Internal Function Definitions +%% ------------------------------------------------------------------ +-spec update_location(libp2p_crypto:pubkey_bin()) -> + {ok, h3:index(), float(), float()} | {error, any()}. +update_location(PubKeyBin) -> + Start = erlang:system_time(millisecond), + NewLoc = #location{ + gateway = PubKeyBin, + timestamp = erlang:system_time(millisecond), + h3_index = undefined, + lat = undefined, + long = undefined + }, + case get_location_from_ics(PubKeyBin) of + {error, Reason} -> + hpr_metrics:observe_gateway_location(Start, error), + GatewauName = hpr_utils:gateway_name(PubKeyBin), + lager:warning( + "fail to get_location_from_ics ~p for ~s", + [Reason, GatewauName] + ), + ok = insert(NewLoc), + {error, not_found}; + {ok, H3IndexString} -> + hpr_metrics:observe_gateway_location(Start, ok), + H3Index = h3:from_string(H3IndexString), + {Lat, Long} = h3:to_geo(H3Index), + ok = insert(NewLoc#location{ + h3_index = H3Index, + lat = Lat, + long = Long + }), + {ok, H3Index, Lat, Long} + end. + +-spec insert(Loc :: #location{}) -> ok. +insert(Loc) -> + true = ets:insert(?ETS, Loc), + _ = erlang:spawn(dets, insert, [?DETS, Loc]), + ok. + +%% We have to do this because the call to `helium_iot_config_gateway_client:location` can return +%% `{error, {Status, Reason}, _}` but is not in the spec... +-dialyzer({nowarn_function, get_location_from_ics/1}). +-spec get_location_from_ics(PubKeyBin :: libp2p_crypto:pubkey_bin()) -> + {ok, string()} | {error, any()}. +get_location_from_ics(PubKeyBin) -> + SigFun = hpr_utils:sig_fun(), + Req = #iot_config_gateway_location_req_v1_pb{ + gateway = PubKeyBin, + signer = hpr_utils:pubkey_bin() + }, + EncodedReq = iot_config_pb:encode_msg(Req, iot_config_gateway_location_req_v1_pb), + SignedReq = Req#iot_config_gateway_location_req_v1_pb{signature = SigFun(EncodedReq)}, + case + helium_iot_config_gateway_client:location(SignedReq, #{ + channel => ?IOT_CONFIG_CHANNEL + }) + of + {error, {Status, Reason}, _} when is_binary(Status) -> + {error, {grpcbox_utils:status_to_string(Status), Reason}}; + {grpc_error, Reason} -> + {error, Reason}; + {error, Reason} -> + {error, Reason}; + {ok, #iot_config_gateway_location_res_v1_pb{location = Location}, _Meta} -> + {ok, Location} + end. + +-spec open_dets() -> ok. +open_dets() -> + DataDir = hpr_utils:base_data_dir(), + DETSFile = filename:join(DataDir, ?DEFAULT_DETS_FILE), + ok = filelib:ensure_dir(DETSFile), + case dets:open_file(?DETS, [{file, DETSFile}, {keypos, #location.gateway}]) of + {error, _Reason} -> + Deleted = file:delete(DETSFile), + lager:error("failed to open dets ~p deleting file ~p", [_Reason, Deleted]), + open_dets(); + {ok, _} -> + ok + end. diff --git a/src/grpc/packet_router/hpr_packet_router_service.erl b/src/grpc/packet_router/hpr_packet_router_service.erl index bea859fd..9f6751cd 100644 --- a/src/grpc/packet_router/hpr_packet_router_service.erl +++ b/src/grpc/packet_router/hpr_packet_router_service.erl @@ -105,7 +105,7 @@ register(PubKeyBin) -> {error, not_found} -> true = gproc:add_local_name(?REG_KEY(PubKeyBin)), lager:debug("register"), - hpr_protocol_router:register(PubKeyBin, Self), + ok = hpr_protocol_router:register(PubKeyBin, Self), ok; {ok, Self} -> lager:info("nothing to do, already registered"), @@ -148,6 +148,8 @@ handle_register(Reg, StreamState0) -> {stop, StreamState0}; true -> ok = ?MODULE:register(PubKeyBin), + %% Atttempt to get location from ICS to pre-cache data + _ = hpr_gateway_location:get(PubKeyBin), HandlerState = grpcbox_stream:stream_handler_state(StreamState0), StreamState1 = grpcbox_stream:stream_handler_state( StreamState0, HandlerState#handler_state{pubkey_bin = PubKeyBin} @@ -267,6 +269,8 @@ route_packet_test() -> route_register_test() -> meck:new(hpr_metrics, [passthrough]), meck:expect(hpr_metrics, observe_grpc_connection, fun(_, _) -> ok end), + meck:new(hpr_gateway_location, [passthrough]), + meck:expect(hpr_gateway_location, get, fun(_) -> ok end), application:ensure_all_started(gproc), Self = self(), @@ -297,6 +301,7 @@ route_register_test() -> application:stop(gproc), meck:unload(hpr_metrics), + meck:unload(hpr_gateway_location), ok. handle_info_test() -> diff --git a/src/hpr_routing.erl b/src/hpr_routing.erl index ccecf887..11003192 100644 --- a/src/hpr_routing.erl +++ b/src/hpr_routing.erl @@ -257,7 +257,7 @@ maybe_deliver_no_routes(PacketUp) -> lists:foreach( fun({Host, Port}) -> Route = hpr_route:new_packet_router(Host, Port), - hpr_protocol_router:send(PacketUp, Route) + hpr_protocol_router:send(PacketUp, Route, undefined) end, HostsAndPorts ) @@ -336,7 +336,16 @@ maybe_deliver_packet_to_route(PacketUp, RouteETS, SKFMaxCopies) -> Error; {ok, IsFree} -> RouteID = hpr_route:id(Route), - case deliver_packet(Protocol, PacketUp, Route) of + PubKeyBin = hpr_packet_up:gateway(PacketUp), + GatewayLocation = + case hpr_gateway_location:get(PubKeyBin) of + {error, _Reason} -> + lager:debug("failed to get gateway location ~p", [_Reason]), + undefined; + {ok, H3Index, Lat, Long} -> + {H3Index, Lat, Long} + end, + case deliver_packet(Protocol, PacketUp, Route, GatewayLocation) of {error, Reason} = Error -> lager:warning(RouteMD, "error ~p", [Reason]), ok = hpr_route_ets:inc_backoff(RouteID), @@ -354,15 +363,16 @@ maybe_deliver_packet_to_route(PacketUp, RouteETS, SKFMaxCopies) -> -spec deliver_packet( hpr_route:protocol(), PacketUp :: hpr_packet_up:packet(), - Route :: hpr_route:route() + Route :: hpr_route:route(), + GatewayLocation :: hpr_gateway_location:loc() ) -> hpr_routing_response(). -deliver_packet({packet_router, _}, PacketUp, Route) -> - hpr_protocol_router:send(PacketUp, Route); -deliver_packet({gwmp, _}, PacketUp, Route) -> - hpr_protocol_gwmp:send(PacketUp, Route); -deliver_packet({http_roaming, _}, PacketUp, Route) -> - hpr_protocol_http_roaming:send(PacketUp, Route); -deliver_packet(_OtherProtocol, _PacketUp, _Route) -> +deliver_packet({packet_router, _}, PacketUp, Route, GatewayLocation) -> + hpr_protocol_router:send(PacketUp, Route, GatewayLocation); +deliver_packet({gwmp, _}, PacketUp, Route, GatewayLocation) -> + hpr_protocol_gwmp:send(PacketUp, Route, GatewayLocation); +deliver_packet({http_roaming, _}, PacketUp, Route, GatewayLocation) -> + hpr_protocol_http_roaming:send(PacketUp, Route, GatewayLocation); +deliver_packet(_OtherProtocol, _PacketUp, _Route, _GatewayLocation) -> lager:warning([{protocol, _OtherProtocol}], "protocol unimplemented"). -spec maybe_report_packet( @@ -466,6 +476,8 @@ foreach_setup() -> true = erlang:register(hpr_sup, self()), ok = hpr_route_ets:init(), ok = hpr_multi_buy:init(), + meck:new(hpr_gateway_location, [passthrough]), + meck:expect(hpr_gateway_location, get, fun(_) -> {error, not_implemented} end), ok. foreach_cleanup(ok) -> @@ -481,6 +493,7 @@ foreach_cleanup(ok) -> ), true = ets:delete(hpr_routes_ets), true = erlang:unregister(hpr_sup), + meck:unload(hpr_gateway_location), ok. find_routes_for_uplink_no_skf() -> @@ -835,7 +848,7 @@ find_routes_for_uplink_ignore_empty_skf() -> maybe_deliver_packet_to_route_locked() -> meck:new(hpr_protocol_router, [passthrough]), - meck:expect(hpr_protocol_router, send, fun(_, _) -> ok end), + meck:expect(hpr_protocol_router, send, fun(_, _, _) -> ok end), RouteID1 = "route_id_1", Route1 = hpr_route:test_new(#{ @@ -860,13 +873,13 @@ maybe_deliver_packet_to_route_locked() -> {error, locked}, maybe_deliver_packet_to_route(PacketUp, RouteETS1, 1) ), - ?assertEqual(0, meck:num_calls(hpr_protocol_router, send, 2)), + ?assertEqual(0, meck:num_calls(hpr_protocol_router, send, 3)), meck:unload(hpr_protocol_router), ok. maybe_deliver_packet_to_route_inactive() -> meck:new(hpr_protocol_router, [passthrough]), - meck:expect(hpr_protocol_router, send, fun(_, _) -> ok end), + meck:expect(hpr_protocol_router, send, fun(_, _, _) -> ok end), RouteID1 = "route_id_1", Route1 = hpr_route:test_new(#{ @@ -891,13 +904,13 @@ maybe_deliver_packet_to_route_inactive() -> {error, inactive}, maybe_deliver_packet_to_route(PacketUp, RouteETS1, 1) ), - ?assertEqual(0, meck:num_calls(hpr_protocol_router, send, 2)), + ?assertEqual(0, meck:num_calls(hpr_protocol_router, send, 3)), meck:unload(hpr_protocol_router), ok. maybe_deliver_packet_to_route_in_cooldown() -> meck:new(hpr_protocol_router, [passthrough]), - meck:expect(hpr_protocol_router, send, fun(_, _) -> ok end), + meck:expect(hpr_protocol_router, send, fun(_, _, _) -> ok end), RouteID1 = "route_id_1", Route1 = hpr_route:test_new(#{ @@ -923,13 +936,13 @@ maybe_deliver_packet_to_route_in_cooldown() -> {error, in_cooldown}, maybe_deliver_packet_to_route(PacketUp, RouteETS1, 1) ), - ?assertEqual(0, meck:num_calls(hpr_protocol_router, send, 2)), + ?assertEqual(0, meck:num_calls(hpr_protocol_router, send, 3)), meck:unload(hpr_protocol_router), ok. maybe_deliver_packet_to_route_multi_buy() -> meck:new(hpr_protocol_router, [passthrough]), - meck:expect(hpr_protocol_router, send, fun(_, _) -> ok end), + meck:expect(hpr_protocol_router, send, fun(_, _, _) -> ok end), meck:new(hpr_metrics, [passthrough]), meck:expect(hpr_metrics, observe_multi_buy, fun(_, _) -> ok end), @@ -972,7 +985,7 @@ maybe_deliver_packet_to_route_multi_buy() -> {error, multi_buy}, maybe_deliver_packet_to_route(PacketUp, RouteETS1, 0) ), - ?assertEqual(2, meck:num_calls(hpr_protocol_router, send, 2)), + ?assertEqual(2, meck:num_calls(hpr_protocol_router, send, 3)), meck:unload(hpr_protocol_router), meck:unload(hpr_metrics), ok. diff --git a/src/hpr_sup.erl b/src/hpr_sup.erl index 7b299287..1cf59991 100644 --- a/src/hpr_sup.erl +++ b/src/hpr_sup.erl @@ -46,8 +46,9 @@ start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []). init([]) -> - KeyFileName = application:get_env(?APP, key, "/var/data/hpr.key"), + DataDir = hpr_utils:base_data_dir(), + KeyFileName = filename:join(DataDir, "hpr.key"), lager:info("KeyFileName ~s", [KeyFileName]), ok = filelib:ensure_dir(KeyFileName), @@ -58,6 +59,7 @@ init([]) -> ok = hpr_multi_buy:init(), ok = hpr_protocol_router:init(), ok = hpr_route_ets:init(), + ok = hpr_gateway_location:init(), PacketReporterConfig = application:get_env(?APP, packet_reporter, #{}), ConfigServiceConfig = application:get_env(?APP, iot_config_service, #{}), diff --git a/src/hpr_utils.erl b/src/hpr_utils.erl index dcd60d07..74b0c76a 100644 --- a/src/hpr_utils.erl +++ b/src/hpr_utils.erl @@ -32,59 +32,12 @@ pubkey_bin/0, sig_fun/0, sender_nsid/0, - b58/0 + b58/0, + base_data_dir/0 ]). -type trace() :: packet_gateway | stream_gateway | devaddr | app_eui | dev_eui. --spec load_key(KeyFileName :: string()) -> ok. -load_key(KeyFileName) -> - {PubKey, SigFun} = - Key = - case libp2p_crypto:load_keys(KeyFileName) of - {ok, #{secret := PrivKey, public := PubKey0}} -> - {PubKey0, libp2p_crypto:mk_sig_fun(PrivKey)}; - {error, enoent} -> - KeyMap = - #{secret := PrivKey, public := PubKey0} = libp2p_crypto:generate_keys( - ed25519 - ), - ok = libp2p_crypto:save_keys(KeyMap, KeyFileName), - {PubKey0, libp2p_crypto:mk_sig_fun(PrivKey)} - end, - - PubKeyBin = libp2p_crypto:pubkey_to_bin(PubKey), - B58 = libp2p_crypto:bin_to_b58(PubKeyBin), - ok = persistent_term:put(?HPR_PUBKEY_BIN, PubKeyBin), - - %% Keep as binary for http protocol jsx encoding/decoding - SenderNSID = - case application:get_env(hpr, http_roaming_sender_nsid, erlang:list_to_binary(B58)) of - <<"">> -> erlang:list_to_binary(B58); - Val -> Val - end, - ok = persistent_term:put(?HPR_SENDER_NSID, SenderNSID), - - ok = persistent_term:put(?HPR_B58, B58), - ok = persistent_term:put(?HPR_SIG_FUN, SigFun), - ok = persistent_term:put(?HPR_KEY, Key). - --spec pubkey_bin() -> libp2p_crypto:pubkey_bin(). -pubkey_bin() -> - persistent_term:get(?HPR_PUBKEY_BIN, undefined). - --spec sig_fun() -> libp2p_crypto:sig_fun(). -sig_fun() -> - persistent_term:get(?HPR_SIG_FUN, undefined). - --spec sender_nsid() -> string(). -sender_nsid() -> - persistent_term:get(?HPR_SENDER_NSID, undefined). - --spec b58() -> binary(). -b58() -> - persistent_term:get(?HPR_B58, undefined). - -spec gateway_name(PubKeyBin :: libp2p_crypto:pubkey_bin() | string()) -> string(). gateway_name(PubKeyBin) when is_binary(PubKeyBin) -> B58 = libp2p_crypto:bin_to_b58(PubKeyBin), @@ -229,6 +182,61 @@ get_env_int(Key, Default) -> I -> I end. +-spec load_key(KeyFileName :: string()) -> ok. +load_key(KeyFileName) -> + {PubKey, SigFun} = + Key = + case libp2p_crypto:load_keys(KeyFileName) of + {ok, #{secret := PrivKey, public := PubKey0}} -> + {PubKey0, libp2p_crypto:mk_sig_fun(PrivKey)}; + {error, enoent} -> + KeyMap = + #{secret := PrivKey, public := PubKey0} = libp2p_crypto:generate_keys( + ed25519 + ), + ok = libp2p_crypto:save_keys(KeyMap, KeyFileName), + {PubKey0, libp2p_crypto:mk_sig_fun(PrivKey)} + end, + + PubKeyBin = libp2p_crypto:pubkey_to_bin(PubKey), + B58 = libp2p_crypto:bin_to_b58(PubKeyBin), + ok = persistent_term:put(?HPR_PUBKEY_BIN, PubKeyBin), + + %% Keep as binary for http protocol jsx encoding/decoding + SenderNSID = + case application:get_env(hpr, http_roaming_sender_nsid, erlang:list_to_binary(B58)) of + <<"">> -> erlang:list_to_binary(B58); + Val -> Val + end, + ok = persistent_term:put(?HPR_SENDER_NSID, SenderNSID), + + ok = persistent_term:put(?HPR_B58, B58), + ok = persistent_term:put(?HPR_SIG_FUN, SigFun), + ok = persistent_term:put(?HPR_KEY, Key). + +-spec pubkey_bin() -> libp2p_crypto:pubkey_bin(). +pubkey_bin() -> + persistent_term:get(?HPR_PUBKEY_BIN, undefined). + +-spec sig_fun() -> libp2p_crypto:sig_fun(). +sig_fun() -> + persistent_term:get(?HPR_SIG_FUN, undefined). + +-spec sender_nsid() -> string(). +sender_nsid() -> + persistent_term:get(?HPR_SENDER_NSID, undefined). + +-spec b58() -> binary(). +b58() -> + persistent_term:get(?HPR_B58, undefined). + +-spec base_data_dir() -> string(). +base_data_dir() -> + DataDir = application:get_env(?APP, data_dir, ?DATA_DIR), + lager:info("base data dir ~s", [DataDir]), + ok = filelib:ensure_dir(DataDir), + DataDir. + %% ------------------------------------------------------------------ %% Internal Function Definitions %% ------------------------------------------------------------------ diff --git a/src/metrics/hpr_metrics.erl b/src/metrics/hpr_metrics.erl index 96c0b70f..615ede2b 100644 --- a/src/metrics/hpr_metrics.erl +++ b/src/metrics/hpr_metrics.erl @@ -16,7 +16,8 @@ observe_multi_buy/2, observe_find_routes/1, observe_grpc_connection/2, - ics_update/2 + ics_update/2, + observe_gateway_location/2 ]). %% ------------------------------------------------------------------ @@ -126,6 +127,17 @@ ics_update(Type, Action) -> _ = prometheus_counter:inc(?METRICS_ICS_UPDATES_COUNTER, [Type, Action]), ok. +-spec observe_gateway_location( + Start :: non_neg_integer(), + Status :: ok | error +) -> ok. +observe_gateway_location(Start, Status) -> + prometheus_histogram:observe( + ?METRICS_ICS_GATEWAY_LOCATION_HISTOGRAM, + [Status], + erlang:system_time(millisecond) - Start + ). + %% ------------------------------------------------------------------ %% gen_server Function Definitions %% ------------------------------------------------------------------ diff --git a/src/protocols/gwmp/hpr_gwmp_worker.erl b/src/protocols/gwmp/hpr_gwmp_worker.erl index e2625daa..73edcaa8 100644 --- a/src/protocols/gwmp/hpr_gwmp_worker.erl +++ b/src/protocols/gwmp/hpr_gwmp_worker.erl @@ -19,7 +19,7 @@ %% ------------------------------------------------------------------ -export([ start_link/1, - push_data/3 + push_data/4 ]). %% ------------------------------------------------------------------ @@ -72,10 +72,14 @@ start_link(Args) -> -spec push_data( WorkerPid :: pid(), PacketUp :: hpr_packet_up:packet(), - SocketDest :: socket_dest() + SocketDest :: socket_dest(), + GatewayLocation :: hpr_gateway_location:loc() ) -> ok | {error, any()}. -push_data(WorkerPid, PacketUp, SocketDest) -> - gen_server:cast(WorkerPid, {push_data, PacketUp, SocketDest, erlang:system_time(millisecond)}). +push_data(WorkerPid, PacketUp, SocketDest, GatewayLocation) -> + gen_server:cast( + WorkerPid, + {push_data, PacketUp, SocketDest, erlang:system_time(millisecond), GatewayLocation} + ). %% ------------------------------------------------------------------ %% gen_server Function Definitions @@ -111,7 +115,7 @@ handle_call(Msg, _From, State) -> {stop, {unimplemented_call, Msg}, State}. handle_cast( - {push_data, PacketUp, SocketDest, Timestamp}, + {push_data, PacketUp, SocketDest, Timestamp, GatewayLocation}, #state{ push_data = PushData, socket = Socket @@ -119,7 +123,7 @@ handle_cast( State0 ) -> ok = hpr_packet_up:md(PacketUp), - {Token, Payload} = packet_up_to_push_data(PacketUp, Timestamp), + {Token, Payload} = packet_up_to_push_data(PacketUp, Timestamp, GatewayLocation), State = maybe_send_pull_data(SocketDest, State0), {_Reply, TimerRef} = send_push_data(Token, Payload, Socket, SocketDest), NewPushData = maps:put(Token, {Payload, TimerRef}, PushData), @@ -222,16 +226,34 @@ terminate(_Reason, _State = #state{socket = Socket}) -> -spec packet_up_to_push_data( PacketUp :: hpr_packet_up:packet(), - PacketTime :: non_neg_integer() + PacketTime :: non_neg_integer(), + GatewayLocation :: hpr_gateway_location:loc() ) -> {Token :: binary(), Payload :: binary()}. -packet_up_to_push_data(Up, GatewayTime) -> +packet_up_to_push_data(Up, GatewayTime, GatewayLocation) -> Token = semtech_udp:token(), PubKeyBin = hpr_packet_up:gateway(Up), MAC = hpr_utils:pubkeybin_to_mac(PubKeyBin), B58 = erlang:list_to_binary(libp2p_crypto:bin_to_b58(PubKeyBin)), Name = erlang:list_to_binary(hpr_utils:gateway_name(PubKeyBin)), + BaseMeta = #{ + gateway_id => B58, + gateway_name => Name, + regi => hpr_packet_up:region(Up) + }, + Meta = + case GatewayLocation of + undefined -> + BaseMeta; + {H3Index, Lat, Long} -> + BaseMeta#{ + gateway_h3index => H3Index, + gateway_lat => Lat, + gateway_long => Long + } + end, + Data = semtech_udp:push_data( Token, MAC, @@ -251,11 +273,7 @@ packet_up_to_push_data(Up, GatewayTime) -> lsnr => hpr_packet_up:snr(Up), size => erlang:byte_size(hpr_packet_up:payload(Up)), data => base64:encode(hpr_packet_up:payload(Up)), - meta => #{ - gateway_id => B58, - gateway_name => Name, - regi => hpr_packet_up:region(Up) - } + meta => Meta } ), {Token, Data}. diff --git a/src/protocols/hpr_protocol_gwmp.erl b/src/protocols/hpr_protocol_gwmp.erl index 7f8fec0e..cb01d3b4 100644 --- a/src/protocols/hpr_protocol_gwmp.erl +++ b/src/protocols/hpr_protocol_gwmp.erl @@ -1,12 +1,13 @@ -module(hpr_protocol_gwmp). --export([send/2]). +-export([send/3]). -spec send( Packet :: hpr_packet_up:packet(), - Route :: hpr_route:route() + Route :: hpr_route:route(), + GatewayLocation :: hpr_gateway_location:loc() ) -> ok | {error, any()}. -send(PacketUp, Route) -> +send(PacketUp, Route, GatewayLocation) -> Gateway = hpr_packet_up:gateway(PacketUp), case hpr_gwmp_sup:maybe_start_worker(Gateway, #{}) of {error, Reason} -> @@ -14,7 +15,7 @@ send(PacketUp, Route) -> {ok, Pid} -> Region = hpr_packet_up:region(PacketUp), Dest = hpr_route:gwmp_region_lns(Region, Route), - try hpr_gwmp_worker:push_data(Pid, PacketUp, Dest) of + try hpr_gwmp_worker:push_data(Pid, PacketUp, Dest, GatewayLocation) of _ -> ok catch Type:Err:Stack -> diff --git a/src/protocols/hpr_protocol_http_roaming.erl b/src/protocols/hpr_protocol_http_roaming.erl index e9a8714d..498c623d 100644 --- a/src/protocols/hpr_protocol_http_roaming.erl +++ b/src/protocols/hpr_protocol_http_roaming.erl @@ -11,13 +11,15 @@ -include("hpr_http_roaming.hrl"). --export([send/2]). +-export([send/3]). -spec send( PacketUp :: hpr_packet_up:packet(), - Route :: hpr_route:route() + Route :: hpr_route:route(), + GatewayLocation :: hpr_gateway_location:loc() ) -> ok | {error, any()}. -send(PacketUp, Route) -> +send(PacketUp, Route, GatewayLocation) -> + RecvTime = erlang:system_time(millisecond), WorkerKey = worker_key_from(PacketUp, Route), PubKeyBin = hpr_packet_up:gateway(PacketUp), Protocol = protocol_from(Route), @@ -35,8 +37,7 @@ send(PacketUp, Route) -> ), {error, worker_not_started}; {ok, WorkerPid} -> - RecvTime = erlang:system_time(millisecond), - hpr_http_roaming_worker:handle_packet(WorkerPid, PacketUp, RecvTime), + hpr_http_roaming_worker:handle_packet(WorkerPid, PacketUp, RecvTime, GatewayLocation), ok end. diff --git a/src/protocols/hpr_protocol_router.erl b/src/protocols/hpr_protocol_router.erl index 6e083bc8..940030bf 100644 --- a/src/protocols/hpr_protocol_router.erl +++ b/src/protocols/hpr_protocol_router.erl @@ -8,7 +8,7 @@ -export([ start_link/1, init/0, - send/2, + send/3, get_stream/3, remove_stream/1, remove_stream/2, register/2 @@ -64,9 +64,10 @@ init() -> -spec send( PacketUp :: hpr_packet_up:packet(), - Route :: hpr_route:route() + Route :: hpr_route:route(), + GatewayLocation :: hpr_gateway_location:loc() ) -> ok | {error, any()}. -send(PacketUp, Route) -> +send(PacketUp, Route, _GatewayLocation) -> Gateway = hpr_packet_up:gateway(PacketUp), LNS = hpr_route:lns(Route), Server = hpr_route:server(Route), @@ -428,7 +429,7 @@ test_full() -> Pids = lists:map( fun(_X) -> erlang:spawn(fun() -> - R = ?MODULE:send(HprPacketUp, Route), + R = ?MODULE:send(HprPacketUp, Route, undefined), Self ! {send_result, R}, receive stop -> ok @@ -501,7 +502,7 @@ test_cannot_locate_stream() -> %% So we wrap the sender in a process so we it can be killed and start the cleanup. SenderPid = spawn(fun() -> ?MODULE:register(PubKeyBin, self()), - ?assertEqual(ok, ?MODULE:send(HprPacketUp, Route)), + ?assertEqual(ok, ?MODULE:send(HprPacketUp, Route, undefined)), receive stop -> Self ! stopped, diff --git a/src/protocols/http/hpr_http_roaming.erl b/src/protocols/http/hpr_http_roaming.erl index 1a586831..5cf34e2a 100644 --- a/src/protocols/http/hpr_http_roaming.erl +++ b/src/protocols/http/hpr_http_roaming.erl @@ -32,7 +32,7 @@ auth_headers/1 ]). --export([new_packet/2]). +-export([new_packet/3]). -define(NO_ROAMING_AGREEMENT, <<"NoRoamingAgreement">>). @@ -64,7 +64,8 @@ -record(packet, { packet_up :: hpr_packet_up:packet(), - received_time :: received_time() + received_time :: received_time(), + gateway_location :: hpr_gateway_location:loc() }). -type packet() :: #packet{}. @@ -81,12 +82,14 @@ -spec new_packet( PacketUp :: hpr_packet_up:packet(), - ReceivedTime :: received_time() + ReceivedTime :: received_time(), + GatewayLocation :: hpr_gateway_location:loc() ) -> #packet{}. -new_packet(PacketUp, ReceivedTime) -> +new_packet(PacketUp, ReceivedTime, GatewayLocation) -> #packet{ packet_up = PacketUp, - received_time = ReceivedTime + received_time = ReceivedTime, + gateway_location = GatewayLocation }. -spec make_uplink_payload( @@ -516,7 +519,7 @@ select_best(Copies) -> Best. -spec gw_info(packet()) -> map(). -gw_info(#packet{packet_up = PacketUp}) -> +gw_info(#packet{packet_up = PacketUp, gateway_location = GatewayLocation}) -> PubKeyBin = hpr_packet_up:gateway(PacketUp), Region = hpr_packet_up:region(PacketUp), @@ -530,7 +533,15 @@ gw_info(#packet{packet_up = PacketUp}) -> 'SNR' => SNR, 'DLAllowed' => true }, - GW. + case GatewayLocation of + undefined -> + GW; + {_h3Index, Lat, Long} -> + GW#{ + 'Lat' => Lat, + 'Lon' => Long + } + end. -spec encode_deveui(non_neg_integer()) -> binary(). encode_deveui(Num) -> diff --git a/src/protocols/http/hpr_http_roaming_worker.erl b/src/protocols/http/hpr_http_roaming_worker.erl index 76575468..9c9d8f9b 100644 --- a/src/protocols/http/hpr_http_roaming_worker.erl +++ b/src/protocols/http/hpr_http_roaming_worker.erl @@ -18,7 +18,7 @@ %% ------------------------------------------------------------------ -export([ start_link/1, - handle_packet/3 + handle_packet/4 ]). %% ------------------------------------------------------------------ @@ -64,10 +64,11 @@ start_link(Args) -> -spec handle_packet( WorkerPid :: pid(), PacketUp :: hpr_packet_up:packet(), - ReceivedTime :: hpr_http_roaming:received_timee() + ReceivedTime :: hpr_http_roaming:received_timee(), + GatewayLocation :: hpr_gateway_location:loc() ) -> ok | {error, any()}. -handle_packet(Pid, PacketUp, ReceivedTime) -> - gen_server:cast(Pid, {handle_packet, PacketUp, ReceivedTime}). +handle_packet(Pid, PacketUp, ReceivedTime, GatewayLocation) -> + gen_server:cast(Pid, {handle_packet, PacketUp, ReceivedTime, GatewayLocation}). %% ------------------------------------------------------------------ %% gen_server Function Definitions @@ -101,18 +102,18 @@ handle_call(_Msg, _From, State) -> {reply, ok, State}. handle_cast( - {handle_packet, PacketUp, ReceiveTime}, + {handle_packet, PacketUp, ReceiveTime, GatewayLocation}, #state{send_data_timer = 0, shutdown_timer_ref = ShutdownTimerRef0} = State ) -> ok = hpr_packet_up:md(PacketUp), {ok, StateWithPacket} = do_handle_packet( - PacketUp, ReceiveTime, State + PacketUp, ReceiveTime, GatewayLocation, State ), ok = send_data(StateWithPacket), {ok, ShutdownTimerRef1} = maybe_schedule_shutdown(ShutdownTimerRef0), {noreply, State#state{shutdown_timer_ref = ShutdownTimerRef1}}; handle_cast( - {handle_packet, PacketUp, ReceiveTime}, + {handle_packet, PacketUp, ReceiveTime, GatewayLocation}, #state{ should_shutdown = false, send_data_timer = Timeout, @@ -120,7 +121,7 @@ handle_cast( } = State0 ) -> ok = hpr_packet_up:md(PacketUp), - {ok, State1} = do_handle_packet(PacketUp, ReceiveTime, State0), + {ok, State1} = do_handle_packet(PacketUp, ReceiveTime, GatewayLocation, State0), {ok, TimerRef1} = maybe_schedule_send_data(Timeout, TimerRef0), {noreply, State1#state{send_data_timer_ref = TimerRef1}}; handle_cast( @@ -177,14 +178,15 @@ next_transaction_id() -> -spec do_handle_packet( PacketUp :: hpr_packet_up:packet(), ReceiveTime :: hpr_http_roaming:received_timee(), + GatewayLocation :: hpr_gateway_location:loc(), State :: #state{} ) -> {ok, #state{}}. do_handle_packet( - PacketUp, ReceiveTime, #state{packets = Packets} = State + PacketUp, ReceiveTime, GatewayLocation, #state{packets = Packets} = State ) -> State1 = State#state{ packets = [ - hpr_http_roaming:new_packet(PacketUp, ReceiveTime) | Packets + hpr_http_roaming:new_packet(PacketUp, ReceiveTime, GatewayLocation) | Packets ] }, {ok, State1}. diff --git a/test/hpr_gateway_location_SUITE.erl b/test/hpr_gateway_location_SUITE.erl new file mode 100644 index 00000000..b6ff3739 --- /dev/null +++ b/test/hpr_gateway_location_SUITE.erl @@ -0,0 +1,108 @@ +-module(hpr_gateway_location_SUITE). + +-include_lib("eunit/include/eunit.hrl"). +-include("../src/grpc/autogen/iot_config_pb.hrl"). + +-export([ + all/0, + init_per_testcase/2, + end_per_testcase/2 +]). + +-export([ + main_test/1 +]). + +-record(location, { + gateway :: libp2p_crypto:pubkey_bin(), + timestamp :: non_neg_integer(), + h3_index :: h3:index() | undefined, + lat :: float() | undefined, + long :: float() | undefined +}). + +%%-------------------------------------------------------------------- +%% COMMON TEST CALLBACK FUNCTIONS +%%-------------------------------------------------------------------- + +%%-------------------------------------------------------------------- +%% @public +%% @doc +%% Running tests for this suite +%% @end +%%-------------------------------------------------------------------- +all() -> + [ + main_test + ]. + +%%-------------------------------------------------------------------- +%% TEST CASE SETUP +%%-------------------------------------------------------------------- +init_per_testcase(TestCase, Config) -> + persistent_term:put(hpr_test_ics_gateway_service, self()), + test_utils:init_per_testcase(TestCase, Config). + +%%-------------------------------------------------------------------- +%% TEST CASE TEARDOWN +%%-------------------------------------------------------------------- +end_per_testcase(TestCase, Config) -> + test_utils:end_per_testcase(TestCase, Config). + +%%-------------------------------------------------------------------- +%% TEST CASES +%%-------------------------------------------------------------------- + +main_test(_Config) -> + %% Create gateway and add location to service + #{public := PubKey1} = libp2p_crypto:generate_keys(ecc_compact), + PubKeyBin1 = libp2p_crypto:pubkey_to_bin(PubKey1), + IndexString = "8828308281fffff", + ExpectedIndex = h3:from_string(IndexString), + ok = hpr_test_ics_gateway_service:register_gateway_location( + PubKeyBin1, + IndexString + ), + {ExpectedLat, ExpectedLong} = h3:to_geo(ExpectedIndex), + + %% Make request to get gateway location + Before = erlang:system_time(millisecond) - 1, + ?assertEqual( + {ok, ExpectedIndex, ExpectedLat, ExpectedLong}, hpr_gateway_location:get(PubKeyBin1) + ), + + %% Check that req was received + [{location, Req1}] = rcv_loop([]), + ?assertEqual(PubKeyBin1, Req1#iot_config_gateway_location_req_v1_pb.gateway), + + %% Verify ETS data + [ETSLocationRec] = ets:lookup(hpr_gateway_location_ets, PubKeyBin1), + ?assertEqual(PubKeyBin1, ETSLocationRec#location.gateway), + ?assertEqual(ExpectedIndex, ETSLocationRec#location.h3_index), + ?assertEqual(ExpectedLat, ETSLocationRec#location.lat), + ?assertEqual(ExpectedLong, ETSLocationRec#location.long), + ?assert(ETSLocationRec#location.timestamp > Before), + ?assert(ETSLocationRec#location.timestamp =< erlang:system_time(millisecond)), + + %% Verify DETS data + [DETSLocationRec] = dets:lookup(hpr_gateway_location_dets, PubKeyBin1), + ?assertEqual(PubKeyBin1, DETSLocationRec#location.gateway), + ?assertEqual(ExpectedIndex, DETSLocationRec#location.h3_index), + ?assertEqual(ExpectedLat, DETSLocationRec#location.lat), + ?assertEqual(ExpectedLong, DETSLocationRec#location.long), + ?assert(DETSLocationRec#location.timestamp > Before), + ?assert(DETSLocationRec#location.timestamp =< erlang:system_time(millisecond)), + + ok. + +%% ------------------------------------------------------------------ +%% Helper functions +%% ------------------------------------------------------------------ + +rcv_loop(Acc) -> + receive + {hpr_test_ics_gateway_service, Type, Req} -> + lager:notice("got hpr_test_ics_gateway_service ~p req ~p", [Type, Req]), + rcv_loop([{Type, Req} | Acc]) + after timer:seconds(2) -> Acc + end. diff --git a/test/hpr_metrics_SUITE.erl b/test/hpr_metrics_SUITE.erl index 499f7245..e15a5a01 100644 --- a/test/hpr_metrics_SUITE.erl +++ b/test/hpr_metrics_SUITE.erl @@ -187,4 +187,13 @@ main_test(_Config) -> end ), + ok = test_utils:wait_until( + fun() -> + undefined =/= + prometheus_histogram:value(?METRICS_ICS_GATEWAY_LOCATION_HISTOGRAM, [ + error + ]) + end + ), + ok. diff --git a/test/hpr_protocol_gwmp_SUITE.erl b/test/hpr_protocol_gwmp_SUITE.erl index 05cf8386..570a9aa4 100644 --- a/test/hpr_protocol_gwmp_SUITE.erl +++ b/test/hpr_protocol_gwmp_SUITE.erl @@ -8,6 +8,7 @@ -export([ full_test/1, + with_location_test/1, single_lns_test/1, multi_lns_test/1, single_lns_downlink_test/1, @@ -37,6 +38,7 @@ all() -> [ full_test, + with_location_test, single_lns_test, multi_lns_test, single_lns_downlink_test, @@ -97,6 +99,42 @@ full_test(_Config) -> ok. +with_location_test(_Config) -> + {ok, RcvSocket} = gen_udp:open(1777, [binary, {active, true}]), + + {Route, EUIPairs, DevAddrRanges} = test_route(1777), + IndexString = "8828308281fffff", + {ok, GatewayPid} = hpr_test_gateway:start(#{ + forward => self(), + route => Route, + eui_pairs => EUIPairs, + devaddr_ranges => DevAddrRanges, + h3_index_str => IndexString + }), + + %% Send packet and route directly through interface + ok = hpr_test_gateway:send_packet(GatewayPid, #{}), + + PacketUp = + case hpr_test_gateway:receive_send_packet(GatewayPid) of + {ok, EnvUp} -> + {packet, PUp} = hpr_envelope_up:data(EnvUp), + PUp; + {error, timeout} -> + ct:fail(receive_send_packet) + end, + + %% Initial PULL_DATA + {ok, _Token, _MAC} = expect_pull_data(RcvSocket, route_pull_data), + %% PUSH_DATA + {ok, Data} = expect_push_data(RcvSocket, router_push_data), + ok = verify_push_data_with_location(PacketUp, Data, IndexString), + + ok = gen_udp:close(RcvSocket), + ok = gen_server:stop(GatewayPid), + + ok. + single_lns_test(_Config) -> PacketUp = fake_join_up_packet(), @@ -104,7 +142,7 @@ single_lns_test(_Config) -> {ok, RcvSocket} = gen_udp:open(1777, [binary, {active, true}]), - hpr_protocol_gwmp:send(PacketUp, Route), + hpr_protocol_gwmp:send(PacketUp, Route, undefined), %% Initial PULL_DATA {ok, _Token, _MAC} = expect_pull_data(RcvSocket, route_pull_data), %% PUSH_DATA @@ -125,17 +163,17 @@ multi_lns_test(_Config) -> {ok, RcvSocket2} = gen_udp:open(1778, [binary, {active, true}]), %% Send packet to route 1 - hpr_protocol_gwmp:send(PacketUp, Route1), + hpr_protocol_gwmp:send(PacketUp, Route1, undefined), {ok, _Token, _MAC} = expect_pull_data(RcvSocket1, route1_pull_data), {ok, _} = expect_push_data(RcvSocket1, route1_push_data), %% Same packet to route 2 - hpr_protocol_gwmp:send(PacketUp, Route2), + hpr_protocol_gwmp:send(PacketUp, Route2, undefined), {ok, _Token2, _MAC2} = expect_pull_data(RcvSocket2, route2_pull_data), {ok, _} = expect_push_data(RcvSocket2, route2_push_data), %% Another packet to route 1 - hpr_protocol_gwmp:send(PacketUp, Route1), + hpr_protocol_gwmp:send(PacketUp, Route1, undefined), {ok, _} = expect_push_data(RcvSocket1, route1_push_data_repeat), ok = no_more_messages(), @@ -152,7 +190,7 @@ single_lns_downlink_test(_Config) -> {ok, LnsSocket} = gen_udp:open(1777, [binary, {active, true}]), %% Send packet - _ = hpr_protocol_gwmp:send(PacketUp, Route1), + _ = hpr_protocol_gwmp:send(PacketUp, Route1, undefined), %% Eat the pull_data {ok, _Token, _MAC} = expect_pull_data(LnsSocket, downlink_test_initiate_connection), @@ -218,7 +256,7 @@ single_lns_class_c_downlink_test(_Config) -> {ok, LnsSocket} = gen_udp:open(1777, [binary, {active, true}]), %% Send packet - _ = hpr_protocol_gwmp:send(PacketUp, Route1), + _ = hpr_protocol_gwmp:send(PacketUp, Route1, undefined), %% Eat the pull_data {ok, _Token, _MAC} = expect_pull_data(LnsSocket, downlink_test_initiate_connection), @@ -289,7 +327,7 @@ multi_lns_downlink_test(_Config) -> {ok, LNSSocket2} = gen_udp:open(1778, [binary, {active, true}]), %% Send packet to LNS 1 - _ = hpr_protocol_gwmp:send(PacketUp, Route1), + _ = hpr_protocol_gwmp:send(PacketUp, Route1, undefined), {ok, _Token, _Data} = expect_pull_data(LNSSocket1, downlink_test_initiate_connection_lns1), %% Receive the uplink from LNS 1 (mostly to get the return address) {ok, UDPWorkerAddress} = @@ -301,7 +339,7 @@ multi_lns_downlink_test(_Config) -> end, %% Send packet to LNS 2 - _ = hpr_protocol_gwmp:send(PacketUp, Route2), + _ = hpr_protocol_gwmp:send(PacketUp, Route2, undefined), {ok, _Token2, _Data2} = expect_pull_data(LNSSocket2, downlink_test_initiate_connection_lns2), {ok, _} = expect_push_data(LNSSocket2, route2_push_data), @@ -340,12 +378,12 @@ multi_gw_single_lns_test(_Config) -> {ok, RcvSocket} = gen_udp:open(1777, [binary, {active, true}]), %% Send the packet from the first hotspot - hpr_protocol_gwmp:send(PacketUp1, Route), + hpr_protocol_gwmp:send(PacketUp1, Route, undefined), {ok, _Token, _Data} = expect_pull_data(RcvSocket, first_gw_pull_data), {ok, _} = expect_push_data(RcvSocket, first_gw_push_data), %% Send the same packet from the second hotspot - hpr_protocol_gwmp:send(PacketUp2, Route), + hpr_protocol_gwmp:send(PacketUp2, Route, undefined), {ok, _Token2, _Data2} = expect_pull_data(RcvSocket, second_gw_pull_data), {ok, _} = expect_push_data(RcvSocket, second_gw_push_data), @@ -362,7 +400,7 @@ pull_data_test(_Config) -> {ok, RcvSocket} = gen_udp:open(1777, [binary, {active, true}]), - hpr_protocol_gwmp:send(PacketUp, Route), + hpr_protocol_gwmp:send(PacketUp, Route, undefined), %% Initial PULL_DATA {ok, Token, MAC} = expect_pull_data(RcvSocket, route_pull_data), @@ -379,7 +417,7 @@ pull_ack_test(_Config) -> {ok, RcvSocket} = gen_udp:open(1777, [binary, {active, true}]), - hpr_protocol_gwmp:send(PacketUp, Route), + hpr_protocol_gwmp:send(PacketUp, Route, undefined), %% Initial PULL_DATA, grab the address and port for responding {ok, Token, Address, Port} = @@ -415,7 +453,7 @@ pull_ack_test(_Config) -> %% Sending the same packet again shouldn't matter here, we only want to %% trigger the push_data/pull_data logic. - hpr_protocol_gwmp:send(PacketUp, Route), + hpr_protocol_gwmp:send(PacketUp, Route, undefined), ?assertEqual( #{{{127, 0, 0, 1}, 1777} => acknowledged}, @@ -445,7 +483,7 @@ pull_ack_hostname_test(_Config) -> {Route, _, _} = test_route(TestURL, 1777), {ok, RcvSocket} = gen_udp:open(1777, [binary, {active, true}]), - hpr_protocol_gwmp:send(PacketUp, Route), + hpr_protocol_gwmp:send(PacketUp, Route, undefined), %% Initial PULL_DATA, grab the address and port for responding {ok, Token, Address, Port} = @@ -516,12 +554,12 @@ region_port_redirect_test(_Config) -> CNPacketUp = USPacketUp#packet_router_packet_up_v1_pb{gateway = CNPubKeyBin, region = 'CN470'}, %% US send packet - hpr_protocol_gwmp:send(USPacketUp, Route), + hpr_protocol_gwmp:send(USPacketUp, Route, undefined), {ok, _, _} = expect_pull_data(USSocket, us_redirected_pull_data), {ok, _} = expect_push_data(USSocket, us_redirected_push_data), %% EU send packet - hpr_protocol_gwmp:send(EUPacketUp, Route), + hpr_protocol_gwmp:send(EUPacketUp, Route, undefined), {ok, _, _} = expect_pull_data(EUSocket, eu_redirected_pull_data), {ok, _} = expect_push_data(EUSocket, eu_redirected_push_data), @@ -533,7 +571,7 @@ region_port_redirect_test(_Config) -> end, %% Send from the last region to make sure fallback port is chosen - hpr_protocol_gwmp:send(CNPacketUp, Route), + hpr_protocol_gwmp:send(CNPacketUp, Route, undefined), {ok, _, _} = expect_pull_data(FallbackSocket, fallback_pull_data), {ok, _} = expect_push_data(FallbackSocket, fallback_push_data), @@ -741,7 +779,54 @@ verify_push_data(PacketUp, PushDataBinary) -> hpr_utils:gateway_name(PubKeyBin) ), <<"regi">> => erlang:atom_to_binary( - hpr_packet_up:region(PacketUp)) + hpr_packet_up:region(PacketUp) + ) + } + } + ] + }, + ?assert(test_utils:match_map(MapFromPacketUp, JsonData)). + +verify_push_data_with_location(PacketUp, PushDataBinary, IndexString) -> + JsonData = semtech_udp:json_data(PushDataBinary), + ExpectedIndex = h3:from_string(IndexString), + {ExpectedLat, ExpectedLong} = h3:to_geo(ExpectedIndex), + + PubKeyBin = hpr_packet_up:gateway(PacketUp), + MapFromPacketUp = #{ + <<"rxpk">> => + [ + #{ + <<"chan">> => 0, + <<"codr">> => <<"4/5">>, + <<"data">> => base64:encode(hpr_packet_up:payload(PacketUp)), + <<"datr">> => erlang:atom_to_binary(hpr_packet_up:datarate(PacketUp)), + <<"freq">> => list_to_float( + float_to_list(hpr_packet_up:frequency_mhz(PacketUp), [ + {decimals, 4}, compact + ]) + ), + <<"lsnr">> => hpr_packet_up:snr(PacketUp), + <<"modu">> => <<"LORA">>, + <<"rfch">> => 0, + <<"rssi">> => hpr_packet_up:rssi(PacketUp), + <<"size">> => erlang:byte_size(hpr_packet_up:payload(PacketUp)), + <<"stat">> => 1, + <<"time">> => fun erlang:is_binary/1, + <<"tmst">> => hpr_packet_up:timestamp(PacketUp) band 16#FFFF_FFFF, + <<"meta">> => #{ + <<"gateway_id">> => erlang:list_to_binary( + libp2p_crypto:bin_to_b58(PubKeyBin) + ), + <<"gateway_name">> => erlang:list_to_binary( + hpr_utils:gateway_name(PubKeyBin) + ), + <<"regi">> => erlang:atom_to_binary( + hpr_packet_up:region(PacketUp) + ), + <<"gateway_h3index">> => ExpectedIndex, + <<"gateway_lat">> => ExpectedLat, + <<"gateway_long">> => ExpectedLong } } ] diff --git a/test/hpr_protocol_http_roaming_packet_SUITE.erl b/test/hpr_protocol_http_roaming_packet_SUITE.erl index c44f59ab..c5c370ed 100644 --- a/test/hpr_protocol_http_roaming_packet_SUITE.erl +++ b/test/hpr_protocol_http_roaming_packet_SUITE.erl @@ -22,6 +22,7 @@ http_async_downlink_test/1, http_uplink_packet_no_roaming_agreement_test/1, http_uplink_packet_test/1, + uplink_with_gateway_location_test/1, http_class_c_downlink_test/1, http_multiple_gateways_test/1, http_multiple_joins_same_dest_test/1, @@ -77,6 +78,7 @@ all() -> http_async_downlink_test, http_uplink_packet_no_roaming_agreement_test, http_uplink_packet_test, + uplink_with_gateway_location_test, http_class_c_downlink_test, http_multiple_gateways_test, http_multiple_joins_same_dest_test, @@ -747,6 +749,94 @@ http_uplink_packet_test(_Config) -> ok. +uplink_with_gateway_location_test(_Config) -> + %% One Gateway is going to be sending all the packets. + #{secret := PrivKey, public := PubKey} = libp2p_crypto:generate_keys(ed25519), + SigFun = libp2p_crypto:mk_sig_fun(PrivKey), + PubKeyBin = libp2p_crypto:pubkey_to_bin(PubKey), + IndexString = "8828308281fffff", + ok = hpr_test_ics_gateway_service:register_gateway_location( + PubKeyBin, + IndexString + ), + + ok = start_uplink_listener(), + + SendPacketFun = fun(DevAddr) -> + GatewayTime = erlang:system_time(millisecond), + PacketUp = test_utils:uplink_packet_up(#{ + gateway => PubKeyBin, + devaddr => DevAddr, + fcnt => 0, + sig_fun => SigFun, + timestamp => GatewayTime + }), + ok = hpr_routing:handle_packet(PacketUp, #{gateway => PubKeyBin}), + {ok, PacketUp, GatewayTime} + end, + + uplink_test_route(), + + {ok, PacketUp, GatewayTime} = SendPacketFun(?DEVADDR_ACTILITY), + Payload = hpr_packet_up:payload(PacketUp), + Region = hpr_packet_up:region(PacketUp), + PacketTime = hpr_packet_up:timestamp(PacketUp), + ExpectedIndex = h3:from_string(IndexString), + {ExpectedLat, ExpectedLong} = h3:to_geo(ExpectedIndex), + { + ok, + #{<<"ULMetaData">> := #{<<"FNSULToken">> := Token}}, + _Request, + {200, _RespBody} + } = http_rcv( + #{ + <<"ProtocolVersion">> => <<"1.1">>, + <<"SenderNSID">> => hpr_utils:sender_nsid(), + <<"ReceiverNSID">> => <<"test-uplink-receiver-id">>, + <<"DedupWindowSize">> => fun erlang:is_integer/1, + <<"TransactionID">> => fun erlang:is_number/1, + <<"SenderID">> => <<"0xC00053">>, + <<"ReceiverID">> => ?NET_ID_ACTILITY_BIN, + <<"MessageType">> => <<"PRStartReq">>, + <<"PHYPayload">> => hpr_http_roaming_utils:binary_to_hexstring(Payload), + <<"ULMetaData">> => #{ + <<"DevAddr">> => ?DEVADDR_ACTILITY_BIN, + <<"DataRate">> => hpr_lorawan:datarate_to_index( + Region, + hpr_packet_up:datarate(PacketUp) + ), + <<"ULFreq">> => hpr_packet_up:frequency_mhz(PacketUp), + <<"RFRegion">> => erlang:atom_to_binary(Region), + <<"RecvTime">> => formatted_timestamp_within_one_second( + hpr_http_roaming_utils:format_time(GatewayTime) + ), + + <<"FNSULToken">> => fun erlang:is_binary/1, + <<"GWCnt">> => 1, + <<"GWInfo">> => [ + #{ + <<"RFRegion">> => erlang:atom_to_binary(Region), + <<"RSSI">> => hpr_packet_up:rssi(PacketUp), + <<"SNR">> => hpr_packet_up:snr(PacketUp), + <<"Lat">> => ExpectedLat, + <<"Lon">> => ExpectedLong, + <<"DLAllowed">> => true, + <<"GWID">> => hpr_http_roaming_utils:binary_to_hexstring( + hpr_utils:pubkeybin_to_mac(PubKeyBin) + ) + } + ] + } + } + ), + + ?assertMatch( + {ok, PubKeyBin, 'US915', PacketTime, "route1"}, + hpr_http_roaming:parse_uplink_token(Token) + ), + + ok. + http_class_c_downlink_test(_Config) -> %% %% Forwarder : HPR diff --git a/test/hpr_protocol_router_SUITE.erl b/test/hpr_protocol_router_SUITE.erl index 8fcf7117..f845794a 100644 --- a/test/hpr_protocol_router_SUITE.erl +++ b/test/hpr_protocol_router_SUITE.erl @@ -109,7 +109,7 @@ connection_refused_test(_Config) -> ?assertEqual( {error, {shutdown, econnrefused}}, - hpr_protocol_router:send(PacketUp, Route) + hpr_protocol_router:send(PacketUp, Route, undefined) ), ok. diff --git a/test/hpr_route_stream_worker_SUITE.erl b/test/hpr_route_stream_worker_SUITE.erl index 2809fc19..f6cfc489 100644 --- a/test/hpr_route_stream_worker_SUITE.erl +++ b/test/hpr_route_stream_worker_SUITE.erl @@ -81,16 +81,16 @@ main_test(_Config) -> session_key => SessionKey1, max_copies => 1 }), - ok = hpr_test_iot_config_service_route:stream_resp( + ok = hpr_test_ics_route_service:stream_resp( hpr_route_stream_res:test_new(#{action => add, data => {route, Route1}}) ), - ok = hpr_test_iot_config_service_route:stream_resp( + ok = hpr_test_ics_route_service:stream_resp( hpr_route_stream_res:test_new(#{action => add, data => {eui_pair, EUIPair1}}) ), - ok = hpr_test_iot_config_service_route:stream_resp( + ok = hpr_test_ics_route_service:stream_resp( hpr_route_stream_res:test_new(#{action => add, data => {devaddr_range, DevAddrRange1}}) ), - ok = hpr_test_iot_config_service_route:stream_resp( + ok = hpr_test_ics_route_service:stream_resp( hpr_route_stream_res:test_new(#{action => add, data => {skf, SessionKeyFilter}}) ), @@ -141,7 +141,7 @@ main_test(_Config) -> ), %% Delete EUI Pairs / DevAddr Ranges / SKF - ok = hpr_test_iot_config_service_route:stream_resp( + ok = hpr_test_ics_route_service:stream_resp( hpr_route_stream_res:test_new(#{action => remove, data => {eui_pair, EUIPair1}}) ), @@ -160,7 +160,7 @@ main_test(_Config) -> ?assertEqual([], hpr_route_ets:lookup_eui_pair(1, 100)), ?assertEqual([], hpr_route_ets:lookup_eui_pair(3, 3)), - ok = hpr_test_iot_config_service_route:stream_resp( + ok = hpr_test_ics_route_service:stream_resp( hpr_route_stream_res:test_new(#{action => remove, data => {devaddr_range, DevAddrRange1}}) ), @@ -174,7 +174,7 @@ main_test(_Config) -> ?assertEqual([], hpr_route_ets:lookup_devaddr_range(16#00000006)), ?assertEqual([], hpr_route_ets:lookup_devaddr_range(16#00000020)), - ok = hpr_test_iot_config_service_route:stream_resp( + ok = hpr_test_ics_route_service:stream_resp( hpr_route_stream_res:test_new(#{action => remove, data => {skf, SessionKeyFilter}}) ), @@ -187,13 +187,13 @@ main_test(_Config) -> ?assertEqual([], hpr_route_ets:lookup_skf(SKFETS1, DevAddr1)), %% Add back euis, ranges and skf to test full route delete - ok = hpr_test_iot_config_service_route:stream_resp( + ok = hpr_test_ics_route_service:stream_resp( hpr_route_stream_res:test_new(#{action => add, data => {eui_pair, EUIPair1}}) ), - ok = hpr_test_iot_config_service_route:stream_resp( + ok = hpr_test_ics_route_service:stream_resp( hpr_route_stream_res:test_new(#{action => add, data => {devaddr_range, DevAddrRange1}}) ), - ok = hpr_test_iot_config_service_route:stream_resp( + ok = hpr_test_ics_route_service:stream_resp( hpr_route_stream_res:test_new(#{action => add, data => {skf, SessionKeyFilter}}) ), ok = test_utils:wait_until( @@ -220,7 +220,7 @@ main_test(_Config) -> ), %% Remove route should delete eveything - ok = hpr_test_iot_config_service_route:stream_resp( + ok = hpr_test_ics_route_service:stream_resp( hpr_route_stream_res:test_new(#{action => remove, data => {route, Route1}}) ), @@ -265,16 +265,16 @@ refresh_route_test(_Config) -> route_id => Route1ID, devaddr => DevAddr1, session_key => SessionKey1, max_copies => 1 }), - ok = hpr_test_iot_config_service_route:stream_resp( + ok = hpr_test_ics_route_service:stream_resp( hpr_route_stream_res:test_new(#{action => add, data => {route, Route1}}) ), - ok = hpr_test_iot_config_service_route:stream_resp( + ok = hpr_test_ics_route_service:stream_resp( hpr_route_stream_res:test_new(#{action => add, data => {eui_pair, EUIPair1}}) ), - ok = hpr_test_iot_config_service_route:stream_resp( + ok = hpr_test_ics_route_service:stream_resp( hpr_route_stream_res:test_new(#{action => add, data => {devaddr_range, DevAddrRange1}}) ), - ok = hpr_test_iot_config_service_route:stream_resp( + ok = hpr_test_ics_route_service:stream_resp( hpr_route_stream_res:test_new(#{action => add, data => {skf, SessionKeyFilter1}}) ), diff --git a/test/hpr_routing_SUITE.erl b/test/hpr_routing_SUITE.erl index 9d352172..8a5c0666 100644 --- a/test/hpr_routing_SUITE.erl +++ b/test/hpr_routing_SUITE.erl @@ -357,7 +357,7 @@ skf_max_copies_test(_Config) -> lists:seq(1, 3) ), - ?assertEqual(3, meck:num_calls(hpr_protocol_http_roaming, send, 2)), + ?assertEqual(3, meck:num_calls(hpr_protocol_http_roaming, send, 3)), lists:foreach( fun(_) -> @@ -376,14 +376,14 @@ skf_max_copies_test(_Config) -> lists:seq(1, 3) ), - ?assertEqual(3, meck:num_calls(hpr_protocol_http_roaming, send, 2)), + ?assertEqual(3, meck:num_calls(hpr_protocol_http_roaming, send, 3)), meck:unload(hpr_protocol_http_roaming), ok. multi_buy_without_service_test(_Config) -> meck:new(hpr_protocol_router, [passthrough]), - meck:expect(hpr_protocol_router, send, fun(_, _) -> ok end), + meck:expect(hpr_protocol_router, send, fun(_, _, _) -> ok end), meck:new(hpr_packet_reporter, [passthrough]), meck:expect(hpr_packet_reporter, report_packet, fun(_, _, _, _) -> ok end), @@ -479,14 +479,16 @@ multi_buy_without_service_test(_Config) -> {Self, {hpr_protocol_router, send, [ UplinkPacketUp1, - Route + Route, + undefined ]}, ok}, Received2 = {Self, {hpr_protocol_router, send, [ UplinkPacketUp2, - Route + Route, + undefined ]}, ok}, @@ -507,7 +509,8 @@ multi_buy_without_service_test(_Config) -> {Self, {hpr_protocol_router, send, [ UplinkPacketUp4, - Route + Route, + undefined ]}, ok}, @@ -538,7 +541,7 @@ multi_buy_without_service_test(_Config) -> multi_buy_with_service_test(_Config) -> meck:new(hpr_protocol_router, [passthrough]), - meck:expect(hpr_protocol_router, send, fun(_, _) -> ok end), + meck:expect(hpr_protocol_router, send, fun(_, _, _) -> ok end), meck:new(hpr_packet_reporter, [passthrough]), meck:expect(hpr_packet_reporter, report_packet, fun(_, _, _, _) -> ok end), @@ -626,14 +629,16 @@ multi_buy_with_service_test(_Config) -> {Self, {hpr_protocol_router, send, [ UplinkPacketUp1, - Route + Route, + undefined ]}, ok}, Received2 = {Self, {hpr_protocol_router, send, [ UplinkPacketUp2, - Route + Route, + undefined ]}, ok}, @@ -654,7 +659,8 @@ multi_buy_with_service_test(_Config) -> {Self, {hpr_protocol_router, send, [ UplinkPacketUp4, - Route + Route, + undefined ]}, ok}, @@ -698,7 +704,7 @@ multi_buy_requests_test(_Config) -> end), meck:new(hpr_protocol_router, [passthrough]), - meck:expect(hpr_protocol_router, send, fun(_, _) -> ok end), + meck:expect(hpr_protocol_router, send, fun(_, _, _) -> ok end), MaxCopies = 3, DevAddr = 16#00000000, @@ -872,7 +878,7 @@ active_locked_route_test(_Config) -> ok = lists:foreach(fun hpr_route_ets:insert_devaddr_range/1, DevAddrRanges), meck:new(hpr_protocol_router, [passthrough]), - meck:expect(hpr_protocol_router, send, fun(_, _) -> ok end), + meck:expect(hpr_protocol_router, send, fun(_, _, _) -> ok end), AppSessionKey = crypto:strong_rand_bytes(16), NwkSessionKey = crypto:strong_rand_bytes(16), @@ -901,7 +907,8 @@ active_locked_route_test(_Config) -> {Self, {hpr_protocol_router, send, [ PacketUp0, - Route1 + Route1, + undefined ]}, ok}, @@ -999,25 +1006,25 @@ in_cooldown_route_test(_Config) -> %% We setup protocol router to fail every call meck:new(hpr_protocol_router, [passthrough]), - meck:expect(hpr_protocol_router, send, fun(_, _) -> {error, not_implemented} end), + meck:expect(hpr_protocol_router, send, fun(_, _, _) -> {error, not_implemented} end), %% We send first packet a call should be made to hpr_protocol_router ?assertEqual(ok, hpr_routing:handle_packet(PacketUp(0), #{gateway => Gateway1})), - ?assertEqual(1, meck:num_calls(hpr_protocol_router, send, 2)), + ?assertEqual(1, meck:num_calls(hpr_protocol_router, send, 3)), %% We send second packet NO call should be made to hpr_protocol_router as the route would be in cooldown ?assertEqual(ok, hpr_routing:handle_packet(PacketUp(1), #{gateway => Gateway1})), - ?assertEqual(1, meck:num_calls(hpr_protocol_router, send, 2)), + ?assertEqual(1, meck:num_calls(hpr_protocol_router, send, 3)), %% We wait the initial first timeout 1s %% Send another packet and watch another call made to hpr_protocol_router timer:sleep(1000), ?assertEqual(ok, hpr_routing:handle_packet(PacketUp(2), #{gateway => Gateway1})), - ?assertEqual(2, meck:num_calls(hpr_protocol_router, send, 2)), + ?assertEqual(2, meck:num_calls(hpr_protocol_router, send, 3)), %% We send couple more packets and check that we still only 2 calls to hpr_protocol_router ?assertEqual(ok, hpr_routing:handle_packet(PacketUp(3), #{gateway => Gateway1})), ?assertEqual(ok, hpr_routing:handle_packet(PacketUp(4), #{gateway => Gateway1})), - ?assertEqual(2, meck:num_calls(hpr_protocol_router, send, 2)), + ?assertEqual(2, meck:num_calls(hpr_protocol_router, send, 3)), %% We check the route and make sure that the backoff is setup properly [RouteETS1] = hpr_route_ets:lookup_route(RouteID), @@ -1027,10 +1034,10 @@ in_cooldown_route_test(_Config) -> %% Wait another time out (2s now) and reset hpr_protocol_router to return ok timer:sleep(2000), - meck:expect(hpr_protocol_router, send, fun(_, _) -> ok end), + meck:expect(hpr_protocol_router, send, fun(_, _, _) -> ok end), %% Sending another packet should trigger a new call to hpr_protocol_router ?assertEqual(ok, hpr_routing:handle_packet(PacketUp(5), #{gateway => Gateway1})), - ?assertEqual(3, meck:num_calls(hpr_protocol_router, send, 2)), + ?assertEqual(3, meck:num_calls(hpr_protocol_router, send, 3)), %% The route backoff should be back to undefined [RouteETS2] = hpr_route_ets:lookup_route(RouteID), @@ -1047,7 +1054,7 @@ success_test(_Config) -> Gateway = libp2p_crypto:pubkey_to_bin(PubKey), meck:new(hpr_protocol_router, [passthrough]), - meck:expect(hpr_protocol_router, send, fun(_, _) -> ok end), + meck:expect(hpr_protocol_router, send, fun(_, _, _) -> ok end), DevAddr = 16#00000000, {ok, NetID} = lora_subnet:parse_netid(DevAddr, big), @@ -1089,7 +1096,8 @@ success_test(_Config) -> {Self, {hpr_protocol_router, send, [ JoinPacketUpValid, - Route + Route, + undefined ]}, ok}, ?assertEqual([Received1], meck:history(hpr_protocol_router)), @@ -1103,7 +1111,8 @@ success_test(_Config) -> {Self, {hpr_protocol_router, send, [ UplinkPacketUp, - Route + Route, + undefined ]}, ok}, ?assertEqual( @@ -1233,7 +1242,7 @@ maybe_report_packet_test(_Config) -> meck:new(hpr_protocol_router, [passthrough]), meck:new(hpr_packet_reporter, [passthrough]), - meck:expect(hpr_protocol_router, send, fun(_, _) -> ok end), + meck:expect(hpr_protocol_router, send, fun(_, _, _) -> ok end), DevAddr = 16#00000000, {ok, NetID} = lora_subnet:parse_netid(DevAddr, big), @@ -1275,7 +1284,8 @@ maybe_report_packet_test(_Config) -> {Self, {hpr_protocol_router, send, [ JoinPacketUpValid, - Route + Route, + undefined ]}, ok}, ?assertEqual([Received1], meck:history(hpr_protocol_router)), @@ -1289,7 +1299,8 @@ maybe_report_packet_test(_Config) -> {Self, {hpr_protocol_router, send, [ UplinkPacketUp1, - Route + Route, + undefined ]}, ok}, ?assertEqual( @@ -1334,12 +1345,14 @@ maybe_report_packet_test(_Config) -> CallExpected3 = {hpr_protocol_router, send, [ UplinkPacketUp2, - BadRoute + BadRoute, + undefined ]}, CallExpected4 = {hpr_protocol_router, send, [ UplinkPacketUp2, - Route + Route, + undefined ]}, %% Packet is still send to both Routes @@ -1532,7 +1545,7 @@ routing_cleanup_test(_Config) -> Gateway = libp2p_crypto:pubkey_to_bin(PubKey), meck:new(hpr_protocol_router, [passthrough]), - meck:expect(hpr_protocol_router, send, fun(_, _) -> ok end), + meck:expect(hpr_protocol_router, send, fun(_, _, _) -> ok end), DevAddr = 16#00000000, {ok, NetID} = lora_subnet:parse_netid(DevAddr, big), diff --git a/test/hpr_test_gateway.erl b/test/hpr_test_gateway.erl index 93c8a2da..e54213e3 100644 --- a/test/hpr_test_gateway.erl +++ b/test/hpr_test_gateway.erl @@ -142,12 +142,22 @@ init( ok = lists:foreach(fun hpr_route_ets:insert_eui_pair/1, EUIPairs), ok = lists:foreach(fun hpr_route_ets:insert_devaddr_range/1, DevAddrRanges), self() ! ?CONNECT, + PubKeyBin = libp2p_crypto:pubkey_to_bin(PubKey), + case maps:get(h3_index_str, Args, false) of + IndexString when is_list(IndexString) -> + ok = hpr_test_ics_gateway_service:register_gateway_location( + PubKeyBin, + IndexString + ); + false -> + ok + end, {ok, #state{ forward = Pid, route = Route, eui_pairs = EUIPairs, devaddr_ranges = DevAddrRanges, - pubkey_bin = libp2p_crypto:pubkey_to_bin(PubKey), + pubkey_bin = PubKeyBin, sig_fun = libp2p_crypto:mk_sig_fun(PrivKey) }}. diff --git a/test/hpr_test_ics_gateway_service.erl b/test/hpr_test_ics_gateway_service.erl new file mode 100644 index 00000000..7ab9e152 --- /dev/null +++ b/test/hpr_test_ics_gateway_service.erl @@ -0,0 +1,104 @@ +-module(hpr_test_ics_gateway_service). + +-behaviour(helium_iot_config_gateway_bhvr). +-include("../src/grpc/autogen/iot_config_pb.hrl"). + +-export([ + init/2, + handle_info/2 +]). + +-export([ + region_params/2, + load_region/2, + location/2, + info/2, + info_stream/2 +]). + +-export([ + register_gateway_location/2 +]). + +-define(KNOWN_LOCS, hpr_test_ics_gateway_service_known_locations). + +-spec init(atom(), StreamState :: grpcbox_stream:t()) -> grpcbox_stream:t(). +init(_RPC, StreamState) -> + StreamState. + +-spec handle_info(Msg :: any(), StreamState :: grpcbox_stream:t()) -> grpcbox_stream:t(). +handle_info(_Msg, StreamState) -> + StreamState. + +region_params(_Ctx, _Msg) -> + {grpc_error, {grpcbox_stream:code_to_status(12), <<"UNIMPLEMENTED">>}}. + +load_region(_Ctx, _Msg) -> + {grpc_error, {grpcbox_stream:code_to_status(12), <<"UNIMPLEMENTED">>}}. + +info(_Ctx, _Msg) -> + {grpc_error, {grpcbox_stream:code_to_status(12), <<"UNIMPLEMENTED">>}}. + +info_stream(_Ctx, _Msg) -> + {grpc_error, {grpcbox_stream:code_to_status(12), <<"UNIMPLEMENTED">>}}. + +location(Ctx, Req) -> + case verify_location_req(Req) of + true -> + PubKeyBin = Req#iot_config_gateway_location_req_v1_pb.gateway, + case maybe_get_registered_location(PubKeyBin) of + {ok, Location} -> + lager:info("got location req ~p", [Req]), + Res = #iot_config_gateway_location_res_v1_pb{ + location = Location + }, + catch persistent_term:get(?MODULE) ! {?MODULE, location, Req}, + {ok, Res, Ctx}; + {error, not_found} -> + {grpc_error, {grpcbox_stream:code_to_status(5), <<"gateway not asserted">>}} + end; + false -> + lager:error("failed to verify location req ~p", [Req]), + {grpc_error, {7, <<"PERMISSION_DENIED">>}} + end. + +-spec verify_location_req(Req :: #iot_config_gateway_location_req_v1_pb{}) -> boolean(). +verify_location_req(Req) -> + EncodedReq = iot_config_pb:encode_msg( + Req#iot_config_gateway_location_req_v1_pb{ + signature = <<>> + }, + iot_config_gateway_location_req_v1_pb + ), + libp2p_crypto:verify( + EncodedReq, + Req#iot_config_gateway_location_req_v1_pb.signature, + libp2p_crypto:bin_to_pubkey(hpr_utils:pubkey_bin()) + ). + +-spec register_gateway_location( + PubKeyBin :: libp2p_crypto:pubkey_bin(), + Location :: string() +) -> ok. +register_gateway_location(PubKeyBin, Location) -> + Map = persistent_term:get(?KNOWN_LOCS, #{}), + ok = persistent_term:put(?KNOWN_LOCS, Map#{PubKeyBin => Location}). + +-spec maybe_get_registered_location(PubKeyBin :: libp2p_crypto:pubkey_bin()) -> + {ok, string()} | {error, not_found}. +maybe_get_registered_location(PubKeyBin) -> + Map = persistent_term:get(?KNOWN_LOCS, #{}), + case maps:get(PubKeyBin, Map, undefined) of + undefined -> {error, not_found}; + Location -> {ok, Location} + end. + +%% NOTE: if more asserted gateways are needed, use these locations. +%% location = "8828308281fffff", %% original from location worker +%% location = "8c29a962ed5b3ff" %% from blockchain init +%% +%% all locations inserted into chain +%% ["8C29A962ED5B3FF","8C29A975818B3FF","8C29A97497733FF", +%% "8C29A92809AEDFF","8C29A92A98DE7FF","8C29A92E404ABFF", +%% "8C29A92552F31FF","8C29A924C86E7FF","8C2834535A1B5FF", +%% "8C2834CD22653FF","8C2834CCE41C3FF","8C28341B06945FF"] diff --git a/test/hpr_test_iot_config_service_route.erl b/test/hpr_test_ics_route_service.erl similarity index 99% rename from test/hpr_test_iot_config_service_route.erl rename to test/hpr_test_ics_route_service.erl index 39cc290d..b215a8f5 100644 --- a/test/hpr_test_iot_config_service_route.erl +++ b/test/hpr_test_ics_route_service.erl @@ -1,4 +1,4 @@ --module(hpr_test_iot_config_service_route). +-module(hpr_test_ics_route_service). -behaviour(helium_iot_config_route_bhvr). diff --git a/test/test_utils.erl b/test/test_utils.erl index b7b16f12..f7a6ea91 100644 --- a/test/test_utils.erl +++ b/test/test_utils.erl @@ -24,10 +24,11 @@ init_per_testcase(TestCase, Config) -> Suite = proplists:get_value(suite, proplists:get_value(tc_group_properties, Config)), BaseDir = filename:join([Suite, TestCase]), - KeyFilePath = filename:join([BaseDir, "hpr.key"]), - ok = application:set_env(hpr, key, KeyFilePath, [{persistent, true}]), + BaseDirPath = filename:join([BaseDir, "data"]), + ok = application:set_env(hpr, data_dir, BaseDirPath, [{persistent, true}]), - ct:pal("BaseDir ~p", [BaseDir]), + ct:pal("BaseDirPath ~p", [BaseDirPath]), + ok = filelib:ensure_dir(BaseDirPath), FormatStr = [ "[", @@ -90,7 +91,7 @@ init_per_testcase(TestCase, Config) -> fun() -> {state, Stream, _Backoff} = sys:get_state(hpr_route_stream_worker), Stream =/= undefined andalso - erlang:is_pid(erlang:whereis(hpr_test_iot_config_service_route)) + erlang:is_pid(erlang:whereis(hpr_test_ics_route_service)) end, 20, 500