Skip to content

Commit

Permalink
fix(server): Use thread::spawn instead of thread::scoped.
Browse files Browse the repository at this point in the history
  • Loading branch information
reem committed Apr 15, 2015
1 parent eab8fcd commit e864956
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 30 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ cache:
- target

script:
- cargo build
- cargo test
- cargo build --features nightly
- cargo test --features nightly
- cargo bench --features nightly

after_success: |
Expand Down
45 changes: 20 additions & 25 deletions src/server/listener.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::thread::{self, JoinGuard};
use std::sync::mpsc;
use std::sync::{Arc, mpsc};
use std::thread;

use net::NetworkListener;

pub struct ListenerPool<A: NetworkListener> {
acceptor: A
}

impl<'a, A: NetworkListener + Send + 'a> ListenerPool<A> {
impl<A: NetworkListener + Send + 'static> ListenerPool<A> {
/// Create a thread pool to manage the acceptor.
pub fn new(acceptor: A) -> ListenerPool<A> {
ListenerPool { acceptor: acceptor }
Expand All @@ -18,31 +19,32 @@ impl<'a, A: NetworkListener + Send + 'a> ListenerPool<A> {
///
/// Panics if threads == 0.
pub fn accept<F>(self, work: F, threads: usize)
where F: Fn(A::Stream) + Send + Sync + 'a {
where F: Fn(A::Stream) + Send + Sync + 'static {
assert!(threads != 0, "Can't accept on 0 threads.");

let (super_tx, supervisor_rx) = mpsc::channel();

let work = &work;
let spawn = move |id| {
spawn_with(super_tx.clone(), work, self.acceptor.clone(), id)
};
let work = Arc::new(work);

// Go
let mut guards: Vec<_> = (0..threads).map(|id| spawn(id)).collect();
// Begin work.
for _ in (0..threads) {
spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone())
}

for id in supervisor_rx.iter() {
guards[id] = spawn(id);
// Monitor for panics.
// FIXME(reem): This won't ever exit since we still have a super_tx handle.
for _ in supervisor_rx.iter() {
spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone());
}
}
}

fn spawn_with<'a, A, F>(supervisor: mpsc::Sender<usize>, work: &'a F, mut acceptor: A, id: usize) -> thread::JoinGuard<'a, ()>
where A: NetworkListener + Send + 'a,
F: Fn(<A as NetworkListener>::Stream) + Send + Sync + 'a {
fn spawn_with<A, F>(supervisor: mpsc::Sender<()>, work: Arc<F>, mut acceptor: A)
where A: NetworkListener + Send + 'static,
F: Fn(<A as NetworkListener>::Stream) + Send + Sync + 'static {

thread::scoped(move || {
let _sentinel = Sentinel::new(supervisor, id);
thread::spawn(move || {
let _sentinel = Sentinel::new(supervisor, ());

loop {
match acceptor.accept() {
Expand All @@ -52,32 +54,25 @@ where A: NetworkListener + Send + 'a,
}
}
}
})
});
}

struct Sentinel<T: Send + 'static> {
value: Option<T>,
supervisor: mpsc::Sender<T>,
//active: bool
}

impl<T: Send + 'static> Sentinel<T> {
fn new(channel: mpsc::Sender<T>, data: T) -> Sentinel<T> {
Sentinel {
value: Some(data),
supervisor: channel,
//active: true
}
}

//fn cancel(mut self) { self.active = false; }
}

impl<T: Send + 'static> Drop for Sentinel<T> {
fn drop(&mut self) {
// If we were cancelled, get out of here.
//if !self.active { return; }

// Respawn ourselves
let _ = self.supervisor.send(self.value.take().unwrap());
}
Expand Down
9 changes: 6 additions & 3 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::io::{BufWriter, Write};
use std::marker::PhantomData;
use std::net::{SocketAddr, ToSocketAddrs};
use std::path::Path;
use std::thread::{self, JoinGuard};
use std::thread::{self, JoinHandle};

use num_cpus;

Expand Down Expand Up @@ -104,7 +104,7 @@ S: NetworkStream + Clone + Send> Server<'a, H, L> {
let pool = ListenerPool::new(listener.clone());
let work = move |mut stream| handle_connection(&mut stream, &handler);

let guard = thread::scoped(move || pool.accept(work, threads));
let guard = thread::spawn(move || pool.accept(work, threads));

Ok(Listening {
_guard: guard,
Expand Down Expand Up @@ -175,7 +175,10 @@ where S: NetworkStream + Clone, H: Handler {

/// A listening server, which can later be closed.
pub struct Listening {
_guard: JoinGuard<'static, ()>,
#[cfg(feature = "nightly")]
_guard: JoinHandle<()>,
#[cfg(not(feature = "nightly"))]
_guard: JoinHandle,
/// The socket addresses that the server is bound to.
pub socket: SocketAddr,
}
Expand Down

0 comments on commit e864956

Please sign in to comment.