Skip to content

Commit

Permalink
添加kcp连接支持
Browse files Browse the repository at this point in the history
  • Loading branch information
tkzcfc committed Dec 6, 2024
1 parent e30788c commit 21d50bd
Show file tree
Hide file tree
Showing 16 changed files with 508 additions and 68 deletions.
47 changes: 47 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions np_base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ log = "0.4.0"
async-trait = "0.1.75"
byteorder = "1.5.0"
anyhow = "1.0.79"
tokio_kcp = { git = "https://github.com/tkzcfc/tokio_kcp.git" }

# 加密相关
rand = "0.8"
Expand Down
175 changes: 175 additions & 0 deletions np_base/src/net/kcp_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
use crate::net::session_delegate::CreateSessionDelegateCallback;
use crate::net::{net_session, tls};
use log::{debug, error};
use log::{info, trace};
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::ToSocketAddrs;
use tokio::select;
use tokio::sync::{broadcast, mpsc};
use tokio_kcp::{KcpConfig, KcpListener};
use tokio_rustls::rustls::ServerConfig;
use tokio_rustls::TlsAcceptor;

struct Server {
notify_shutdown: broadcast::Sender<()>,
shutdown_complete_tx: mpsc::Sender<()>,
}

impl Server {
async fn start_server(
&self,
mut listener: KcpListener,
on_create_session_delegate_callback: CreateSessionDelegateCallback,
tls_configuration: Option<tls::TlsConfiguration>,
) -> anyhow::Result<()> {
let tls_acceptor: Option<TlsAcceptor> = match tls_configuration {
Some(tls_configuration) => {
let certs = super::tls::load_certs(&tls_configuration.certificate)?;
let keys = super::tls::load_private_key(&tls_configuration.key)?;

let server_config = ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_single_cert(certs, keys)?;

Some(TlsAcceptor::from(Arc::new(server_config)))
}
None => None,
};

loop {
let (stream, addr) = listener.accept().await?;

let tls_acceptor = tls_acceptor.clone();
let delegate = on_create_session_delegate_callback();
let shutdown = self.notify_shutdown.subscribe();
let shutdown_complete = self.shutdown_complete_tx.clone();

// 新连接单独起一个异步任务处理
tokio::spawn(async move {
trace!("KCP Server new connection: {}", addr);

if let Some(tls_acceptor) = tls_acceptor {
match tls::try_tls(stream, tls_acceptor).await {
Ok(stream) => {
net_session::run(
net_session::create_session_id(),
addr,
delegate,
shutdown,
stream,
)
.await;
}
Err(err) => {
debug!("KCP Server tls error: {err}");
}
}
} else {
net_session::run(
net_session::create_session_id(),
addr,
delegate,
shutdown,
stream,
)
.await;
}

trace!("KCP Server disconnect: {}", addr);
// 反向通知此会话结束
drop(shutdown_complete);
});
}
}
}

pub struct Builder {
create_session_delegate_callback: CreateSessionDelegateCallback,
kcp_config: KcpConfig,
tls_configuration: Option<tls::TlsConfiguration>,
}

impl Builder {
pub fn new(create_session_delegate_callback: CreateSessionDelegateCallback) -> Self {
Self {
create_session_delegate_callback,
kcp_config: KcpConfig::default(),
tls_configuration: None,
}
}

pub fn set_kcp_config(mut self, config: KcpConfig) -> Self {
self.kcp_config = config;
self
}

pub fn set_tls_configuration<A: ToString>(mut self, certificate: A, key: A) -> Self {
self.tls_configuration = Some(tls::TlsConfiguration {
certificate: certificate.to_string(),
key: key.to_string(),
});
self
}

pub async fn build_with_listener(
self,
listener: KcpListener,
shutdown_condition: impl Future,
) -> anyhow::Result<()> {
let (notify_shutdown, _) = broadcast::channel::<()>(1);
let (shutdown_complete_tx, mut shutdown_complete_rx) = mpsc::channel(1);

let server = Server {
notify_shutdown,
shutdown_complete_tx,
};

select! {
res = server.start_server(listener, self.create_session_delegate_callback, self.tls_configuration) => {
if let Err(err) = res {
error!("KCP Server error: {}", err);
}
},
_ = shutdown_condition => {
info!("KCP Server shutting down");
}
}

// 解构server中的变量
let Server {
notify_shutdown,
shutdown_complete_tx,
} = server;

// 销毁notify_shutdown 是为了触发 net_session run函数中shutdown.recv()返回
drop(notify_shutdown);
// 此处必须将 shutdown_complete_tx 并销毁,否则会一直卡在shutdown_complete_rx.recv().await
drop(shutdown_complete_tx);

// 等待服务器优雅退出任务
let wait_task = async {
let _ = shutdown_complete_rx.recv().await;
};

// 设置超时时间,无法优雅退出则强制退出
if let Err(_) = tokio::time::timeout(Duration::from_secs(600), wait_task).await {
error!("KCP Server exit timeout, forced exit");
}

info!("KCP Server shutdown finish");

Ok(())
}

pub async fn build<A: ToSocketAddrs>(
self,
addr: A,
shutdown_condition: impl Future,
) -> anyhow::Result<()> {
let listener = KcpListener::bind(self.kcp_config, &addr).await?;
self.build_with_listener(listener, shutdown_condition).await
}
}
3 changes: 2 additions & 1 deletion np_base/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use std::net::SocketAddr;
use std::pin::Pin;
use std::time::Duration;

pub mod kcp_server;
pub mod net_session;
pub mod session_delegate;
pub mod tcp_server;
pub mod tcp_session;
pub mod tls;
pub mod udp_server;
pub mod udp_session;
Expand Down
11 changes: 11 additions & 0 deletions np_base/src/net/tcp_session.rs → np_base/src/net/net_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use anyhow::anyhow;
use bytes::BytesMut;
use log::{error, info};
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU32, Ordering};
use tokio::io::{
AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufWriter, ReadHalf, WriteHalf,
};
Expand All @@ -13,6 +14,16 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use tokio::task::yield_now;
use tokio::time::sleep;

static SESSION_COUNTER: AtomicU32 = AtomicU32::new(0);
pub fn create_session_id() -> u32 {
loop {
let id = SESSION_COUNTER.fetch_add(1, Ordering::Relaxed);
if id > 0 {
return id;
}
}
}

/// run
///
/// [`session_id`] 会话id
Expand Down
Loading

0 comments on commit 21d50bd

Please sign in to comment.