diff --git a/src/server/acceptor.rs b/src/server/acceptor.rs index a4d8c5b12c..38e0652d9b 100644 --- a/src/server/acceptor.rs +++ b/src/server/acceptor.rs @@ -1,13 +1,13 @@ use std::thread::{self, JoinGuard}; -use std::sync::Arc; use std::sync::mpsc; +use std::collections::VecMap; use net::NetworkAcceptor; pub struct AcceptorPool { acceptor: A } -impl AcceptorPool { +impl<'a, A: NetworkAcceptor + 'a> AcceptorPool { /// Create a thread pool to manage the acceptor. pub fn new(acceptor: A) -> AcceptorPool { AcceptorPool { acceptor: acceptor } @@ -18,33 +18,39 @@ impl AcceptorPool { /// ## Panics /// /// Panics if threads == 0. - pub fn accept(self, work: F, threads: usize) -> JoinGuard<'static, ()> - where F: Fn(A::Stream) + Send + Sync + 'static { + pub fn accept(self, work: F, threads: usize) + where F: Fn(A::Stream) + Send + Sync + 'a { assert!(threads != 0, "Can't accept on 0 threads."); - // Replace with &F when Send changes land. - let work = Arc::new(work); - let (super_tx, supervisor_rx) = mpsc::channel(); - let spawn = - move || spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone()); + let counter = &mut 0; + let work = &work; + let mut spawn = move || { + let id = *counter; + let guard = spawn_with(super_tx.clone(), work, self.acceptor.clone(), id); + *counter += 1; + (id, guard) + }; // Go - for _ in 0..threads { spawn() } + let mut guards: VecMap<_> = (0..threads).map(|_| spawn()).collect(); - // Spawn the supervisor - thread::scoped(move || for () in supervisor_rx.iter() { spawn() }) + for id in supervisor_rx.iter() { + guards.remove(&id); + let (id, guard) = spawn(); + guards.insert(id, guard); + } } } -fn spawn_with(supervisor: mpsc::Sender<()>, work: Arc, mut acceptor: A) -where A: NetworkAcceptor + 'static, - F: Fn(::Stream) + Send + Sync + 'static { +fn spawn_with<'a, A, F>(supervisor: mpsc::Sender, work: &'a F, mut acceptor: A, id: usize) -> JoinGuard<'a, ()> +where A: NetworkAcceptor + 'a, + F: Fn(::Stream) + Send + Sync + 'a { use std::old_io::EndOfFile; - thread::spawn(move || { - let sentinel = Sentinel::new(supervisor, ()); + thread::scoped(move || { + let sentinel = Sentinel::new(supervisor, id); loop { match acceptor.accept() { @@ -60,7 +66,7 @@ where A: NetworkAcceptor + 'static, } } } - }); + }) } struct Sentinel { diff --git a/src/server/mod.rs b/src/server/mod.rs index bbb4a4c626..169979d4a4 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -2,7 +2,7 @@ use std::old_io::{Listener, BufferedReader, BufferedWriter}; use std::old_io::net::ip::{IpAddr, Port, SocketAddr}; use std::os; -use std::thread::JoinGuard; +use std::thread::{self, JoinGuard}; pub use self::request::Request; pub use self::response::Response; @@ -77,8 +77,10 @@ S: NetworkStream + Clone + Send> Server { let pool = AcceptorPool::new(acceptor.clone()); let work = move |stream| handle_connection(stream, &handler); + let guard = thread::scoped(move || pool.accept(work, threads)); + Ok(Listening { - _guard: pool.accept(work, threads), + _guard: guard, socket: socket, acceptor: acceptor })