From 1c0c445fa00fbbde8329434ac316f92911f6749f Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Mon, 1 Jul 2024 15:26:16 +0100 Subject: [PATCH 01/15] json: Template rjson_serialize on Buffer Some structures serialize quite large, so in order to avoid oversize allocs, it's necessary to use a buffer type that is not based onn contiguous memmory. Explicitly instantiate all overloads with StringBuffer. Fix the test due to the way name lookup works. Signed-off-by: Ben Pope (cherry picked from commit 7cea48d60091e79ff8e85a0f2f7aacd07b30eef2) --- src/v/compat/json.h | 2 +- src/v/container/include/container/json.h | 5 +- src/v/json/json.cc | 91 +++++++++++++++++---- src/v/json/json.h | 76 +++++++++-------- src/v/json/tests/json_serialization_test.cc | 9 +- 5 files changed, 126 insertions(+), 57 deletions(-) diff --git a/src/v/compat/json.h b/src/v/compat/json.h index 58c79547a8372..8ff3a8760b94b 100644 --- a/src/v/compat/json.h +++ b/src/v/compat/json.h @@ -523,7 +523,7 @@ inline void rjson_serialize( ss.rdstate())); } w.Key("address"); - rjson_serialize(w, ss.str()); + rjson_serialize(w, std::string_view{ss.str()}); w.EndObject(); } diff --git a/src/v/container/include/container/json.h b/src/v/container/include/container/json.h index a2e03954e5413..602ae285458dc 100644 --- a/src/v/container/include/container/json.h +++ b/src/v/container/include/container/json.h @@ -15,10 +15,9 @@ namespace json { -template +template void rjson_serialize( - json::Writer& w, - const fragmented_vector& v) { + json::Writer& w, const fragmented_vector& v) { w.StartArray(); for (const auto& e : v) { rjson_serialize(w, e); diff --git a/src/v/json/json.cc b/src/v/json/json.cc index 61736942d7196..465feb5b4de6c 100644 --- a/src/v/json/json.cc +++ b/src/v/json/json.cc @@ -9,40 +9,57 @@ #include "json/json.h" +#include "json/stringbuffer.h" + namespace json { -void rjson_serialize(json::Writer& w, short v) { w.Int(v); } +template +void rjson_serialize(json::Writer& w, short v) { + w.Int(v); +} -void rjson_serialize(json::Writer& w, bool v) { w.Bool(v); } +template +void rjson_serialize(json::Writer& w, bool v) { + w.Bool(v); +} -void rjson_serialize(json::Writer& w, long long v) { +template +void rjson_serialize(json::Writer& w, long long v) { w.Int64(v); } -void rjson_serialize(json::Writer& w, int v) { w.Int(v); } +template +void rjson_serialize(json::Writer& w, int v) { + w.Int(v); +} -void rjson_serialize(json::Writer& w, unsigned int v) { +template +void rjson_serialize(json::Writer& w, unsigned int v) { w.Uint(v); } -void rjson_serialize(json::Writer& w, long v) { +template +void rjson_serialize(json::Writer& w, long v) { w.Int64(v); } -void rjson_serialize(json::Writer& w, unsigned long v) { +template +void rjson_serialize(json::Writer& w, unsigned long v) { w.Uint64(v); } -void rjson_serialize(json::Writer& w, double v) { +template +void rjson_serialize(json::Writer& w, double v) { w.Double(v); } -void rjson_serialize(json::Writer& w, std::string_view v) { +template +void rjson_serialize(json::Writer& w, std::string_view v) { w.String(v.data(), v.size()); } -void rjson_serialize( - json::Writer& w, const ss::socket_address& v) { +template +void rjson_serialize(json::Writer& w, const ss::socket_address& v) { w.StartObject(); std::ostringstream a; @@ -68,8 +85,9 @@ void rjson_serialize( w.EndObject(); } +template void rjson_serialize( - json::Writer& w, const net::unresolved_address& v) { + json::Writer& w, const net::unresolved_address& v) { w.StartObject(); w.Key("address"); @@ -81,20 +99,22 @@ void rjson_serialize( w.EndObject(); } +template void rjson_serialize( - json::Writer& w, const std::chrono::milliseconds& v) { + json::Writer& w, const std::chrono::milliseconds& v) { uint64_t _tmp = v.count(); rjson_serialize(w, _tmp); } -void rjson_serialize( - json::Writer& w, const std::chrono::seconds& v) { +template +void rjson_serialize(json::Writer& w, const std::chrono::seconds& v) { uint64_t _tmp = v.count(); rjson_serialize(w, _tmp); } +template void rjson_serialize( - json::Writer& w, const std::filesystem::path& path) { + json::Writer& w, const std::filesystem::path& path) { rjson_serialize(w, std::string_view{path.native()}); } @@ -116,4 +136,43 @@ ss::sstring prettify(std::string_view json) { return ss::sstring(out.GetString(), out.GetSize()); } +template void rjson_serialize( + json::Writer& w, short v); + +template void rjson_serialize( + json::Writer& w, bool v); + +template void rjson_serialize( + json::Writer& w, long long v); + +template void +rjson_serialize(json::Writer& w, int v); + +template void rjson_serialize( + json::Writer& w, unsigned int v); + +template void rjson_serialize( + json::Writer& w, long v); + +template void rjson_serialize( + json::Writer& w, unsigned long v); + +template void rjson_serialize( + json::Writer& w, double v); + +template void rjson_serialize( + json::Writer& w, std::string_view s); + +template void rjson_serialize( + json::Writer& w, const net::unresolved_address& v); + +template void rjson_serialize( + json::Writer& w, const std::chrono::milliseconds& v); + +template void rjson_serialize( + json::Writer& w, const std::chrono::seconds& v); + +template void rjson_serialize( + json::Writer& w, const std::filesystem::path& path); + } // namespace json diff --git a/src/v/json/json.h b/src/v/json/json.h index 261ccdeb67afd..b70f8d1961b4a 100644 --- a/src/v/json/json.h +++ b/src/v/json/json.h @@ -28,50 +28,62 @@ namespace json { -void rjson_serialize(json::Writer& w, short v); +template +void rjson_serialize(json::Writer& w, short v); -void rjson_serialize(json::Writer& w, bool v); +template +void rjson_serialize(json::Writer& w, bool v); -void rjson_serialize(json::Writer& w, long long v); +template +void rjson_serialize(json::Writer& w, long long v); -void rjson_serialize(json::Writer& w, int v); +template +void rjson_serialize(json::Writer& w, int v); -void rjson_serialize(json::Writer& w, unsigned int v); +template +void rjson_serialize(json::Writer& w, unsigned int v); -void rjson_serialize(json::Writer& w, long v); +template +void rjson_serialize(json::Writer& w, long v); -void rjson_serialize(json::Writer& w, unsigned long v); +template +void rjson_serialize(json::Writer& w, unsigned long v); -void rjson_serialize(json::Writer& w, double v); +template +void rjson_serialize(json::Writer& w, double v); -void rjson_serialize(json::Writer& w, std::string_view s); +template +void rjson_serialize(json::Writer& w, std::string_view s); -void rjson_serialize( - json::Writer& w, const net::unresolved_address& v); +template +void rjson_serialize(json::Writer& w, const net::unresolved_address& v); +template void rjson_serialize( - json::Writer& w, const std::chrono::milliseconds& v); + json::Writer& w, const std::chrono::milliseconds& v); -void rjson_serialize( - json::Writer& w, const std::chrono::seconds& v); +template +void rjson_serialize(json::Writer& w, const std::chrono::seconds& v); +template void rjson_serialize( - json::Writer& w, const std::filesystem::path& path); + json::Writer& w, const std::filesystem::path& path); -template>> -void rjson_serialize(json::Writer& w, T v) { +template< + typename Buffer, + typename T, + typename = std::enable_if_t>> +void rjson_serialize(json::Writer& w, T v) { rjson_serialize(w, static_cast>(v)); } -template -void rjson_serialize( - json::Writer& w, const named_type& v) { +template +void rjson_serialize(json::Writer& w, const named_type& v) { rjson_serialize(w, v()); } -template -void rjson_serialize( - json::Writer& w, const std::optional& v) { +template +void rjson_serialize(json::Writer& w, const std::optional& v) { if (v) { rjson_serialize(w, *v); return; @@ -79,9 +91,8 @@ void rjson_serialize( w.Null(); } -template -void rjson_serialize( - json::Writer& w, const std::vector& v) { +template +void rjson_serialize(json::Writer& w, const std::vector& v) { w.StartArray(); for (const auto& e : v) { rjson_serialize(w, e); @@ -89,10 +100,9 @@ void rjson_serialize( w.EndArray(); } -template +template void rjson_serialize( - json::Writer& w, - const ss::chunked_fifo& v) { + json::Writer& w, const ss::chunked_fifo& v) { w.StartArray(); for (const auto& e : v) { rjson_serialize(w, e); @@ -100,9 +110,9 @@ void rjson_serialize( w.EndArray(); } -template +template void rjson_serialize( - json::Writer& w, + json::Writer& w, const std::unordered_map& v) { w.StartArray(); for (const auto& e : v) { @@ -111,9 +121,9 @@ void rjson_serialize( w.EndArray(); } -template +template void rjson_serialize( - json::Writer& w, const ss::circular_buffer& v) { + json::Writer& w, const ss::circular_buffer& v) { w.StartArray(); for (const auto& e : v) { rjson_serialize(w, e); diff --git a/src/v/json/tests/json_serialization_test.cc b/src/v/json/tests/json_serialization_test.cc index 95d8258686245..31aee1f7d34c8 100644 --- a/src/v/json/tests/json_serialization_test.cc +++ b/src/v/json/tests/json_serialization_test.cc @@ -41,8 +41,8 @@ struct personne_t { } // namespace -void rjson_serialize( - json::Writer& w, const personne_t::nested& obj) { +template +void rjson_serialize(json::Writer& w, const personne_t::nested& obj) { w.StartObject(); w.Key("x"); @@ -57,11 +57,12 @@ void rjson_serialize( w.EndObject(); } -void rjson_serialize(json::Writer& w, const personne_t& p) { +template +void rjson_serialize(json::Writer& w, const personne_t& p) { w.StartObject(); w.Key("full_name"); - json::rjson_serialize(w, p.full_name); + json::rjson_serialize(w, std::string_view{p.full_name}); w.Key("nic"); json::rjson_serialize(w, p.nic); From 1228340d4b9f22fab4a6c175d716bcb3a8bce6b5 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Tue, 2 Jul 2024 13:18:53 +0100 Subject: [PATCH 02/15] pandaproxy/json: Template rjson_serialize on Buffer Signed-off-by: Ben Pope (cherry picked from commit ebc4c1347ef8cb9955dd671fb69a7bfbe103a34c) --- src/v/pandaproxy/json/iobuf.h | 9 ++++++--- src/v/pandaproxy/json/requests/brokers.h | 5 +++-- src/v/pandaproxy/json/requests/create_consumer.h | 6 +++--- src/v/pandaproxy/json/requests/error_reply.h | 4 ++-- src/v/pandaproxy/json/requests/fetch.h | 8 ++++---- src/v/pandaproxy/json/requests/offset_fetch.h | 12 ++++++------ src/v/pandaproxy/json/requests/produce.h | 12 ++++++------ src/v/pandaproxy/json/rjson_util.h | 4 ++-- 8 files changed, 32 insertions(+), 28 deletions(-) diff --git a/src/v/pandaproxy/json/iobuf.h b/src/v/pandaproxy/json/iobuf.h index 79ec11e176b62..e13be0c8584c0 100644 --- a/src/v/pandaproxy/json/iobuf.h +++ b/src/v/pandaproxy/json/iobuf.h @@ -65,7 +65,8 @@ class rjson_serialize_impl { explicit rjson_serialize_impl(serialization_format fmt) : _fmt(fmt) {} - bool operator()(::json::Writer<::json::StringBuffer>& w, iobuf buf) { + template + bool operator()(::json::Writer& w, iobuf buf) { switch (_fmt) { case serialization_format::none: [[fallthrough]]; @@ -80,7 +81,8 @@ class rjson_serialize_impl { } } - bool encode_base64(::json::Writer<::json::StringBuffer>& w, iobuf buf) { + template + bool encode_base64(::json::Writer& w, iobuf buf) { if (buf.empty()) { return w.Null(); } @@ -88,7 +90,8 @@ class rjson_serialize_impl { return w.String(iobuf_to_base64(buf)); }; - bool encode_json(::json::Writer<::json::StringBuffer>& w, iobuf buf) { + template + bool encode_json(::json::Writer& w, iobuf buf) { if (buf.empty()) { return w.Null(); } diff --git a/src/v/pandaproxy/json/requests/brokers.h b/src/v/pandaproxy/json/requests/brokers.h index 6a0e57ef55028..4229bc4656b75 100644 --- a/src/v/pandaproxy/json/requests/brokers.h +++ b/src/v/pandaproxy/json/requests/brokers.h @@ -21,8 +21,9 @@ struct get_brokers_res { std::vector ids; }; -inline void rjson_serialize( - ::json::Writer<::json::StringBuffer>& w, const get_brokers_res& brokers) { +template +void rjson_serialize( + ::json::Writer& w, const get_brokers_res& brokers) { w.StartObject(); w.Key("brokers"); ::json::rjson_serialize(w, brokers.ids); diff --git a/src/v/pandaproxy/json/requests/create_consumer.h b/src/v/pandaproxy/json/requests/create_consumer.h index cd7b0a0b6d1f4..9db53b836bd87 100644 --- a/src/v/pandaproxy/json/requests/create_consumer.h +++ b/src/v/pandaproxy/json/requests/create_consumer.h @@ -107,9 +107,9 @@ struct create_consumer_response { ss::sstring base_uri; }; -inline void rjson_serialize( - ::json::Writer<::json::StringBuffer>& w, - const create_consumer_response& res) { +template +void rjson_serialize( + ::json::Writer& w, const create_consumer_response& res) { w.StartObject(); w.Key("instance_id"); w.String(res.instance_id()); diff --git a/src/v/pandaproxy/json/requests/error_reply.h b/src/v/pandaproxy/json/requests/error_reply.h index 7ac84d300ab5c..57cf38317476f 100644 --- a/src/v/pandaproxy/json/requests/error_reply.h +++ b/src/v/pandaproxy/json/requests/error_reply.h @@ -26,8 +26,8 @@ struct error_body { ss::sstring message; }; -inline void -rjson_serialize(::json::Writer<::json::StringBuffer>& w, const error_body& v) { +template +void rjson_serialize(::json::Writer& w, const error_body& v) { w.StartObject(); w.Key("error_code"); ::json::rjson_serialize(w, v.ec.value()); diff --git a/src/v/pandaproxy/json/requests/fetch.h b/src/v/pandaproxy/json/requests/fetch.h index f69280210e255..f70e98e37ade0 100644 --- a/src/v/pandaproxy/json/requests/fetch.h +++ b/src/v/pandaproxy/json/requests/fetch.h @@ -43,8 +43,8 @@ class rjson_serialize_impl { , _tpv(tpv) , _base_offset(base_offset) {} - bool - operator()(::json::Writer<::json::StringBuffer>& w, model::record record) { + template + bool operator()(::json::Writer& w, model::record record) { auto offset = _base_offset() + record.offset_delta(); w.StartObject(); @@ -97,8 +97,8 @@ class rjson_serialize_impl { explicit rjson_serialize_impl(serialization_format fmt) : _fmt(fmt) {} - bool operator()( - ::json::Writer<::json::StringBuffer>& w, kafka::fetch_response&& res) { + template + bool operator()(::json::Writer& w, kafka::fetch_response&& res) { // Eager check for errors for (auto& v : res) { if (v.partition_response->error_code != kafka::error_code::none) { diff --git a/src/v/pandaproxy/json/requests/offset_fetch.h b/src/v/pandaproxy/json/requests/offset_fetch.h index 293926cd9e9ad..32bcd9f211ad2 100644 --- a/src/v/pandaproxy/json/requests/offset_fetch.h +++ b/src/v/pandaproxy/json/requests/offset_fetch.h @@ -40,9 +40,9 @@ partitions_request_to_offset_request(std::vector tps) { return res; } -inline void rjson_serialize( - ::json::Writer<::json::StringBuffer>& w, - const kafka::offset_fetch_response_topic& v) { +template +void rjson_serialize( + ::json::Writer& w, const kafka::offset_fetch_response_topic& v) { for (const auto& p : v.partitions) { w.StartObject(); w.Key("topic"); @@ -57,9 +57,9 @@ inline void rjson_serialize( } } -inline void rjson_serialize( - ::json::Writer<::json::StringBuffer>& w, - const kafka::offset_fetch_response& v) { +template +void rjson_serialize( + ::json::Writer& w, const kafka::offset_fetch_response& v) { w.StartObject(); w.Key("offsets"); w.StartArray(); diff --git a/src/v/pandaproxy/json/requests/produce.h b/src/v/pandaproxy/json/requests/produce.h index 83dfeb5aac782..df15bd923c619 100644 --- a/src/v/pandaproxy/json/requests/produce.h +++ b/src/v/pandaproxy/json/requests/produce.h @@ -265,9 +265,9 @@ class produce_request_handler { std::optional _json_writer; }; -inline void rjson_serialize( - ::json::Writer<::json::StringBuffer>& w, - const kafka::produce_response::partition& v) { +template +void rjson_serialize( + ::json::Writer& w, const kafka::produce_response::partition& v) { w.StartObject(); w.Key("partition"); w.Int(v.partition_index); @@ -280,9 +280,9 @@ inline void rjson_serialize( w.EndObject(); } -inline void rjson_serialize( - ::json::Writer<::json::StringBuffer>& w, - const kafka::produce_response::topic& v) { +template +void rjson_serialize( + ::json::Writer& w, const kafka::produce_response::topic& v) { w.StartObject(); w.Key("offsets"); w.StartArray(); diff --git a/src/v/pandaproxy/json/rjson_util.h b/src/v/pandaproxy/json/rjson_util.h index 2fe5d465c07f4..1c219d29b3143 100644 --- a/src/v/pandaproxy/json/rjson_util.h +++ b/src/v/pandaproxy/json/rjson_util.h @@ -48,8 +48,8 @@ struct rjson_serialize_fmt_impl { return rjson_serialize_impl>{fmt}( std::forward(t)); } - template - bool operator()(::json::Writer<::json::StringBuffer>& w, T&& t) { + template + bool operator()(::json::Writer& w, T&& t) { return rjson_serialize_impl>{fmt}( w, std::forward(t)); } From 3c97452faff753666e2ab334cbb917ec63662c30 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Tue, 2 Jul 2024 13:19:13 +0100 Subject: [PATCH 03/15] schema_registry: Template rjson_serialize on Buffer Signed-off-by: Ben Pope (cherry picked from commit 9fcb311283019d0dd359cb89dfd99a7edf289438) --- .../schema_registry/requests/compatibility.h | 5 ++- .../schema_registry/requests/config.h | 12 ++--- .../requests/get_schemas_ids_id.h | 6 +-- .../requests/get_schemas_ids_id_versions.h | 6 +-- .../requests/get_subject_versions_version.h | 3 +- .../schema_registry/requests/mode.h | 6 +-- .../requests/post_subject_versions.h | 5 ++- src/v/pandaproxy/schema_registry/storage.h | 44 +++++++++---------- .../test/post_subjects_subject_version.cc | 4 +- 9 files changed, 47 insertions(+), 44 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/requests/compatibility.h b/src/v/pandaproxy/schema_registry/requests/compatibility.h index db9759f515225..a39608b689ee8 100644 --- a/src/v/pandaproxy/schema_registry/requests/compatibility.h +++ b/src/v/pandaproxy/schema_registry/requests/compatibility.h @@ -20,8 +20,9 @@ struct post_compatibility_res { bool is_compat{false}; }; -inline void rjson_serialize( - ::json::Writer<::json::StringBuffer>& w, +template +void rjson_serialize( + ::json::Writer& w, const schema_registry::post_compatibility_res& res) { w.StartObject(); w.Key("is_compatible"); diff --git a/src/v/pandaproxy/schema_registry/requests/config.h b/src/v/pandaproxy/schema_registry/requests/config.h index d219ad6cfa02c..688e66f2e67fb 100644 --- a/src/v/pandaproxy/schema_registry/requests/config.h +++ b/src/v/pandaproxy/schema_registry/requests/config.h @@ -82,18 +82,18 @@ class put_config_handler : public json::base_handler { } }; -inline void rjson_serialize( - ::json::Writer<::json::StringBuffer>& w, - const schema_registry::get_config_req_rep& res) { +template +void rjson_serialize( + ::json::Writer& w, const schema_registry::get_config_req_rep& res) { w.StartObject(); w.Key(get_config_req_rep::field_name.data()); ::json::rjson_serialize(w, to_string_view(res.compat)); w.EndObject(); } -inline void rjson_serialize( - ::json::Writer<::json::StringBuffer>& w, - const schema_registry::put_config_req_rep& res) { +template +void rjson_serialize( + ::json::Writer& w, const schema_registry::put_config_req_rep& res) { w.StartObject(); w.Key(put_config_req_rep::field_name.data()); ::json::rjson_serialize(w, to_string_view(res.compat)); diff --git a/src/v/pandaproxy/schema_registry/requests/get_schemas_ids_id.h b/src/v/pandaproxy/schema_registry/requests/get_schemas_ids_id.h index 6eec85d6a497f..c6d75a6854649 100644 --- a/src/v/pandaproxy/schema_registry/requests/get_schemas_ids_id.h +++ b/src/v/pandaproxy/schema_registry/requests/get_schemas_ids_id.h @@ -20,9 +20,9 @@ struct get_schemas_ids_id_response { canonical_schema_definition definition; }; -inline void rjson_serialize( - ::json::Writer<::json::StringBuffer>& w, - const get_schemas_ids_id_response& res) { +template +void rjson_serialize( + ::json::Writer& w, const get_schemas_ids_id_response& res) { w.StartObject(); if (res.definition.type() != schema_type::avro) { w.Key("schemaType"); diff --git a/src/v/pandaproxy/schema_registry/requests/get_schemas_ids_id_versions.h b/src/v/pandaproxy/schema_registry/requests/get_schemas_ids_id_versions.h index c7700303d0d43..b840de34e5fa9 100644 --- a/src/v/pandaproxy/schema_registry/requests/get_schemas_ids_id_versions.h +++ b/src/v/pandaproxy/schema_registry/requests/get_schemas_ids_id_versions.h @@ -21,9 +21,9 @@ struct get_schemas_ids_id_versions_response { chunked_vector subject_versions; }; -inline void rjson_serialize( - ::json::Writer<::json::StringBuffer>& w, - const get_schemas_ids_id_versions_response& res) { +template +void rjson_serialize( + ::json::Writer& w, const get_schemas_ids_id_versions_response& res) { w.StartArray(); for (const auto& sv : res.subject_versions) { w.StartObject(); diff --git a/src/v/pandaproxy/schema_registry/requests/get_subject_versions_version.h b/src/v/pandaproxy/schema_registry/requests/get_subject_versions_version.h index 23da030df1766..0b6b03d5cf7c2 100644 --- a/src/v/pandaproxy/schema_registry/requests/get_subject_versions_version.h +++ b/src/v/pandaproxy/schema_registry/requests/get_subject_versions_version.h @@ -22,8 +22,9 @@ struct post_subject_versions_version_response { schema_version version; }; +template inline void rjson_serialize( - ::json::Writer<::json::StringBuffer>& w, + ::json::Writer& w, const post_subject_versions_version_response& res) { w.StartObject(); w.Key("subject"); diff --git a/src/v/pandaproxy/schema_registry/requests/mode.h b/src/v/pandaproxy/schema_registry/requests/mode.h index d5159a623a74a..b94eae35fdb5c 100644 --- a/src/v/pandaproxy/schema_registry/requests/mode.h +++ b/src/v/pandaproxy/schema_registry/requests/mode.h @@ -77,9 +77,9 @@ class mode_handler : public json::base_handler { } }; -inline void rjson_serialize( - ::json::Writer<::json::StringBuffer>& w, - const schema_registry::mode_req_rep& res) { +template +void rjson_serialize( + ::json::Writer& w, const schema_registry::mode_req_rep& res) { w.StartObject(); w.Key(mode_req_rep::field_name.data()); ::json::rjson_serialize(w, to_string_view(res.mode)); diff --git a/src/v/pandaproxy/schema_registry/requests/post_subject_versions.h b/src/v/pandaproxy/schema_registry/requests/post_subject_versions.h index 500f014f92926..c83427b002758 100644 --- a/src/v/pandaproxy/schema_registry/requests/post_subject_versions.h +++ b/src/v/pandaproxy/schema_registry/requests/post_subject_versions.h @@ -241,8 +241,9 @@ struct post_subject_versions_response { schema_id id; }; -inline void rjson_serialize( - ::json::Writer<::json::StringBuffer>& w, +template +void rjson_serialize( + ::json::Writer& w, const schema_registry::post_subject_versions_response& res) { w.StartObject(); w.Key("id"); diff --git a/src/v/pandaproxy/schema_registry/storage.h b/src/v/pandaproxy/schema_registry/storage.h index cdc60bf16833e..9b5a4cd008fac 100644 --- a/src/v/pandaproxy/schema_registry/storage.h +++ b/src/v/pandaproxy/schema_registry/storage.h @@ -161,9 +161,9 @@ struct schema_key { } }; -inline void rjson_serialize( - ::json::Writer<::json::StringBuffer>& w, - const schema_registry::schema_key& key) { +template +void rjson_serialize( + ::json::Writer& w, const schema_registry::schema_key& key) { w.StartObject(); w.Key("keytype"); ::json::rjson_serialize(w, to_string_view(key.keytype)); @@ -324,9 +324,8 @@ struct schema_value { using unparsed_schema_value = schema_value; using canonical_schema_value = schema_value; -template -inline void rjson_serialize( - ::json::Writer<::json::StringBuffer>& w, const schema_value& val) { +template +void rjson_serialize(::json::Writer& w, const schema_value& val) { w.StartObject(); w.Key("subject"); ::json::rjson_serialize(w, val.schema.sub()); @@ -645,9 +644,9 @@ struct config_key { } }; -inline void rjson_serialize( - ::json::Writer<::json::StringBuffer>& w, - const schema_registry::config_key& key) { +template +void rjson_serialize( + ::json::Writer& w, const schema_registry::config_key& key) { w.StartObject(); w.Key("keytype"); ::json::rjson_serialize(w, to_string_view(key.keytype)); @@ -783,9 +782,9 @@ struct config_value { } }; -inline void rjson_serialize( - ::json::Writer<::json::StringBuffer>& w, - const schema_registry::config_value& val) { +template +void rjson_serialize( + ::json::Writer& w, const schema_registry::config_value& val) { w.StartObject(); if (val.sub.has_value()) { w.Key("subject"); @@ -881,9 +880,9 @@ struct mode_key { } }; -inline void rjson_serialize( - ::json::Writer<::json::StringBuffer>& w, - const schema_registry::mode_key& key) { +template +void rjson_serialize( + ::json::Writer& w, const schema_registry::mode_key& key) { w.StartObject(); w.Key("keytype"); ::json::rjson_serialize(w, to_string_view(key.keytype)); @@ -1019,9 +1018,9 @@ struct mode_value { } }; -inline void rjson_serialize( - ::json::Writer<::json::StringBuffer>& w, - const schema_registry::mode_value& val) { +template +void rjson_serialize( + ::json::Writer& w, const schema_registry::mode_value& val) { w.StartObject(); if (val.sub.has_value()) { w.Key("subject"); @@ -1118,8 +1117,8 @@ struct delete_subject_key { } }; -inline void rjson_serialize( - ::json::Writer<::json::StringBuffer>& w, const delete_subject_key& key) { +template +void rjson_serialize(::json::Writer& w, const delete_subject_key& key) { w.StartObject(); w.Key("keytype"); ::json::rjson_serialize(w, to_string_view(key.keytype)); @@ -1259,8 +1258,9 @@ struct delete_subject_value { } }; -inline void rjson_serialize( - ::json::Writer<::json::StringBuffer>& w, const delete_subject_value& val) { +template +void rjson_serialize( + ::json::Writer& w, const delete_subject_value& val) { w.StartObject(); w.Key("subject"); ::json::rjson_serialize(w, val.sub); diff --git a/src/v/pandaproxy/schema_registry/test/post_subjects_subject_version.cc b/src/v/pandaproxy/schema_registry/test/post_subjects_subject_version.cc index 77a8094d07b6c..12afe44806ad7 100644 --- a/src/v/pandaproxy/schema_registry/test/post_subjects_subject_version.cc +++ b/src/v/pandaproxy/schema_registry/test/post_subjects_subject_version.cc @@ -26,8 +26,8 @@ struct request { pps::canonical_schema schema; }; -void rjson_serialize( - ::json::Writer<::json::StringBuffer>& w, const request& r) { +template +void rjson_serialize(::json::Writer& w, const request& r) { w.StartObject(); w.Key("schema"); ::json::rjson_serialize(w, r.schema.def().raw()); From 03345c4c279a88758e059d582a56b440a742b624 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Wed, 3 Jul 2024 10:35:18 +0100 Subject: [PATCH 04/15] pandaproxy/json: Cleanup header includes Namespace changed due to lookup rules for templates. Signed-off-by: Ben Pope (cherry picked from commit a455d2374ec799c14a627989df09ff320ddf0447) Conflicts: src/v/pandaproxy/json/requests/produce.h (utils/tristate)) --- src/v/pandaproxy/json/iobuf.h | 3 +-- src/v/pandaproxy/json/requests/brokers.h | 4 ++-- src/v/pandaproxy/json/requests/create_consumer.h | 7 +------ src/v/pandaproxy/json/requests/error_reply.h | 1 - src/v/pandaproxy/json/requests/fetch.h | 5 ----- src/v/pandaproxy/json/requests/offset_fetch.h | 7 ++++--- src/v/pandaproxy/json/requests/produce.h | 10 ++++++---- src/v/pandaproxy/json/requests/test/fetch.cc | 5 ----- src/v/pandaproxy/json/rjson_util.h | 9 ++++++--- src/v/pandaproxy/json/types.h | 8 +------- .../test/post_subjects_subject_version.cc | 3 --- 11 files changed, 21 insertions(+), 41 deletions(-) diff --git a/src/v/pandaproxy/json/iobuf.h b/src/v/pandaproxy/json/iobuf.h index e13be0c8584c0..b3068b3f4251a 100644 --- a/src/v/pandaproxy/json/iobuf.h +++ b/src/v/pandaproxy/json/iobuf.h @@ -15,9 +15,8 @@ #include "bytes/iobuf_parser.h" #include "json/reader.h" #include "json/stream.h" -#include "json/stringbuffer.h" #include "json/writer.h" -#include "pandaproxy/json/types.h" +#include "pandaproxy/json/rjson_util.h" #include "utils/base64.h" #include diff --git a/src/v/pandaproxy/json/requests/brokers.h b/src/v/pandaproxy/json/requests/brokers.h index 4229bc4656b75..a9c715fdbfe60 100644 --- a/src/v/pandaproxy/json/requests/brokers.h +++ b/src/v/pandaproxy/json/requests/brokers.h @@ -11,9 +11,9 @@ #pragma once -#include "json/stringbuffer.h" +#include "json/json.h" #include "json/writer.h" -#include "model/metadata.h" +#include "model/fundamental.h" namespace pandaproxy::json { diff --git a/src/v/pandaproxy/json/requests/create_consumer.h b/src/v/pandaproxy/json/requests/create_consumer.h index 9db53b836bd87..9a959a261e28a 100644 --- a/src/v/pandaproxy/json/requests/create_consumer.h +++ b/src/v/pandaproxy/json/requests/create_consumer.h @@ -12,14 +12,9 @@ #pragma once #include "base/seastarx.h" -#include "bytes/iobuf.h" -#include "json/stringbuffer.h" #include "json/types.h" #include "json/writer.h" -#include "kafka/protocol/errors.h" -#include "kafka/protocol/produce.h" -#include "kafka/types.h" -#include "pandaproxy/json/iobuf.h" +#include "kafka/protocol/join_group.h" #include "pandaproxy/json/rjson_parse.h" #include "strings/string_switch.h" diff --git a/src/v/pandaproxy/json/requests/error_reply.h b/src/v/pandaproxy/json/requests/error_reply.h index 57cf38317476f..6d0350021a6ee 100644 --- a/src/v/pandaproxy/json/requests/error_reply.h +++ b/src/v/pandaproxy/json/requests/error_reply.h @@ -13,7 +13,6 @@ #include "base/seastarx.h" #include "json/json.h" -#include "json/stringbuffer.h" #include "json/writer.h" #include diff --git a/src/v/pandaproxy/json/requests/fetch.h b/src/v/pandaproxy/json/requests/fetch.h index f70e98e37ade0..c8abd9c76fa44 100644 --- a/src/v/pandaproxy/json/requests/fetch.h +++ b/src/v/pandaproxy/json/requests/fetch.h @@ -12,18 +12,13 @@ #pragma once #include "base/seastarx.h" -#include "bytes/iobuf.h" -#include "bytes/iobuf_parser.h" -#include "json/stringbuffer.h" #include "json/writer.h" #include "kafka/protocol/errors.h" #include "kafka/protocol/fetch.h" #include "model/fundamental.h" #include "model/record.h" -#include "model/record_batch_reader.h" #include "pandaproxy/json/exceptions.h" #include "pandaproxy/json/iobuf.h" -#include "pandaproxy/json/requests/produce.h" #include "pandaproxy/json/rjson_util.h" #include "pandaproxy/json/types.h" #include "storage/parser_utils.h" diff --git a/src/v/pandaproxy/json/requests/offset_fetch.h b/src/v/pandaproxy/json/requests/offset_fetch.h index 32bcd9f211ad2..f04d7a71b7ac2 100644 --- a/src/v/pandaproxy/json/requests/offset_fetch.h +++ b/src/v/pandaproxy/json/requests/offset_fetch.h @@ -12,9 +12,7 @@ #pragma once #include "base/seastarx.h" -#include "json/stringbuffer.h" #include "json/writer.h" -#include "kafka/protocol/errors.h" #include "kafka/protocol/offset_fetch.h" #include @@ -39,6 +37,9 @@ partitions_request_to_offset_request(std::vector tps) { } return res; } +} // namespace pandaproxy::json + +namespace kafka { template void rjson_serialize( @@ -70,4 +71,4 @@ void rjson_serialize( w.EndObject(); } -} // namespace pandaproxy::json +} // namespace kafka diff --git a/src/v/pandaproxy/json/requests/produce.h b/src/v/pandaproxy/json/requests/produce.h index df15bd923c619..65ff9b53b9c3d 100644 --- a/src/v/pandaproxy/json/requests/produce.h +++ b/src/v/pandaproxy/json/requests/produce.h @@ -13,7 +13,6 @@ #include "base/seastarx.h" #include "bytes/iobuf.h" -#include "json/json.h" #include "json/stringbuffer.h" #include "json/types.h" #include "json/writer.h" @@ -21,7 +20,7 @@ #include "kafka/protocol/errors.h" #include "kafka/protocol/produce.h" #include "pandaproxy/json/iobuf.h" -#include "pandaproxy/json/types.h" +#include "pandaproxy/json/rjson_util.h" #include "tristate.h" #include @@ -265,6 +264,9 @@ class produce_request_handler { std::optional _json_writer; }; +} // namespace pandaproxy::json + +namespace kafka { template void rjson_serialize( ::json::Writer& w, const kafka::produce_response::partition& v) { @@ -273,7 +275,7 @@ void rjson_serialize( w.Int(v.partition_index); if (v.error_code != kafka::error_code::none) { w.Key("error_code"); - ::json::rjson_serialize(w, v.error_code); + rjson_serialize(w, v.error_code); } w.Key("offset"); w.Int64(v.base_offset); @@ -293,4 +295,4 @@ void rjson_serialize( w.EndObject(); } -} // namespace pandaproxy::json +} // namespace kafka diff --git a/src/v/pandaproxy/json/requests/test/fetch.cc b/src/v/pandaproxy/json/requests/test/fetch.cc index a12290fec63d8..b32b43c680c06 100644 --- a/src/v/pandaproxy/json/requests/test/fetch.cc +++ b/src/v/pandaproxy/json/requests/test/fetch.cc @@ -15,12 +15,9 @@ #include "json/writer.h" #include "kafka/client/test/utils.h" #include "kafka/protocol/errors.h" -#include "kafka/protocol/fetch.h" #include "kafka/protocol/wire.h" #include "model/fundamental.h" -#include "model/record.h" #include "model/timestamp.h" -#include "pandaproxy/json/requests/fetch.h" #include "pandaproxy/json/rjson_util.h" #include "pandaproxy/json/types.h" @@ -29,8 +26,6 @@ #include #include -#include - namespace ppj = pandaproxy::json; std::optional diff --git a/src/v/pandaproxy/json/rjson_util.h b/src/v/pandaproxy/json/rjson_util.h index 1c219d29b3143..3aed103580bbc 100644 --- a/src/v/pandaproxy/json/rjson_util.h +++ b/src/v/pandaproxy/json/rjson_util.h @@ -12,7 +12,6 @@ #pragma once #include "json/json.h" -#include "json/prettywriter.h" #include "json/reader.h" #include "json/stream.h" #include "json/stringbuffer.h" @@ -22,10 +21,14 @@ #include -#include - namespace pandaproxy::json { +template +class rjson_parse_impl; + +template +class rjson_serialize_impl; + template ss::sstring rjson_serialize(const T& v) { ::json::StringBuffer str_buf; diff --git a/src/v/pandaproxy/json/types.h b/src/v/pandaproxy/json/types.h index d698bcef65624..e4b9d910335b7 100644 --- a/src/v/pandaproxy/json/types.h +++ b/src/v/pandaproxy/json/types.h @@ -12,7 +12,7 @@ #pragma once #include -#include +#include namespace pandaproxy::json { @@ -52,10 +52,4 @@ inline std::string_view name(serialization_format fmt) { return "(unknown format)"; } -template -class rjson_parse_impl; - -template -class rjson_serialize_impl; - } // namespace pandaproxy::json diff --git a/src/v/pandaproxy/schema_registry/test/post_subjects_subject_version.cc b/src/v/pandaproxy/schema_registry/test/post_subjects_subject_version.cc index 12afe44806ad7..d6df5b3392c7f 100644 --- a/src/v/pandaproxy/schema_registry/test/post_subjects_subject_version.cc +++ b/src/v/pandaproxy/schema_registry/test/post_subjects_subject_version.cc @@ -8,7 +8,6 @@ // by the Apache License, Version 2.0 #include "pandaproxy/error.h" -#include "pandaproxy/json/error.h" #include "pandaproxy/json/rjson_util.h" #include "pandaproxy/schema_registry/test/avro_payloads.h" #include "pandaproxy/schema_registry/test/client_utils.h" @@ -16,8 +15,6 @@ #include "pandaproxy/test/pandaproxy_fixture.h" #include "pandaproxy/test/utils.h" -#include - namespace pp = pandaproxy; namespace ppj = pp::json; namespace pps = pp::schema_registry; From fdd8d6ceae33040e05b5891a831d7486d3cb871d Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Wed, 3 Jul 2024 10:43:33 +0100 Subject: [PATCH 05/15] pandaproxy/json: Avoid temporary for calls to write_body Signed-off-by: Ben Pope (cherry picked from commit 5d90218887aec7e665626a8ba1ed5deb40ab3a21) --- src/v/pandaproxy/reply.h | 3 +-- src/v/pandaproxy/rest/handlers.cc | 19 ++++++------------- 2 files changed, 7 insertions(+), 15 deletions(-) diff --git a/src/v/pandaproxy/reply.h b/src/v/pandaproxy/reply.h index b314b58ed48ed..a69703286cf10 100644 --- a/src/v/pandaproxy/reply.h +++ b/src/v/pandaproxy/reply.h @@ -76,8 +76,7 @@ errored_body(std::error_condition ec, ss::sstring msg) { pandaproxy::json::error_body body{.ec = ec, .message = std::move(msg)}; auto rep = std::make_unique(); rep->set_status(error_code_to_status(ec)); - auto b = json::rjson_serialize(body); - rep->write_body("json", b); + rep->write_body("json", json::rjson_serialize(body)); return rep; } diff --git a/src/v/pandaproxy/rest/handlers.cc b/src/v/pandaproxy/rest/handlers.cc index 3d46b391ea33c..a839885a633a6 100644 --- a/src/v/pandaproxy/rest/handlers.cc +++ b/src/v/pandaproxy/rest/handlers.cc @@ -78,8 +78,7 @@ get_brokers(server::request_t rq, server::reply_t rp) { return b.node_id; }); - auto json_rslt = ppj::rjson_serialize(brokers); - rp.rep->write_body("json", json_rslt); + rp.rep->write_body("json", ppj::rjson_serialize(brokers)); rp.mime_type = res_fmt; return std::move(rp); @@ -109,8 +108,7 @@ get_topics_names(server::request_t rq, server::reply_t rp) { } } - auto json_rslt = ppj::rjson_serialize(names); - rp.rep->write_body("json", json_rslt); + rp.rep->write_body("json", ppj::rjson_serialize(names)); rp.mime_type = res_fmt; return std::move(rp); }); @@ -186,8 +184,8 @@ post_topics_name(server::request_t rq, server::reply_t rp) { return client.produce_records(topic, std::move(records)) .then([rp{std::move(rp)}, res_fmt](kafka::produce_response res) mutable { - auto json_rslt = ppj::rjson_serialize(res.data.responses[0]); - rp.rep->write_body("json", json_rslt); + rp.rep->write_body( + "json", ppj::rjson_serialize(res.data.responses[0])); rp.mime_type = res_fmt; return std::move(rp); }); @@ -278,8 +276,7 @@ create_consumer(server::request_t rq, server::reply_t rp) { kafka::member_id name) mutable { json::create_consumer_response res{ .instance_id = name, .base_uri = base_uri + name}; - auto json_rslt = ppj::rjson_serialize(res); - rp.rep->write_body("json", json_rslt); + rp.rep->write_body("json", ppj::rjson_serialize(res)); rp.mime_type = res_fmt; return std::move(rp); }); @@ -444,11 +441,7 @@ get_consumer_offsets(server::request_t rq, server::reply_t rp) { return client .consumer_offset_fetch(group_id, member_id, std::move(req_data)) .then([rp{std::move(rp)}, res_fmt](auto res) mutable { - ::json::StringBuffer str_buf; - ::json::Writer<::json::StringBuffer> w(str_buf); - ppj::rjson_serialize(w, res); - ss::sstring json_rslt = str_buf.GetString(); - rp.rep->write_body("json", json_rslt); + rp.rep->write_body("json", ppj::rjson_serialize(res)); rp.mime_type = res_fmt; return std::move(rp); }); From e4631da0886d3c9c5095a1cf9c75d4c5ab2b6225 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Wed, 3 Jul 2024 10:47:36 +0100 Subject: [PATCH 06/15] schema_registry: Avoid temporary for calls to write_body Signed-off-by: Ben Pope (cherry picked from commit d74ec2aef846f895d0ef5f6e5b05c08120baeb14) Conflicts: src/v/pandaproxy/schema_registry/handlers.cc (no JSON in this branch) --- src/v/pandaproxy/schema_registry/handlers.cc | 89 +++++++++----------- 1 file changed, 40 insertions(+), 49 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/handlers.cc b/src/v/pandaproxy/schema_registry/handlers.cc index 070bab5ced4ee..07cc744f0f006 100644 --- a/src/v/pandaproxy/schema_registry/handlers.cc +++ b/src/v/pandaproxy/schema_registry/handlers.cc @@ -85,8 +85,8 @@ get_config(server::request_t rq, server::reply_t rp) { auto res = co_await rq.service().schema_store().get_compatibility(); - auto json_rslt = ppj::rjson_serialize(get_config_req_rep{.compat = res}); - rp.rep->write_body("json", json_rslt); + rp.rep->write_body( + "json", ppj::rjson_serialize(get_config_req_rep{.compat = res})); co_return rp; } @@ -100,8 +100,7 @@ put_config(server::request_t rq, server::reply_t rp) { co_await rq.service().writer().write_config(std::nullopt, config.compat); - auto json_rslt = ppj::rjson_serialize(config); - rp.rep->write_body("json", json_rslt); + rp.rep->write_body("json", ppj::rjson_serialize(config)); co_return rp; } @@ -120,8 +119,8 @@ get_config_subject(server::request_t rq, server::reply_t rp) { auto res = co_await rq.service().schema_store().get_compatibility( sub, fallback); - auto json_rslt = ppj::rjson_serialize(get_config_req_rep{.compat = res}); - rp.rep->write_body("json", json_rslt); + rp.rep->write_body( + "json", ppj::rjson_serialize(get_config_req_rep{.compat = res})); co_return rp; } @@ -169,8 +168,7 @@ put_config_subject(server::request_t rq, server::reply_t rp) { co_await rq.service().writer().read_sync(); co_await rq.service().writer().write_config(sub, config.compat); - auto json_rslt = ppj::rjson_serialize(config); - rp.rep->write_body("json", json_rslt); + rp.rep->write_body("json", ppj::rjson_serialize(config)); co_return rp; } @@ -199,8 +197,8 @@ delete_config_subject(server::request_t rq, server::reply_t rp) { co_await rq.service().writer().delete_config(sub); - auto json_rslt = ppj::rjson_serialize(get_config_req_rep{.compat = lvl}); - rp.rep->write_body("json", json_rslt); + rp.rep->write_body( + "json", ppj::rjson_serialize(get_config_req_rep{.compat = lvl})); co_return rp; } @@ -213,8 +211,7 @@ ss::future get_mode(server::request_t rq, server::reply_t rp) { auto res = co_await rq.service().schema_store().get_mode(); - auto json_rslt = ppj::rjson_serialize(mode_req_rep{.mode = res}); - rp.rep->write_body("json", json_rslt); + rp.rep->write_body("json", ppj::rjson_serialize(mode_req_rep{.mode = res})); co_return rp; } @@ -228,8 +225,7 @@ ss::future put_mode(server::request_t rq, server::reply_t rp) { co_await rq.service().writer().write_mode(std::nullopt, res.mode, frc); - auto json_rslt = ppj::rjson_serialize(res); - rp.rep->write_body("json", json_rslt); + rp.rep->write_body("json", ppj::rjson_serialize(res)); co_return rp; } @@ -247,8 +243,7 @@ get_mode_subject(server::request_t rq, server::reply_t rp) { auto res = co_await rq.service().schema_store().get_mode(sub, fallback); - auto json_rslt = ppj::rjson_serialize(mode_req_rep{.mode = res}); - rp.rep->write_body("json", json_rslt); + rp.rep->write_body("json", ppj::rjson_serialize(mode_req_rep{.mode = res})); co_return rp; } @@ -266,8 +261,7 @@ put_mode_subject(server::request_t rq, server::reply_t rp) { co_await rq.service().writer().read_sync(); co_await rq.service().writer().write_mode(sub, res.mode, frc); - auto json_rslt = ppj::rjson_serialize(res); - rp.rep->write_body("json", json_rslt); + rp.rep->write_body("json", ppj::rjson_serialize(res)); co_return rp; } @@ -295,8 +289,7 @@ delete_mode_subject(server::request_t rq, server::reply_t rp) { co_await rq.service().writer().delete_mode(sub); - auto json_rslt = ppj::rjson_serialize(mode_req_rep{.mode = m}); - rp.rep->write_body("json", json_rslt); + rp.rep->write_body("json", ppj::rjson_serialize(mode_req_rep{.mode = m})); co_return rp; } @@ -307,8 +300,7 @@ get_schemas_types(server::request_t rq, server::reply_t rp) { static const std::vector schemas_types{ "PROTOBUF", "AVRO"}; - auto json_rslt = ppj::rjson_serialize(schemas_types); - rp.rep->write_body("json", json_rslt); + rp.rep->write_body("json", ppj::rjson_serialize(schemas_types)); return ss::make_ready_future(std::move(rp)); } @@ -322,9 +314,10 @@ get_schemas_ids_id(server::request_t rq, server::reply_t rp) { return rq.service().schema_store().get_schema_definition(id); }); - auto json_rslt = ppj::rjson_serialize( - get_schemas_ids_id_response{.definition{std::move(def)}}); - rp.rep->write_body("json", json_rslt); + rp.rep->write_body( + "json", + ppj::rjson_serialize( + get_schemas_ids_id_response{.definition{std::move(def)}})); co_return rp; } @@ -454,8 +447,7 @@ get_subject_versions(server::request_t rq, server::reply_t rp) { auto versions = co_await rq.service().schema_store().get_versions( sub, inc_del); - auto json_rslt{json::rjson_serialize(versions)}; - rp.rep->write_body("json", json_rslt); + rp.rep->write_body("json", ppj::rjson_serialize(versions)); co_return rp; } @@ -494,11 +486,12 @@ post_subject(server::request_t rq, server::reply_t rp) { auto sub_schema = co_await rq.service().schema_store().has_schema( schema, inc_del); - auto json_rslt{json::rjson_serialize(post_subject_versions_version_response{ - .schema{std::move(sub_schema.schema)}, - .id{sub_schema.id}, - .version{sub_schema.version}})}; - rp.rep->write_body("json", json_rslt); + rp.rep->write_body( + "json", + ppj::rjson_serialize(post_subject_versions_version_response{ + .schema{std::move(sub_schema.schema)}, + .id{sub_schema.id}, + .version{sub_schema.version}})); co_return rp; } @@ -534,9 +527,9 @@ post_subject_versions(server::request_t rq, server::reply_t rp) { std::move(schema)); } - auto json_rslt{ - json::rjson_serialize(post_subject_versions_response{.id{schema_id}})}; - rp.rep->write_body("json", json_rslt); + rp.rep->write_body( + "json", + ppj::rjson_serialize(post_subject_versions_response{.id{schema_id}})); co_return rp; } @@ -559,11 +552,12 @@ ss::future::reply_t> get_subject_versions_version( sub, version, inc_del); }); - auto json_rslt{json::rjson_serialize(post_subject_versions_version_response{ - .schema = std::move(get_res.schema), - .id = get_res.id, - .version = get_res.version})}; - rp.rep->write_body("json", json_rslt); + rp.rep->write_body( + "json", + ppj::rjson_serialize(post_subject_versions_version_response{ + .schema = std::move(get_res.schema), + .id = get_res.id, + .version = get_res.version})); co_return rp; } @@ -603,8 +597,7 @@ get_subject_versions_version_referenced_by( auto references = co_await rq.service().schema_store().referenced_by( sub, version); - auto json_rslt{json::rjson_serialize(references)}; - rp.rep->write_body("json", json_rslt); + rp.rep->write_body("json", ppj::rjson_serialize(references)); co_return rp; } @@ -627,8 +620,7 @@ delete_subject(server::request_t rq, server::reply_t rp) { sub, std::nullopt) : co_await rq.service().writer().delete_subject_impermanent(sub); - auto json_rslt{json::rjson_serialize(versions)}; - rp.rep->write_body("json", json_rslt); + rp.rep->write_body("json", ppj::rjson_serialize(versions)); co_return rp; } @@ -675,8 +667,7 @@ delete_subject_version(server::request_t rq, server::reply_t rp) { co_await rq.service().writer().delete_subject_version(sub, version); } - auto json_rslt{json::rjson_serialize(version)}; - rp.rep->write_body("json", json_rslt); + rp.rep->write_body("json", ppj::rjson_serialize(version)); co_return rp; } @@ -716,9 +707,9 @@ compatibility_subject_version(server::request_t rq, server::reply_t rp) { return rq.service().schema_store().is_compatible(version, schema); }); - auto json_rslt{ - json::rjson_serialize(post_compatibility_res{.is_compat = get_res})}; - rp.rep->write_body("json", json_rslt); + rp.rep->write_body( + "json", + ppj::rjson_serialize(post_compatibility_res{.is_compat = get_res})); co_return rp; } From 75e6e6c5ebf22c0a9b85cd8010cbf66b488e868c Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Wed, 3 Jul 2024 11:52:12 +0100 Subject: [PATCH 07/15] json: Introduce chunked_buffer Signed-off-by: Ben Pope (cherry picked from commit f17567fe898c37960896ef303e123ef4c43bcd17) --- src/v/json/chunked_buffer.h | 59 +++++++++++++++++++++++++++++++++++++ src/v/json/json.cc | 40 +++++++++++++++++++++++++ 2 files changed, 99 insertions(+) create mode 100644 src/v/json/chunked_buffer.h diff --git a/src/v/json/chunked_buffer.h b/src/v/json/chunked_buffer.h new file mode 100644 index 0000000000000..0b6d382497b86 --- /dev/null +++ b/src/v/json/chunked_buffer.h @@ -0,0 +1,59 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#pragma once + +#include "bytes/iobuf.h" +#include "json/encodings.h" + +namespace json { + +namespace impl { + +/** + * \brief An in-memory output stream with non-contiguous memory allocation. + */ +template +struct generic_chunked_buffer { + using Ch = Encoding::Ch; + + /** + * \defgroup Implement rapidjson::Stream + */ + /**@{*/ + + void Put(Ch c) { _impl.append(&c, sizeof(Ch)); } + void Flush() {} + + //! Get the size of string in bytes in the string buffer. + size_t GetSize() const { return _impl.size_bytes(); } + + //! Get the length of string in Ch in the string buffer. + size_t GetLength() const { return _impl.size_bytes() / sizeof(Ch); } + + void Reserve(size_t s) { _impl.reserve(s); } + + void Clear() { _impl.clear(); } + + /**@}*/ + + iobuf as_iobuf() && { return std::move(_impl); } + +private: + iobuf _impl; +}; + +} // namespace impl + +template +using generic_chunked_buffer = impl::generic_chunked_buffer; + +using chunked_buffer = generic_chunked_buffer>; + +} // namespace json diff --git a/src/v/json/json.cc b/src/v/json/json.cc index 465feb5b4de6c..8ca4ec5bcbe6d 100644 --- a/src/v/json/json.cc +++ b/src/v/json/json.cc @@ -9,6 +9,7 @@ #include "json/json.h" +#include "json/chunked_buffer.h" #include "json/stringbuffer.h" namespace json { @@ -175,4 +176,43 @@ template void rjson_serialize( template void rjson_serialize( json::Writer& w, const std::filesystem::path& path); +template void +rjson_serialize(json::Writer& w, short v); + +template void +rjson_serialize(json::Writer& w, bool v); + +template void +rjson_serialize(json::Writer& w, long long v); + +template void +rjson_serialize(json::Writer& w, int v); + +template void rjson_serialize( + json::Writer& w, unsigned int v); + +template void +rjson_serialize(json::Writer& w, long v); + +template void rjson_serialize( + json::Writer& w, unsigned long v); + +template void +rjson_serialize(json::Writer& w, double v); + +template void rjson_serialize( + json::Writer& w, std::string_view s); + +template void rjson_serialize( + json::Writer& w, const net::unresolved_address& v); + +template void rjson_serialize( + json::Writer& w, const std::chrono::milliseconds& v); + +template void rjson_serialize( + json::Writer& w, const std::chrono::seconds& v); + +template void rjson_serialize( + json::Writer& w, const std::filesystem::path& path); + } // namespace json From 12021ab0708e030dd87d6e881efde437b2fa4e1b Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Wed, 3 Jul 2024 11:52:49 +0100 Subject: [PATCH 08/15] pandaproxy: Use chunked_buffer Signed-off-by: Ben Pope (cherry picked from commit 744b0f01067395b993a48005387dae8d356c9f25) Conflicts: src/v/pandaproxy/json/requests/produce.h (utils/tristate) --- src/v/pandaproxy/json/requests/produce.h | 11 +++-- .../pandaproxy/json/requests/test/produce.cc | 2 +- src/v/pandaproxy/json/rjson_util.h | 44 ++++++++++++++++--- src/v/pandaproxy/rest/handlers.cc | 21 +++++---- .../test/get_subject_versions_version.cc | 2 +- src/v/pandaproxy/schema_registry/storage.h | 8 +--- .../test/post_subjects_subject_version.cc | 6 ++- .../schema_registry/test/storage.cc | 16 +++---- 8 files changed, 69 insertions(+), 41 deletions(-) diff --git a/src/v/pandaproxy/json/requests/produce.h b/src/v/pandaproxy/json/requests/produce.h index 65ff9b53b9c3d..22c73be92d7ca 100644 --- a/src/v/pandaproxy/json/requests/produce.h +++ b/src/v/pandaproxy/json/requests/produce.h @@ -13,14 +13,14 @@ #include "base/seastarx.h" #include "bytes/iobuf.h" -#include "json/stringbuffer.h" +#include "json/chunked_buffer.h" #include "json/types.h" #include "json/writer.h" #include "kafka/client/types.h" #include "kafka/protocol/errors.h" #include "kafka/protocol/produce.h" #include "pandaproxy/json/iobuf.h" -#include "pandaproxy/json/rjson_util.h" +#include "pandaproxy/json/types.h" #include "tristate.h" #include @@ -42,7 +42,7 @@ class produce_request_handler { serialization_format _fmt = serialization_format::none; state state = state::empty; - using json_writer = ::json::Writer<::json::StringBuffer>; + using json_writer = ::json::Writer<::json::chunked_buffer>; // If we're parsing json_v2, and the field is key or value (implied by // _json_writer being set), then forward calls to json_writer. @@ -55,8 +55,7 @@ class produce_request_handler { auto res = std::invoke( mem_func, *_json_writer, std::forward(args)...); if (_json_writer->IsComplete()) { - iobuf buf; - buf.append(_buf.GetString(), _buf.GetSize()); + iobuf buf = std::move(_buf).as_iobuf(); switch (state) { case state::key: result.back().key.emplace(std::move(buf)); @@ -260,7 +259,7 @@ class produce_request_handler { } private: - ::json::StringBuffer _buf; + ::json::chunked_buffer _buf; std::optional _json_writer; }; diff --git a/src/v/pandaproxy/json/requests/test/produce.cc b/src/v/pandaproxy/json/requests/test/produce.cc index 3c986208ff5da..6f065bb224f41 100644 --- a/src/v/pandaproxy/json/requests/test/produce.cc +++ b/src/v/pandaproxy/json/requests/test/produce.cc @@ -278,7 +278,7 @@ SEASTAR_THREAD_TEST_CASE(test_produce_response) { .log_append_time_ms = model::timestamp{}, .log_start_offset = model::offset{}}); - auto output = ppj::rjson_serialize(topic); + auto output = ppj::rjson_serialize_str(topic); BOOST_TEST(output == expected); } diff --git a/src/v/pandaproxy/json/rjson_util.h b/src/v/pandaproxy/json/rjson_util.h index 3aed103580bbc..f3414579e94b5 100644 --- a/src/v/pandaproxy/json/rjson_util.h +++ b/src/v/pandaproxy/json/rjson_util.h @@ -11,6 +11,8 @@ #pragma once +#include "bytes/iostream.h" +#include "json/chunked_buffer.h" #include "json/json.h" #include "json/reader.h" #include "json/stream.h" @@ -29,16 +31,46 @@ class rjson_parse_impl; template class rjson_serialize_impl; -template -ss::sstring rjson_serialize(const T& v) { - ::json::StringBuffer str_buf; - ::json::Writer<::json::StringBuffer> wrt(str_buf); +template +Buffer rjson_serialize_buf(T&& v) { + Buffer buf; + ::json::Writer wrt{buf}; using ::json::rjson_serialize; using ::pandaproxy::json::rjson_serialize; - rjson_serialize(wrt, v); + rjson_serialize(wrt, std::forward(v)); + + return buf; +} + +template +ss::sstring rjson_serialize_str(T&& v) { + auto str_buf = rjson_serialize_buf<::json::StringBuffer>( + std::forward(v)); + return {str_buf.GetString(), str_buf.GetSize()}; +} - return ss::sstring(str_buf.GetString(), str_buf.GetSize()); +template +iobuf rjson_serialize_iobuf(T&& v) { + return rjson_serialize_buf<::json::chunked_buffer>(std::forward(v)) + .as_iobuf(); +} + +inline ss::noncopyable_function(ss::output_stream&& os)> +as_body_writer(iobuf&& buf) { + return [buf{std::move(buf)}](ss::output_stream&& os) mutable { + return ss::do_with( + std::move(buf), std::move(os), [](auto& buf, auto& os) { + return write_iobuf_to_output_stream(std::move(buf), os) + .finally([&os] { return os.close(); }); + }); + }; +} + +template +ss::noncopyable_function(ss::output_stream&& os)> +rjson_serialize(T&& v) { + return as_body_writer(rjson_serialize_iobuf(std::forward(v))); } struct rjson_serialize_fmt_impl { diff --git a/src/v/pandaproxy/rest/handlers.cc b/src/v/pandaproxy/rest/handlers.cc index a839885a633a6..9ff46e6b45e16 100644 --- a/src/v/pandaproxy/rest/handlers.cc +++ b/src/v/pandaproxy/rest/handlers.cc @@ -144,16 +144,16 @@ get_topics_records(server::request_t rq, server::reply_t rp) { return client .fetch_partition(std::move(tp), offset, max_bytes, timeout) .then([res_fmt](kafka::fetch_response res) { - ::json::StringBuffer str_buf; - ::json::Writer<::json::StringBuffer> w(str_buf); + ::json::chunked_buffer buf; + ::json::Writer<::json::chunked_buffer> w(buf); ppj::rjson_serialize_fmt(res_fmt)(w, std::move(res)); - // TODO Ben: Prevent this linearization - return ss::make_ready_future(str_buf.GetString()); + return buf; }); }) - .then([res_fmt, rp = std::move(rp)](ss::sstring json_rslt) mutable { - rp.rep->write_body("json", json_rslt); + .then([res_fmt, rp = std::move(rp)](auto buf) mutable { + rp.rep->write_body( + "json", json::as_body_writer(std::move(buf).as_iobuf())); rp.mime_type = res_fmt; return std::move(rp); }); @@ -393,14 +393,13 @@ consumer_fetch(server::request_t rq, server::reply_t rp) { return client.consumer_fetch(group_id, name, timeout, max_bytes) .then([res_fmt, rp{std::move(rp)}](auto res) mutable { - ::json::StringBuffer str_buf; - ::json::Writer<::json::StringBuffer> w(str_buf); + ::json::chunked_buffer buf; + ::json::Writer<::json::chunked_buffer> w(buf); ppj::rjson_serialize_fmt(res_fmt)(w, std::move(res)); - // TODO Ben: Prevent this linearization - ss::sstring json_rslt = str_buf.GetString(); - rp.rep->write_body("json", json_rslt); + rp.rep->write_body( + "json", json::as_body_writer(std::move(buf).as_iobuf())); rp.mime_type = res_fmt; return std::move(rp); }); diff --git a/src/v/pandaproxy/schema_registry/requests/test/get_subject_versions_version.cc b/src/v/pandaproxy/schema_registry/requests/test/get_subject_versions_version.cc index 3a3d85b35d793..228e57fbad063 100644 --- a/src/v/pandaproxy/schema_registry/requests/test/get_subject_versions_version.cc +++ b/src/v/pandaproxy/schema_registry/requests/test/get_subject_versions_version.cc @@ -48,7 +48,7 @@ SEASTAR_THREAD_TEST_CASE(test_post_subject_versions_version_response) { "schema": ")" + escaped_schema_def + R"("})"}; - auto result{ppj::rjson_serialize(response)}; + auto result = ppj::rjson_serialize_str(response); BOOST_REQUIRE_EQUAL(::json::minify(expected), result); } diff --git a/src/v/pandaproxy/schema_registry/storage.h b/src/v/pandaproxy/schema_registry/storage.h index 9b5a4cd008fac..2c84d16d0c37c 100644 --- a/src/v/pandaproxy/schema_registry/storage.h +++ b/src/v/pandaproxy/schema_registry/storage.h @@ -14,7 +14,6 @@ #include "base/vlog.h" #include "bytes/iobuf_parser.h" #include "json/json.h" -#include "json/stringbuffer.h" #include "json/types.h" #include "json/writer.h" #include "model/metadata.h" @@ -1354,11 +1353,8 @@ auto from_json_iobuf(iobuf&& iobuf, Args&&... args) { } template -auto to_json_iobuf(T t) { - auto val_js = json::rjson_serialize(t); - iobuf buf; - buf.append(val_js.data(), val_js.size()); - return buf; +auto to_json_iobuf(T&& t) { + return json::rjson_serialize_iobuf(std::forward(t)); } template diff --git a/src/v/pandaproxy/schema_registry/test/post_subjects_subject_version.cc b/src/v/pandaproxy/schema_registry/test/post_subjects_subject_version.cc index d6df5b3392c7f..54f41088e6693 100644 --- a/src/v/pandaproxy/schema_registry/test/post_subjects_subject_version.cc +++ b/src/v/pandaproxy/schema_registry/test/post_subjects_subject_version.cc @@ -566,7 +566,7 @@ FIXTURE_TEST(schema_registry_post_avro_references, pandaproxy_test_fixture) { info("Post company schema (expect schema_id=1)"); auto res = post_schema( - client, company_req.schema.sub(), ppj::rjson_serialize(company_req)); + client, company_req.schema.sub(), ppj::rjson_serialize_str(company_req)); BOOST_REQUIRE_EQUAL(res.body, R"({"id":1})"); BOOST_REQUIRE_EQUAL( res.headers.at(boost::beast::http::field::content_type), @@ -574,7 +574,9 @@ FIXTURE_TEST(schema_registry_post_avro_references, pandaproxy_test_fixture) { info("Post employee schema (expect schema_id=2)"); res = post_schema( - client, employee_req.schema.sub(), ppj::rjson_serialize(employee_req)); + client, + employee_req.schema.sub(), + ppj::rjson_serialize_str(employee_req)); BOOST_REQUIRE_EQUAL(res.body, R"({"id":2})"); BOOST_REQUIRE_EQUAL( res.headers.at(boost::beast::http::field::content_type), diff --git a/src/v/pandaproxy/schema_registry/test/storage.cc b/src/v/pandaproxy/schema_registry/test/storage.cc index d79b14fc60663..72a2153983371 100644 --- a/src/v/pandaproxy/schema_registry/test/storage.cc +++ b/src/v/pandaproxy/schema_registry/test/storage.cc @@ -131,7 +131,7 @@ BOOST_AUTO_TEST_CASE(test_storage_serde) { avro_schema_key_sv.data(), pps::schema_key_handler<>{}); BOOST_CHECK_EQUAL(avro_schema_key, key); - auto str = ppj::rjson_serialize(avro_schema_key); + auto str = ppj::rjson_serialize_str(avro_schema_key); BOOST_CHECK_EQUAL(str, ::json::minify(avro_schema_key_sv)); } @@ -140,7 +140,7 @@ BOOST_AUTO_TEST_CASE(test_storage_serde) { avro_schema_value_sv.data(), pps::canonical_schema_value_handler<>{}); BOOST_CHECK_EQUAL(avro_schema_value, val); - auto str = ppj::rjson_serialize(avro_schema_value); + auto str = ppj::rjson_serialize_str(avro_schema_value); BOOST_CHECK_EQUAL(str, ::json::minify(avro_schema_value_sv)); } @@ -149,7 +149,7 @@ BOOST_AUTO_TEST_CASE(test_storage_serde) { config_key_sv.data(), pps::config_key_handler<>{}); BOOST_CHECK_EQUAL(config_key, val); - auto str = ppj::rjson_serialize(config_key); + auto str = ppj::rjson_serialize_str(config_key); BOOST_CHECK_EQUAL(str, ::json::minify(config_key_sv)); } @@ -158,7 +158,7 @@ BOOST_AUTO_TEST_CASE(test_storage_serde) { config_key_sub_sv.data(), pps::config_key_handler<>{}); BOOST_CHECK_EQUAL(config_key_sub, val); - auto str = ppj::rjson_serialize(config_key_sub); + auto str = ppj::rjson_serialize_str(config_key_sub); BOOST_CHECK_EQUAL(str, ::json::minify(config_key_sub_sv)); } @@ -167,7 +167,7 @@ BOOST_AUTO_TEST_CASE(test_storage_serde) { config_value_sv.data(), pps::config_value_handler<>{}); BOOST_CHECK_EQUAL(config_value, val); - auto str = ppj::rjson_serialize(config_value); + auto str = ppj::rjson_serialize_str(config_value); BOOST_CHECK_EQUAL(str, ::json::minify(config_value_sv)); } @@ -176,7 +176,7 @@ BOOST_AUTO_TEST_CASE(test_storage_serde) { config_value_sub_sv.data(), pps::config_value_handler<>{}); BOOST_CHECK_EQUAL(config_value_sub, val); - auto str = ppj::rjson_serialize(config_value_sub); + auto str = ppj::rjson_serialize_str(config_value_sub); BOOST_CHECK_EQUAL(str, ::json::minify(config_value_sub_sv)); } @@ -185,7 +185,7 @@ BOOST_AUTO_TEST_CASE(test_storage_serde) { delete_subject_key_sv.data(), pps::delete_subject_key_handler<>{}); BOOST_CHECK_EQUAL(delete_subject_key, val); - auto str = ppj::rjson_serialize(delete_subject_key); + auto str = ppj::rjson_serialize_str(delete_subject_key); BOOST_CHECK_EQUAL(str, ::json::minify(delete_subject_key_sv)); } @@ -195,7 +195,7 @@ BOOST_AUTO_TEST_CASE(test_storage_serde) { pps::delete_subject_value_handler<>{}); BOOST_CHECK_EQUAL(delete_subject_value, val); - auto str = ppj::rjson_serialize(delete_subject_value); + auto str = ppj::rjson_serialize_str(delete_subject_value); BOOST_CHECK_EQUAL(str, ::json::minify(delete_subject_value_sv)); } } From 5ecdb7c6431977ba1de084517c90d8db1fe020ff Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Wed, 3 Jul 2024 17:31:02 +0100 Subject: [PATCH 09/15] Revert "sr: Streaming json lists" This reverts commit 313d2084299e33225694ebf0045cfbfdb0f550f2. Some edits required to avoid temporaries.. (cherry picked from commit 5f3236c0ead0f8e9c439e957e04bc342f0f136de) --- src/v/pandaproxy/schema_registry/handlers.cc | 62 +++----------------- 1 file changed, 7 insertions(+), 55 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/handlers.cc b/src/v/pandaproxy/schema_registry/handlers.cc index 07cc744f0f006..c4814cdc7fe15 100644 --- a/src/v/pandaproxy/schema_registry/handlers.cc +++ b/src/v/pandaproxy/schema_registry/handlers.cc @@ -10,7 +10,6 @@ #include "handlers.h" #include "container/json.h" -#include "container/lw_shared_container.h" #include "pandaproxy/json/rjson_util.h" #include "pandaproxy/json/types.h" #include "pandaproxy/parsing/httpd.h" @@ -28,7 +27,6 @@ #include #include -#include #include #include @@ -321,41 +319,6 @@ get_schemas_ids_id(server::request_t rq, server::reply_t rp) { co_return rp; } -struct schema_version_json : public ss::json::json_base { - ss::json::json_element subject; - ss::json::json_element version; - - void register_params() { - add(&subject, "subject"); - add(&version, "version"); - } - - schema_version_json() { register_params(); } - - schema_version_json(const schema_version_json& e) { - register_params(); - subject = e.subject; - version = e.version; - } - template - schema_version_json& operator=(const T& e) { - subject = e.subject; - version = e.version; - return *this; - } - schema_version_json& operator=(const schema_version_json& e) { - subject = e.subject; - version = e.version; - return *this; - } - template - schema_version_json& update(T& e) { - e.subject = subject; - e.version = version; - return *this; - } -}; - ss::future get_schemas_ids_id_versions(server::request_t rq, server::reply_t rp) { parse_accept_header(rq, rp); @@ -373,14 +336,8 @@ get_schemas_ids_id_versions(server::request_t rq, server::reply_t rp) { rp.rep->write_body( "json", - ss::json::stream_range_as_array( - lw_shared_container(std::move(svs)), [](const subject_version& sv) { - schema_version_json j; - j.subject = sv.sub; - j.version = sv.version; - return j; - })); - + ppj::rjson_serialize(get_schemas_ids_id_versions_response{ + .subject_versions{std::move(svs)}})); co_return rp; } @@ -399,13 +356,11 @@ ss::future::reply_t> get_schemas_ids_id_subjects( // Force early 40403 if the schema id isn't found co_await rq.service().schema_store().get_schema_definition(id); - auto subjects = co_await rq.service().schema_store().get_schema_subjects( - id, incl_del); rp.rep->write_body( "json", - ss::json::stream_range_as_array( - lw_shared_container(std::move(subjects)), - [](const subject& subj) { return subj; })); + ppj::rjson_serialize( + co_await rq.service().schema_store().get_schema_subjects( + id, incl_del))); co_return rp; } @@ -422,13 +377,10 @@ get_subjects(server::request_t rq, server::reply_t rp) { // List-type request: must ensure we see latest writes co_await rq.service().writer().read_sync(); - auto subjects = co_await rq.service().schema_store().get_subjects( - inc_del, subject_prefix); rp.rep->write_body( "json", - ss::json::stream_range_as_array( - lw_shared_container(std::move(subjects)), - [](const subject& subj) { return subj; })); + json::rjson_serialize(co_await rq.service().schema_store().get_subjects( + inc_del, subject_prefix))); co_return rp; } From 76b49f3a3eddd0b6e9935c94078ce10760c0ede4 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Wed, 3 Jul 2024 21:45:06 +0100 Subject: [PATCH 10/15] json: Introduce chunked_input_stream Signed-off-by: Ben Pope (cherry picked from commit c676f6e8b7294019cd4d517dd93a6c4002473c4f) --- src/v/json/chunked_input_stream.h | 64 +++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 src/v/json/chunked_input_stream.h diff --git a/src/v/json/chunked_input_stream.h b/src/v/json/chunked_input_stream.h new file mode 100644 index 0000000000000..d405b2ba438fd --- /dev/null +++ b/src/v/json/chunked_input_stream.h @@ -0,0 +1,64 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#pragma once + +#include "bytes/streambuf.h" +#include "json/encodings.h" +#include "json/istreamwrapper.h" + +namespace json { + +namespace impl { + +/** + * \brief An in-memory input stream with non-contiguous memory allocation. + */ +template> +class chunked_input_stream { +public: + using Ch = Encoding::Ch; + + explicit chunked_input_stream(iobuf&& buf) + : _buf(std::move(buf)) + , _is(_buf) + , _sis{&_is} + , _isw(_sis) {} + + /** + * \defgroup Implement rapidjson::Stream + */ + /**@{*/ + + Ch Peek() const { return _isw.Peek(); } + Ch Peek4() const { return _isw.Peek4(); } + Ch Take() { return _isw.Take(); } + size_t Tell() const { return _isw.Tell(); } + void Put(Ch ch) { return _isw.Put(ch); } + Ch* PutBegin() { return _isw.PutBegin(); } + size_t PutEnd(Ch* ch) { return _isw.PutEnd(ch); } + void Flush() { return _isw.Flush(); } + + /**@}*/ + +private: + iobuf _buf; + iobuf_istreambuf _is; + std::istream _sis; + ::json::IStreamWrapper _isw; +}; + +} // namespace impl + +template +using generic_chunked_input_stream = impl::chunked_input_stream; + +using chunked_input_stream = generic_chunked_input_stream>; + +} // namespace json From 3a4ec229324f97a6f27f9d5495e3787cb6b9f53f Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Wed, 3 Jul 2024 21:46:03 +0100 Subject: [PATCH 11/15] pandaproxy/json: Introduce chunked parsing with rjson_parse Signed-off-by: Ben Pope (cherry picked from commit 240c682d9fa655d3b19525260d31f5326fb48f54) --- src/v/pandaproxy/json/rjson_util.h | 68 +++++++++++++++++++++++++++--- 1 file changed, 62 insertions(+), 6 deletions(-) diff --git a/src/v/pandaproxy/json/rjson_util.h b/src/v/pandaproxy/json/rjson_util.h index f3414579e94b5..70bbb262094f7 100644 --- a/src/v/pandaproxy/json/rjson_util.h +++ b/src/v/pandaproxy/json/rjson_util.h @@ -13,6 +13,7 @@ #include "bytes/iostream.h" #include "json/chunked_buffer.h" +#include "json/chunked_input_stream.h" #include "json/json.h" #include "json/reader.h" #include "json/stream.h" @@ -21,7 +22,12 @@ #include "pandaproxy/json/exceptions.h" #include "pandaproxy/json/types.h" +#include #include +#include +#include + +#include namespace pandaproxy::json { @@ -94,18 +100,68 @@ inline rjson_serialize_fmt_impl rjson_serialize_fmt(serialization_format fmt) { return rjson_serialize_fmt_impl{fmt}; } +namespace impl { + template -requires std::is_same_v< - decltype(std::declval().result), - typename Handler::rjson_parse_result> +concept RjsonParseHandler = requires { + std::same_as< + decltype(std::declval().result), + typename Handler::rjson_parse_result>; +}; + +template typename Handler::rjson_parse_result -rjson_parse(const char* const s, Handler&& handler) { +rjson_parse_buf(Arg&& arg, Handler&& handler) { ::json::Reader reader; - ::json::StringStream ss(s); + IStream ss(std::forward(arg)); if (!reader.Parse(ss, handler)) { throw parse_error(reader.GetErrorOffset()); } - return std::move(handler.result); + return std::forward(handler).result; +} + +} // namespace impl + +/// \brief Parse a payload using the handler. +/// +/// \warning rjson_parse is preferred, since it can be chunked. +template +typename Handler::rjson_parse_result +rjson_parse(const char* const s, Handler&& handler) { + return impl::rjson_parse_buf<::json::StringStream>( + s, std::forward(handler)); +} + +///\brief Parse a payload using the handler. +template +typename Handler::rjson_parse_result rjson_parse(iobuf buf, Handler&& handler) { + return impl::rjson_parse_buf<::json::chunked_input_stream>( + std::move(buf), std::forward(handler)); +} + +///\brief Parse a request body using the handler. +template +typename ss::future +rjson_parse(std::unique_ptr req, Handler handler) { + if (!req->content.empty()) { + co_return rjson_parse(req->content.data(), std::move(handler)); + } + + iobuf buf; + auto is = req->content_stream; + co_await ss::repeat([&buf, &is]() { + return is->read().then([&buf](ss::temporary_buffer tmp_buf) { + if (tmp_buf.empty()) { + return ss::make_ready_future( + ss::stop_iteration::yes); + } + buf.append(std::move(tmp_buf)); + return ss::make_ready_future( + ss::stop_iteration::no); + }); + }); + + co_return rjson_parse(std::move(buf), std::move(handler)); } } // namespace pandaproxy::json From 7903d3843f6e99d04b85f2f4c2afc56a3646bc0e Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Wed, 3 Jul 2024 22:20:52 +0100 Subject: [PATCH 12/15] pandaproxy/rest: Use alternative rjson_parse Signed-off-by: Ben Pope (cherry picked from commit 8560e0ab3653ea3235f170043741a213dca1a66b) --- src/v/pandaproxy/rest/handlers.cc | 32 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/src/v/pandaproxy/rest/handlers.cc b/src/v/pandaproxy/rest/handlers.cc index 9ff46e6b45e16..e7652215d3de4 100644 --- a/src/v/pandaproxy/rest/handlers.cc +++ b/src/v/pandaproxy/rest/handlers.cc @@ -173,14 +173,11 @@ post_topics_name(server::request_t rq, server::reply_t rp) { vlog(plog.debug, "get_topics_name: topic: {}", topic); + auto records = co_await ppj::rjson_parse( + std::move(rq.req), ppj::produce_request_handler(req_fmt)); co_return co_await rq.dispatch( - [data{rq.req->content.data()}, - topic, - req_fmt, - res_fmt, - rp{std::move(rp)}](kafka::client::client& client) mutable { - auto records = ppj::rjson_parse( - data, ppj::produce_request_handler(req_fmt)); + [records{std::move(records)}, topic, res_fmt, rp{std::move(rp)}]( + kafka::client::client& client) mutable { return client.produce_records(topic, std::move(records)) .then([rp{std::move(rp)}, res_fmt](kafka::produce_response res) mutable { @@ -213,8 +210,10 @@ create_consumer(server::request_t rq, server::reply_t rp) { auto group_id = parse::request_param( *rq.req, "group_name"); - auto req_data = ppj::rjson_parse( - rq.req->content.data(), ppj::create_consumer_request_handler()); + auto base_uri = make_consumer_uri_base(rq, group_id); + + auto req_data = co_await ppj::rjson_parse( + std::move(rq.req), ppj::create_consumer_request_handler()); validate_no_control( req_data.name(), parse::pp_parsing_error{req_data.name()}); @@ -233,8 +232,6 @@ create_consumer(server::request_t rq, server::reply_t rp) { parse::error_code::invalid_param, "auto.commit must be false"); } - auto base_uri = make_consumer_uri_base(rq, group_id); - co_return co_await rq.dispatch( group_id, [group_id, @@ -326,8 +323,8 @@ subscribe_consumer(server::request_t rq, server::reply_t rp) { auto member_id = parse::request_param( *rq.req, "instance"); - auto req_data = ppj::rjson_parse( - rq.req->content.data(), ppj::subscribe_consumer_request_handler()); + auto req_data = co_await ppj::rjson_parse( + std::move(rq.req), ppj::subscribe_consumer_request_handler()); std::for_each( req_data.topics.begin(), req_data.topics.end(), @@ -416,8 +413,9 @@ get_consumer_offsets(server::request_t rq, server::reply_t rp) { auto group_id{parse::request_param(*rq.req, "group_name")}; auto member_id{parse::request_param(*rq.req, "instance")}; - auto req_data = ppj::partitions_request_to_offset_request(ppj::rjson_parse( - rq.req->content.data(), ppj::partitions_request_handler())); + auto req_data = ppj::partitions_request_to_offset_request( + co_await ppj::rjson_parse( + std::move(rq.req), ppj::partitions_request_handler())); std::for_each(req_data.begin(), req_data.end(), [](const auto& r) { validate_no_control(r.name(), parse::pp_parsing_error{r.name()}); @@ -461,8 +459,8 @@ post_consumer_offsets(server::request_t rq, server::reply_t rp) { auto req_data = rq.req->content.length() == 0 ? std::vector() : ppj::partition_offsets_request_to_offset_commit_request( - ppj::rjson_parse( - rq.req->content.data(), + co_await ppj::rjson_parse( + std::move(rq.req), ppj::partition_offsets_request_handler())); std::for_each(req_data.begin(), req_data.end(), [](const auto& r) { From b098de6ce532df08511eaf72d4cc266afee11a3c Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Wed, 3 Jul 2024 22:21:11 +0100 Subject: [PATCH 13/15] schema_registry: Use alternative rjson_parse Signed-off-by: Ben Pope (cherry picked from commit 6795578e7cd2d171b9733e4da95e829dc0476105) --- src/v/pandaproxy/schema_registry/handlers.cc | 32 ++++++++------------ src/v/pandaproxy/schema_registry/storage.h | 5 ++- 2 files changed, 14 insertions(+), 23 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/handlers.cc b/src/v/pandaproxy/schema_registry/handlers.cc index c4814cdc7fe15..bea8d14fcdb45 100644 --- a/src/v/pandaproxy/schema_registry/handlers.cc +++ b/src/v/pandaproxy/schema_registry/handlers.cc @@ -92,9 +92,8 @@ ss::future put_config(server::request_t rq, server::reply_t rp) { parse_content_type_header(rq); parse_accept_header(rq, rp); - auto config = ppj::rjson_parse( - rq.req->content.data(), put_config_handler<>{}); - rq.req.reset(); + auto config = co_await ppj::rjson_parse( + std::move(rq.req), put_config_handler<>{}); co_await rq.service().writer().write_config(std::nullopt, config.compat); @@ -158,9 +157,8 @@ put_config_subject(server::request_t rq, server::reply_t rp) { parse_content_type_header(rq); parse_accept_header(rq, rp); auto sub = parse::request_param(*rq.req, "subject"); - auto config = ppj::rjson_parse( - rq.req->content.data(), put_config_handler<>{}); - rq.req.reset(); + auto config = co_await ppj::rjson_parse( + std::move(rq.req), put_config_handler<>{}); // Ensure we see latest writes co_await rq.service().writer().read_sync(); @@ -218,8 +216,7 @@ ss::future put_mode(server::request_t rq, server::reply_t rp) { parse_accept_header(rq, rp); auto frc = parse::query_param>(*rq.req, "force") .value_or(force::no); - auto res = ppj::rjson_parse(rq.req->content.data(), mode_handler<>{}); - rq.req.reset(); + auto res = co_await ppj::rjson_parse(std::move(rq.req), mode_handler<>{}); co_await rq.service().writer().write_mode(std::nullopt, res.mode, frc); @@ -252,8 +249,7 @@ put_mode_subject(server::request_t rq, server::reply_t rp) { auto frc = parse::query_param>(*rq.req, "force") .value_or(force::no); auto sub = parse::request_param(*rq.req, "subject"); - auto res = ppj::rjson_parse(rq.req->content.data(), mode_handler<>{}); - rq.req.reset(); + auto res = co_await ppj::rjson_parse(std::move(rq.req), mode_handler<>{}); // Ensure we see latest writes co_await rq.service().writer().read_sync(); @@ -420,8 +416,8 @@ post_subject(server::request_t rq, server::reply_t rp) { canonical_schema schema; try { - auto unparsed = ppj::rjson_parse( - rq.req->content.data(), post_subject_versions_request_handler<>{sub}); + auto unparsed = co_await ppj::rjson_parse( + std::move(rq.req), post_subject_versions_request_handler<>{sub}); schema = co_await rq.service().schema_store().make_canonical_schema( std::move(unparsed.def)); } catch (const exception& e) { @@ -433,8 +429,6 @@ post_subject(server::request_t rq, server::reply_t rp) { throw as_exception(invalid_subject_schema(sub)); } - rq.req.reset(); - auto sub_schema = co_await rq.service().schema_store().has_schema( schema, inc_del); @@ -456,9 +450,8 @@ post_subject_versions(server::request_t rq, server::reply_t rp) { co_await rq.service().writer().read_sync(); - auto unparsed = ppj::rjson_parse( - rq.req->content.data(), post_subject_versions_request_handler<>{sub}); - rq.req.reset(); + auto unparsed = co_await ppj::rjson_parse( + std::move(rq.req), post_subject_versions_request_handler<>{sub}); subject_schema schema{ co_await rq.service().schema_store().make_canonical_schema( @@ -629,9 +622,8 @@ compatibility_subject_version(server::request_t rq, server::reply_t rp) { parse_accept_header(rq, rp); auto ver = parse::request_param(*rq.req, "version"); auto sub = parse::request_param(*rq.req, "subject"); - auto unparsed = ppj::rjson_parse( - rq.req->content.data(), post_subject_versions_request_handler<>{sub}); - rq.req.reset(); + auto unparsed = co_await ppj::rjson_parse( + std::move(rq.req), post_subject_versions_request_handler<>{sub}); // Must read, in case we have the subject in cache with an outdated config co_await rq.service().writer().read_sync(); diff --git a/src/v/pandaproxy/schema_registry/storage.h b/src/v/pandaproxy/schema_registry/storage.h index 2c84d16d0c37c..1f3a3e23e9f80 100644 --- a/src/v/pandaproxy/schema_registry/storage.h +++ b/src/v/pandaproxy/schema_registry/storage.h @@ -1347,9 +1347,8 @@ class delete_subject_value_handler : public json::base_handler { template auto from_json_iobuf(iobuf&& iobuf, Args&&... args) { - auto p = iobuf_parser(std::move(iobuf)); - auto str = p.read_string(p.bytes_left()); - return json::rjson_parse(str.data(), Handler{std::forward(args)...}); + return json::rjson_parse( + std::move(iobuf), Handler{std::forward(args)...}); } template From bba5270dea5c4a215b6ca1abbe84ba228292f0d6 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Wed, 3 Jul 2024 23:33:20 +0100 Subject: [PATCH 14/15] pandaproxy: Enable streaming requests Signed-off-by: Ben Pope (cherry picked from commit c93475720e00301e2c192bd2a44eac8b9a564c6e) --- src/v/pandaproxy/rest/handlers.cc | 2 +- src/v/pandaproxy/server.cc | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/v/pandaproxy/rest/handlers.cc b/src/v/pandaproxy/rest/handlers.cc index e7652215d3de4..0f1f956468c8e 100644 --- a/src/v/pandaproxy/rest/handlers.cc +++ b/src/v/pandaproxy/rest/handlers.cc @@ -456,7 +456,7 @@ post_consumer_offsets(server::request_t rq, server::reply_t rp) { auto member_id{parse::request_param(*rq.req, "instance")}; // If the request is empty, commit all offsets - auto req_data = rq.req->content.length() == 0 + auto req_data = rq.req->content_length == 0 ? std::vector() : ppj::partition_offsets_request_to_offset_commit_request( co_await ppj::rjson_parse( diff --git a/src/v/pandaproxy/server.cc b/src/v/pandaproxy/server.cc index 9ac81f15df3dd..22507fdc51353 100644 --- a/src/v/pandaproxy/server.cc +++ b/src/v/pandaproxy/server.cc @@ -154,6 +154,7 @@ server::server( _api20.set_api_doc(_server._routes); _api20.register_api_file(_server._routes, header); _api20.add_definitions_file(_server._routes, definitions); + _server.set_content_streaming(true); } /* From 84acab67b56f20db3e3882f44008e6372b7fa011 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Thu, 4 Jul 2024 13:36:29 +0100 Subject: [PATCH 15/15] pandaproxy/json: Move string version of rjson_parse It shouldn't be used in the general case. Signed-off-by: Ben Pope (cherry picked from commit ac7c7dd8f2a7177e88a886753c687f46e84c3f1b) --- .../pandaproxy/json/requests/test/produce.cc | 22 +++++++++---------- src/v/pandaproxy/json/rjson_util.h | 6 ++--- src/v/pandaproxy/json/test/parse.cc | 3 ++- src/v/pandaproxy/rest/test/consumer_group.cc | 4 ++-- .../requests/test/post_subject_versions.cc | 2 +- .../schema_registry/test/storage.cc | 16 +++++++------- 6 files changed, 27 insertions(+), 26 deletions(-) diff --git a/src/v/pandaproxy/json/requests/test/produce.cc b/src/v/pandaproxy/json/requests/test/produce.cc index 6f065bb224f41..b067dc8f56865 100644 --- a/src/v/pandaproxy/json/requests/test/produce.cc +++ b/src/v/pandaproxy/json/requests/test/produce.cc @@ -46,7 +46,7 @@ SEASTAR_THREAD_TEST_CASE(test_produce_binary_request) { ] })"; - auto records = ppj::rjson_parse(input, make_binary_v2_handler()); + auto records = ppj::impl::rjson_parse(input, make_binary_v2_handler()); BOOST_TEST(records.size() == 2); BOOST_TEST(!!records[0].value); @@ -77,7 +77,7 @@ SEASTAR_THREAD_TEST_CASE(test_produce_json_request) { ] })"; - auto records = ppj::rjson_parse(input, make_json_v2_handler()); + auto records = ppj::impl::rjson_parse(input, make_json_v2_handler()); BOOST_REQUIRE_EQUAL(records.size(), 2); BOOST_REQUIRE_EQUAL(records[0].partition_id, model::partition_id(0)); BOOST_REQUIRE(!records[0].key); @@ -111,7 +111,7 @@ SEASTAR_THREAD_TEST_CASE(test_produce_invalid_json_request) { })"; BOOST_CHECK_THROW( - ppj::rjson_parse(input, make_json_v2_handler()), + ppj::impl::rjson_parse(input, make_json_v2_handler()), pandaproxy::json::parse_error); } @@ -121,7 +121,7 @@ SEASTAR_THREAD_TEST_CASE(test_produce_request_empty) { "records": [] })"; - auto records = ppj::rjson_parse(input, make_binary_v2_handler()); + auto records = ppj::impl::rjson_parse(input, make_binary_v2_handler()); BOOST_TEST(records.size() == 0); } @@ -137,7 +137,7 @@ SEASTAR_THREAD_TEST_CASE(test_produce_request_error_records_name) { })"; BOOST_CHECK_EXCEPTION( - ppj::rjson_parse(input, make_binary_v2_handler()), + ppj::impl::rjson_parse(input, make_binary_v2_handler()), ppj::parse_error, [](ppj::parse_error const& e) { return e.what() == std::string_view("parse error at offset 25"); @@ -156,7 +156,7 @@ SEASTAR_THREAD_TEST_CASE(test_produce_request_error_partition_name) { })"; BOOST_CHECK_EXCEPTION( - ppj::rjson_parse(input, make_binary_v2_handler()), + ppj::impl::rjson_parse(input, make_binary_v2_handler()), ppj::parse_error, [](ppj::parse_error const& e) { return e.what() == std::string_view("parse error at offset 99"); @@ -175,7 +175,7 @@ SEASTAR_THREAD_TEST_CASE(test_produce_request_error_partition_type) { })"; BOOST_CHECK_EXCEPTION( - ppj::rjson_parse(input, make_binary_v2_handler()), + ppj::impl::rjson_parse(input, make_binary_v2_handler()), ppj::parse_error, [](ppj::parse_error const& e) { return e.what() == std::string_view("parse error at offset 112"); @@ -195,7 +195,7 @@ SEASTAR_THREAD_TEST_CASE(test_produce_request_error_before_records) { })"; BOOST_CHECK_EXCEPTION( - ppj::rjson_parse(input, make_binary_v2_handler()), + ppj::impl::rjson_parse(input, make_binary_v2_handler()), ppj::parse_error, [](ppj::parse_error const& e) { return e.what() == std::string_view("parse error at offset 28"); @@ -215,7 +215,7 @@ SEASTAR_THREAD_TEST_CASE(test_produce_request_error_after_records) { })"; BOOST_CHECK_EXCEPTION( - ppj::rjson_parse(input, make_binary_v2_handler()), + ppj::impl::rjson_parse(input, make_binary_v2_handler()), ppj::parse_error, [](ppj::parse_error const& e) { return e.what() == std::string_view("parse error at offset 152"); @@ -235,7 +235,7 @@ SEASTAR_THREAD_TEST_CASE(test_produce_request_error_between_records) { })"; BOOST_CHECK_EXCEPTION( - ppj::rjson_parse(input, make_binary_v2_handler()), + ppj::impl::rjson_parse(input, make_binary_v2_handler()), ppj::parse_error, [](ppj::parse_error const& e) { return e.what() == std::string_view("parse error at offset 144"); @@ -250,7 +250,7 @@ SEASTAR_THREAD_TEST_CASE(test_produce_request_error_no_records) { })"; BOOST_CHECK_EXCEPTION( - ppj::rjson_parse(input, make_binary_v2_handler()), + ppj::impl::rjson_parse(input, make_binary_v2_handler()), ppj::parse_error, [](ppj::parse_error const& e) { return e.what() == std::string_view("parse error at offset 24"); diff --git a/src/v/pandaproxy/json/rjson_util.h b/src/v/pandaproxy/json/rjson_util.h index 70bbb262094f7..a3d3358f2078d 100644 --- a/src/v/pandaproxy/json/rjson_util.h +++ b/src/v/pandaproxy/json/rjson_util.h @@ -120,8 +120,6 @@ rjson_parse_buf(Arg&& arg, Handler&& handler) { return std::forward(handler).result; } -} // namespace impl - /// \brief Parse a payload using the handler. /// /// \warning rjson_parse is preferred, since it can be chunked. @@ -132,6 +130,8 @@ rjson_parse(const char* const s, Handler&& handler) { s, std::forward(handler)); } +} // namespace impl + ///\brief Parse a payload using the handler. template typename Handler::rjson_parse_result rjson_parse(iobuf buf, Handler&& handler) { @@ -144,7 +144,7 @@ template typename ss::future rjson_parse(std::unique_ptr req, Handler handler) { if (!req->content.empty()) { - co_return rjson_parse(req->content.data(), std::move(handler)); + co_return impl::rjson_parse(req->content.data(), std::move(handler)); } iobuf buf; diff --git a/src/v/pandaproxy/json/test/parse.cc b/src/v/pandaproxy/json/test/parse.cc index cb75470644411..d7207474bf5d4 100644 --- a/src/v/pandaproxy/json/test/parse.cc +++ b/src/v/pandaproxy/json/test/parse.cc @@ -43,7 +43,8 @@ inline void parse_test(size_t data_size) { auto input = gen(data_size); perf_tests::start_measuring_time(); - auto records = ppj::rjson_parse(input.c_str(), make_binary_v2_handler()); + auto records = ppj::impl::rjson_parse( + input.c_str(), make_binary_v2_handler()); perf_tests::stop_measuring_time(); } diff --git a/src/v/pandaproxy/rest/test/consumer_group.cc b/src/v/pandaproxy/rest/test/consumer_group.cc index 0c0ea3ae13bb9..655d9384cfeb4 100644 --- a/src/v/pandaproxy/rest/test/consumer_group.cc +++ b/src/v/pandaproxy/rest/test/consumer_group.cc @@ -84,7 +84,7 @@ FIXTURE_TEST(pandaproxy_consumer_group, pandaproxy_test_fixture) { BOOST_REQUIRE_EQUAL( res.headers.result(), boost::beast::http::status::ok); - auto res_data = ppj::rjson_parse( + auto res_data = ppj::impl::rjson_parse( res.body.data(), ppj::create_consumer_response_handler()); BOOST_REQUIRE_EQUAL(res_data.instance_id, "test_consumer"); member_id = res_data.instance_id; @@ -315,7 +315,7 @@ FIXTURE_TEST( BOOST_REQUIRE_EQUAL( res.headers.result(), boost::beast::http::status::ok); - auto res_data = ppj::rjson_parse( + auto res_data = ppj::impl::rjson_parse( res.body.data(), ppj::create_consumer_response_handler()); BOOST_REQUIRE_EQUAL(res_data.instance_id, "test_consumer"); member_id = res_data.instance_id; diff --git a/src/v/pandaproxy/schema_registry/requests/test/post_subject_versions.cc b/src/v/pandaproxy/schema_registry/requests/test/post_subject_versions.cc index 3cc171a12aba6..a0befd798c81c 100644 --- a/src/v/pandaproxy/schema_registry/requests/test/post_subject_versions.cc +++ b/src/v/pandaproxy/schema_registry/requests/test/post_subject_versions.cc @@ -52,7 +52,7 @@ SEASTAR_THREAD_TEST_CASE(test_post_subject_versions_parser) { const parse_result expected{ {sub, expected_schema_def}, std::nullopt, std::nullopt}; - auto result{ppj::rjson_parse( + auto result{ppj::impl::rjson_parse( payload.data(), pps::post_subject_versions_request_handler{sub})}; // canonicalisation now requires a sharded_store, for now, minify. diff --git a/src/v/pandaproxy/schema_registry/test/storage.cc b/src/v/pandaproxy/schema_registry/test/storage.cc index 72a2153983371..aef243d10295e 100644 --- a/src/v/pandaproxy/schema_registry/test/storage.cc +++ b/src/v/pandaproxy/schema_registry/test/storage.cc @@ -127,7 +127,7 @@ const pps::delete_subject_value delete_subject_value{ BOOST_AUTO_TEST_CASE(test_storage_serde) { { - auto key = ppj::rjson_parse( + auto key = ppj::impl::rjson_parse( avro_schema_key_sv.data(), pps::schema_key_handler<>{}); BOOST_CHECK_EQUAL(avro_schema_key, key); @@ -136,7 +136,7 @@ BOOST_AUTO_TEST_CASE(test_storage_serde) { } { - auto val = ppj::rjson_parse( + auto val = ppj::impl::rjson_parse( avro_schema_value_sv.data(), pps::canonical_schema_value_handler<>{}); BOOST_CHECK_EQUAL(avro_schema_value, val); @@ -145,7 +145,7 @@ BOOST_AUTO_TEST_CASE(test_storage_serde) { } { - auto val = ppj::rjson_parse( + auto val = ppj::impl::rjson_parse( config_key_sv.data(), pps::config_key_handler<>{}); BOOST_CHECK_EQUAL(config_key, val); @@ -154,7 +154,7 @@ BOOST_AUTO_TEST_CASE(test_storage_serde) { } { - auto val = ppj::rjson_parse( + auto val = ppj::impl::rjson_parse( config_key_sub_sv.data(), pps::config_key_handler<>{}); BOOST_CHECK_EQUAL(config_key_sub, val); @@ -163,7 +163,7 @@ BOOST_AUTO_TEST_CASE(test_storage_serde) { } { - auto val = ppj::rjson_parse( + auto val = ppj::impl::rjson_parse( config_value_sv.data(), pps::config_value_handler<>{}); BOOST_CHECK_EQUAL(config_value, val); @@ -172,7 +172,7 @@ BOOST_AUTO_TEST_CASE(test_storage_serde) { } { - auto val = ppj::rjson_parse( + auto val = ppj::impl::rjson_parse( config_value_sub_sv.data(), pps::config_value_handler<>{}); BOOST_CHECK_EQUAL(config_value_sub, val); @@ -181,7 +181,7 @@ BOOST_AUTO_TEST_CASE(test_storage_serde) { } { - auto val = ppj::rjson_parse( + auto val = ppj::impl::rjson_parse( delete_subject_key_sv.data(), pps::delete_subject_key_handler<>{}); BOOST_CHECK_EQUAL(delete_subject_key, val); @@ -190,7 +190,7 @@ BOOST_AUTO_TEST_CASE(test_storage_serde) { } { - auto val = ppj::rjson_parse( + auto val = ppj::impl::rjson_parse( delete_subject_value_sv.data(), pps::delete_subject_value_handler<>{}); BOOST_CHECK_EQUAL(delete_subject_value, val);