Skip to content

Commit

Permalink
Retrieve tcp port from server (#331)
Browse files Browse the repository at this point in the history
* ServerHandle can now retrieve the local_addr

* use ServerHandle::local_addr and port 0 in the perf example
  • Loading branch information
jadamcrain authored Dec 21, 2023
1 parent 386339b commit 1ee7b36
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 24 deletions.
27 changes: 12 additions & 15 deletions dnp3/examples/perf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@ enum Action {
#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
struct Cli {
#[clap(short, long, value_parser, default_value_t = 20000)]
port: u16,
#[clap(long, value_parser, default_value_t = 1)]
sessions: u16,
sessions: usize,
#[clap(short, long, value_parser, default_value_t = 100)]
values: usize,
#[clap(long, value_parser, default_value_t = 10)]
Expand Down Expand Up @@ -77,9 +75,7 @@ async fn run(args: Cli) {
.init();
}

let port_range: std::ops::Range<u16> = args.port..args.port + args.sessions;

let mut harness = TestHarness::create(port_range, config).await;
let mut harness = TestHarness::create(args.sessions, config).await;
println!("settings: {:?}", args);

println!("starting up...");
Expand Down Expand Up @@ -123,10 +119,10 @@ struct TestHarness {
}

impl TestHarness {
async fn create(ports: std::ops::Range<u16>, config: TestConfig) -> Self {
async fn create(count: usize, config: TestConfig) -> Self {
let mut pairs = Vec::new();
for port in ports {
pairs.push(Pair::spawn(port, config).await)
for _ in 0..count {
pairs.push(Pair::spawn(config).await)
}
Self { pairs }
}
Expand Down Expand Up @@ -222,7 +218,7 @@ struct Pair {
}

impl Pair {
const LOCALHOST: std::net::IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::LOCALHOST);

fn update_values(&mut self) {
self.outstation.transaction(|db| {
Expand All @@ -240,9 +236,10 @@ impl Pair {
assert_eq!(self.rx.recv().await.unwrap(), self.values.len());
}

async fn spawn(port: u16, config: TestConfig) -> Self {
let (server, outstation) = Self::spawn_outstation(port, config).await;
let (master, assoc, measurements, rx) = Self::spawn_master(port, config).await;
async fn spawn(config: TestConfig) -> Self {
let (server, outstation) = Self::spawn_outstation(config).await;
let assigned_port = server.local_addr().unwrap().port();
let (master, assoc, measurements, rx) = Self::spawn_master(assigned_port, config).await;

Self {
values: measurements,
Expand All @@ -254,9 +251,9 @@ impl Pair {
}
}

async fn spawn_outstation(port: u16, config: TestConfig) -> (ServerHandle, OutstationHandle) {
async fn spawn_outstation(config: TestConfig) -> (ServerHandle, OutstationHandle) {
let mut server =
Server::new_tcp_server(LinkErrorMode::Close, SocketAddr::new(Self::LOCALHOST, port));
Server::new_tcp_server(LinkErrorMode::Close, SocketAddr::new(Self::LOCALHOST, 0));
let outstation = server
.add_outstation(
Self::get_outstation_config(config.outstation_level),
Expand Down
28 changes: 19 additions & 9 deletions dnp3/src/tcp/outstation.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::net::SocketAddr;
use tracing::Instrument;

use crate::app::{ConnectStrategy, Listener, Shutdown};
Expand Down Expand Up @@ -71,16 +72,26 @@ struct OutstationInfo {
pub struct Server {
link_error_mode: LinkErrorMode,
connection_id: u64,
address: std::net::SocketAddr,
address: SocketAddr,
outstations: Vec<OutstationInfo>,
connection_handler: ServerConnectionHandler,
}

/// Handle to a running server. Dropping the handle, shuts down the server.
pub struct ServerHandle {
addr: Option<SocketAddr>,
_tx: tokio::sync::oneshot::Sender<()>,
}

impl ServerHandle {
/// Returns the local address to which this server is bound.
///
/// This can be useful, for example, when binding to port 0 to figure out which port was actually bound.
pub fn local_addr(&self) -> Option<SocketAddr> {
self.addr
}
}

enum ServerConnectionHandler {
Tcp,
#[cfg(feature = "tls")]
Expand All @@ -100,7 +111,7 @@ impl ServerConnectionHandler {
impl Server {
/// create a TCP server builder object that will eventually be bound
/// to the specified address
pub fn new_tcp_server(link_error_mode: LinkErrorMode, address: std::net::SocketAddr) -> Self {
pub fn new_tcp_server(link_error_mode: LinkErrorMode, address: SocketAddr) -> Self {
Self {
link_error_mode,
connection_id: 0,
Expand All @@ -114,7 +125,7 @@ impl Server {
#[cfg(feature = "tls")]
pub fn new_tls_server(
link_error_mode: LinkErrorMode,
address: std::net::SocketAddr,
address: SocketAddr,
tls_config: crate::tcp::tls::TlsServerConfig,
) -> Self {
Self {
Expand Down Expand Up @@ -206,6 +217,8 @@ impl Server {
) -> Result<(ServerHandle, impl std::future::Future<Output = Shutdown>), tokio::io::Error> {
let listener = tokio::net::TcpListener::bind(self.address).await?;

let addr = listener.local_addr().ok();

let (tx, rx) = tokio::sync::oneshot::channel();

let task = async move {
Expand All @@ -215,7 +228,7 @@ impl Server {
.await
};

let handle = ServerHandle { _tx: tx };
let handle = ServerHandle { addr, _tx: tx };

Ok((handle, task))
}
Expand All @@ -224,6 +237,7 @@ impl Server {
/// task onto the Tokio runtime. Returns a ServerHandle that will shut down the server and all
/// associated outstations when dropped.
///
///
/// This must be called from within the Tokio runtime
pub async fn bind(self) -> Result<ServerHandle, tokio::io::Error> {
let (handle, future) = self.bind_no_spawn().await?;
Expand Down Expand Up @@ -279,11 +293,7 @@ impl Server {
}
}

async fn process_connection(
&mut self,
stream: tokio::net::TcpStream,
addr: std::net::SocketAddr,
) {
async fn process_connection(&mut self, stream: tokio::net::TcpStream, addr: SocketAddr) {
let id = self.connection_id;
self.connection_id = self.connection_id.wrapping_add(1);

Expand Down

0 comments on commit 1ee7b36

Please sign in to comment.