Skip to content

Commit

Permalink
Merge pull request #829 from hyperium/822
Browse files Browse the repository at this point in the history
perf(http): reduce memcpy calls using boxed pimpl
  • Loading branch information
seanmonstar committed Jun 15, 2016
2 parents dabe3ac + 13a6a59 commit 2a56094
Showing 1 changed file with 84 additions and 63 deletions.
147 changes: 84 additions & 63 deletions src/http/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@ const MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;
/// The connection will determine when a message begins and ends, creating
/// a new message `MessageHandler` for each one, as well as determine if this
/// connection can be kept alive after the message, or if it is complete.
pub struct Conn<K: Key, T: Transport, H: MessageHandler<T>> {
pub struct Conn<K: Key, T: Transport, H: MessageHandler<T>>(Box<ConnInner<K, T, H>>);


/// `ConnInner` contains all of a connections state which Conn proxies for in a way
/// that allows Conn to maintain convenient move and self consuming method call
/// semantics but avoiding many costly memcpy calls.
struct ConnInner<K: Key, T: Transport, H: MessageHandler<T>> {
buf: Buffer,
ctrl: (channel::Sender<Next>, channel::Receiver<Next>),
keep_alive_enabled: bool,
Expand All @@ -33,7 +39,7 @@ pub struct Conn<K: Key, T: Transport, H: MessageHandler<T>> {
transport: T,
}

impl<K: Key, T: Transport, H: MessageHandler<T>> fmt::Debug for Conn<K, T, H> {
impl<K: Key, T: Transport, H: MessageHandler<T>> fmt::Debug for ConnInner<K, T, H> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Conn")
.field("keep_alive_enabled", &self.keep_alive_enabled)
Expand All @@ -43,23 +49,14 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> fmt::Debug for Conn<K, T, H> {
}
}

impl<K: Key, T: Transport, H: MessageHandler<T>> Conn<K, T, H> {
pub fn new(key: K, transport: T, notify: rotor::Notifier) -> Conn<K, T, H> {
Conn {
buf: Buffer::new(),
ctrl: channel::new(notify),
keep_alive_enabled: true,
key: key,
state: State::Init,
transport: transport,
}
impl<K: Key, T: Transport, H: MessageHandler<T>> fmt::Debug for Conn<K, T, H> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.0.fmt(f)
}
}

pub fn keep_alive(mut self, val: bool) -> Conn<K, T, H> {
self.keep_alive_enabled = val;
self
}

impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> {
/// Desired Register interest based on state of current connection.
///
/// This includes the user interest, such as when they return `Next::read()`.
Expand Down Expand Up @@ -434,17 +431,71 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> Conn<K, T, H> {
}
}

fn on_error(&mut self, err: ::Error) {
debug!("on_error err = {:?}", err);
trace!("on_error state = {:?}", self.state);
let next = match self.state {
State::Init => Next::remove(),
State::Http1(ref mut http1) => http1.handler.on_error(err),
State::Closed => Next::remove(),
};
self.state.update(next);
}

fn on_readable<F>(&mut self, scope: &mut Scope<F>)
where F: MessageHandlerFactory<K, T, Output=H> {
trace!("on_readable -> {:?}", self.state);
let state = mem::replace(&mut self.state, State::Closed);
self.state = self.read(scope, state);
trace!("on_readable <- {:?}", self.state);
}

fn on_writable<F>(&mut self, scope: &mut Scope<F>)
where F: MessageHandlerFactory<K, T, Output=H> {
trace!("on_writable -> {:?}", self.state);
let state = mem::replace(&mut self.state, State::Closed);
self.state = self.write(scope, state);
trace!("on_writable <- {:?}", self.state);
}

fn on_remove(self) {
debug!("on_remove");
match self.state {
State::Init | State::Closed => (),
State::Http1(http1) => http1.handler.on_remove(self.transport),
}
}

}

impl<K: Key, T: Transport, H: MessageHandler<T>> Conn<K, T, H> {
pub fn new(key: K, transport: T, notify: rotor::Notifier) -> Conn<K, T, H> {
Conn(Box::new(ConnInner {
buf: Buffer::new(),
ctrl: channel::new(notify),
keep_alive_enabled: true,
key: key,
state: State::Init,
transport: transport,
}))
}

pub fn keep_alive(mut self, val: bool) -> Conn<K, T, H> {
self.0.keep_alive_enabled = val;
self
}

pub fn ready<F>(mut self, events: EventSet, scope: &mut Scope<F>) -> Option<(Self, Option<Duration>)>
where F: MessageHandlerFactory<K, T, Output=H> {
trace!("Conn::ready events='{:?}', blocked={:?}", events, self.transport.blocked());
trace!("Conn::ready events='{:?}', blocked={:?}", events, self.0.transport.blocked());

if events.is_error() {
match self.transport.take_socket_error() {
match self.0.transport.take_socket_error() {
Ok(_) => {
trace!("is_error, but not socket error");
// spurious?
},
Err(e) => self.on_error(e.into())
Err(e) => self.0.on_error(e.into())
}
}

Expand All @@ -462,8 +513,8 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> Conn<K, T, H> {
// - hyper needs to translate that `readable` event back into a `write`,
// since that is actually what the Handler wants.

let events = if let Some(blocked) = self.transport.blocked() {
let interest = self.interest();
let events = if let Some(blocked) = self.0.transport.blocked() {
let interest = self.0.interest();
trace!("translating blocked={:?}, interest={:?}", blocked, interest);
match (blocked, interest) {
(Blocked::Read, Reg::Write) => EventSet::writable(),
Expand All @@ -476,94 +527,64 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> Conn<K, T, H> {
};

if events.is_readable() {
self.on_readable(scope);
self.0.on_readable(scope);
}

if events.is_writable() {
self.on_writable(scope);
self.0.on_writable(scope);
}

let events = match self.register() {
let events = match self.0.register() {
Reg::Read => EventSet::readable(),
Reg::Write => EventSet::writable(),
Reg::ReadWrite => EventSet::readable() | EventSet::writable(),
Reg::Wait => EventSet::none(),
Reg::Remove => {
trace!("removing transport");
let _ = scope.deregister(&self.transport);
let _ = scope.deregister(&self.0.transport);
self.on_remove();
return None;
},
};

if events.is_readable() && self.can_read_more() {
if events.is_readable() && self.0.can_read_more() {
return self.ready(events, scope);
}

trace!("scope.reregister({:?})", events);
match scope.reregister(&self.transport, events, PollOpt::level()) {
match scope.reregister(&self.0.transport, events, PollOpt::level()) {
Ok(..) => {
let timeout = self.state.timeout();
let timeout = self.0.state.timeout();
Some((self, timeout))
},
Err(e) => {
trace!("error reregistering: {:?}", e);
self.on_error(e.into());
self.0.on_error(e.into());
None
}
}
}

pub fn wakeup<F>(mut self, scope: &mut Scope<F>) -> Option<(Self, Option<Duration>)>
where F: MessageHandlerFactory<K, T, Output=H> {
while let Ok(next) = self.ctrl.1.try_recv() {
while let Ok(next) = self.0.ctrl.1.try_recv() {
trace!("woke up with {:?}", next);
self.state.update(next);
self.0.state.update(next);
}
self.ready(EventSet::readable() | EventSet::writable(), scope)
}

pub fn timeout<F>(mut self, scope: &mut Scope<F>) -> Option<(Self, Option<Duration>)>
where F: MessageHandlerFactory<K, T, Output=H> {
//TODO: check if this was a spurious timeout?
self.on_error(::Error::Timeout);
self.0.on_error(::Error::Timeout);
self.ready(EventSet::none(), scope)
}

fn on_error(&mut self, err: ::Error) {
debug!("on_error err = {:?}", err);
trace!("on_error state = {:?}", self.state);
let next = match self.state {
State::Init => Next::remove(),
State::Http1(ref mut http1) => http1.handler.on_error(err),
State::Closed => Next::remove(),
};
self.state.update(next);
}

fn on_remove(self) {
debug!("on_remove");
match self.state {
State::Init | State::Closed => (),
State::Http1(http1) => http1.handler.on_remove(self.transport),
}
self.0.on_remove()
}

fn on_readable<F>(&mut self, scope: &mut Scope<F>)
where F: MessageHandlerFactory<K, T, Output=H> {
trace!("on_readable -> {:?}", self.state);
let state = mem::replace(&mut self.state, State::Closed);
self.state = self.read(scope, state);
trace!("on_readable <- {:?}", self.state);
}

fn on_writable<F>(&mut self, scope: &mut Scope<F>)
where F: MessageHandlerFactory<K, T, Output=H> {
trace!("on_writable -> {:?}", self.state);
let state = mem::replace(&mut self.state, State::Closed);
self.state = self.write(scope, state);
trace!("on_writable <- {:?}", self.state);
}
}

enum State<H: MessageHandler<T>, T: Transport> {
Expand Down

0 comments on commit 2a56094

Please sign in to comment.