Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Socket Addrs #812

Merged
merged 5 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions ingest/src/server_iot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,6 @@ impl poc_lora::PocLora for GrpcServer {
}

pub async fn grpc_server(settings: &Settings) -> Result<()> {
let grpc_addr = settings.listen_addr()?;

// Initialize uploader
let (file_upload, file_upload_server) =
file_upload::FileUpload::from_settings_tm(&settings.output).await?;
Expand Down Expand Up @@ -374,6 +372,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
.create()
.await?;

let grpc_addr = settings.listen;
let grpc_server = GrpcServer {
beacon_report_sink,
witness_report_sink,
Expand Down
3 changes: 1 addition & 2 deletions ingest/src/server_mobile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,6 @@ impl poc_mobile::PocMobile for GrpcServer {
}

pub async fn grpc_server(settings: &Settings) -> Result<()> {
let grpc_addr = settings.listen_addr()?;

// Initialize uploader
let (file_upload, file_upload_server) =
file_upload::FileUpload::from_settings_tm(&settings.output).await?;
Expand Down Expand Up @@ -451,6 +449,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
bail!("expected valid api token in settings");
};

let grpc_addr = settings.listen;
let grpc_server = GrpcServer {
heartbeat_report_sink,
wifi_heartbeat_report_sink,
Expand Down
16 changes: 4 additions & 12 deletions ingest/src/settings.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
use config::{Config, Environment, File};
use helium_crypto::Network;
use serde::Deserialize;
use std::{
net::{AddrParseError, SocketAddr},
path::Path,
str::FromStr,
};
use std::{net::SocketAddr, path::Path};

#[derive(Debug, Deserialize)]
pub struct Settings {
Expand All @@ -20,7 +16,7 @@ pub struct Settings {
pub mode: Mode,
/// Listen address. Required. Default is 0.0.0.0:9081
#[serde(default = "default_listen_addr")]
pub listen: String,
pub listen: SocketAddr,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this operation is fallible clap is able to do this automatically when not using the default method? this doesn't need any additional annotations for the serde macro or anything?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

afaik the only requirement is the type implements FromStr, then Clap and Serde will attempt to convert into the type using that.

#[test]
fn test_config_parse() {
    use std::net::SocketAddr;

    #[derive(Debug, serde::Deserialize)]
    struct TestSettings {
        one: SocketAddr,
        #[serde(default = "default_two")]
        two: SocketAddr,
    }

    fn default_two() -> SocketAddr {
        "1.2.3.4:19001".parse().unwrap()
    }

    let x: TestSettings = config::Config::builder()
        .add_source(File::with_name("./socket_addr_test.toml"))
        .build()
        .and_then(|config| config.try_deserialize())
        .expect("valid settings");

    println!("config: {x:?}");
}

/// Local folder for storing intermediate files
pub cache: String,
/// Network required in all public keys: mainnet | testnet
Expand Down Expand Up @@ -49,8 +45,8 @@ pub fn default_session_key_offer_timeout() -> u64 {
5
}

pub fn default_listen_addr() -> String {
"0.0.0.0:9081".to_string()
pub fn default_listen_addr() -> SocketAddr {
"0.0.0.0:9081".parse().unwrap()
}

pub fn default_log() -> String {
Expand Down Expand Up @@ -97,10 +93,6 @@ impl Settings {
.and_then(|config| config.try_deserialize())
}

pub fn listen_addr(&self) -> Result<SocketAddr, AddrParseError> {
SocketAddr::from_str(&self.listen)
}

pub fn session_key_offer_timeout(&self) -> std::time::Duration {
std::time::Duration::from_secs(self.session_key_offer_timeout)
}
Expand Down
71 changes: 36 additions & 35 deletions ingest/tests/iot_ingest.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{net::SocketAddr, str::FromStr};
use std::net::{SocketAddr, TcpListener};

use backon::{ExponentialBuilder, Retryable};
use file_store::file_sink::{FileSinkClient, Message as SinkMessage};
Expand All @@ -12,7 +12,7 @@ use helium_proto::services::poc_lora::{
};
use ingest::server_iot::GrpcServer;
use prost::Message;
use rand::{rngs::OsRng, Rng};
use rand::rngs::OsRng;
use task_manager::TaskManager;
use tokio::{sync::mpsc::error::TryRecvError, task::LocalSet, time::timeout};
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
Expand All @@ -22,12 +22,12 @@ use tonic::{transport::Channel, Streaming};
async fn initialize_session_and_send_beacon_and_witness() {
let (beacon_client, mut beacons) = create_file_sink();
let (witness_client, mut witnesses) = create_file_sink();
let port = get_port();
let addr = get_socket_addr().expect("socket addr");

LocalSet::new()
.run_until(async move {
tokio::task::spawn_local(async move {
let server = create_test_server(port, beacon_client, witness_client, None, None);
let server = create_test_server(addr, beacon_client, witness_client, None, None);
TaskManager::builder()
.add_task(server)
.build()
Expand All @@ -38,7 +38,7 @@ async fn initialize_session_and_send_beacon_and_witness() {
let pub_key = generate_keypair();
let session_key = generate_keypair();

let mut client = connect_and_stream(port).await;
let mut client = connect_and_stream(addr).await;
let offer = client.receive_offer().await;

client
Expand Down Expand Up @@ -75,12 +75,12 @@ async fn initialize_session_and_send_beacon_and_witness() {
async fn stream_stops_after_incorrectly_signed_init_request() {
let (beacon_client, _) = create_file_sink();
let (witness_client, _) = create_file_sink();
let port = get_port();
let addr = get_socket_addr().expect("socket addr");

LocalSet::new()
.run_until(async move {
tokio::task::spawn_local(async move {
let server = create_test_server(port, beacon_client, witness_client, None, None);
let server = create_test_server(addr, beacon_client, witness_client, None, None);
TaskManager::builder()
.add_task(server)
.build()
Expand All @@ -91,7 +91,7 @@ async fn stream_stops_after_incorrectly_signed_init_request() {
let pub_key = generate_keypair();
let session_key = generate_keypair();

let mut client = connect_and_stream(port).await;
let mut client = connect_and_stream(addr).await;
let offer = client.receive_offer().await;

client
Expand All @@ -113,12 +113,12 @@ async fn stream_stops_after_incorrectly_signed_init_request() {
async fn stream_stops_after_incorrectly_signed_beacon() {
let (beacon_client, beacons) = create_file_sink();
let (witness_client, _) = create_file_sink();
let port = get_port();
let addr = get_socket_addr().expect("socket addr");

LocalSet::new()
.run_until(async move {
tokio::task::spawn_local(async move {
let server = create_test_server(port, beacon_client, witness_client, None, None);
let server = create_test_server(addr, beacon_client, witness_client, None, None);
TaskManager::builder()
.add_task(server)
.build()
Expand All @@ -129,7 +129,7 @@ async fn stream_stops_after_incorrectly_signed_beacon() {
let pub_key = generate_keypair();
let session_key = generate_keypair();

let mut client = connect_and_stream(port).await;
let mut client = connect_and_stream(addr).await;
let offer = client.receive_offer().await;

client
Expand All @@ -154,12 +154,12 @@ async fn stream_stops_after_incorrectly_signed_beacon() {
async fn stream_stops_after_incorrect_beacon_pubkey() {
let (beacon_client, beacons) = create_file_sink();
let (witness_client, _) = create_file_sink();
let port = get_port();
let addr = get_socket_addr().expect("socket addr");

LocalSet::new()
.run_until(async move {
tokio::task::spawn_local(async move {
let server = create_test_server(port, beacon_client, witness_client, None, None);
let server = create_test_server(addr, beacon_client, witness_client, None, None);
TaskManager::builder()
.add_task(server)
.build()
Expand All @@ -170,7 +170,7 @@ async fn stream_stops_after_incorrect_beacon_pubkey() {
let pub_key = generate_keypair();
let session_key = generate_keypair();

let mut client = connect_and_stream(port).await;
let mut client = connect_and_stream(addr).await;
let offer = client.receive_offer().await;

client
Expand Down Expand Up @@ -198,12 +198,12 @@ async fn stream_stops_after_incorrect_beacon_pubkey() {
async fn stream_stops_after_incorrectly_signed_witness() {
let (beacon_client, _) = create_file_sink();
let (witness_client, witnesses) = create_file_sink();
let port = get_port();
let addr = get_socket_addr().expect("socket addr");

LocalSet::new()
.run_until(async move {
tokio::task::spawn_local(async move {
let server = create_test_server(port, beacon_client, witness_client, None, None);
let server = create_test_server(addr, beacon_client, witness_client, None, None);
TaskManager::builder()
.add_task(server)
.build()
Expand All @@ -214,7 +214,7 @@ async fn stream_stops_after_incorrectly_signed_witness() {
let pub_key = generate_keypair();
let session_key = generate_keypair();

let mut client = connect_and_stream(port).await;
let mut client = connect_and_stream(addr).await;
let offer = client.receive_offer().await;

client
Expand All @@ -239,12 +239,12 @@ async fn stream_stops_after_incorrectly_signed_witness() {
async fn stream_stops_after_incorrect_witness_pubkey() {
let (beacon_client, _) = create_file_sink();
let (witness_client, witnesses) = create_file_sink();
let port = get_port();
let addr = get_socket_addr().expect("socket addr");

LocalSet::new()
.run_until(async move {
tokio::task::spawn_local(async move {
let server = create_test_server(port, beacon_client, witness_client, None, None);
let server = create_test_server(addr, beacon_client, witness_client, None, None);
TaskManager::builder()
.add_task(server)
.build()
Expand All @@ -255,7 +255,7 @@ async fn stream_stops_after_incorrect_witness_pubkey() {
let pub_key = generate_keypair();
let session_key = generate_keypair();

let mut client = connect_and_stream(port).await;
let mut client = connect_and_stream(addr).await;
let offer = client.receive_offer().await;

client
Expand Down Expand Up @@ -283,12 +283,12 @@ async fn stream_stops_after_incorrect_witness_pubkey() {
async fn stream_stop_if_client_attempts_to_initiliaze_2nd_session() {
let (beacon_client, mut beacons) = create_file_sink();
let (witness_client, _) = create_file_sink();
let port = get_port();
let addr = get_socket_addr().expect("socket addr");

LocalSet::new()
.run_until(async move {
tokio::task::spawn_local(async move {
let server = create_test_server(port, beacon_client, witness_client, None, None);
let server = create_test_server(addr, beacon_client, witness_client, None, None);
TaskManager::builder()
.add_task(server)
.build()
Expand All @@ -299,7 +299,7 @@ async fn stream_stop_if_client_attempts_to_initiliaze_2nd_session() {
let pub_key = generate_keypair();
let session_key = generate_keypair();

let mut client = connect_and_stream(port).await;
let mut client = connect_and_stream(addr).await;
let offer = client.receive_offer().await;

client
Expand Down Expand Up @@ -337,21 +337,21 @@ async fn stream_stop_if_client_attempts_to_initiliaze_2nd_session() {
async fn stream_stops_if_init_not_sent_within_timeout() {
let (beacon_client, _) = create_file_sink();
let (witness_client, _) = create_file_sink();
let port = get_port();
let addr = get_socket_addr().expect("socket addr");

LocalSet::new()
.run_until(async move {
tokio::task::spawn_local(async move {
let server =
create_test_server(port, beacon_client, witness_client, Some(500), None);
create_test_server(addr, beacon_client, witness_client, Some(500), None);
TaskManager::builder()
.add_task(server)
.build()
.start()
.await
});

let mut client = connect_and_stream(port).await;
let mut client = connect_and_stream(addr).await;
let _offer = client.receive_offer().await;

client.assert_closed().await;
Expand All @@ -363,21 +363,21 @@ async fn stream_stops_if_init_not_sent_within_timeout() {
async fn stream_stops_on_session_timeout() {
let (beacon_client, mut beacons) = create_file_sink();
let (witness_client, _) = create_file_sink();
let port = get_port();
let addr = get_socket_addr().expect("socket addr");

LocalSet::new()
.run_until(async move {
tokio::task::spawn_local(async move {
let server =
create_test_server(port, beacon_client, witness_client, Some(500), Some(900));
create_test_server(addr, beacon_client, witness_client, Some(500), Some(900));
TaskManager::builder()
.add_task(server)
.build()
.start()
.await
});

let mut client = connect_and_stream(port).await;
let mut client = connect_and_stream(addr).await;
let offer = client.receive_offer().await;

let pub_key = generate_keypair();
Expand Down Expand Up @@ -449,8 +449,8 @@ fn create_file_sink() -> (FileSinkClient, MockFileSinkReceiver) {
)
}

async fn connect_and_stream(port: u64) -> TestClient {
let mut client = (|| PocLoraClient::connect(format!("http://127.0.0.1:{port}")))
async fn connect_and_stream(socket_addr: SocketAddr) -> TestClient {
let mut client = (|| PocLoraClient::connect(format!("http://{socket_addr}")))
.retry(&ExponentialBuilder::default())
.await
.expect("client connect");
Expand Down Expand Up @@ -572,7 +572,7 @@ impl TestClient {
}

fn create_test_server(
port: u64,
socket_addr: SocketAddr,
beacon_file_sink: FileSinkClient,
witness_file_sink: FileSinkClient,
offer_timeout: Option<u64>,
Expand All @@ -584,7 +584,7 @@ fn create_test_server(
beacon_report_sink: beacon_file_sink,
witness_report_sink: witness_file_sink,
required_network: Network::MainNet,
address: SocketAddr::from_str(&format!("127.0.0.1:{port}")).expect("socket address"),
address: socket_addr,
session_key_offer_timeout: std::time::Duration::from_millis(offer_timeout),
session_key_timeout: std::time::Duration::from_millis(timeout),
}
Expand All @@ -598,6 +598,7 @@ fn seconds(s: u64) -> std::time::Duration {
std::time::Duration::from_secs(s)
}

fn get_port() -> u64 {
rand::thread_rng().gen_range(6000..10000)
fn get_socket_addr() -> anyhow::Result<SocketAddr> {
let listener = TcpListener::bind("127.0.0.1:0")?;
Ok(listener.local_addr()?)
}
3 changes: 1 addition & 2 deletions iot_config/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ impl Daemon {
// Create on-chain metadata pool
let metadata_pool = settings.metadata.connect("iot-config-metadata").await?;

let listen_addr = settings.listen_addr()?;

let (auth_updater, auth_cache) = AuthCache::new(settings.admin_pubkey()?, &pool).await?;
let (region_updater, region_map) = RegionMapReader::new(&pool).await?;
let (delegate_key_updater, delegate_key_cache) = org::delegate_keys_cache(&pool).await?;
Expand Down Expand Up @@ -104,6 +102,7 @@ impl Daemon {
region_updater,
)?;

let listen_addr = settings.listen;
let pubkey = settings
.signing_keypair()
.map(|keypair| keypair.public_key().to_string())?;
Expand Down
Loading
Loading