From 21d50bd46931708f45882243e080cacbbec5cfab Mon Sep 17 00:00:00 2001 From: fang cheng Date: Fri, 6 Dec 2024 18:52:41 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0kcp=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 47 +++++ np_base/Cargo.toml | 1 + np_base/src/net/kcp_server.rs | 175 ++++++++++++++++++ np_base/src/net/mod.rs | 3 +- .../net/{tcp_session.rs => net_session.rs} | 11 ++ np_base/src/net/tcp_server.rs | 56 +++--- np_base/src/net/tls.rs | 26 +++ np_base/src/proxy/outlet.rs | 4 +- np_client/Cargo.toml | 1 + np_client/src/client.rs | 98 ++++++++-- np_client/src/main.rs | 53 +++++- np_client/src/winservice.rs | 1 + np_server/Cargo.toml | 1 + np_server/src/global/config.rs | 10 +- np_server/src/main.rs | 61 +++++- root-ca.key.pem | 28 +++ 16 files changed, 508 insertions(+), 68 deletions(-) create mode 100644 np_base/src/net/kcp_server.rs rename np_base/src/net/{tcp_session.rs => net_session.rs} (94%) create mode 100644 root-ca.key.pem diff --git a/Cargo.lock b/Cargo.lock index 5defc3f..17f9a57 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -677,6 +677,12 @@ version = "3.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" +[[package]] +name = "byte_string" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11aade7a05aa8c3a351cedc44c3fc45806430543382fcc4743a9b757a2a0b4ed" + [[package]] name = "bytecheck" version = "0.6.12" @@ -1243,6 +1249,17 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.72", +] + [[package]] name = "futures-sink" version = "0.3.30" @@ -1264,6 +1281,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-io", + "futures-macro", "futures-sink", "futures-task", "memchr", @@ -1580,6 +1598,17 @@ dependencies = [ "serde", ] +[[package]] +name = "kcp" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e387a924f42063d380be14857714a2f0c177e44dbeab8895244e5370d8a8e68" +dependencies = [ + "bytes", + "log", + "thiserror", +] + [[package]] name = "language-tags" version = "0.3.2" @@ -1782,6 +1811,7 @@ dependencies = [ "socket2", "tokio", "tokio-rustls", + "tokio_kcp", ] [[package]] @@ -1801,6 +1831,7 @@ dependencies = [ "socket2", "tokio", "tokio-rustls", + "tokio_kcp", "webpki-roots 0.22.6", "windows-service", ] @@ -1849,6 +1880,7 @@ dependencies = [ "socket2", "sqlx", "tokio", + "tokio_kcp", ] [[package]] @@ -3394,6 +3426,21 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio_kcp" +version = "0.10.0" +source = "git+https://github.com/tkzcfc/tokio_kcp.git#3ef0f5f40be764a780b7010e9a5fc4634f5fa923" +dependencies = [ + "byte_string", + "bytes", + "futures-util", + "kcp", + "log", + "rand 0.8.5", + "spin 0.9.8", + "tokio", +] + [[package]] name = "toml" version = "0.5.11" diff --git a/np_base/Cargo.toml b/np_base/Cargo.toml index d90bc75..4f87b0c 100644 --- a/np_base/Cargo.toml +++ b/np_base/Cargo.toml @@ -14,6 +14,7 @@ log = "0.4.0" async-trait = "0.1.75" byteorder = "1.5.0" anyhow = "1.0.79" +tokio_kcp = { git = "https://github.com/tkzcfc/tokio_kcp.git" } # 加密相关 rand = "0.8" diff --git a/np_base/src/net/kcp_server.rs b/np_base/src/net/kcp_server.rs new file mode 100644 index 0000000..64f3f71 --- /dev/null +++ b/np_base/src/net/kcp_server.rs @@ -0,0 +1,175 @@ +use crate::net::session_delegate::CreateSessionDelegateCallback; +use crate::net::{net_session, tls}; +use log::{debug, error}; +use log::{info, trace}; +use std::future::Future; +use std::sync::Arc; +use std::time::Duration; +use tokio::net::ToSocketAddrs; +use tokio::select; +use tokio::sync::{broadcast, mpsc}; +use tokio_kcp::{KcpConfig, KcpListener}; +use tokio_rustls::rustls::ServerConfig; +use tokio_rustls::TlsAcceptor; + +struct Server { + notify_shutdown: broadcast::Sender<()>, + shutdown_complete_tx: mpsc::Sender<()>, +} + +impl Server { + async fn start_server( + &self, + mut listener: KcpListener, + on_create_session_delegate_callback: CreateSessionDelegateCallback, + tls_configuration: Option, + ) -> anyhow::Result<()> { + let tls_acceptor: Option = match tls_configuration { + Some(tls_configuration) => { + let certs = super::tls::load_certs(&tls_configuration.certificate)?; + let keys = super::tls::load_private_key(&tls_configuration.key)?; + + let server_config = ServerConfig::builder() + .with_safe_defaults() + .with_no_client_auth() + .with_single_cert(certs, keys)?; + + Some(TlsAcceptor::from(Arc::new(server_config))) + } + None => None, + }; + + loop { + let (stream, addr) = listener.accept().await?; + + let tls_acceptor = tls_acceptor.clone(); + let delegate = on_create_session_delegate_callback(); + let shutdown = self.notify_shutdown.subscribe(); + let shutdown_complete = self.shutdown_complete_tx.clone(); + + // 新连接单独起一个异步任务处理 + tokio::spawn(async move { + trace!("KCP Server new connection: {}", addr); + + if let Some(tls_acceptor) = tls_acceptor { + match tls::try_tls(stream, tls_acceptor).await { + Ok(stream) => { + net_session::run( + net_session::create_session_id(), + addr, + delegate, + shutdown, + stream, + ) + .await; + } + Err(err) => { + debug!("KCP Server tls error: {err}"); + } + } + } else { + net_session::run( + net_session::create_session_id(), + addr, + delegate, + shutdown, + stream, + ) + .await; + } + + trace!("KCP Server disconnect: {}", addr); + // 反向通知此会话结束 + drop(shutdown_complete); + }); + } + } +} + +pub struct Builder { + create_session_delegate_callback: CreateSessionDelegateCallback, + kcp_config: KcpConfig, + tls_configuration: Option, +} + +impl Builder { + pub fn new(create_session_delegate_callback: CreateSessionDelegateCallback) -> Self { + Self { + create_session_delegate_callback, + kcp_config: KcpConfig::default(), + tls_configuration: None, + } + } + + pub fn set_kcp_config(mut self, config: KcpConfig) -> Self { + self.kcp_config = config; + self + } + + pub fn set_tls_configuration(mut self, certificate: A, key: A) -> Self { + self.tls_configuration = Some(tls::TlsConfiguration { + certificate: certificate.to_string(), + key: key.to_string(), + }); + self + } + + pub async fn build_with_listener( + self, + listener: KcpListener, + shutdown_condition: impl Future, + ) -> anyhow::Result<()> { + let (notify_shutdown, _) = broadcast::channel::<()>(1); + let (shutdown_complete_tx, mut shutdown_complete_rx) = mpsc::channel(1); + + let server = Server { + notify_shutdown, + shutdown_complete_tx, + }; + + select! { + res = server.start_server(listener, self.create_session_delegate_callback, self.tls_configuration) => { + if let Err(err) = res { + error!("KCP Server error: {}", err); + } + }, + _ = shutdown_condition => { + info!("KCP Server shutting down"); + } + } + + // 解构server中的变量 + let Server { + notify_shutdown, + shutdown_complete_tx, + } = server; + + // 销毁notify_shutdown 是为了触发 net_session run函数中shutdown.recv()返回 + drop(notify_shutdown); + // 此处必须将 shutdown_complete_tx 并销毁,否则会一直卡在shutdown_complete_rx.recv().await + drop(shutdown_complete_tx); + + // 等待服务器优雅退出任务 + let wait_task = async { + let _ = shutdown_complete_rx.recv().await; + }; + + // 设置超时时间,无法优雅退出则强制退出 + if let Err(_) = tokio::time::timeout(Duration::from_secs(600), wait_task).await { + error!("KCP Server exit timeout, forced exit"); + } + + info!("KCP Server shutdown finish"); + + Ok(()) + } + + pub async fn build( + self, + addr: A, + shutdown_condition: impl Future, + ) -> anyhow::Result<()> { + let listener = KcpListener::bind(self.kcp_config, &addr).await?; + self.build_with_listener(listener, shutdown_condition).await + } +} diff --git a/np_base/src/net/mod.rs b/np_base/src/net/mod.rs index 5cf9b49..d94204d 100644 --- a/np_base/src/net/mod.rs +++ b/np_base/src/net/mod.rs @@ -3,9 +3,10 @@ use std::net::SocketAddr; use std::pin::Pin; use std::time::Duration; +pub mod kcp_server; +pub mod net_session; pub mod session_delegate; pub mod tcp_server; -pub mod tcp_session; pub mod tls; pub mod udp_server; pub mod udp_session; diff --git a/np_base/src/net/tcp_session.rs b/np_base/src/net/net_session.rs similarity index 94% rename from np_base/src/net/tcp_session.rs rename to np_base/src/net/net_session.rs index 83dac4d..565e7ce 100644 --- a/np_base/src/net/tcp_session.rs +++ b/np_base/src/net/net_session.rs @@ -4,6 +4,7 @@ use anyhow::anyhow; use bytes::BytesMut; use log::{error, info}; use std::net::SocketAddr; +use std::sync::atomic::{AtomicU32, Ordering}; use tokio::io::{ AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufWriter, ReadHalf, WriteHalf, }; @@ -13,6 +14,16 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use tokio::task::yield_now; use tokio::time::sleep; +static SESSION_COUNTER: AtomicU32 = AtomicU32::new(0); +pub fn create_session_id() -> u32 { + loop { + let id = SESSION_COUNTER.fetch_add(1, Ordering::Relaxed); + if id > 0 { + return id; + } + } +} + /// run /// /// [`session_id`] 会话id diff --git a/np_base/src/net/tcp_server.rs b/np_base/src/net/tcp_server.rs index 140d826..62563d0 100644 --- a/np_base/src/net/tcp_server.rs +++ b/np_base/src/net/tcp_server.rs @@ -1,5 +1,5 @@ use crate::net::session_delegate::CreateSessionDelegateCallback; -use crate::net::tcp_session; +use crate::net::{net_session, tls}; use log::{debug, error}; use log::{info, trace}; use std::future::Future; @@ -9,7 +9,6 @@ use std::time::Duration; use tokio::net::{TcpListener, TcpStream, ToSocketAddrs}; use tokio::select; use tokio::sync::{broadcast, mpsc}; -use tokio::time::timeout; use tokio_rustls::rustls::ServerConfig; use tokio_rustls::TlsAcceptor; @@ -19,11 +18,6 @@ pub type StreamInitCallbackType = Arc< + Sync, >; -struct TlsConfiguration { - certificate: String, - key: String, -} - struct Server { notify_shutdown: broadcast::Sender<()>, shutdown_complete_tx: mpsc::Sender<()>, @@ -35,7 +29,7 @@ impl Server { listener: TcpListener, on_create_session_delegate_callback: CreateSessionDelegateCallback, on_stream_init_callback: Option, - tls_configuration: Option, + tls_configuration: Option, ) -> anyhow::Result<()> { let tls_acceptor: Option = match tls_configuration { Some(tls_configuration) => { @@ -52,7 +46,6 @@ impl Server { None => None, }; - let mut session_id_seed = 0; loop { let (mut stream, addr) = listener.accept().await?; @@ -68,9 +61,6 @@ impl Server { } } - session_id_seed += 1; - - let session_id = session_id_seed; let tls_acceptor = tls_acceptor.clone(); let delegate = on_create_session_delegate_callback(); let shutdown = self.notify_shutdown.subscribe(); @@ -81,17 +71,30 @@ impl Server { trace!("TCP Server new connection: {}", addr); if let Some(tls_acceptor) = tls_acceptor { - match Self::try_tls(stream, tls_acceptor).await { + match tls::try_tls(stream, tls_acceptor).await { Ok(stream) => { - tcp_session::run(session_id, addr, delegate, shutdown, stream).await; + net_session::run( + net_session::create_session_id(), + addr, + delegate, + shutdown, + stream, + ) + .await; } Err(err) => { - println!("TCP Server tls error: {err}"); debug!("TCP Server tls error: {err}"); } } } else { - tcp_session::run(session_id, addr, delegate, shutdown, stream).await; + net_session::run( + net_session::create_session_id(), + addr, + delegate, + shutdown, + stream, + ) + .await; } trace!("TCP Server disconnect: {}", addr); @@ -100,26 +103,11 @@ impl Server { }); } } - - const TIMEOUT_TLS: u64 = 15; - - // ref https://github.com/netskillzgh/rollo/blob/master/rollo/src/server/world_socket_mgr.rs#L183 - async fn try_tls( - socket: TcpStream, - tls_acceptor: TlsAcceptor, - ) -> anyhow::Result> { - let stream = timeout( - Duration::from_secs(Self::TIMEOUT_TLS), - tls_acceptor.accept(socket), - ) - .await??; - Ok(tokio_rustls::TlsStream::Server(stream)) - } } pub struct Builder { create_session_delegate_callback: CreateSessionDelegateCallback, - tls_configuration: Option, + tls_configuration: Option, steam_init_callback: Option, } @@ -141,7 +129,7 @@ impl Builder { } pub fn set_tls_configuration(mut self, certificate: A, key: A) -> Self { - self.tls_configuration = Some(TlsConfiguration { + self.tls_configuration = Some(tls::TlsConfiguration { certificate: certificate.to_string(), key: key.to_string(), }); @@ -178,7 +166,7 @@ impl Builder { shutdown_complete_tx, } = server; - // 销毁notify_shutdown 是为了触发 tcp_session run函数中shutdown.recv()返回 + // 销毁notify_shutdown 是为了触发 net_session run函数中shutdown.recv()返回 drop(notify_shutdown); // 此处必须将 shutdown_complete_tx 并销毁,否则会一直卡在shutdown_complete_rx.recv().await drop(shutdown_complete_tx); diff --git a/np_base/src/net/tls.rs b/np_base/src/net/tls.rs index 0c908b1..0476e35 100644 --- a/np_base/src/net/tls.rs +++ b/np_base/src/net/tls.rs @@ -1,7 +1,15 @@ use anyhow::anyhow; use std::fs::File; use std::io::BufReader; +use std::time::Duration; +use tokio::io::{AsyncRead, AsyncWrite}; use tokio_rustls::rustls::{Certificate, PrivateKey}; +use tokio_rustls::TlsAcceptor; + +pub struct TlsConfiguration { + pub certificate: String, + pub key: String, +} pub fn load_certs(path: &str) -> anyhow::Result> { let cert_file = File::open(path)?; @@ -25,3 +33,21 @@ pub fn load_private_key(path: &str) -> anyhow::Result { Err(anyhow!("The private key file ({path}) format is incorrect")) }; } + +const TIMEOUT_TLS: u64 = 15; + +// ref https://github.com/netskillzgh/rollo/blob/master/rollo/src/server/world_socket_mgr.rs#L183 +pub async fn try_tls( + stream: IO, + tls_acceptor: TlsAcceptor, +) -> anyhow::Result> +where + IO: AsyncRead + AsyncWrite + Unpin, +{ + let stream = tokio::time::timeout( + Duration::from_secs(TIMEOUT_TLS), + tls_acceptor.accept(stream), + ) + .await??; + Ok(tokio_rustls::TlsStream::Server(stream)) +} diff --git a/np_base/src/proxy/outlet.rs b/np_base/src/proxy/outlet.rs index 51e276a..4419edb 100644 --- a/np_base/src/proxy/outlet.rs +++ b/np_base/src/proxy/outlet.rs @@ -1,5 +1,5 @@ use crate::net::session_delegate::SessionDelegate; -use crate::net::{tcp_session, udp_session, SendMessageFuncType, WriterMessage}; +use crate::net::{net_session, udp_session, SendMessageFuncType, WriterMessage}; use crate::proxy::common::{InputSenderType, SessionCommonInfo}; use crate::proxy::crypto::get_method; use crate::proxy::inlet::InletProxyType; @@ -322,7 +322,7 @@ impl Outlet { let shutdown = self.receiver_shutdown.resubscribe(); tokio::spawn(async move { - tcp_session::run( + net_session::run( session_id, addr, Box::new(OutletSession::new( diff --git a/np_client/Cargo.toml b/np_client/Cargo.toml index 5f767c5..3247828 100644 --- a/np_client/Cargo.toml +++ b/np_client/Cargo.toml @@ -20,6 +20,7 @@ bytes = "1.5.0" byteorder = "1.5.0" socket2 = "0.5" once_cell = "1.19" +tokio_kcp = { git = "https://github.com/tkzcfc/tokio_kcp.git" } [target.'cfg(windows)'.dependencies] diff --git a/np_client/src/client.rs b/np_client/src/client.rs index f3c431b..1e87443 100644 --- a/np_client/src/client.rs +++ b/np_client/src/client.rs @@ -19,10 +19,11 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf}; -use tokio::net::TcpStream; +use tokio::net::{TcpStream, ToSocketAddrs}; use tokio::select; use tokio::sync::{Mutex, RwLock}; use tokio::time::{sleep, timeout, Instant}; +use tokio_kcp::{KcpConfig, KcpNoDelayConfig, KcpStream}; use tokio_rustls::rustls::client::ServerCertVerified; use tokio_rustls::rustls::{ClientConfig, OwnedTrustAnchor, RootCertStore, ServerName}; use tokio_rustls::{rustls, TlsConnector}; @@ -59,15 +60,54 @@ impl rustls::client::ServerCertVerifier for NoCertificateVerifier { } } -pub async fn run(common_args: &CommonArgs) -> anyhow::Result<()> { - info!("Start connecting to server {}", common_args.server); +async fn connect_with_tcp(addr: &str) -> anyhow::Result { + let stream = TcpStream::connect(&addr).await?; + info!("TCP successfully connected to serve {}", addr); - let stream = TcpStream::connect(&common_args.server).await?; let ka = TcpKeepalive::new().with_time(Duration::from_secs(30)); let sf = SockRef::from(&stream); sf.set_tcp_keepalive(&ka)?; + Ok(stream) +} - info!("Successful connection with server {}", common_args.server); +async fn connect_with_kcp(addr: &str) -> anyhow::Result { + let addrs = tokio::net::lookup_host(addr).await?; + + let mut last_err = None; + + for addr in addrs { + match KcpStream::connect( + &KcpConfig { + mtu: 1400, + nodelay: KcpNoDelayConfig::fastest(), + wnd_size: (1024, 1024), + session_expire: Some(Duration::from_secs(60)), + flush_write: false, + flush_acks_input: false, + stream: true, + allow_recv_empty_packet: false, + }, + addr, + ) + .await + { + Ok(stream) => { + info!("KCP successfully connected to serve {}", addr); + return Ok(stream); + } + Err(e) => last_err = Some(e), + } + } + + if let Some(err) = last_err { + Err(anyhow!(err)) + } else { + Err(anyhow!("could not resolve to any address")) + } +} + +pub async fn run(common_args: &CommonArgs, use_tcp: bool) -> anyhow::Result<()> { + info!("Start connecting to server {}", common_args.server); // 升级为TLS连接 if common_args.enable_tls { @@ -112,15 +152,33 @@ pub async fn run(common_args: &CommonArgs) -> anyhow::Result<()> { } let connector = TlsConnector::from(Arc::new(config)); - let stream = timeout( - Duration::from_secs(TIMEOUT_TLS), - connector.connect(ServerName::try_from(str_vec[0])?, stream), - ) - .await??; + if use_tcp { + let stream = timeout( + Duration::from_secs(TIMEOUT_TLS), + connector.connect( + ServerName::try_from(str_vec[0])?, + connect_with_tcp(&common_args.server).await?, + ), + ) + .await??; + + run_client(common_args, stream).await + } else { + let kcp_stream = connect_with_kcp(&common_args.server).await?; + let stream = timeout( + Duration::from_secs(TIMEOUT_TLS), + connector.connect(ServerName::try_from(str_vec[0])?, kcp_stream), + ) + .await??; - run_client(common_args, stream).await + run_client(common_args, stream).await + } } else { - run_client(common_args, stream).await + if use_tcp { + run_client(common_args, connect_with_tcp(&common_args.server).await?).await + } else { + run_client(common_args, connect_with_kcp(&common_args.server).await?).await + } } } @@ -163,6 +221,7 @@ where S: AsyncRead + AsyncWrite + Send + 'static, { const PING_INTERVAL: Duration = Duration::from_secs(5); + const PING_TIMEOUT: Duration = Duration::from_secs(15); loop { sleep(Duration::from_secs(1)).await; @@ -170,6 +229,10 @@ where continue; } + if last_active_time.read().await.elapsed() > PING_TIMEOUT { + return Err(anyhow!("ping timeout")); + } + // 获取当前时间 let now = SystemTime::now(); @@ -203,17 +266,16 @@ where let mut buffer = BytesMut::with_capacity(65536); loop { let len = reader.read_buf(&mut buffer).await?; - - if last_active_time.read().await.elapsed() >= WRITE_TIMEOUT { - let mut instant_write = last_active_time.write().await; - *instant_write = Instant::now(); - } - // len为0表示对端已经关闭连接。 if len == 0 { info!("Disconnect from the server"); break; } else { + if last_active_time.read().await.elapsed() >= WRITE_TIMEOUT { + let mut instant_write = last_active_time.write().await; + *instant_write = Instant::now(); + } + // 循环解包 loop { if buffer.is_empty() { diff --git a/np_client/src/main.rs b/np_client/src/main.rs index e546959..45c595c 100644 --- a/np_client/src/main.rs +++ b/np_client/src/main.rs @@ -12,6 +12,28 @@ mod client; #[cfg(windows)] mod winservice; +#[derive(clap::ValueEnum, Clone, Default, Debug)] +enum NetType { + /// use TCP connection + #[default] + Tcp, + /// use KCP connection + Kcp, + /// alternate between using TCP and KCP connections + Auto, +} + +impl NetType { + #[cfg(windows)] + fn to_string(&self) -> String { + match self { + NetType::Tcp => String::from("tcp"), + NetType::Kcp => String::from("kcp"), + NetType::Auto => String::from("auto"), + } + } +} + #[derive(Args)] pub(crate) struct CommonArgs { /// print backtracking information @@ -34,7 +56,7 @@ pub(crate) struct CommonArgs { #[arg(long, default_value = "false")] pub enable_tls: bool, - /// If true, the validity of the SSL certificate is not verified. + /// if true, the validity of the SSL certificate is not verified. #[arg(long, default_value = "false")] pub insecure: bool, @@ -49,6 +71,10 @@ pub(crate) struct CommonArgs { /// set log level #[arg(long, default_value = "error")] pub base_log_level: String, + + /// net type + #[clap(long, default_value_t, value_enum)] + pub net_type: NetType, } #[derive(Parser)] @@ -133,13 +159,28 @@ pub(crate) fn init_logger(common_args: &CommonArgs) -> anyhow::Result<()> { Ok(()) } +async fn run_and_handle_errors(common_args: &CommonArgs, is_tcp: bool) { + if let Err(err) = client::run(&common_args, is_tcp).await { + error!("{err}"); + sleep(Duration::from_secs(5)).await; + } else { + sleep(Duration::from_secs(1)).await; + } +} + pub(crate) async fn run_with_args(common_args: CommonArgs) -> anyhow::Result<()> { loop { - if let Err(err) = client::run(&common_args).await { - error!("{err}"); - sleep(Duration::from_secs(5)).await; - } else { - sleep(Duration::from_millis(100)).await; + match common_args.net_type { + NetType::Tcp => { + run_and_handle_errors(&common_args, true).await; + } + NetType::Kcp => { + run_and_handle_errors(&common_args, false).await; + } + NetType::Auto => { + run_and_handle_errors(&common_args, false).await; + run_and_handle_errors(&common_args, true).await; + } } } } diff --git a/np_client/src/winservice.rs b/np_client/src/winservice.rs index fb0dc02..f349feb 100644 --- a/np_client/src/winservice.rs +++ b/np_client/src/winservice.rs @@ -198,6 +198,7 @@ pub fn install_service(common_args: CommonArgs) -> anyhow::Result<()> { OsString::from(format!("--log-level={}", common_args.log_level)), OsString::from(format!("--base-log-level={}", common_args.base_log_level)), OsString::from(format!("--ca-cert={}", common_args.ca_cert)), + OsString::from(format!("--net-type={}", common_args.net_type.to_string())), ]; if common_args.enable_tls { diff --git a/np_server/Cargo.toml b/np_server/Cargo.toml index 2f1c2eb..bda47d8 100644 --- a/np_server/Cargo.toml +++ b/np_server/Cargo.toml @@ -32,6 +32,7 @@ actix-session = { version = "0.9", features = ["cookie-session"] } chrono = "0.4.31" md5 = "0.7" socket2 = "0.5" +tokio_kcp = { git = "https://github.com/tkzcfc/tokio_kcp.git" } [dev-dependencies] diff --git a/np_server/src/global/config.rs b/np_server/src/global/config.rs index b636237..cddf901 100644 --- a/np_server/src/global/config.rs +++ b/np_server/src/global/config.rs @@ -8,8 +8,12 @@ use std::io::BufReader; pub struct Config { /// 数据库地址 pub database_url: String, - /// 服务器监听地址 + /// tcp服务监听地址 + #[serde(default = "default_config_string_function")] pub listen_addr: String, + /// kcp服务监听地址 + #[serde(default = "default_config_string_function")] + pub kcp_listen_addr: String, /// 启用tls pub enable_tls: bool, /// tls证书 @@ -25,11 +29,11 @@ pub struct Config { /// web目录 pub web_base_dir: String, /// 非法流量转发地址 - #[serde(default = "default_illegal_traffic_forward")] + #[serde(default = "default_config_string_function")] pub illegal_traffic_forward: String, } -fn default_illegal_traffic_forward() -> String { +fn default_config_string_function() -> String { "".to_string() } diff --git a/np_server/src/main.rs b/np_server/src/main.rs index 35d495c..5a39401 100644 --- a/np_server/src/main.rs +++ b/np_server/src/main.rs @@ -10,13 +10,16 @@ use crate::global::opts::GLOBAL_OPTS; use crate::peer::Peer; use anyhow::anyhow; use log::info; +use np_base::net::kcp_server; use np_base::net::session_delegate::SessionDelegate; use np_base::net::tcp_server; use once_cell::sync::Lazy; use std::net::SocketAddr; +use std::time::Duration; use tokio::{select, signal}; +use tokio_kcp::{KcpConfig, KcpNoDelayConfig}; -pub async fn run_tcp_server() -> anyhow::Result<()> { +async fn run_tcp_server() -> anyhow::Result<()> { let mut builder = tcp_server::Builder::new(Box::new(|| -> Box { Box::new(Peer::new()) })); @@ -30,7 +33,57 @@ pub async fn run_tcp_server() -> anyhow::Result<()> { .await } -pub async fn run_web_server() -> anyhow::Result<()> { +async fn run_kcp_server() -> anyhow::Result<()> { + let mut builder = kcp_server::Builder::new(Box::new(|| -> Box { + Box::new(Peer::new()) + })) + .set_kcp_config(KcpConfig { + mtu: 1400, + nodelay: KcpNoDelayConfig::fastest(), + wnd_size: (1024, 1024), + session_expire: Some(Duration::from_secs(15)), + flush_write: false, + flush_acks_input: false, + stream: true, + allow_recv_empty_packet: false, + }); + + if GLOBAL_CONFIG.enable_tls { + builder = builder.set_tls_configuration(&GLOBAL_CONFIG.tls_cert, &GLOBAL_CONFIG.tls_key); + } + + builder + .build(GLOBAL_CONFIG.listen_addr.as_str(), signal::ctrl_c()) + .await +} + +async fn run_logic_server() -> anyhow::Result<()> { + match ( + !GLOBAL_CONFIG.kcp_listen_addr.is_empty(), + !GLOBAL_CONFIG.listen_addr.is_empty(), + ) { + (true, true) => { + select! { + res = run_tcp_server() => { + if let Err(e) = res { + return Err(anyhow!("TCP server error: {}", e)); + } + }, + res = run_kcp_server() => { + if let Err(e) = res { + return Err(anyhow!("KCP server error: {}", e)); + } + }, + } + Ok(()) + } + (false, true) => run_tcp_server().await, + (true, false) => run_kcp_server().await, + (false, false) => Err(anyhow!("No listening address configured")), + } +} + +async fn run_web_server() -> anyhow::Result<()> { info!("HttpServer listening: {}", GLOBAL_CONFIG.web_addr); let addr = GLOBAL_CONFIG.web_addr.parse::(); return match addr { @@ -49,12 +102,12 @@ pub async fn main() -> anyhow::Result<()> { || GLOBAL_CONFIG.web_password.is_empty() || GLOBAL_CONFIG.web_addr.is_empty() { - run_tcp_server().await + run_logic_server().await } else { let result: anyhow::Result<()>; select! { - r1 = run_tcp_server() => { result = r1 }, + r1 = run_logic_server() => { result = r1 }, r2 = run_web_server() => { result = r2 }, } diff --git a/root-ca.key.pem b/root-ca.key.pem new file mode 100644 index 0000000..4c43732 --- /dev/null +++ b/root-ca.key.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCykhHhnEevSzGL +rqIoieoW5576CPjgcsT9WCxVFy2QxrB2xkW4XrFio7CvzoRoCIgtCrXon+WM/1qm +V0wiIXb2xLc9/Xs+n+tUELQEKz3h39aREy/f5KrtgmLplE6BBiM7ZHRE7T3R31AH +OGyoA+BI1XcSnqK5y/+LSzdE6MJduAmgqSIWb+zN5Ox6LrjDeooxkElb6uw7o4/Z +FVv5yqMaKZmIvfZ0zG3zmdVf7mjo3LzCWuXjC4ugMD4MSItHaK+1UITiIWvo3Co0 +smLi0za3uLvPxhCxJ7aqZ2RAgHgt5HzqkS8QhjAFWnjz3yQyzo0cnfwPlWixdqr7 +DePYaXfpAgMBAAECggEALoVKXkXaEQtcXYEB4uDHGO9/pbEHsWWE92Fsr0BKESEK +IO9Xg4g72zA83BhHu1iZrW5QZMmfQmPWCGbOwUeJmBE4kdBDNtXvt2dgloNzbDBl +1Ea03rXSZ+zjeum1p/xi2lq5xakuxUnRrgffmsfjuuXNzw5/4zqHmRDf1bZzyHk1 +QNvoAGS6kYbE7AkaLjr1V+/sHjeJ2o0zyIIPGJEeTdvKvVkhXtk43DuZP98ieTjW +14fs75yj6P8d60l7PtOJkXn7KLq8VZc7w/lBXFPDl27PrPFhawK1bfgXYrA76fCk +j2MBwGB2rHiuAdGDhErNfiqNpiWtbHfazdjH1TEapwKBgQDqDk+EukySPWZ2Gate +6LpXM3/rEcVB1VKwTOvABqXoDFgL2WT2hV/2X9/ipT8mn8aInyioKMnPtE40ynpB +ZBh7XNF97DxRtYTFol/wNfHmopqQ5PSmUYkGGJtmq+BFRZEjkYh7e/3mdZNKEdbR +Elk4MUYWC1ZgdYQpGRCw1tDjQwKBgQDDUAd6y1SDHKNH68jSiy7cfHOsOAZIOaIJ +yE5KWb6C06uf6mGHLnEsDY+JtEiuCNUwUOLuTcYpJKRvQVHroY2J+9M3aRSUcv9T +6P6aoCX6ouFKV0LieiCbKbVMn+3w8kIdCLPg3SM4NQGwpgTLzSeGCkSNMGvPiD11 +UJVgzF5HYwKBgA5yNaw+h21bRtCxQueHu8CYGCQdI6ytiv4AsWhXwLOy11hb4aRp +lQOy5TGq+1vZ7roOAMdoRRSBz3XorWUN1Yj/sJpyE0MDzh8Yp40+9yBCFwIsb4E4 +dRmgOS88GoqMAapNWDwu6fvoc9/VfAnJdSJHwEiqCHnJpw+WUXZou9nbAoGBAK3W +XkHqFfd4ls/dyemzner7Dg2b8+N3AIVKdnxwskNJWPlFqLakHHDERRygjQvxqqaN +Z94I9TYLehvX/K0IJq1IqW8pJPsX+6/Ysjw4DHWvYWxEjLB9WOYh0k84DZUmWrfy +1f9vc/iEHVP4AE7Q87u3CdrM3ThjH6h6J+p3G/bXAoGAesf67SoeYAWbwiFxmayj +JSJe6bXPIU5DPSUQ/fJAOGCALYr7VKKyyX+bhPuth93Dju3AAsawNS/yDF/ksJE1 +5mbJ0vDauQQl0zX7GR8c5TGntiBh6iuPPUQ5ft3eJks0Jo2uMKJ3HQeMkFs0/q9/ +KHqVjsExKOIGWppeYQy1sBo= +-----END PRIVATE KEY-----