Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
sword-jin committed Jul 31, 2024
1 parent 68ded8b commit 8965351
Showing 1 changed file with 128 additions and 108 deletions.
236 changes: 128 additions & 108 deletions src/server/data_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,119 +83,130 @@ impl DataServer {
let seed = since_the_epoch.as_secs();
let mut rng = StdRng::seed_from_u64(seed);

while let Some(event) = receiver.recv().await {
match event.payload {
event::Payload::RegisterTcp { port } => {
let result: Result<(Available, TcpListener), tonic::Status> =
create_socket::<Tcp>(port, &mut this.port_manager.clone()).await;
match result {
Ok((available_port, listener)) => {
let cancel = event.close_listener;
let conn_event_chan = event.incoming_events;
event
.resp
.send(ClientEventResponse::registered(
this.entrypoint_config
.make_entrypoint(&event.payload, *available_port),
))
.unwrap(); // success
spawn(async move {
Tcp::new(listener, conn_event_chan.clone())
.serve(cancel)
.await;
info!(port = *available_port, "tcp server closed");
});
}
Err(status) => {
event
.resp
.send(ClientEventResponse::registered_failed(status))
.unwrap();
}
}
loop {
tokio::select! {
_ = shutdown.clone() => {
break;
}
event::Payload::RegisterUdp { port } => {
let result: Result<(Available, UdpSocket), tonic::Status> =
create_socket::<Udp>(port, &mut this.port_manager.clone()).await;

match result {
Ok((available_port, socket)) => {
let socket = socket;
let cancel = event.close_listener;
let conn_event_chan = event.incoming_events;
event
.resp
.send(ClientEventResponse::registered(
this.entrypoint_config
.make_entrypoint(&event.payload, *available_port),
))
.unwrap(); // success
spawn(async move {
Udp::new(socket, conn_event_chan.clone())
.serve(cancel)
.await;
info!(port = *available_port, "udp server closed");
});
event = receiver.recv() => {
let event = match event {
Some(event) => event,
None => break,
};
match event.payload {
event::Payload::RegisterTcp { port } => {
let result: Result<(Available, TcpListener), tonic::Status> =
create_socket::<Tcp>(port, &mut this.port_manager.clone()).await;
match result {
Ok((available_port, listener)) => {
let cancel = event.close_listener;
let conn_event_chan = event.incoming_events;
event
.resp
.send(ClientEventResponse::registered(
this.entrypoint_config
.make_entrypoint(&event.payload, *available_port),
))
.unwrap(); // success
spawn(async move {
Tcp::new(listener, conn_event_chan.clone())
.serve(cancel)
.await;
info!(port = *available_port, "tcp server closed");
});
}
Err(status) => {
event
.resp
.send(ClientEventResponse::registered_failed(status))
.unwrap();
}
}
}
Err(status) => {
event
.resp
.send(ClientEventResponse::registered_failed(status))
.unwrap();
event::Payload::RegisterUdp { port } => {
let result: Result<(Available, UdpSocket), tonic::Status> =
create_socket::<Udp>(port, &mut this.port_manager.clone()).await;

match result {
Ok((available_port, socket)) => {
let socket = socket;
let cancel = event.close_listener;
let conn_event_chan = event.incoming_events;
event
.resp
.send(ClientEventResponse::registered(
this.entrypoint_config
.make_entrypoint(&event.payload, *available_port),
))
.unwrap(); // success
spawn(async move {
Udp::new(socket, conn_event_chan.clone())
.serve(cancel)
.await;
info!(port = *available_port, "udp server closed");
});
}
Err(status) => {
event
.resp
.send(ClientEventResponse::registered_failed(status))
.unwrap();
}
}
}
}
}
event::Payload::RegisterHttp {
mut port,
mut subdomain,
domain,
random_subdomain,
} => {
let subdomain_c = subdomain.clone();
let domain_c = domain.clone();
let domain_c2 = domain.clone();
let resp_status = shutdown
.wrap_cancel(this.register_http(
event.close_listener.clone(),
event::Payload::RegisterHttp {
mut port,
mut subdomain,
domain,
&mut subdomain,
random_subdomain,
&mut port,
event.incoming_events,
&mut rng,
))
.await;
let this = Arc::clone(&this);
if let Ok(Some(status)) = resp_status {
event
.resp
.send(ClientEventResponse::registered_failed(status))
.unwrap();
} else {
// means register successfully
// listen the close_listener to cancel the unregister domain/subdomain.
let payload = Payload::RegisterHttp {
port,
subdomain,
domain: domain_c2,
random_subdomain,
};
event
.resp
.send(ClientEventResponse::registered(
this.entrypoint_config.make_entrypoint(&payload, port),
))
.unwrap();
} => {
let subdomain_c = subdomain.clone();
let domain_c = domain.clone();
let domain_c2 = domain.clone();
let resp_status = shutdown
.wrap_cancel(this.register_http(
event.close_listener.clone(),
domain,
&mut subdomain,
random_subdomain,
&mut port,
event.incoming_events,
&mut rng,
))
.await;
let this = Arc::clone(&this);
if let Ok(Some(status)) = resp_status {
event
.resp
.send(ClientEventResponse::registered_failed(status))
.unwrap();
} else {
// means register successfully
// listen the close_listener to cancel the unregister domain/subdomain.
let payload = Payload::RegisterHttp {
port,
subdomain,
domain: domain_c2,
random_subdomain,
};
event
.resp
.send(ClientEventResponse::registered(
this.entrypoint_config.make_entrypoint(&payload, port),
))
.unwrap();

tokio::spawn(async move {
event.close_listener.cancelled().await;
if !subdomain_c.is_empty() {
this.http_registry.unregister_subdomain(subdomain_c);
}
if !domain_c.is_empty() {
this.http_registry.unregister_domain(domain_c);
tokio::spawn(async move {
event.close_listener.cancelled().await;
if !subdomain_c.is_empty() {
this.http_registry.unregister_subdomain(subdomain_c);
}
if !domain_c.is_empty() {
this.http_registry.unregister_domain(domain_c);
}
});
}
});
}
}
}
}
Expand Down Expand Up @@ -296,24 +307,33 @@ impl DataServer {
#[cfg(test)]
mod test {
use async_shutdown::ShutdownManager;
use tokio::time::sleep;

use crate::debug;

use super::*;

#[tokio::test]
async fn test_cannot_listen_on_same_vhttp_port() {
debug::setup_logging(6669);
let (_t1, r1) = mpsc::channel(10);
let shutdown1 = ShutdownManager::new();
let signal1 = shutdown1.wait_shutdown_triggered();
let s1 = DataServer::new(3000, EntrypointConfig::default());
let s1 = DataServer::new(3100, EntrypointConfig::default());
let h1 = tokio::spawn(async move { s1.listen(signal1, r1).await });

sleep(std::time::Duration::from_millis(10)).await;

let (_t2, r2) = mpsc::channel(10);
let s2 = DataServer::new(3000, EntrypointConfig::default());
let s2 = DataServer::new(3100, EntrypointConfig::default());
let shutdown2 = ShutdownManager::new();
let h2 = s2.listen(shutdown2.wait_shutdown_triggered(), r2).await;
assert!(h2.is_err());
shutdown1.trigger_shutdown(()).unwrap();
let h1 = tokio::join!(h1);
assert!(h1.0.is_ok());
shutdown2.trigger_shutdown(()).unwrap();

sleep(std::time::Duration::from_secs(1000)).await;
}
}

0 comments on commit 8965351

Please sign in to comment.