Skip to content

Commit

Permalink
Allow deferring the sending of an HTTP response. references #425
Browse files Browse the repository at this point in the history
This allows processing of long running http handlers to defer their
response until it is ready without blocking the network thread.
  • Loading branch information
Peter Thorson committed Apr 29, 2015
1 parent f469b90 commit c18f210
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 23 deletions.
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ HEAD
code that was used as a guide for this implementation. Fixes #324
- Feature: Adds a vectored/scatter-gather write handler to the iostream
transport.
- Feature: Adds the ability to defer sending an HTTP response until sometime
after the `http_handler` is run. This allows processing of long running http
handlers to defer their response until it is ready without blocking the
network thread. references #425
- Improvement: `echo_server_tls` has been update to demonstrate how to configure
it for Mozilla's recommended intermediate and modern TLS security profiles.
- Improvement: `endpoint::set_timer` now uses a steady clock provided by
Expand Down
47 changes: 47 additions & 0 deletions test/connection/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,15 @@ void http_func(server* s, websocketpp::connection_hdl hdl) {
con->set_status(websocketpp::http::status_code::ok);
}

void defer_http_func(server* s, bool * deferred, websocketpp::connection_hdl hdl) {
*deferred = true;

server::connection_ptr con = s->get_con_from_hdl(hdl);

websocketpp::lib::error_code ec;
con->defer_http_response(ec);
}

void check_on_fail(server* s, websocketpp::lib::error_code ec, bool & called,
websocketpp::connection_hdl hdl)
{
Expand Down Expand Up @@ -223,6 +232,44 @@ BOOST_AUTO_TEST_CASE( http_request ) {
BOOST_CHECK_EQUAL(run_server_test(s,input), output);
}

BOOST_AUTO_TEST_CASE( deferred_http_request ) {
std::string input = "GET /foo/bar HTTP/1.1\r\nHost: www.example.com\r\nOrigin: http://www.example.com\r\n\r\n";
std::string output = "HTTP/1.1 200 OK\r\nContent-Length: 8\r\nServer: ";
output+=websocketpp::user_agent;
output+="\r\n\r\n/foo/bar";

server s;
server::connection_ptr con;
bool deferred = false;
s.set_http_handler(bind(&defer_http_func,&s, &deferred,::_1));

s.clear_access_channels(websocketpp::log::alevel::all);
s.clear_error_channels(websocketpp::log::elevel::all);

std::stringstream ostream;
s.register_ostream(&ostream);

con = s.get_connection();
con->start();

BOOST_CHECK(!deferred);
BOOST_CHECK_EQUAL(ostream.str(), "");
con->read_some(input.data(),input.size());
BOOST_CHECK(deferred);
BOOST_CHECK_EQUAL(ostream.str(), "");

con->set_body(con->get_resource());
con->set_status(websocketpp::http::status_code::ok);

websocketpp::lib::error_code ec;
con->send_http_response(ec);
BOOST_CHECK_EQUAL(ec, websocketpp::lib::error_code());
BOOST_CHECK_EQUAL(ostream.str(), output);
con->send_http_response(ec);
BOOST_CHECK_EQUAL(ec, make_error_code(websocketpp::error::invalid_state));

}

BOOST_AUTO_TEST_CASE( request_no_server_header ) {
std::string input = "GET / HTTP/1.1\r\nHost: www.example.com\r\nConnection: upgrade\r\nUpgrade: websocket\r\nSec-WebSocket-Version: 13\r\nSec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\nOrigin: http://www.example.com\r\n\r\n";
std::string output = "HTTP/1.1 101 Switching Protocols\r\nConnection: upgrade\r\nSec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=\r\nUpgrade: websocket\r\n\r\n";
Expand Down
71 changes: 67 additions & 4 deletions websocketpp/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,20 @@ namespace internal_state {
PROCESS_CONNECTION = 7
};
} // namespace internal_state


namespace http_state {
// states to keep track of the progress of http connections

enum value {
init = 0,
deferred = 1,
headers_written = 2,
body_written = 3,
closed = 4
};
} // namespace http_state

} // namespace session

/// Represents an individual WebSocket connection
Expand Down Expand Up @@ -312,6 +326,7 @@ class connection
, m_local_close_code(close::status::abnormal_close)
, m_remote_close_code(close::status::abnormal_close)
, m_is_http(false)
, m_http_state(session::http_state::init)
, m_was_clean(false)
{
m_alog.write(log::alevel::devel,"connection constructor");
Expand Down Expand Up @@ -1060,6 +1075,49 @@ class connection
request_type const & get_request() const {
return m_request;
}

/// Defer HTTP Response until later
/**
* Used in the http handler to defer the HTTP response for this connection
* until later. Handshake timers will be canceled and the connection will be
* left open until `send_http_response` or an equivalent is called.
*
* Warning: deferred connections won't time out and as a result can tie up
* resources.
*
* @since 0.6.0
*
* @param ec A status code, zero on success, non-zero otherwise
*/
void defer_http_response(lib::error_code & ec);

/// Send deferred HTTP Response
/**
* Sends an http response to an HTTP connection that was deferred. This will
* send a complete response including all headers, status line, and body
* text. The connection will be closed afterwards.
*
* @since 0.6.0
*
* @param ec A status code, zero on success, non-zero otherwise
*/
void send_http_response(lib::error_code & ec);

// TODO HTTPNBIO: write_headers
// function that processes headers + status so far and writes it to the wire
// beginning the HTTP response body state. This method will ignore anything
// in the response body.

// TODO HTTPNBIO: write_body_message
// queues the specified message_buffer for async writing

// TODO HTTPNBIO: finish connection
//

// TODO HTTPNBIO: write_response
// Writes the whole response, headers + body and closes the connection



/////////////////////////////////////////////////////////////
// Pass-through access to the other connection information //
Expand Down Expand Up @@ -1202,7 +1260,8 @@ class connection
void handle_read_http_response(lib::error_code const & ec,
size_t bytes_transferred);

void handle_send_http_response(lib::error_code const & ec);

void handle_write_http_response(lib::error_code const & ec);
void handle_send_http_request(lib::error_code const & ec);

void handle_open_handshake_timeout(lib::error_code const & ec);
Expand Down Expand Up @@ -1254,13 +1313,13 @@ class connection
lib::error_code process_handshake_request();
private:
/// Completes m_response, serializes it, and sends it out on the wire.
void send_http_response(lib::error_code const & ec);
void write_http_response(lib::error_code const & ec);

/// Sends an opening WebSocket connect request
void send_http_request();

/// Alternate path for send_http_response in error conditions
void send_http_response_error(lib::error_code const & ec);
/// Alternate path for write_http_response in error conditions
void write_http_response_error(lib::error_code const & ec);

/// Process control message
/**
Expand Down Expand Up @@ -1510,6 +1569,10 @@ class connection
/// A flag that gets set once it is determined that the connection is an
/// HTTP connection and not a WebSocket one.
bool m_is_http;

/// A flag that gets set when the completion of an http connection is
/// deferred until later.
session::http_state::value m_http_state;

bool m_was_clean;

Expand Down
94 changes: 75 additions & 19 deletions websocketpp/impl/connection_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,57 @@ void connection<config>::remove_header(std::string const & key)
}
}

/// Defer HTTP Response until later
/**
* Used in the http handler to defer the HTTP response for this connection
* until later. Handshake timers will be canceled and the connection will be
* left open until `send_http_response` or an equivalent is called.
*
* Warning: deferred connections won't time out and as a result can tie up
* resources.
*
* @param ec A status code, zero on success, non-zero otherwise
*/
template <typename config>
void connection<config>::defer_http_response(lib::error_code & ec) {
// Cancel handshake timer, otherwise the connection will time out and we'll
// close the connection before the app has a chance to send a response.
if (m_handshake_timer) {
m_handshake_timer->cancel();
m_handshake_timer.reset();
}

// Do something to signal deferral
m_http_state = session::http_state::deferred;

ec = lib::error_code();
}

/// Send deferred HTTP Response
/**
* Sends an http response to an HTTP connection that was deferred. This will
* send a complete response including all headers, status line, and body
* text. The connection will be closed afterwards.
*
* @since 0.6.0
*
* @param ec A status code, zero on success, non-zero otherwise
*/
template <typename config>
void connection<config>::send_http_response(lib::error_code & ec) {
{
scoped_lock_type lock(m_connection_state_lock);
if (m_http_state != session::http_state::deferred) {
ec = error::make_error_code(error::invalid_state);
return;
}

m_http_state = session::http_state::body_written;
}

this->write_http_response(lib::error_code());
ec = lib::error_code();
}



Expand Down Expand Up @@ -728,7 +778,7 @@ void connection<config>::read_handshake(size_t num_bytes) {
);
}

// All exit paths for this function need to call send_http_response() or submit
// All exit paths for this function need to call write_http_response() or submit
// a new read request with this function as the handler.
template <typename config>
void connection<config>::handle_read_handshake(lib::error_code const & ec,
Expand Down Expand Up @@ -784,7 +834,7 @@ void connection<config>::handle_read_handshake(lib::error_code const & ec,
// All HTTP exceptions will result in this request failing and an error
// response being returned. No more bytes will be read in this con.
m_response.set_status(e.m_error_code,e.m_error_msg);
this->send_http_response_error(error::make_error_code(error::http_parse_error));
this->write_http_response_error(error::make_error_code(error::http_parse_error));
return;
}

Expand All @@ -806,7 +856,7 @@ void connection<config>::handle_read_handshake(lib::error_code const & ec,
if (m_request.ready()) {
lib::error_code processor_ec = this->initialize_processor();
if (processor_ec) {
this->send_http_response_error(processor_ec);
this->write_http_response_error(processor_ec);
return;
}

Expand All @@ -823,7 +873,7 @@ void connection<config>::handle_read_handshake(lib::error_code const & ec,
// TODO: need more bytes
m_alog.write(log::alevel::devel,"short key3 read");
m_response.set_status(http::status_code::internal_server_error);
this->send_http_response_error(processor::error::make_error_code(processor::error::short_key3));
this->write_http_response_error(processor::error::make_error_code(processor::error::short_key3));
return;
}
}
Expand All @@ -847,7 +897,9 @@ void connection<config>::handle_read_handshake(lib::error_code const & ec,

// We have the complete request. Process it.
lib::error_code handshake_ec = this->process_handshake_request();
this->send_http_response(handshake_ec);
if (!m_is_http || m_http_state != session::http_state::deferred) {
this->write_http_response(handshake_ec);
}
} else {
// read at least 1 more byte
transport_con_type::async_read_at_least(
Expand All @@ -864,26 +916,26 @@ void connection<config>::handle_read_handshake(lib::error_code const & ec,
}
}

// send_http_response requires the request to be fully read and the connection
// write_http_response requires the request to be fully read and the connection
// to be in the PROCESS_HTTP_REQUEST state. In some cases we can detect errors
// before the request is fully read (specifically at a point where we aren't
// sure if the hybi00 key3 bytes need to be read). This method sets the correct
// state and calls send_http_response
// state and calls write_http_response
template <typename config>
void connection<config>::send_http_response_error(lib::error_code const & ec) {
void connection<config>::write_http_response_error(lib::error_code const & ec) {
if (m_internal_state != istate::READ_HTTP_REQUEST) {
m_alog.write(log::alevel::devel,
"send_http_response_error called in invalid state");
"write_http_response_error called in invalid state");
this->terminate(error::make_error_code(error::invalid_state));
return;
}

m_internal_state = istate::PROCESS_HTTP_REQUEST;

this->send_http_response(ec);
this->write_http_response(ec);
}

// All exit paths for this function need to call send_http_response() or submit
// All exit paths for this function need to call write_http_response() or submit
// a new read request with this function as the handler.
template <typename config>
void connection<config>::handle_read_frame(lib::error_code const & ec,
Expand Down Expand Up @@ -1113,6 +1165,7 @@ lib::error_code connection<config>::process_handshake_request() {
if (m_http_handler) {
m_is_http = true;
m_http_handler(m_connection_hdl);

if (m_state == session::state::closed) {
return error::make_error_code(error::http_connection_ended);
}
Expand Down Expand Up @@ -1207,8 +1260,8 @@ lib::error_code connection<config>::process_handshake_request() {
}

template <typename config>
void connection<config>::send_http_response(lib::error_code const & ec) {
m_alog.write(log::alevel::devel,"connection send_http_response");
void connection<config>::write_http_response(lib::error_code const & ec) {
m_alog.write(log::alevel::devel,"connection write_http_response");

if (ec == error::make_error_code(error::http_connection_ended)) {
m_alog.write(log::alevel::http,"An HTTP handler took over the connection.");
Expand Down Expand Up @@ -1254,16 +1307,16 @@ void connection<config>::send_http_response(lib::error_code const & ec) {
m_handshake_buffer.data(),
m_handshake_buffer.size(),
lib::bind(
&type::handle_send_http_response,
&type::handle_write_http_response,
type::get_shared(),
lib::placeholders::_1
)
);
}

template <typename config>
void connection<config>::handle_send_http_response(lib::error_code const & ec) {
m_alog.write(log::alevel::devel,"handle_send_http_response");
void connection<config>::handle_write_http_response(lib::error_code const & ec) {
m_alog.write(log::alevel::devel,"handle_write_http_response");

lib::error_code ecm = ec;

Expand All @@ -1279,7 +1332,7 @@ void connection<config>::handle_send_http_response(lib::error_code const & ec) {
// usually by the handshake timer. This is basically expected
// (though hopefully rare) and there is nothing we can do so ignore.
m_alog.write(log::alevel::devel,
"handle_send_http_response invoked after connection was closed");
"handle_write_http_response invoked after connection was closed");
return;
} else {
ecm = error::make_error_code(error::invalid_state);
Expand All @@ -1294,7 +1347,7 @@ void connection<config>::handle_send_http_response(lib::error_code const & ec) {
return;
}

log_err(log::elevel::rerror,"handle_send_http_response",ecm);
log_err(log::elevel::rerror,"handle_write_http_response",ecm);
this->terminate(ecm);
return;
}
Expand Down Expand Up @@ -1608,7 +1661,10 @@ void connection<config>::terminate(lib::error_code const & ec) {
m_local_close_reason = ec.message();
}

// TODO: does this need a mutex?
// TODO: does any of this need a mutex?
if (m_is_http) {
m_http_state = session::http_state::closed;
}
if (m_state == session::state::connecting) {
m_state = session::state::closed;
tstat = failed;
Expand Down

0 comments on commit c18f210

Please sign in to comment.