Skip to content

Commit

Permalink
添加Ping操作,设置TcpKeepalive
Browse files Browse the repository at this point in the history
  • Loading branch information
tkzcfc committed Jul 24, 2024
1 parent 0bbe581 commit 8d4f784
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 31 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion np_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ anyhow = "1.0.86"
log = "0.4.20"
flexi_logger = { version = "0.27.3", features = ["async", "dont_minimize_extra_stacks"] }
bytes = "1.5.0"
byteorder = "1.5.0"
byteorder = "1.5.0"
socket2 = "0.5"
94 changes: 64 additions & 30 deletions np_client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,20 @@ use np_base::proxy::outlet::Outlet;
use np_base::proxy::{OutputFuncType, ProxyMessage};
use np_proto::class_def::{Tunnel, TunnelPoint};
use np_proto::client_server::LoginReq;
use np_proto::generic::{
I2oConnect, I2oDisconnect, I2oSendData, O2iConnect, O2iDisconnect, O2iRecvData,
};
use np_proto::generic::{I2oConnect, I2oDisconnect, I2oSendData, O2iConnect, O2iDisconnect, O2iRecvData, Ping};
use np_proto::message_map::{encode_raw_message, get_message_id, get_message_size, MessageType};
use np_proto::server_client::ModifyTunnelNtf;
use np_proto::{generic, message_map};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::io::{AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf};
use tokio::net::TcpStream;
use tokio::select;
use tokio::sync::{Mutex, RwLock};
use tokio::time::sleep;
use socket2::{SockRef, TcpKeepalive};


type WriterType = Arc<Mutex<WriteHalf<TcpStream>>>;

Expand All @@ -34,13 +37,19 @@ struct Client {
}

pub async fn run(opts: &Opts) -> anyhow::Result<()> {
info!("Start connecting to server {}", opts.server);
let stream = TcpStream::connect(&opts.server).await?;
let ka = TcpKeepalive::new().with_time(std::time::Duration::from_secs(30));
let sf = SockRef::from(&stream);
sf.set_tcp_keepalive(&ka)?;
info!("Successful connection with server {}", opts.server);

let (reader, writer) = tokio::io::split(stream);

let writer = Arc::new(Mutex::new(writer));

let mut client = Client {
writer: Arc::new(Mutex::new(writer)),
writer: writer.clone(),
username: opts.username.clone(),
password: opts.password.clone(),
player_id: 0u32,
Expand All @@ -50,11 +59,35 @@ pub async fn run(opts: &Opts) -> anyhow::Result<()> {
};

client.send_login().await?;
let result = client.run(reader).await;
let result;
select! {
r1= client.run(reader) => { result = r1 },
r2= ping_forever(writer) => { result = r2 },
}
client.sync_tunnels(&Vec::new()).await;
result
}

async fn ping_forever(writer: WriterType) -> anyhow::Result<()> {
loop {
sleep(Duration::from_secs(5)).await;

// 获取当前时间
let now = SystemTime::now();

// 计算自UNIX_EPOCH以来的持续时间
let since_epoch = now.duration_since(UNIX_EPOCH)
.expect("Time went backwards");

// 将时间转换为毫秒
let nanos = since_epoch.as_millis();

package_and_send_message(writer.clone(), -2, &MessageType::GenericPing(Ping{
ticks: nanos as i64
})).await?;
}
}

impl Client {
async fn run(&mut self, mut reader: ReadHalf<TcpStream>) -> anyhow::Result<()> {
let mut buffer = BytesMut::with_capacity(1024);
Expand Down Expand Up @@ -88,7 +121,7 @@ impl Client {

async fn send_login(&self) -> anyhow::Result<()> {
info!("Start Login");
Self::package_and_send_message(
package_and_send_message(
self.writer.clone(),
-1,
&MessageType::ClientServerLoginReq(LoginReq {
Expand Down Expand Up @@ -392,35 +425,12 @@ impl Client {
match message {
MessageType::None => {}
_ => {
let _ = Self::package_and_send_message(writer, 0, &message).await;
let _ = package_and_send_message(writer, 0, &message).await;
}
}
}
}

#[inline]
pub(crate) async fn package_and_send_message(
writer: WriterType,
serial: i32,
message: &MessageType,
) -> anyhow::Result<()> {
if let Some(message_id) = get_message_id(message) {
let message_size = get_message_size(message);
let mut buf = Vec::with_capacity(message_size + 14);

byteorder::WriteBytesExt::write_u8(&mut buf, 33u8)?;
byteorder::WriteBytesExt::write_u32::<BigEndian>(&mut buf, (8 + message_size) as u32)?;
byteorder::WriteBytesExt::write_i32::<BigEndian>(&mut buf, serial)?;
byteorder::WriteBytesExt::write_u32::<BigEndian>(&mut buf, message_id)?;
encode_raw_message(message, &mut buf);

writer.lock().await.write_all(&buf).await?;
Ok(())
} else {
Err(anyhow!("Message id not found"))
}
}

// 收到玩家向服务器推送消息
pub(crate) async fn handle_push(&mut self, message: MessageType) -> anyhow::Result<()> {
match message {
Expand Down Expand Up @@ -551,6 +561,30 @@ impl Client {
}
}


#[inline]
pub(crate) async fn package_and_send_message(
writer: WriterType,
serial: i32,
message: &MessageType,
) -> anyhow::Result<()> {
if let Some(message_id) = get_message_id(message) {
let message_size = get_message_size(message);
let mut buf = Vec::with_capacity(message_size + 14);

byteorder::WriteBytesExt::write_u8(&mut buf, 33u8)?;
byteorder::WriteBytesExt::write_u32::<BigEndian>(&mut buf, (8 + message_size) as u32)?;
byteorder::WriteBytesExt::write_i32::<BigEndian>(&mut buf, serial)?;
byteorder::WriteBytesExt::write_u32::<BigEndian>(&mut buf, message_id)?;
encode_raw_message(message, &mut buf);

writer.lock().await.write_all(&buf).await?;
Ok(())
} else {
Err(anyhow!("Message id not found"))
}
}

/// 数据粘包处理
///
/// 注意:这个函数只能使用消耗 buffer 数据的函数,否则框架会一直循环调用本函数来驱动处理消息
Expand Down

0 comments on commit 8d4f784

Please sign in to comment.