Skip to content

Commit

Permalink
[#106] Fixed bug, should not hold UDP connections forever
Browse files Browse the repository at this point in the history
  • Loading branch information
zonyitoo committed May 9, 2018
1 parent 42d51ba commit ca85f75
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 1 deletion.
27 changes: 27 additions & 0 deletions src/relay/udprelay/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ use std::io::{self, Cursor, ErrorKind, Read};
use std::net::SocketAddr;
use std::net::{IpAddr, Ipv4Addr};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

use futures::{self, Future, Stream};

use tokio;
use tokio::net::UdpSocket;
use tokio::util::FutureExt;
use tokio_io::IoFuture;

use config::{Config, ServerAddr, ServerConfig};
Expand Down Expand Up @@ -47,6 +49,7 @@ fn listen(config: Arc<Config>, l: UdpSocket) -> IoFuture<()> {
let svr_cfg_cloned_cloned = svr_cfg.clone();
let socket = socket.clone();
let config = config.clone();
let timeout = *svr_cfg.timeout();

let rel = futures::lazy(|| UdpAssociateHeader::read_from(Cursor::new(pkt)))
.map_err(From::from)
Expand Down Expand Up @@ -82,12 +85,36 @@ fn listen(config: Arc<Config>, l: UdpSocket) -> IoFuture<()> {
})
.and_then(move |(remote_udp, remote_addr, payload, addr)| {
info!("UDP ASSOCIATE {} -> {}, payload length {} bytes", src, addr, payload.len());
let to = timeout.unwrap_or(Duration::from_secs(5));
let caddr = addr.clone();
remote_udp.send_dgram(payload, &remote_addr)
.deadline(Instant::now() + to)
.map_err(move |err| {
match err.into_inner() {
Some(e) => e,
None => {
error!("Udp associate sending datagram {} -> {} timed out in {:?}", src, caddr, to);
io::Error::new(io::ErrorKind::TimedOut, "udp send timed out")
}
}
})
.map(|(remote_udp, _)| (remote_udp, addr))
})
.and_then(move |(remote_udp, addr)| {
let buf = vec![0u8; MAXIMUM_UDP_PAYLOAD_SIZE];
let to = timeout.unwrap_or(Duration::from_secs(5));
let caddr = addr.clone();
remote_udp.recv_dgram(buf)
.deadline(Instant::now() + to)
.map_err(move |err| {
match err.into_inner() {
Some(e) => e,
None => {
error!("Udp associate waiting datagram {} <- {} timed out in {:?}", src, caddr, to);
io::Error::new(io::ErrorKind::TimedOut, "udp recv timed out")
}
}
})
.and_then(move |(_remote_udp, buf, n, _from)| {
let svr_cfg = svr_cfg_cloned;
decrypt_payload(svr_cfg.method(), svr_cfg.key(), &buf[..n])
Expand Down
30 changes: 29 additions & 1 deletion src/relay/udprelay/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
use std::io::{self, Cursor, ErrorKind};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

use futures::{self, Future, Stream};

use tokio;
use tokio::net::UdpSocket;
use tokio::util::FutureExt;
use tokio_io::IoFuture;

use config::{Config, ServerConfig};
Expand Down Expand Up @@ -50,6 +52,7 @@ fn listen(config: Arc<Config>, svr_cfg: Arc<ServerConfig>) -> IoFuture<()> {
let svr_cfg_cloned = svr_cfg.clone();
let socket = socket.clone();
let config = config.clone();
let timeout = *svr_cfg.timeout();
let rel = futures::lazy(move || decrypt_payload(svr_cfg.method(), svr_cfg.key(), &pkt))
.and_then(move |payload| {
// Read Address in the front (ShadowSocks protocol)
Expand All @@ -75,9 +78,21 @@ fn listen(config: Arc<Config>, svr_cfg: Arc<ServerConfig>) -> IoFuture<()> {
.map(|(remote_udp, _)| (remote_udp, addr))
})
})
.and_then(|(remote_udp, addr)| {
.and_then(move |(remote_udp, addr)| {
let buf = vec![0u8; MAXIMUM_UDP_PAYLOAD_SIZE];
let to = timeout.unwrap_or(Duration::from_secs(5));
let caddr = addr.clone();
remote_udp.recv_dgram(buf)
.deadline(Instant::now() + to)
.map_err(move |err| {
match err.into_inner() {
Some(e) => e,
None => {
error!("Udp associate waiting datagram {} -> {} timed out in {:?}", src, caddr, to);
io::Error::new(io::ErrorKind::TimedOut, "udp recv timed out")
}
}
})
.and_then(|(_remote_udp, buf, n, _from)| {
let svr_cfg = svr_cfg_cloned;

Expand All @@ -89,7 +104,20 @@ fn listen(config: Arc<Config>, svr_cfg: Arc<ServerConfig>) -> IoFuture<()> {
})
.and_then(move |(buf, addr)| {
info!("UDP ASSOCIATE {} <- {}, payload length {} bytes", src, addr, buf.len());

let to = timeout.unwrap_or(Duration::from_secs(5));
let caddr = addr.clone();
SendDgramRc::new(socket, buf, src)
.deadline(Instant::now() + to)
.map_err(move |err| {
match err.into_inner() {
Some(e) => e,
None => {
error!("Udp associate sending datagram {} <- {} timed out in {:?}", src, caddr, to);
io::Error::new(io::ErrorKind::TimedOut, "udp recv timed out")
}
}
})
})
.map(|_| ());

Expand Down

0 comments on commit ca85f75

Please sign in to comment.