Skip to content

Commit

Permalink
Merge pull request #21450 from BenPope/backport-pr-20827-v24.1.x
Browse files Browse the repository at this point in the history
[CORE-5540] [v24.1.x] Pandaproxy: Avoid large allocations whilst serializing JSON
  • Loading branch information
BenPope authored Jul 17, 2024
2 parents cc371b9 + 84acab6 commit a022cde
Show file tree
Hide file tree
Showing 36 changed files with 625 additions and 396 deletions.
2 changes: 1 addition & 1 deletion src/v/compat/json.h
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ inline void rjson_serialize(
ss.rdstate()));
}
w.Key("address");
rjson_serialize<std::string_view>(w, ss.str());
rjson_serialize(w, std::string_view{ss.str()});
w.EndObject();
}

Expand Down
5 changes: 2 additions & 3 deletions src/v/container/include/container/json.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@

namespace json {

template<typename T, size_t max_fragment_size>
template<typename Buffer, typename T, size_t max_fragment_size>
void rjson_serialize(
json::Writer<json::StringBuffer>& w,
const fragmented_vector<T, max_fragment_size>& v) {
json::Writer<Buffer>& w, const fragmented_vector<T, max_fragment_size>& v) {
w.StartArray();
for (const auto& e : v) {
rjson_serialize(w, e);
Expand Down
59 changes: 59 additions & 0 deletions src/v/json/chunked_buffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#pragma once

#include "bytes/iobuf.h"
#include "json/encodings.h"

namespace json {

namespace impl {

/**
* \brief An in-memory output stream with non-contiguous memory allocation.
*/
template<typename Encoding>
struct generic_chunked_buffer {
using Ch = Encoding::Ch;

/**
* \defgroup Implement rapidjson::Stream
*/
/**@{*/

void Put(Ch c) { _impl.append(&c, sizeof(Ch)); }
void Flush() {}

//! Get the size of string in bytes in the string buffer.
size_t GetSize() const { return _impl.size_bytes(); }

//! Get the length of string in Ch in the string buffer.
size_t GetLength() const { return _impl.size_bytes() / sizeof(Ch); }

void Reserve(size_t s) { _impl.reserve(s); }

void Clear() { _impl.clear(); }

/**@}*/

iobuf as_iobuf() && { return std::move(_impl); }

private:
iobuf _impl;
};

} // namespace impl

template<typename Encoding>
using generic_chunked_buffer = impl::generic_chunked_buffer<Encoding>;

using chunked_buffer = generic_chunked_buffer<UTF8<>>;

} // namespace json
64 changes: 64 additions & 0 deletions src/v/json/chunked_input_stream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#pragma once

#include "bytes/streambuf.h"
#include "json/encodings.h"
#include "json/istreamwrapper.h"

namespace json {

namespace impl {

/**
* \brief An in-memory input stream with non-contiguous memory allocation.
*/
template<typename Encoding = ::json::UTF8<>>
class chunked_input_stream {
public:
using Ch = Encoding::Ch;

explicit chunked_input_stream(iobuf&& buf)
: _buf(std::move(buf))
, _is(_buf)
, _sis{&_is}
, _isw(_sis) {}

/**
* \defgroup Implement rapidjson::Stream
*/
/**@{*/

Ch Peek() const { return _isw.Peek(); }
Ch Peek4() const { return _isw.Peek4(); }
Ch Take() { return _isw.Take(); }
size_t Tell() const { return _isw.Tell(); }
void Put(Ch ch) { return _isw.Put(ch); }
Ch* PutBegin() { return _isw.PutBegin(); }
size_t PutEnd(Ch* ch) { return _isw.PutEnd(ch); }
void Flush() { return _isw.Flush(); }

/**@}*/

private:
iobuf _buf;
iobuf_istreambuf _is;
std::istream _sis;
::json::IStreamWrapper _isw;
};

} // namespace impl

template<typename Encoding>
using generic_chunked_input_stream = impl::chunked_input_stream<Encoding>;

using chunked_input_stream = generic_chunked_input_stream<UTF8<>>;

} // namespace json
131 changes: 115 additions & 16 deletions src/v/json/json.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,40 +9,58 @@

#include "json/json.h"

#include "json/chunked_buffer.h"
#include "json/stringbuffer.h"

namespace json {

void rjson_serialize(json::Writer<json::StringBuffer>& w, short v) { w.Int(v); }
template<typename Buffer>
void rjson_serialize(json::Writer<Buffer>& w, short v) {
w.Int(v);
}

void rjson_serialize(json::Writer<json::StringBuffer>& w, bool v) { w.Bool(v); }
template<typename Buffer>
void rjson_serialize(json::Writer<Buffer>& w, bool v) {
w.Bool(v);
}

void rjson_serialize(json::Writer<json::StringBuffer>& w, long long v) {
template<typename Buffer>
void rjson_serialize(json::Writer<Buffer>& w, long long v) {
w.Int64(v);
}

void rjson_serialize(json::Writer<json::StringBuffer>& w, int v) { w.Int(v); }
template<typename Buffer>
void rjson_serialize(json::Writer<Buffer>& w, int v) {
w.Int(v);
}

void rjson_serialize(json::Writer<json::StringBuffer>& w, unsigned int v) {
template<typename Buffer>
void rjson_serialize(json::Writer<Buffer>& w, unsigned int v) {
w.Uint(v);
}

void rjson_serialize(json::Writer<json::StringBuffer>& w, long v) {
template<typename Buffer>
void rjson_serialize(json::Writer<Buffer>& w, long v) {
w.Int64(v);
}

void rjson_serialize(json::Writer<json::StringBuffer>& w, unsigned long v) {
template<typename Buffer>
void rjson_serialize(json::Writer<Buffer>& w, unsigned long v) {
w.Uint64(v);
}

void rjson_serialize(json::Writer<json::StringBuffer>& w, double v) {
template<typename Buffer>
void rjson_serialize(json::Writer<Buffer>& w, double v) {
w.Double(v);
}

void rjson_serialize(json::Writer<json::StringBuffer>& w, std::string_view v) {
template<typename Buffer>
void rjson_serialize(json::Writer<Buffer>& w, std::string_view v) {
w.String(v.data(), v.size());
}

void rjson_serialize(
json::Writer<json::StringBuffer>& w, const ss::socket_address& v) {
template<typename Buffer>
void rjson_serialize(json::Writer<Buffer>& w, const ss::socket_address& v) {
w.StartObject();

std::ostringstream a;
Expand All @@ -68,8 +86,9 @@ void rjson_serialize(
w.EndObject();
}

template<typename Buffer>
void rjson_serialize(
json::Writer<json::StringBuffer>& w, const net::unresolved_address& v) {
json::Writer<Buffer>& w, const net::unresolved_address& v) {
w.StartObject();

w.Key("address");
Expand All @@ -81,20 +100,22 @@ void rjson_serialize(
w.EndObject();
}

template<typename Buffer>
void rjson_serialize(
json::Writer<json::StringBuffer>& w, const std::chrono::milliseconds& v) {
json::Writer<Buffer>& w, const std::chrono::milliseconds& v) {
uint64_t _tmp = v.count();
rjson_serialize(w, _tmp);
}

void rjson_serialize(
json::Writer<json::StringBuffer>& w, const std::chrono::seconds& v) {
template<typename Buffer>
void rjson_serialize(json::Writer<Buffer>& w, const std::chrono::seconds& v) {
uint64_t _tmp = v.count();
rjson_serialize(w, _tmp);
}

template<typename Buffer>
void rjson_serialize(
json::Writer<json::StringBuffer>& w, const std::filesystem::path& path) {
json::Writer<Buffer>& w, const std::filesystem::path& path) {
rjson_serialize(w, std::string_view{path.native()});
}

Expand All @@ -116,4 +137,82 @@ ss::sstring prettify(std::string_view json) {
return ss::sstring(out.GetString(), out.GetSize());
}

template void rjson_serialize<json::StringBuffer>(
json::Writer<json::StringBuffer>& w, short v);

template void rjson_serialize<json::StringBuffer>(
json::Writer<json::StringBuffer>& w, bool v);

template void rjson_serialize<json::StringBuffer>(
json::Writer<json::StringBuffer>& w, long long v);

template void
rjson_serialize<json::StringBuffer>(json::Writer<json::StringBuffer>& w, int v);

template void rjson_serialize<json::StringBuffer>(
json::Writer<json::StringBuffer>& w, unsigned int v);

template void rjson_serialize<json::StringBuffer>(
json::Writer<json::StringBuffer>& w, long v);

template void rjson_serialize<json::StringBuffer>(
json::Writer<json::StringBuffer>& w, unsigned long v);

template void rjson_serialize<json::StringBuffer>(
json::Writer<json::StringBuffer>& w, double v);

template void rjson_serialize<json::StringBuffer>(
json::Writer<json::StringBuffer>& w, std::string_view s);

template void rjson_serialize<json::StringBuffer>(
json::Writer<json::StringBuffer>& w, const net::unresolved_address& v);

template void rjson_serialize<json::StringBuffer>(
json::Writer<json::StringBuffer>& w, const std::chrono::milliseconds& v);

template void rjson_serialize<json::StringBuffer>(
json::Writer<json::StringBuffer>& w, const std::chrono::seconds& v);

template void rjson_serialize<json::StringBuffer>(
json::Writer<json::StringBuffer>& w, const std::filesystem::path& path);

template void
rjson_serialize<chunked_buffer>(json::Writer<chunked_buffer>& w, short v);

template void
rjson_serialize<chunked_buffer>(json::Writer<chunked_buffer>& w, bool v);

template void
rjson_serialize<chunked_buffer>(json::Writer<chunked_buffer>& w, long long v);

template void
rjson_serialize<chunked_buffer>(json::Writer<chunked_buffer>& w, int v);

template void rjson_serialize<chunked_buffer>(
json::Writer<chunked_buffer>& w, unsigned int v);

template void
rjson_serialize<chunked_buffer>(json::Writer<chunked_buffer>& w, long v);

template void rjson_serialize<chunked_buffer>(
json::Writer<chunked_buffer>& w, unsigned long v);

template void
rjson_serialize<chunked_buffer>(json::Writer<chunked_buffer>& w, double v);

template void rjson_serialize<chunked_buffer>(
json::Writer<chunked_buffer>& w, std::string_view s);

template void rjson_serialize<chunked_buffer>(
json::Writer<chunked_buffer>& w, const net::unresolved_address& v);

template void rjson_serialize<chunked_buffer>(
json::Writer<chunked_buffer>& w, const std::chrono::milliseconds& v);

template void rjson_serialize<chunked_buffer>(
json::Writer<chunked_buffer>& w, const std::chrono::seconds& v);

template void rjson_serialize<chunked_buffer>(
json::Writer<chunked_buffer>& w, const std::filesystem::path& path);

} // namespace json
Loading

0 comments on commit a022cde

Please sign in to comment.