From e864956734af72bab07a3e01c9665bc1b7c96e5e Mon Sep 17 00:00:00 2001 From: Jonathan Reem Date: Wed, 15 Apr 2015 12:32:16 -0700 Subject: [PATCH] fix(server): Use thread::spawn instead of thread::scoped. --- .travis.yml | 4 ++-- src/server/listener.rs | 45 +++++++++++++++++++----------------------- src/server/mod.rs | 9 ++++++--- 3 files changed, 28 insertions(+), 30 deletions(-) diff --git a/.travis.yml b/.travis.yml index f10d54bf0e..0c15e0c9df 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,8 +6,8 @@ cache: - target script: - - cargo build - - cargo test + - cargo build --features nightly + - cargo test --features nightly - cargo bench --features nightly after_success: | diff --git a/src/server/listener.rs b/src/server/listener.rs index b258dcd768..7c4463f1d3 100644 --- a/src/server/listener.rs +++ b/src/server/listener.rs @@ -1,12 +1,13 @@ -use std::thread::{self, JoinGuard}; -use std::sync::mpsc; +use std::sync::{Arc, mpsc}; +use std::thread; + use net::NetworkListener; pub struct ListenerPool { acceptor: A } -impl<'a, A: NetworkListener + Send + 'a> ListenerPool { +impl ListenerPool { /// Create a thread pool to manage the acceptor. pub fn new(acceptor: A) -> ListenerPool { ListenerPool { acceptor: acceptor } @@ -18,31 +19,32 @@ impl<'a, A: NetworkListener + Send + 'a> ListenerPool { /// /// Panics if threads == 0. pub fn accept(self, work: F, threads: usize) - where F: Fn(A::Stream) + Send + Sync + 'a { + where F: Fn(A::Stream) + Send + Sync + 'static { assert!(threads != 0, "Can't accept on 0 threads."); let (super_tx, supervisor_rx) = mpsc::channel(); - let work = &work; - let spawn = move |id| { - spawn_with(super_tx.clone(), work, self.acceptor.clone(), id) - }; + let work = Arc::new(work); - // Go - let mut guards: Vec<_> = (0..threads).map(|id| spawn(id)).collect(); + // Begin work. + for _ in (0..threads) { + spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone()) + } - for id in supervisor_rx.iter() { - guards[id] = spawn(id); + // Monitor for panics. + // FIXME(reem): This won't ever exit since we still have a super_tx handle. + for _ in supervisor_rx.iter() { + spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone()); } } } -fn spawn_with<'a, A, F>(supervisor: mpsc::Sender, work: &'a F, mut acceptor: A, id: usize) -> thread::JoinGuard<'a, ()> -where A: NetworkListener + Send + 'a, - F: Fn(::Stream) + Send + Sync + 'a { +fn spawn_with(supervisor: mpsc::Sender<()>, work: Arc, mut acceptor: A) +where A: NetworkListener + Send + 'static, + F: Fn(::Stream) + Send + Sync + 'static { - thread::scoped(move || { - let _sentinel = Sentinel::new(supervisor, id); + thread::spawn(move || { + let _sentinel = Sentinel::new(supervisor, ()); loop { match acceptor.accept() { @@ -52,13 +54,12 @@ where A: NetworkListener + Send + 'a, } } } - }) + }); } struct Sentinel { value: Option, supervisor: mpsc::Sender, - //active: bool } impl Sentinel { @@ -66,18 +67,12 @@ impl Sentinel { Sentinel { value: Some(data), supervisor: channel, - //active: true } } - - //fn cancel(mut self) { self.active = false; } } impl Drop for Sentinel { fn drop(&mut self) { - // If we were cancelled, get out of here. - //if !self.active { return; } - // Respawn ourselves let _ = self.supervisor.send(self.value.take().unwrap()); } diff --git a/src/server/mod.rs b/src/server/mod.rs index 61e9ade6fe..f660f5a177 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -3,7 +3,7 @@ use std::io::{BufWriter, Write}; use std::marker::PhantomData; use std::net::{SocketAddr, ToSocketAddrs}; use std::path::Path; -use std::thread::{self, JoinGuard}; +use std::thread::{self, JoinHandle}; use num_cpus; @@ -104,7 +104,7 @@ S: NetworkStream + Clone + Send> Server<'a, H, L> { let pool = ListenerPool::new(listener.clone()); let work = move |mut stream| handle_connection(&mut stream, &handler); - let guard = thread::scoped(move || pool.accept(work, threads)); + let guard = thread::spawn(move || pool.accept(work, threads)); Ok(Listening { _guard: guard, @@ -175,7 +175,10 @@ where S: NetworkStream + Clone, H: Handler { /// A listening server, which can later be closed. pub struct Listening { - _guard: JoinGuard<'static, ()>, + #[cfg(feature = "nightly")] + _guard: JoinHandle<()>, + #[cfg(not(feature = "nightly"))] + _guard: JoinHandle, /// The socket addresses that the server is bound to. pub socket: SocketAddr, }