Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.1.x] [CORE-5670] Pandaproxy: Avoid large allocations #21468

Merged
merged 12 commits into from
Jul 17, 2024
6 changes: 6 additions & 0 deletions src/v/bytes/include/bytes/iobuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ class iobuf {
using byte_iterator = details::io_byte_iterator;
using placeholder = details::io_placeholder;

static iobuf from(std::string_view view) {
iobuf i;
i.append(view.data(), view.size());
return i;
}

// NOLINTNEXTLINE
iobuf() noexcept {
// nothing allocates memory, but boost intrusive list is not marked as
Expand Down
40 changes: 40 additions & 0 deletions src/v/bytes/include/bytes/streambuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,43 @@ class iobuf_ostreambuf final : public std::streambuf {
private:
iobuf* _buf;
};

///\brief Wrap a std::istream around an iobuf
///
/// iobuf buf;
/// iobuf_istream is(std::move(buf));
/// std::string out;
/// is.istream() >> out;
class iobuf_istream {
public:
explicit iobuf_istream(iobuf buf)
: _buf(std::move(buf))
, _isb(_buf)
, _sis{&_isb} {}
std::istream& istream() { return _sis; }

private:
iobuf _buf;
iobuf_istreambuf _isb;
std::istream _sis;
};

///\brief Wrap a std::ostream around an iobuf
///
/// iobuf_ostream os;
/// os.ostream() << "Hello World";
/// iobuf buf = std::move(os).buf();
class iobuf_ostream {
public:
iobuf_ostream()
: _buf()
, _osb(_buf)
, _sos{&_osb} {}
std::ostream& ostream() { return _sos; }
iobuf buf() && { return std::move(_buf); }

private:
iobuf _buf;
iobuf_ostreambuf _osb;
std::ostream _sos;
};
26 changes: 25 additions & 1 deletion src/v/json/chunked_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@

namespace json {

template<
typename OutputStream,
typename SourceEncoding,
typename TargetEncoding,
unsigned writeFlags>
class generic_iobuf_writer;

namespace impl {

/**
Expand All @@ -37,15 +44,32 @@ struct generic_chunked_buffer {
//! Get the length of string in Ch in the string buffer.
size_t GetLength() const { return _impl.size_bytes() / sizeof(Ch); }

void Reserve(size_t s) { _impl.reserve(s); }
void Reserve(size_t s) { _impl.reserve_memory(s); }

void Clear() { _impl.clear(); }

/**@}*/

/**
* Append a fragment to this chunked_buffer. This takes ownership of the
* fragment and is a zero-copy operation.
*/
void append(std::unique_ptr<iobuf::fragment> frag) {
_impl.append(std::move(frag));
}

/**
* Return the underlying iobuf, this is destructive and zero-copy.
*/
iobuf as_iobuf() && { return std::move(_impl); }

private:
template<
typename OutputStream,
typename SourceEncoding,
typename TargetEncoding,
unsigned writeFlags>
friend class json::generic_iobuf_writer;
iobuf _impl;
};

Expand Down
117 changes: 117 additions & 0 deletions src/v/json/iobuf_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#pragma once

#include "bytes/iobuf.h"
#include "bytes/iobuf_parser.h"
#include "json/chunked_buffer.h"
#include "json/writer.h"

#include <rapidjson/rapidjson.h>

namespace json {

///\brief a json::Writer that can accept an iobuf as a String payload.
template<
typename OutputStream,
typename SourceEncoding = json::UTF8<>,
typename TargetEncoding = json::UTF8<>,
unsigned writeFlags = rapidjson::kWriteDefaultFlags>
class generic_iobuf_writer
: public Writer<OutputStream, SourceEncoding, TargetEncoding, writeFlags> {
using Base
= Writer<OutputStream, SourceEncoding, TargetEncoding, writeFlags>;

public:
explicit generic_iobuf_writer(OutputStream& os)
: Base{os} {}

using Base::String;
bool String(const iobuf& buf) {
constexpr bool buffer_is_chunked
= std::same_as<OutputStream, json::chunked_buffer>;
if constexpr (buffer_is_chunked) {
return write_chunked_string(buf);
} else {
iobuf_const_parser p{buf};
auto str = p.read_string(p.bytes_left());
return this->String(str.data(), str.size(), true);
}
}

private:
bool write_chunked_string(const iobuf& buf) {
const auto last_frag = [this]() {
return std::prev(this->os_->_impl.end());
};
using Ch = Base::Ch;
this->Prefix(rapidjson::kStringType);
const auto beg = buf.begin();
const auto end = buf.end();
const auto last = std::prev(end);
Ch stashed{};
Ch* stash_pos{};
// Base::WriteString is used to JSON encode the string, and requires a
// contiguous range (pointer, len), so we pass it each fragment.
//
// Unfortunately it also encloses the encoded fragment with double
// quotes:
// R"("A string made of ""fragments will need ""fixing")"
//
// This algorithm efficiently removes the extra quotes without
// additional copying:
// For each encoded fragment that is written (except the last one):
// 1. Trim the suffix quote
// 2. Stash the final character, and where it is to be written
// 3. Drop the final character
// For each encoded fragment that is written (except the first one):
// 4. Restore the stashed character over the prefix-quote
for (auto i = beg; i != end; ++i) {
if (!Base::WriteString(i->get(), i->size())) {
return false;
}
if (i != beg) {
// 4. Restore the stashed character over the prefix-quote
*stash_pos = stashed;
}
if (i != last) {
// 1. Trim the suffix quote
this->os_->_impl.trim_back(1);

// 2. Stash the final character, ...
auto last = last_frag();
stashed = *std::prev(last->get_current());
// 3. Drop the final character
this->os_->_impl.trim_back(1);

// Ensure a stable address to restore the stashed character
if (last != last_frag()) {
this->os_->_impl.reserve_memory(1);
}
// 2. ...and where it is to be written.
stash_pos = last_frag()->get_current();
}
}
return this->EndValue(true);
}
};

template<
typename OutputStream,
typename SourceEncoding = json::UTF8<>,
typename TargetEncoding = json::UTF8<>,
unsigned writeFlags = rapidjson::kWriteDefaultFlags>
using iobuf_writer = generic_iobuf_writer<
OutputStream,
SourceEncoding,
TargetEncoding,
writeFlags>;

} // namespace json
10 changes: 10 additions & 0 deletions src/v/json/json.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "json/json.h"

#include "json/chunked_buffer.h"
#include "json/chunked_input_stream.h"
#include "json/stringbuffer.h"

namespace json {
Expand Down Expand Up @@ -128,6 +129,15 @@ ss::sstring minify(std::string_view json) {
return ss::sstring(out.GetString(), out.GetSize());
}

iobuf minify(iobuf json) {
json::Reader r;
json::chunked_input_stream in(std::move(json));
json::chunked_buffer out;
json::Writer<json::chunked_buffer> w{out};
r.Parse(in, w);
return std::move(out).as_iobuf();
}

ss::sstring prettify(std::string_view json) {
json::Reader r;
json::StringStream in(json.data());
Expand Down
2 changes: 2 additions & 0 deletions src/v/json/json.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#pragma once

#include "bytes/iobuf.h"
#include "json/_include_first.h"
#include "json/prettywriter.h"
#include "json/reader.h"
Expand Down Expand Up @@ -132,6 +133,7 @@ void rjson_serialize(
}

ss::sstring minify(std::string_view json);
iobuf minify(iobuf json);

ss::sstring prettify(std::string_view json);

Expand Down
57 changes: 57 additions & 0 deletions src/v/json/tests/json_serialization_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
// by the Apache License, Version 2.0

#include "base/seastarx.h"
#include "bytes/iobuf_parser.h"
#include "json/chunked_buffer.h"
#include "json/chunked_input_stream.h"
#include "json/document.h"
#include "json/iobuf_writer.h"
#include "json/json.h"
#include "json/stringbuffer.h"
#include "json/writer.h"
Expand Down Expand Up @@ -134,3 +138,56 @@ SEASTAR_THREAD_TEST_CASE(json_serialization_test) {

BOOST_TEST(res_doc["obj"].IsObject());
}

static constexpr std::string_view input_string{
R"(The quick brown fox jumps over the lazy dog)"};

static constexpr auto make_chunked_str = []() {
constexpr auto half = input_string.size() / 2;
iobuf in;
in.append_fragments(iobuf::from(input_string.substr(0, half)));
in.append_fragments(iobuf::from(input_string.substr(half)));
BOOST_REQUIRE_EQUAL(std::distance(in.begin(), in.end()), 2);
return in;
};

static constexpr auto make_chunked_json = []() {
iobuf in;
in.append_fragments(iobuf::from("\""));
in.append_fragments(make_chunked_str());
in.append_fragments(iobuf::from("\""));
BOOST_REQUIRE_EQUAL(std::distance(in.begin(), in.end()), 4);
return in;
};

SEASTAR_THREAD_TEST_CASE(json_chunked_input_stream_test) {
{
json::chunked_input_stream is{make_chunked_json()};
json::Document doc;
doc.ParseStream(is);
BOOST_REQUIRE(!doc.HasParseError());

BOOST_REQUIRE(doc.IsString());
auto out_str = std::string_view{doc.GetString(), doc.GetStringLength()};
BOOST_REQUIRE_EQUAL(out_str, input_string);
}
}

SEASTAR_THREAD_TEST_CASE(json_iobuf_writer_test) {
constexpr auto to_string = [](const iobuf& buf) {
iobuf_const_parser p{std::move(buf)};
auto b = p.read_bytes(p.bytes_left());
return std::string{b.begin(), b.end()};
};

{
json::chunked_buffer out;
json::iobuf_writer<json::chunked_buffer> os{out};
auto buf = make_chunked_str();
os.String(buf);
auto out_buf = std::move(out).as_iobuf();
auto expected = make_chunked_json();
BOOST_CHECK_EQUAL(out_buf, expected);
BOOST_CHECK_EQUAL(to_string(out_buf), to_string(expected));
}
}
12 changes: 4 additions & 8 deletions src/v/pandaproxy/json/iobuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
#pragma once

#include "bytes/iobuf.h"
#include "bytes/iobuf_parser.h"
#include "json/iobuf_writer.h"
#include "json/reader.h"
#include "json/stream.h"
#include "json/writer.h"
#include "pandaproxy/json/rjson_util.h"
#include "utils/base64.h"
Expand Down Expand Up @@ -65,7 +64,7 @@ class rjson_serialize_impl<iobuf> {
: _fmt(fmt) {}

template<typename Buffer>
bool operator()(::json::Writer<Buffer>& w, iobuf buf) {
bool operator()(::json::iobuf_writer<Buffer>& w, iobuf buf) {
switch (_fmt) {
case serialization_format::none:
[[fallthrough]];
Expand All @@ -81,7 +80,7 @@ class rjson_serialize_impl<iobuf> {
}

template<typename Buffer>
bool encode_base64(::json::Writer<Buffer>& w, iobuf buf) {
bool encode_base64(::json::iobuf_writer<Buffer>& w, iobuf buf) {
if (buf.empty()) {
return w.Null();
}
Expand All @@ -94,11 +93,8 @@ class rjson_serialize_impl<iobuf> {
if (buf.empty()) {
return w.Null();
}
iobuf_parser p{std::move(buf)};
auto str = p.read_string(p.bytes_left());
static_assert(str.padding(), "StringStream requires null termination");
::json::chunked_input_stream ss{std::move(buf)};
::json::Reader reader;
::json::StringStream ss{str.c_str()};
return reader.Parse(ss, w);
};

Expand Down
5 changes: 3 additions & 2 deletions src/v/pandaproxy/json/requests/fetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class rjson_serialize_impl<model::record> {
, _base_offset(base_offset) {}

template<typename Buffer>
bool operator()(::json::Writer<Buffer>& w, model::record record) {
bool operator()(::json::iobuf_writer<Buffer>& w, model::record record) {
auto offset = _base_offset() + record.offset_delta();

w.StartObject();
Expand Down Expand Up @@ -93,7 +93,8 @@ class rjson_serialize_impl<kafka::fetch_response> {
: _fmt(fmt) {}

template<typename Buffer>
bool operator()(::json::Writer<Buffer>& w, kafka::fetch_response&& res) {
bool
operator()(::json::iobuf_writer<Buffer>& w, kafka::fetch_response&& res) {
// Eager check for errors
for (auto& v : res) {
if (v.partition_response->error_code != kafka::error_code::none) {
Expand Down
Loading
Loading