diff --git a/src/http/conn.rs b/src/http/conn.rs index dc15e95834..17d09b1a5d 100644 --- a/src/http/conn.rs +++ b/src/http/conn.rs @@ -24,7 +24,13 @@ const MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100; /// The connection will determine when a message begins and ends, creating /// a new message `MessageHandler` for each one, as well as determine if this /// connection can be kept alive after the message, or if it is complete. -pub struct Conn> { +pub struct Conn>(Box>); + + +/// `ConnInner` contains all of a connections state which Conn proxies for in a way +/// that allows Conn to maintain convenient move and self consuming method call +/// semantics but avoiding many costly memcpy calls. +struct ConnInner> { buf: Buffer, ctrl: (channel::Sender, channel::Receiver), keep_alive_enabled: bool, @@ -33,7 +39,7 @@ pub struct Conn> { transport: T, } -impl> fmt::Debug for Conn { +impl> fmt::Debug for ConnInner { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Conn") .field("keep_alive_enabled", &self.keep_alive_enabled) @@ -43,23 +49,14 @@ impl> fmt::Debug for Conn { } } -impl> Conn { - pub fn new(key: K, transport: T, notify: rotor::Notifier) -> Conn { - Conn { - buf: Buffer::new(), - ctrl: channel::new(notify), - keep_alive_enabled: true, - key: key, - state: State::Init, - transport: transport, - } +impl> fmt::Debug for Conn { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.0.fmt(f) } +} - pub fn keep_alive(mut self, val: bool) -> Conn { - self.keep_alive_enabled = val; - self - } +impl> ConnInner { /// Desired Register interest based on state of current connection. /// /// This includes the user interest, such as when they return `Next::read()`. @@ -434,17 +431,71 @@ impl> Conn { } } + fn on_error(&mut self, err: ::Error) { + debug!("on_error err = {:?}", err); + trace!("on_error state = {:?}", self.state); + let next = match self.state { + State::Init => Next::remove(), + State::Http1(ref mut http1) => http1.handler.on_error(err), + State::Closed => Next::remove(), + }; + self.state.update(next); + } + + fn on_readable(&mut self, scope: &mut Scope) + where F: MessageHandlerFactory { + trace!("on_readable -> {:?}", self.state); + let state = mem::replace(&mut self.state, State::Closed); + self.state = self.read(scope, state); + trace!("on_readable <- {:?}", self.state); + } + + fn on_writable(&mut self, scope: &mut Scope) + where F: MessageHandlerFactory { + trace!("on_writable -> {:?}", self.state); + let state = mem::replace(&mut self.state, State::Closed); + self.state = self.write(scope, state); + trace!("on_writable <- {:?}", self.state); + } + + fn on_remove(self) { + debug!("on_remove"); + match self.state { + State::Init | State::Closed => (), + State::Http1(http1) => http1.handler.on_remove(self.transport), + } + } + +} + +impl> Conn { + pub fn new(key: K, transport: T, notify: rotor::Notifier) -> Conn { + Conn(Box::new(ConnInner { + buf: Buffer::new(), + ctrl: channel::new(notify), + keep_alive_enabled: true, + key: key, + state: State::Init, + transport: transport, + })) + } + + pub fn keep_alive(mut self, val: bool) -> Conn { + self.0.keep_alive_enabled = val; + self + } + pub fn ready(mut self, events: EventSet, scope: &mut Scope) -> Option<(Self, Option)> where F: MessageHandlerFactory { - trace!("Conn::ready events='{:?}', blocked={:?}", events, self.transport.blocked()); + trace!("Conn::ready events='{:?}', blocked={:?}", events, self.0.transport.blocked()); if events.is_error() { - match self.transport.take_socket_error() { + match self.0.transport.take_socket_error() { Ok(_) => { trace!("is_error, but not socket error"); // spurious? }, - Err(e) => self.on_error(e.into()) + Err(e) => self.0.on_error(e.into()) } } @@ -462,8 +513,8 @@ impl> Conn { // - hyper needs to translate that `readable` event back into a `write`, // since that is actually what the Handler wants. - let events = if let Some(blocked) = self.transport.blocked() { - let interest = self.interest(); + let events = if let Some(blocked) = self.0.transport.blocked() { + let interest = self.0.interest(); trace!("translating blocked={:?}, interest={:?}", blocked, interest); match (blocked, interest) { (Blocked::Read, Reg::Write) => EventSet::writable(), @@ -476,39 +527,39 @@ impl> Conn { }; if events.is_readable() { - self.on_readable(scope); + self.0.on_readable(scope); } if events.is_writable() { - self.on_writable(scope); + self.0.on_writable(scope); } - let events = match self.register() { + let events = match self.0.register() { Reg::Read => EventSet::readable(), Reg::Write => EventSet::writable(), Reg::ReadWrite => EventSet::readable() | EventSet::writable(), Reg::Wait => EventSet::none(), Reg::Remove => { trace!("removing transport"); - let _ = scope.deregister(&self.transport); + let _ = scope.deregister(&self.0.transport); self.on_remove(); return None; }, }; - if events.is_readable() && self.can_read_more() { + if events.is_readable() && self.0.can_read_more() { return self.ready(events, scope); } trace!("scope.reregister({:?})", events); - match scope.reregister(&self.transport, events, PollOpt::level()) { + match scope.reregister(&self.0.transport, events, PollOpt::level()) { Ok(..) => { - let timeout = self.state.timeout(); + let timeout = self.0.state.timeout(); Some((self, timeout)) }, Err(e) => { trace!("error reregistering: {:?}", e); - self.on_error(e.into()); + self.0.on_error(e.into()); None } } @@ -516,9 +567,9 @@ impl> Conn { pub fn wakeup(mut self, scope: &mut Scope) -> Option<(Self, Option)> where F: MessageHandlerFactory { - while let Ok(next) = self.ctrl.1.try_recv() { + while let Ok(next) = self.0.ctrl.1.try_recv() { trace!("woke up with {:?}", next); - self.state.update(next); + self.0.state.update(next); } self.ready(EventSet::readable() | EventSet::writable(), scope) } @@ -526,44 +577,14 @@ impl> Conn { pub fn timeout(mut self, scope: &mut Scope) -> Option<(Self, Option)> where F: MessageHandlerFactory { //TODO: check if this was a spurious timeout? - self.on_error(::Error::Timeout); + self.0.on_error(::Error::Timeout); self.ready(EventSet::none(), scope) } - fn on_error(&mut self, err: ::Error) { - debug!("on_error err = {:?}", err); - trace!("on_error state = {:?}", self.state); - let next = match self.state { - State::Init => Next::remove(), - State::Http1(ref mut http1) => http1.handler.on_error(err), - State::Closed => Next::remove(), - }; - self.state.update(next); - } - fn on_remove(self) { - debug!("on_remove"); - match self.state { - State::Init | State::Closed => (), - State::Http1(http1) => http1.handler.on_remove(self.transport), - } + self.0.on_remove() } - fn on_readable(&mut self, scope: &mut Scope) - where F: MessageHandlerFactory { - trace!("on_readable -> {:?}", self.state); - let state = mem::replace(&mut self.state, State::Closed); - self.state = self.read(scope, state); - trace!("on_readable <- {:?}", self.state); - } - - fn on_writable(&mut self, scope: &mut Scope) - where F: MessageHandlerFactory { - trace!("on_writable -> {:?}", self.state); - let state = mem::replace(&mut self.state, State::Closed); - self.state = self.write(scope, state); - trace!("on_writable <- {:?}", self.state); - } } enum State, T: Transport> {