Skip to content

Commit

Permalink
Also move the reply related protocol handling to the http1.server mod…
Browse files Browse the repository at this point in the history
…ule.
  • Loading branch information
s-ludwig committed May 27, 2024
1 parent 37ae67d commit 2dcaa5d
Show file tree
Hide file tree
Showing 2 changed files with 327 additions and 212 deletions.
270 changes: 266 additions & 4 deletions source/vibe/http/internal/http1/server.d
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import core.time;
import std.datetime : Clock, SysTime, UTC;
import std.encoding : sanitize;
import std.exception : enforce;
import std.format : format;
import std.format : format, formattedWrite;


/** Treats an existing connection as an HTTP connection and processes incoming
Expand Down Expand Up @@ -101,7 +101,8 @@ private bool handleRequest(TLSStreamType, Allocator)(StreamProxy http_stream, TC

// Create the response object
ConnectionStreamProxy cproxy = tcp_connection;
auto res = FreeListRef!HTTPServerResponse(http_stream, cproxy, settings, request_allocator/*.Scoped_payload*/);
auto exchange = new HTTP1ServerExchange(http_stream, cproxy);
auto res = FreeListRef!HTTPServerResponse(exchange, settings, request_allocator/*.Scoped_payload*/);
req.tls = res.m_tls = listen_info.tlsContext !is null;
if (req.tls) {
version (HaveNoTLS) assert(false);
Expand Down Expand Up @@ -240,7 +241,7 @@ private bool handleRequest(TLSStreamType, Allocator)(StreamProxy http_stream, TC
}

// write default headers
if (req.method == HTTPMethod.HEAD) res.m_isHeadResponse = true;
if (req.method == HTTPMethod.HEAD) exchange.m_isHeadResponse = true;
if (settings.serverString.length)
res.headers["Server"] = settings.serverString;
res.headers["Date"] = formatRFC822DateAlloc(reqtime);
Expand Down Expand Up @@ -315,7 +316,7 @@ private bool handleRequest(TLSStreamType, Allocator)(StreamProxy http_stream, TC
// finalize (e.g. for chunked encoding)
res.finalize();

if (res.m_requiresConnectionClose)
if (exchange.m_requiresConnectionClose)
keep_alive = false;

// NOTE: req.m_files may or may not be parsed/filled with actual data, as
Expand Down Expand Up @@ -410,3 +411,264 @@ private string formatRFC822DateAlloc(SysTime time)
return LAST.cachedDate;
}

class HTTP1ServerExchange : HTTPServerExchange {
import vibe.stream.counting : CountingOutputStream, createCountingOutputStreamFL;
import vibe.stream.wrapper : createConnectionProxyStream, createConnectionProxyStreamFL;
import vibe.stream.zlib : ZlibOutputStream, createDeflateOutputStreamFL, createGzipOutputStreamFL;

protected {
StreamProxy m_conn;
ConnectionStreamProxy m_rawConnection;
bool m_isHeadResponse = false;
OutputStreamProxy m_bodyWriter;
FreeListRef!ChunkedOutputStream m_chunkedBodyWriter;
FreeListRef!CountingOutputStream m_countingWriter;
FreeListRef!ZlibOutputStream m_zlibOutputStream;
bool m_headerWritten = false;
bool m_requiresConnectionClose;
}

this(StreamProxy conn, ConnectionStreamProxy raw_connection)
@safe {
m_conn = conn;
m_rawConnection = raw_connection;
m_countingWriter = createCountingOutputStreamFL(conn);
}

override @property bool isHeadResponse() const { return m_isHeadResponse; }
override @property bool headerWritten() const { return m_headerWritten; }
override @property ulong bytesWritten() @safe const { return m_countingWriter.bytesWritten; }

override void writeBody(HTTPServerResponse res, RandomAccessStreamProxy stream)
{
assert(!m_headerWritten, "A body was already written!");
writeHeader(res);
if (m_isHeadResponse) return;

auto bytes = stream.size - stream.tell();
stream.pipe(m_conn);
m_countingWriter.increment(bytes);
}

override void writeBody(HTTPServerResponse res, InputStreamProxy stream, ulong num_bytes = ulong.max)
{
assert(!m_headerWritten, "A body was already written!");
writeHeader(res);
if (m_isHeadResponse) return;

if (num_bytes != ulong.max) {
stream.pipe(m_conn, num_bytes);
m_countingWriter.increment(num_bytes);
} else stream.pipe(m_countingWriter);
}

override void writeVoidBody(HTTPServerResponse res)
{
if (!isHeadResponse) {
assert("Content-Length" !in res.headers);
assert("Transfer-Encoding" !in res.headers);
}
assert(!m_headerWritten);
writeHeader(res);
m_conn.flush();
}

override OutputStreamProxy bodyWriter(HTTPServerResponse res)
{
import std.conv : to;

assert(!!m_conn);
if (m_bodyWriter) {
// for test responses, the body writer is pre-set, without headers
// being written, so we may need to do that here
if (!m_headerWritten) writeHeader(res);

return m_bodyWriter;
}

assert(!m_headerWritten, "A void body was already written!");
assert(res.statusCode >= 200, "1xx responses can't have body");

if (m_isHeadResponse) {
// for HEAD requests, we define a NullOutputWriter for convenience
// - no body will be written. However, the request handler should call writeVoidBody()
// and skip writing of the body in this case.
if ("Content-Length" !in res.headers)
res.headers["Transfer-Encoding"] = "chunked";
writeHeader(res);
m_bodyWriter = nullSink;
return m_bodyWriter;
}

if ("Content-Encoding" in res.headers && "Content-Length" in res.headers) {
// we do not known how large the compressed body will be in advance
// so remove the content-length and use chunked transfer
res.headers.remove("Content-Length");
}

if (auto pcl = "Content-Length" in res.headers) {
writeHeader(res);
m_countingWriter.writeLimit = (*pcl).to!ulong;
m_bodyWriter = m_countingWriter;
} else if (res.httpVersion <= HTTPVersion.HTTP_1_0) {
if ("Connection" in res.headers)
res.headers.remove("Connection"); // default to "close"
writeHeader(res);
m_bodyWriter = m_conn;
} else {
res.headers["Transfer-Encoding"] = "chunked";
writeHeader(res);
m_chunkedBodyWriter = createChunkedOutputStreamFL(m_countingWriter);
m_bodyWriter = m_chunkedBodyWriter;
}

if (auto pce = "Content-Encoding" in res.headers) {
if (icmp2(*pce, "gzip") == 0) {
m_zlibOutputStream = createGzipOutputStreamFL(m_bodyWriter);
m_bodyWriter = m_zlibOutputStream;
} else if (icmp2(*pce, "deflate") == 0) {
m_zlibOutputStream = createDeflateOutputStreamFL(m_bodyWriter);
m_bodyWriter = m_zlibOutputStream;
} else {
logWarn("Unsupported Content-Encoding set in response: '"~*pce~"'");
}
}

return m_bodyWriter;
}

override ConnectionStream switchProtocol(HTTPServerResponse res, string protocol)
{
res.statusCode = HTTPStatus.switchingProtocols;
if (protocol.length) res.headers["Upgrade"] = protocol;
writeVoidBody(res);
m_requiresConnectionClose = true;
m_headerWritten = true;
return createConnectionProxyStream(m_conn, m_rawConnection);
}

override void switchProtocol(HTTPServerResponse res, string protocol, scope void delegate(scope ConnectionStream) @safe del)
{
res.statusCode = HTTPStatus.switchingProtocols;
if (protocol.length) res.headers["Upgrade"] = protocol;
writeVoidBody(res);
m_requiresConnectionClose = true;
m_headerWritten = true;
() @trusted {
auto conn = createConnectionProxyStreamFL(m_conn, m_rawConnection);
del(conn);
} ();
finalize(res);
}

override ConnectionStream connectProxy(HTTPServerResponse res)
{
return createConnectionProxyStream(m_conn, m_rawConnection);
}

override void connectProxy(HTTPServerResponse res, scope void delegate(scope ConnectionStream) @safe del)
{
() @trusted {
auto conn = createConnectionProxyStreamFL(m_conn, m_rawConnection);
del(conn);
} ();
finalize(res);
}

void finalize(HTTPServerResponse res)
{
import std.conv : to;

if (m_zlibOutputStream) {
m_zlibOutputStream.finalize();
m_zlibOutputStream.destroy();
}
if (m_chunkedBodyWriter) {
m_chunkedBodyWriter.finalize();
m_chunkedBodyWriter.destroy();
}

// ignore exceptions caused by an already closed connection - the client
// may have closed the connection already and this doesn't usually indicate
// a problem.
if (m_rawConnection && m_rawConnection.connected) {
try if (m_conn) m_conn.flush();
catch (Exception e) logDebug("Failed to flush connection after finishing HTTP response: %s", e.msg);
if (!isHeadResponse && m_countingWriter.bytesWritten < res.headers.get("Content-Length", "0").to!ulong) {
logDebug("HTTP response only written partially before finalization. Terminating connection.");
m_requiresConnectionClose = true;
}

m_rawConnection = ConnectionStreamProxy.init;
}

if (m_conn) {
m_conn = StreamProxy.init;
res.m_timeFinalized = Clock.currTime(UTC());
}
}

private void writeHeader(HTTPServerResponse res)
@safe {
import vibe.stream.wrapper;

assert(!m_headerWritten, "Try to write header after body has already begun.");
assert(res.httpVersion != HTTPVersion.HTTP_1_0 || res.statusCode >= 200, "Informational status codes aren't supported by HTTP/1.0.");

// Don't set m_headerWritten for 1xx status codes
if (res.statusCode >= 200) m_headerWritten = true;
auto dst = streamOutputRange!1024(m_conn);

void writeLine(T...)(string fmt, T args)
@safe {
formattedWrite(() @trusted { return &dst; } (), fmt, args);
dst.put("\r\n");
logTrace(fmt, args);
}

logTrace("---------------------");
logTrace("HTTP server response:");
logTrace("---------------------");

// write the status line
writeLine("%s %d %s",
getHTTPVersionString(res.httpVersion),
res.statusCode,
res.statusPhrase.length ? res.statusPhrase : httpStatusText(res.statusCode));

// write all normal headers
foreach (k, v; res.headers.byKeyValue) {
dst.put(k);
dst.put(": ");
dst.put(v);
dst.put("\r\n");
logTrace("%s: %s", k, v);
}

logTrace("---------------------");

// write cookies
foreach (n, cookie; () @trusted { return res.cookies.byKeyValue; } ()) {
dst.put("Set-Cookie: ");
cookie.writeString(() @trusted { return &dst; } (), n);
dst.put("\r\n");
}

// finalize response header
dst.put("\r\n");
}

bool waitForConnectionClose(Duration timeout)
{
if (!m_rawConnection || !m_rawConnection.connected) return true;
m_rawConnection.waitForData(timeout);
return !m_rawConnection.connected;
}

@property bool connected()
const {
if (!m_rawConnection) return false;
return m_rawConnection.connected;
}
}

Loading

0 comments on commit 2dcaa5d

Please sign in to comment.