Skip to content

Commit

Permalink
consolidate rpc metadata parsing logic in tests
Browse files Browse the repository at this point in the history
Summary: We should let all metadata parsing logic go through PayloadUtils.

Reviewed By: thedavekwon

Differential Revision: D63565995

fbshipit-source-id: 2db4b7228819a52321b507b1bbf402905aad358d
  • Loading branch information
avalonalex authored and facebook-github-bot committed Sep 29, 2024
1 parent 23dcf0f commit c13c188
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 22 deletions.
8 changes: 8 additions & 0 deletions thrift/lib/cpp2/transport/rocket/PayloadUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ size_t unpackCompact(T& output, const folly::IOBuf* buffer) {
return reader.getCursorPosition();
}

template <typename T>
size_t unpackCompact(T& output, const folly::io::Cursor& cursor) {
CompactProtocolReader reader;
reader.setInput(cursor);
output.read(&reader);
return reader.getCursorPosition();
}

namespace detail {
template <class PayloadType, bool uncompressPayload>
inline PayloadType unpackPayload(rocket::Payload&& payload) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@

#include <thrift/lib/cpp2/async/ServerSinkBridge.h>
#include <thrift/lib/cpp2/async/StreamCallbacks.h>
#include <thrift/lib/cpp2/protocol/CompactProtocol.h>
#include <thrift/lib/cpp2/protocol/Serializer.h>
#include <thrift/lib/cpp2/transport/core/TryUtil.h>
#include <thrift/lib/cpp2/transport/rocket/PayloadUtils.h>
#include <thrift/lib/cpp2/transport/rocket/Types.h>
#include <thrift/lib/cpp2/transport/rocket/client/RocketClient.h>
#include <thrift/lib/cpp2/transport/rocket/framing/test/Util.h>
Expand Down Expand Up @@ -112,18 +112,16 @@ makeTestResponse(
rocket::SetupFrame RocketTestClient::makeTestSetupFrame(
MetadataOpaqueMap<std::string, std::string> md) {
RequestSetupMetadata meta;
meta.opaque_ref() = {};
*meta.opaque_ref() = std::move(md);
meta.maxVersion_ref() = kClientVersion;
CompactProtocolWriter compactProtocolWriter;
folly::IOBufQueue paramQueue;
compactProtocolWriter.setOutput(&paramQueue);
meta.write(&compactProtocolWriter);
meta.opaque() = {};
*meta.opaque() = std::move(md);
meta.maxVersion() = kClientVersion;

auto serializedMeta = packCompact(std::move(meta));

// Serialize RocketClient's major/minor version (which is separate from the
// rsocket protocol major/minor version) into setup metadata.
auto buf = folly::IOBuf::createCombined(
sizeof(int32_t) + meta.serializedSize(&compactProtocolWriter));
sizeof(int32_t) + serializedMeta->computeChainDataLength());
folly::IOBufQueue queue;
queue.append(std::move(buf));
folly::io::QueueAppender appender(&queue, /* do not grow */ 0);
Expand All @@ -132,7 +130,7 @@ rocket::SetupFrame RocketTestClient::makeTestSetupFrame(
appender.writeBE<uint16_t>(0); // Thrift RocketClient major version
appender.writeBE<uint16_t>(1); // Thrift RocketClient minor version
// Append serialized setup parameters to setup frame metadata
appender.insert(paramQueue.move());
appender.insert(std::move(serializedMeta));
return rocket::SetupFrame(
rocket::Payload::makeFromMetadataAndData(queue.move(), {}), false);
}
Expand Down Expand Up @@ -380,21 +378,15 @@ class RocketTestServer::RocketTestServerHandler : public RocketServerHandler {
cursor.retreat(4);
}
// Validate RequestSetupMetadata
CompactProtocolReader reader;
reader.setInput(cursor);
RequestSetupMetadata meta;
meta.read(&reader);
EXPECT_EQ(reader.getCursorPosition(), frame.payload().metadataSize());
size_t unpackedSize = unpackCompact(meta, cursor);
EXPECT_EQ(unpackedSize, frame.payload().metadataSize());
EXPECT_EQ(expectedSetupMetadata_, meta.opaque_ref().value_or({}));
version_ = std::min(kServerVersion, meta.maxVersion_ref().value_or(0));
ServerPushMetadata serverMeta;
serverMeta.set_setupResponse();
serverMeta.setupResponse_ref()->version_ref() = version_;
CompactProtocolWriter compactProtocolWriter;
folly::IOBufQueue queue;
compactProtocolWriter.setOutput(&queue);
serverMeta.write(&compactProtocolWriter);
connection.sendMetadataPush(std::move(queue).move());
connection.sendMetadataPush(packCompact(std::move(serverMeta)));
}

void handleRequestResponseFrame(
Expand Down Expand Up @@ -450,7 +442,7 @@ class RocketTestServer::RocketTestServerHandler : public RocketServerHandler {
void onStreamCancel() override { delete this; }

bool onSinkHeaders(HeadersPayload&& payload) override {
auto metadata_ref = payload.payload.otherMetadata_ref();
auto metadata_ref = payload.payload.otherMetadata();
EXPECT_TRUE(metadata_ref);
if (metadata_ref) {
EXPECT_EQ(
Expand Down Expand Up @@ -508,8 +500,7 @@ class RocketTestServer::RocketTestServerHandler : public RocketServerHandler {

for (size_t i = 1; i <= nHeaders; ++i) {
HeadersPayloadContent header;
header.otherMetadata_ref() = {
{"expected_header", folly::to<std::string>(i)}};
header.otherMetadata() = {{"expected_header", folly::to<std::string>(i)}};
auto alive = clientCallback->onStreamHeaders({std::move(header), {}});
DCHECK(alive);
}
Expand Down

0 comments on commit c13c188

Please sign in to comment.