Skip to content

Commit

Permalink
Merge pull request #21468 from BenPope/backport-pr-21323-v24.1.x
Browse files Browse the repository at this point in the history
[v24.1.x] [CORE-5670] Pandaproxy: Avoid large allocations
  • Loading branch information
BenPope authored Jul 17, 2024
2 parents 7937fab + 8b68ac5 commit dbf848a
Show file tree
Hide file tree
Showing 46 changed files with 720 additions and 308 deletions.
6 changes: 6 additions & 0 deletions src/v/bytes/include/bytes/iobuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ class iobuf {
using byte_iterator = details::io_byte_iterator;
using placeholder = details::io_placeholder;

static iobuf from(std::string_view view) {
iobuf i;
i.append(view.data(), view.size());
return i;
}

// NOLINTNEXTLINE
iobuf() noexcept {
// nothing allocates memory, but boost intrusive list is not marked as
Expand Down
40 changes: 40 additions & 0 deletions src/v/bytes/include/bytes/streambuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,43 @@ class iobuf_ostreambuf final : public std::streambuf {
private:
iobuf* _buf;
};

///\brief Wrap a std::istream around an iobuf
///
/// iobuf buf;
/// iobuf_istream is(std::move(buf));
/// std::string out;
/// is.istream() >> out;
class iobuf_istream {
public:
explicit iobuf_istream(iobuf buf)
: _buf(std::move(buf))
, _isb(_buf)
, _sis{&_isb} {}
std::istream& istream() { return _sis; }

private:
iobuf _buf;
iobuf_istreambuf _isb;
std::istream _sis;
};

///\brief Wrap a std::ostream around an iobuf
///
/// iobuf_ostream os;
/// os.ostream() << "Hello World";
/// iobuf buf = std::move(os).buf();
class iobuf_ostream {
public:
iobuf_ostream()
: _buf()
, _osb(_buf)
, _sos{&_osb} {}
std::ostream& ostream() { return _sos; }
iobuf buf() && { return std::move(_buf); }

private:
iobuf _buf;
iobuf_ostreambuf _osb;
std::ostream _sos;
};
26 changes: 25 additions & 1 deletion src/v/json/chunked_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@

namespace json {

template<
typename OutputStream,
typename SourceEncoding,
typename TargetEncoding,
unsigned writeFlags>
class generic_iobuf_writer;

namespace impl {

/**
Expand All @@ -37,15 +44,32 @@ struct generic_chunked_buffer {
//! Get the length of string in Ch in the string buffer.
size_t GetLength() const { return _impl.size_bytes() / sizeof(Ch); }

void Reserve(size_t s) { _impl.reserve(s); }
void Reserve(size_t s) { _impl.reserve_memory(s); }

void Clear() { _impl.clear(); }

/**@}*/

/**
* Append a fragment to this chunked_buffer. This takes ownership of the
* fragment and is a zero-copy operation.
*/
void append(std::unique_ptr<iobuf::fragment> frag) {
_impl.append(std::move(frag));
}

/**
* Return the underlying iobuf, this is destructive and zero-copy.
*/
iobuf as_iobuf() && { return std::move(_impl); }

private:
template<
typename OutputStream,
typename SourceEncoding,
typename TargetEncoding,
unsigned writeFlags>
friend class json::generic_iobuf_writer;
iobuf _impl;
};

Expand Down
117 changes: 117 additions & 0 deletions src/v/json/iobuf_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#pragma once

#include "bytes/iobuf.h"
#include "bytes/iobuf_parser.h"
#include "json/chunked_buffer.h"
#include "json/writer.h"

#include <rapidjson/rapidjson.h>

namespace json {

///\brief a json::Writer that can accept an iobuf as a String payload.
template<
typename OutputStream,
typename SourceEncoding = json::UTF8<>,
typename TargetEncoding = json::UTF8<>,
unsigned writeFlags = rapidjson::kWriteDefaultFlags>
class generic_iobuf_writer
: public Writer<OutputStream, SourceEncoding, TargetEncoding, writeFlags> {
using Base
= Writer<OutputStream, SourceEncoding, TargetEncoding, writeFlags>;

public:
explicit generic_iobuf_writer(OutputStream& os)
: Base{os} {}

using Base::String;
bool String(const iobuf& buf) {
constexpr bool buffer_is_chunked
= std::same_as<OutputStream, json::chunked_buffer>;
if constexpr (buffer_is_chunked) {
return write_chunked_string(buf);
} else {
iobuf_const_parser p{buf};
auto str = p.read_string(p.bytes_left());
return this->String(str.data(), str.size(), true);
}
}

private:
bool write_chunked_string(const iobuf& buf) {
const auto last_frag = [this]() {
return std::prev(this->os_->_impl.end());
};
using Ch = Base::Ch;
this->Prefix(rapidjson::kStringType);
const auto beg = buf.begin();
const auto end = buf.end();
const auto last = std::prev(end);
Ch stashed{};
Ch* stash_pos{};
// Base::WriteString is used to JSON encode the string, and requires a
// contiguous range (pointer, len), so we pass it each fragment.
//
// Unfortunately it also encloses the encoded fragment with double
// quotes:
// R"("A string made of ""fragments will need ""fixing")"
//
// This algorithm efficiently removes the extra quotes without
// additional copying:
// For each encoded fragment that is written (except the last one):
// 1. Trim the suffix quote
// 2. Stash the final character, and where it is to be written
// 3. Drop the final character
// For each encoded fragment that is written (except the first one):
// 4. Restore the stashed character over the prefix-quote
for (auto i = beg; i != end; ++i) {
if (!Base::WriteString(i->get(), i->size())) {
return false;
}
if (i != beg) {
// 4. Restore the stashed character over the prefix-quote
*stash_pos = stashed;
}
if (i != last) {
// 1. Trim the suffix quote
this->os_->_impl.trim_back(1);

// 2. Stash the final character, ...
auto last = last_frag();
stashed = *std::prev(last->get_current());
// 3. Drop the final character
this->os_->_impl.trim_back(1);

// Ensure a stable address to restore the stashed character
if (last != last_frag()) {
this->os_->_impl.reserve_memory(1);
}
// 2. ...and where it is to be written.
stash_pos = last_frag()->get_current();
}
}
return this->EndValue(true);
}
};

template<
typename OutputStream,
typename SourceEncoding = json::UTF8<>,
typename TargetEncoding = json::UTF8<>,
unsigned writeFlags = rapidjson::kWriteDefaultFlags>
using iobuf_writer = generic_iobuf_writer<
OutputStream,
SourceEncoding,
TargetEncoding,
writeFlags>;

} // namespace json
10 changes: 10 additions & 0 deletions src/v/json/json.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "json/json.h"

#include "json/chunked_buffer.h"
#include "json/chunked_input_stream.h"
#include "json/stringbuffer.h"

namespace json {
Expand Down Expand Up @@ -128,6 +129,15 @@ ss::sstring minify(std::string_view json) {
return ss::sstring(out.GetString(), out.GetSize());
}

iobuf minify(iobuf json) {
json::Reader r;
json::chunked_input_stream in(std::move(json));
json::chunked_buffer out;
json::Writer<json::chunked_buffer> w{out};
r.Parse(in, w);
return std::move(out).as_iobuf();
}

ss::sstring prettify(std::string_view json) {
json::Reader r;
json::StringStream in(json.data());
Expand Down
2 changes: 2 additions & 0 deletions src/v/json/json.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#pragma once

#include "bytes/iobuf.h"
#include "json/_include_first.h"
#include "json/prettywriter.h"
#include "json/reader.h"
Expand Down Expand Up @@ -132,6 +133,7 @@ void rjson_serialize(
}

ss::sstring minify(std::string_view json);
iobuf minify(iobuf json);

ss::sstring prettify(std::string_view json);

Expand Down
57 changes: 57 additions & 0 deletions src/v/json/tests/json_serialization_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
// by the Apache License, Version 2.0

#include "base/seastarx.h"
#include "bytes/iobuf_parser.h"
#include "json/chunked_buffer.h"
#include "json/chunked_input_stream.h"
#include "json/document.h"
#include "json/iobuf_writer.h"
#include "json/json.h"
#include "json/stringbuffer.h"
#include "json/writer.h"
Expand Down Expand Up @@ -134,3 +138,56 @@ SEASTAR_THREAD_TEST_CASE(json_serialization_test) {

BOOST_TEST(res_doc["obj"].IsObject());
}

static constexpr std::string_view input_string{
R"(The quick brown fox jumps over the lazy dog)"};

static constexpr auto make_chunked_str = []() {
constexpr auto half = input_string.size() / 2;
iobuf in;
in.append_fragments(iobuf::from(input_string.substr(0, half)));
in.append_fragments(iobuf::from(input_string.substr(half)));
BOOST_REQUIRE_EQUAL(std::distance(in.begin(), in.end()), 2);
return in;
};

static constexpr auto make_chunked_json = []() {
iobuf in;
in.append_fragments(iobuf::from("\""));
in.append_fragments(make_chunked_str());
in.append_fragments(iobuf::from("\""));
BOOST_REQUIRE_EQUAL(std::distance(in.begin(), in.end()), 4);
return in;
};

SEASTAR_THREAD_TEST_CASE(json_chunked_input_stream_test) {
{
json::chunked_input_stream is{make_chunked_json()};
json::Document doc;
doc.ParseStream(is);
BOOST_REQUIRE(!doc.HasParseError());

BOOST_REQUIRE(doc.IsString());
auto out_str = std::string_view{doc.GetString(), doc.GetStringLength()};
BOOST_REQUIRE_EQUAL(out_str, input_string);
}
}

SEASTAR_THREAD_TEST_CASE(json_iobuf_writer_test) {
constexpr auto to_string = [](const iobuf& buf) {
iobuf_const_parser p{std::move(buf)};
auto b = p.read_bytes(p.bytes_left());
return std::string{b.begin(), b.end()};
};

{
json::chunked_buffer out;
json::iobuf_writer<json::chunked_buffer> os{out};
auto buf = make_chunked_str();
os.String(buf);
auto out_buf = std::move(out).as_iobuf();
auto expected = make_chunked_json();
BOOST_CHECK_EQUAL(out_buf, expected);
BOOST_CHECK_EQUAL(to_string(out_buf), to_string(expected));
}
}
12 changes: 4 additions & 8 deletions src/v/pandaproxy/json/iobuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
#pragma once

#include "bytes/iobuf.h"
#include "bytes/iobuf_parser.h"
#include "json/iobuf_writer.h"
#include "json/reader.h"
#include "json/stream.h"
#include "json/writer.h"
#include "pandaproxy/json/rjson_util.h"
#include "utils/base64.h"
Expand Down Expand Up @@ -65,7 +64,7 @@ class rjson_serialize_impl<iobuf> {
: _fmt(fmt) {}

template<typename Buffer>
bool operator()(::json::Writer<Buffer>& w, iobuf buf) {
bool operator()(::json::iobuf_writer<Buffer>& w, iobuf buf) {
switch (_fmt) {
case serialization_format::none:
[[fallthrough]];
Expand All @@ -81,7 +80,7 @@ class rjson_serialize_impl<iobuf> {
}

template<typename Buffer>
bool encode_base64(::json::Writer<Buffer>& w, iobuf buf) {
bool encode_base64(::json::iobuf_writer<Buffer>& w, iobuf buf) {
if (buf.empty()) {
return w.Null();
}
Expand All @@ -94,11 +93,8 @@ class rjson_serialize_impl<iobuf> {
if (buf.empty()) {
return w.Null();
}
iobuf_parser p{std::move(buf)};
auto str = p.read_string(p.bytes_left());
static_assert(str.padding(), "StringStream requires null termination");
::json::chunked_input_stream ss{std::move(buf)};
::json::Reader reader;
::json::StringStream ss{str.c_str()};
return reader.Parse(ss, w);
};

Expand Down
5 changes: 3 additions & 2 deletions src/v/pandaproxy/json/requests/fetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class rjson_serialize_impl<model::record> {
, _base_offset(base_offset) {}

template<typename Buffer>
bool operator()(::json::Writer<Buffer>& w, model::record record) {
bool operator()(::json::iobuf_writer<Buffer>& w, model::record record) {
auto offset = _base_offset() + record.offset_delta();

w.StartObject();
Expand Down Expand Up @@ -93,7 +93,8 @@ class rjson_serialize_impl<kafka::fetch_response> {
: _fmt(fmt) {}

template<typename Buffer>
bool operator()(::json::Writer<Buffer>& w, kafka::fetch_response&& res) {
bool
operator()(::json::iobuf_writer<Buffer>& w, kafka::fetch_response&& res) {
// Eager check for errors
for (auto& v : res) {
if (v.partition_response->error_code != kafka::error_code::none) {
Expand Down
Loading

0 comments on commit dbf848a

Please sign in to comment.