Skip to content

Commit

Permalink
[feature] add zlib stream decompress
Browse files Browse the repository at this point in the history
  • Loading branch information
BusyStudent committed Sep 20, 2024
1 parent 3aa6961 commit 76b891f
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 51 deletions.
54 changes: 35 additions & 19 deletions include/ilias/http/http1.1.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include <ilias/log.hpp>
#include <source_location>


#undef min
#undef max

Expand Down Expand Up @@ -98,7 +97,7 @@ class Http1Stream final : public HttpStream {
mCon->markBroken();
return Unexpected(err);
}
#endif
#endif // !defined(NDEBUG)

auto send(std::string_view method, const Url &url, const HttpHeaders &hheaders, std::span<const std::byte> payload) -> Task<void> override {
auto &client = mCon->mClient;
Expand Down Expand Up @@ -176,20 +175,11 @@ class Http1Stream final : public HttpStream {
co_return num;
}
// Chunked
if (!mChunkSize) {
// Oh, we did not receive the chunk size
auto line = co_await mCon->mClient.getline("\r\n");
if (!line || line->empty()) {
co_return returnError(line.error_or(Error::HttpBadReply));
}
size_t size = 0;
auto [ptr, ec] = std::from_chars(line->data(), line->data() + line->size(), size, 16);
if (ec != std::errc{}) {
co_return returnError(Error::HttpBadReply);
if (!mChunkSize) [[unlikely]] { //< We didn't get the first chunk size
ILIAS_TRACE("Http1.1", "Try Get the first chunk size");
if (auto ret = co_await readChunkSize(); !ret) {
co_return returnError(ret.error());
}
ILIAS_TRACE("Http1.1", "Reach new chunk, size = {}", size);
mChunkSize = size;
mChunkRemain = size;
}
// Read the chunk
auto num = co_await mCon->mClient.readAll(buffer.subspan(0, std::min(buffer.size(), mChunkRemain)));
Expand All @@ -200,15 +190,41 @@ class Http1Stream final : public HttpStream {
ILIAS_TRACE("Http1.1", "Current chunk was all read = {}", *mChunkSize);
// Drop the \r\n, Every chunk end is \r\n
auto str = co_await mCon->mClient.getline("\r\n");
if (mChunkSize == 0 && str) {
if (!str || !str->empty()) {
co_return returnError(str.error_or(Error::HttpBadReply));
}
// Try Get the next chunk size
if (auto ret = co_await readChunkSize(); !ret) {
co_return returnError(ret.error());
}
if (mChunkSize == 0) { //< Discard last chunk \r\n
str = co_await mCon->mClient.getline("\r\n");
if (!str || !str->empty()) {
co_return returnError(str.error_or(Error::HttpBadReply));
}
ILIAS_TRACE("Http1.1", "All chunks were read");
mContentEnd = true;
}
mChunkSize = std::nullopt;
}
co_return num;
}

auto readChunkSize() -> Task<void> {
auto line = co_await mCon->mClient.getline("\r\n");
if (!line || line->empty()) {
co_return returnError(line.error_or(Error::HttpBadReply));
}
size_t size = 0;
auto [ptr, ec] = std::from_chars(line->data(), line->data() + line->size(), size, 16);
if (ec != std::errc{}) {
co_return returnError(Error::HttpBadReply);
}
ILIAS_TRACE("Http1.1", "Reach new chunk, size = {}", size);
mChunkSize = size;
mChunkRemain = size;
co_return {};
}

auto readHeaders(int &statusCode, std::string &statusMessage, HttpHeaders &headers) -> Task<void> override {
ILIAS_ASSERT(mHeaderSent && !mHeaderReceived);
ILIAS_TRACE("Http1.1", "Recv header Begin");
Expand Down Expand Up @@ -282,7 +298,7 @@ class Http1Stream final : public HttpStream {
else if (transferEncoding == "chunked") {
mChunked = true;
}
else if (mKeepAlive) { //< If keep alive and also no content length, ill-formed
else if (mKeepAlive && !mMethodHead) { //< If keep alive and also no content length, and not head, ill-formed
co_return returnError(Error::HttpBadReply);
}

Expand Down Expand Up @@ -323,7 +339,7 @@ inline auto Http1Stream::sprintf(std::string &buf, const char *fmt, ...) -> void
s = ::_vscprintf(fmt, varg);
#else
s = ::vsnprintf(nullptr, 0, fmt, varg);
#endif
#endif // define _WIN32
va_end(varg);

int len = buf.length();
Expand Down
73 changes: 46 additions & 27 deletions include/ilias/http/reply.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,10 @@ class HttpReply final : public ReadableMethod<HttpReply> {
*
* @param stream
* @param streamMode
* @param noContent if true, the content will not be read
* @return Task<HttpReply>
*/
static auto make(std::unique_ptr<HttpStream> stream, bool streamMode) -> Task<HttpReply>;
static auto make(std::unique_ptr<HttpStream> stream, bool streamMode, bool noContent) -> Task<HttpReply>;
private:
Url mUrl;
int mStatusCode = 0;
Expand All @@ -104,10 +105,15 @@ class HttpReply final : public ReadableMethod<HttpReply> {
std::optional<Error> mLastError; //< The last error that occurred while reading the reply
std::vector<std::byte> mContent; //< The received content of the reply (not in stream mode)
std::unique_ptr<HttpStream> mStream; //< The stream used to read the whole reply

#if !defined(ILIAS_NO_ZLIB)
std::unique_ptr<zlib::Decompressor> mDecompressor; //< Used to decompress the content
#endif // !defined(ILIAS_NO_ZLIB)

friend class HttpSession;
};

inline auto HttpReply::make(std::unique_ptr<HttpStream> stream, bool streamMode) -> Task<HttpReply> {
inline auto HttpReply::make(std::unique_ptr<HttpStream> stream, bool streamMode, bool noContent) -> Task<HttpReply> {
ILIAS_ASSERT(stream);

HttpReply reply;
Expand All @@ -116,32 +122,34 @@ inline auto HttpReply::make(std::unique_ptr<HttpStream> stream, bool streamMode)
}
reply.mStream = std::move(stream);
reply.mUrl = reply.mRequest.url();

#if !defined(ILIAS_NO_ZLIB)
auto contentEncoding = reply.mHeaders.value("Content-Encoding");
std::optional<zlib::ZFormat> format;
if (contentEncoding == "gzip") {
format = zlib::GzipFormat;
}
else if (contentEncoding == "deflate") {
format = zlib::DeflateFormat;
}
if (format) {
reply.mDecompressor = std::make_unique<zlib::Decompressor>(*format);
if (!*reply.mDecompressor) {
co_return Unexpected(Error::Unknown);
}
}
#endif // !defined(ILIAS_NO_ZLIB)

if (noContent) {
reply.mStream.reset();
}

if (!streamMode) {
auto ret = co_await reply.readAll<std::vector<std::byte> >();
if (!ret) {
co_return Unexpected(ret.error());
}
reply.mContent = std::move(*ret);

#if !defined(ILIAS_NO_ZLIB)
auto contentEncoding = reply.mHeaders.value("Content-Encoding");
std::optional<zlib::ZFormat> format;
if (contentEncoding == "gzip") {
format = zlib::GzipFormat;
}
else if (contentEncoding == "deflate") {
format = zlib::DeflateFormat;
}
if (format) {
ILIAS_TRACE("Http", "Decompressing content by format: {}", contentEncoding);
auto ret = zlib::decompress(makeBuffer(reply.mContent), *format);
if (!ret) {
co_return Unexpected(ret.error());
}
reply.mContent = std::move(*ret);
}
#endif

}
co_return reply;
}
Expand All @@ -153,16 +161,27 @@ inline auto HttpReply::read(std::span<std::byte> buffer) -> Task<size_t> {
}
co_return 0;
}
auto ret = co_await mStream->read(buffer);
// TODO: Handle decompression here, not in the make function



Result<size_t> ret;
#if !defined(ILIAS_NO_ZLIB)
if (mDecompressor) {
ret = co_await mDecompressor->decompressTo(buffer, *mStream);
}
#else
if (false) { }
#endif // !defined(ILIAS_NO_ZLIB)
else { // No compression
ret = co_await mStream->read(buffer);
}
if (!ret) {
mLastError = ret.error();
}
if (!ret || *ret == 0) { //< Error or EOF
mStream.reset();

#if !defined(ILIAS_NO_ZLIB)
mDecompressor.reset();
#endif // !defined(ILIAS_NO_ZLIB)

}
co_return ret;
}
Expand Down
8 changes: 5 additions & 3 deletions include/ilias/http/session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ class HttpSession {
* @param request
* @return Task<HttpReply>
*/
auto head(const HttpRequest &request) -> Task<HttpReply> { return sendRequest("HEAD", request); }
auto head(const HttpRequest &request) -> Task<HttpReply> {
return sendRequest("HEAD", request);
}

/**
* @brief Send a PUT request to the server
Expand Down Expand Up @@ -161,7 +163,7 @@ class HttpSession {
nullptr; //< The cookie jar to use for this session (if null, no cookies will be accepted)

// State ...
std::multimap<Endpoint, std::unique_ptr<HttpConnection>> mConnections; //< Conenction Pool
std::multimap<Endpoint, std::unique_ptr<HttpConnection> > mConnections; //< Conenction Pool
};

inline HttpSession::HttpSession(IoContext &ctxt) : mCtxt(ctxt) {
Expand Down Expand Up @@ -222,7 +224,7 @@ inline auto HttpSession::sendRequestImpl(std::string_view method, const Url &url
co_return Unexpected(ret.error());
}
// Build the reply
auto reply = co_await HttpReply::make(std::move(streamPtr), streamMode);
auto reply = co_await HttpReply::make(std::move(streamPtr), streamMode, method == "HEAD");
if (!reply) {
if (fromPool) {
continue;
Expand Down
96 changes: 94 additions & 2 deletions include/ilias/zlib.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
#if __has_include(<zlib.h>) && !defined(ILIAS_NO_ZLIB)

#include <ilias/detail/expected.hpp>
#include <ilias/io/traits.hpp>
#include <ilias/buffer.hpp>
#include <ilias/error.hpp>
#include <ilias/ilias.hpp>
#include <ilias/log.hpp>
#include <vector>
#include <zlib.h>
#include <span>

Expand Down Expand Up @@ -81,6 +83,96 @@ class ZCategory : public ErrorCategory {

ILIAS_DECLARE_ERROR(ZError, ZCategory);

/**
* @brief The zlib decompressor
*
*/
class Decompressor {
public:
/**
* @brief Construct a new Decompressor object by format
*
* @param wbits The wbits parameter for zlib (use the ZFormat enum)
*/
Decompressor(int wbits) {
::memset(&mStream, 0, sizeof(mStream));
mInitialized = (::inflateInit2(&mStream, wbits) == Z_OK);
}

Decompressor(const Decompressor&) = delete;

~Decompressor() {
if (mInitialized) {
::inflateEnd(&mStream);
}
}

/**
* @brief Doing the decompression on an async stream
*
* @tparam T rquires Readable
* @param output The output buffer for writing the decompressed data
* @param source The source stream for reading the data, which need to be decompressed
* @return Task<size_t> (The number of bytes written to the output buffer, 0 on EOF)
*/
template <Readable T>
auto decompressTo(std::span<std::byte> output, T &source) -> Task<size_t> {
if (mStreamEnd) {
co_return 0; //< EOF
}
mStream.next_out = (Bytef*) output.data();
mStream.avail_out = output.size_bytes();

if (mStream.avail_in == 0) {
// We need to fill the source buffer
if (mBuffer.empty()) {
mBuffer.resize(1024);
}
if (mBufferFullFilled) { //< Trying to increase the buffer size, improve the performance
mBufferFullFilled = false;
mBuffer.resize(mBuffer.size() * 2);
}
auto n = co_await source.read(makeBuffer(mBuffer));
if (!n || *n == 0) { //< Error from lower layer or lower layer return 0, We can not continue
ILIAS_ERROR("Zlib", "Failed to read data from source stream");
co_return Unexpected(n.error_or(Error::ZeroReturn));
}
mStream.next_in = (Bytef*) mBuffer.data();
mStream.avail_in = *n;
mBufferFullFilled = (*n == mBuffer.size());
}
do {
auto ret = ::inflate(&mStream, Z_NO_FLUSH);
if (ret == Z_STREAM_ERROR || ret == Z_NEED_DICT || ret == Z_DATA_ERROR || ret == Z_MEM_ERROR) {
ILIAS_ERROR("Zlib", "inflate error: {}", mStream.msg);
co_return Unexpected(ZError(ret));
}
if (ret == Z_STREAM_END) {
mStreamEnd = true;
break;
}
} // Output buffer is full or source buffer is empty or end of stream
while (mStream.avail_out != 0 && mStream.avail_in != 0);
auto readed = mStream.next_out - (Bytef*) output.data(); //< Calculate the number of bytes readed
co_return readed;
}

/**
* @brief Check the decompressor is initialized
*
* @return true
* @return false
*/
explicit operator bool() const noexcept {
return mInitialized;
}
private:
::z_stream mStream {};
bool mInitialized = false;
std::vector<std::byte> mBuffer {}; //< Buffer for the source, waiting to be decompressed
bool mBufferFullFilled = false; //< Does the previous action make the buffer full filled ?
bool mStreamEnd = false; //< Does the stream end ?
};

/**
* @brief decompress a byte buffer using zlib
Expand Down Expand Up @@ -152,4 +244,4 @@ ILIAS_NS_END

#else
#define ILIAS_NO_ZLIB
#endif
#endif
13 changes: 13 additions & 0 deletions tests/unit/http/session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,19 @@ TEST(Session, GET) {
std::cout << text2.value() << std::endl;
}

TEST(Session, HEAD) {
auto reply = session->head("https://www.baidu.com").wait();
ASSERT_TRUE(reply);
auto text = reply->text().wait();
ASSERT_TRUE(text);
ASSERT_TRUE(text.value().empty());

auto reply2 = session->head("http://www.bilibili.com").wait();
ASSERT_TRUE(reply2);
auto text2 = reply2->text().wait();
ASSERT_TRUE(text2);
}

auto main(int argc, char **argv) -> int {

#if defined(_WIN32)
Expand Down

0 comments on commit 76b891f

Please sign in to comment.