Skip to content

Commit

Permalink
Merge pull request #3380 from pleroy/3375
Browse files Browse the repository at this point in the history
Lift the maximum deserialization size to 2 GiB from 512 MiB
  • Loading branch information
pleroy authored Jun 20, 2022
2 parents 2100578 + 866ca1f commit c3bdba1
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 36 deletions.
7 changes: 7 additions & 0 deletions base/push_deserializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <cstdint>
#include <functional>
#include <memory>
#include <ostream>
#include <queue>
#include <thread>

Expand Down Expand Up @@ -38,6 +39,9 @@ class DelegatingArrayInputStream
bool Skip(int count) override;
std::int64_t ByteCount() const override;

friend std::ostream& operator<<(std::ostream& out,
DelegatingArrayInputStream const& stream);

private:
Array<std::uint8_t> bytes_;
std::function<Array<std::uint8_t>()> on_empty_;
Expand All @@ -48,6 +52,9 @@ class DelegatingArrayInputStream
// Next() was called.
};

std::ostream& operator<<(std::ostream& out,
DelegatingArrayInputStream const& stream);

// This class support deserialization which is "pushed" by the client. That is,
// the client creates a |PushDeserializer| object, calls |Start| to start the
// deserialization process, repeatedly calls |Push| to send chunks of data for
Expand Down
29 changes: 27 additions & 2 deletions base/push_deserializer_body.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,32 @@ inline std::int64_t DelegatingArrayInputStream::ByteCount() const {
return byte_count_;
}

inline std::ostream& operator<<(std::ostream& out,
DelegatingArrayInputStream const& stream) {
out << "Stream with " << stream.ByteCount() << " total bytes, "
<< "current chunk of size " << stream.bytes_.size
<< " at position " << stream.position_
<< ", last call to Next returned " << stream.last_returned_size_
<< " bytes\n";
for (std::int64_t index = 0; index < stream.bytes_.size; ++index) {
out << std::hex << std::setw(2) << std::setfill('0')
<< static_cast<std::int16_t>(stream.bytes_.data[index]);
if (index == stream.position_) {
out << "*";
}
if ((index + 1) % 32 == 0) {
out << " " << std::dec << index - 31 << "\n";
} else {
out << " ";
}
}
if (stream.position_ == stream.bytes_.size) {
out << "*";
}
out << "\n";
return out;
}

inline PushDeserializer::PushDeserializer(
int const chunk_size,
int const number_of_chunks,
Expand All @@ -88,7 +114,7 @@ inline PushDeserializer::PushDeserializer(
number_of_chunks_(number_of_chunks),
uncompressed_data_(chunk_size_),
stream_(std::bind(&PushDeserializer::Pull, this)) {
// This sentinel ensures that the two queue are correctly out of step.
// This sentinel ensures that the two queues are correctly out of step.
done_.push(nullptr);
}

Expand All @@ -115,7 +141,6 @@ inline void PushDeserializer::Start(
// we have to copy code from MessageLite::ParseFromZeroCopyStream. Blame
// Kenton.
google::protobuf::io::CodedInputStream decoder(&stream_);
decoder.SetTotalBytesLimit(1 << 29, 1<< 29);
CHECK(message_->ParseFromCodedStream(&decoder));
CHECK(decoder.ConsumedEntireMessage());

Expand Down
70 changes: 40 additions & 30 deletions journal/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,37 @@ Player::Player(std::filesystem::path const& path)
}

bool Player::Play(int const index) {
std::unique_ptr<serialization::Method> method_in = Read();
return Process(/*method_in=*/Read(), index, /*play=*/true);
}

bool Player::Scan(int const index) {
return Process(/*method_in=*/Read(), index, /*play=*/false);
}

serialization::Method const& Player::last_method_in() const {
return *last_method_in_;
}

serialization::Method const& Player::last_method_out_return() const {
return *last_method_out_return_;
}

std::unique_ptr<serialization::Method> Player::Read() {
std::string const line = GetLine(stream_);
if (line.empty()) {
return nullptr;
}

static auto* const encoder = new HexadecimalEncoder</*null_terminated=*/true>;
auto const bytes = encoder->Decode({line.c_str(), strlen(line.c_str())});
auto method = std::make_unique<serialization::Method>();
CHECK(method->ParseFromArray(bytes.data.get(), static_cast<int>(bytes.size)));

return method;
}

bool Player::Process(std::unique_ptr<serialization::Method> method_in,
int const index, bool const play) {
if (method_in == nullptr) {
// End of input file.
return false;
Expand All @@ -58,8 +88,8 @@ bool Player::Play(int const index) {
(PRINCIPIA_PLAYER_ALLOW_VERSION_MISMATCH == 0))
<< "Journal version is " << get_version_out.version()
<< ", running with a binary built at version " << Version
<< "; set PRINCIPIA_PLAYER_ALLOW_VERSION_MISMATCH to 1 if this is "
<< "intended";
<< "; set PRINCIPIA_PLAYER_ALLOW_VERSION_MISMATCH to 1 in player.cpp "
<< "if this is intended.";
}

#if 0
Expand All @@ -77,14 +107,16 @@ bool Player::Play(int const index) {
<< method_out_return->ShortDebugString();
#endif

auto const before = std::chrono::system_clock::now();
if (play) {
auto const before = std::chrono::system_clock::now();

#include "journal/player.generated.cc"

auto const after = std::chrono::system_clock::now();
if (after - before > 100ms) {
LOG(ERROR) << "Long method (" << (after - before) / 1ms << " ms):\n"
<< method_in->DebugString();
auto const after = std::chrono::system_clock::now();
if (after - before > 100ms) {
LOG(ERROR) << "Long method (" << (after - before) / 1ms << " ms):\n"
<< method_in->DebugString();
}
}

last_method_in_.swap(method_in);
Expand All @@ -93,27 +125,5 @@ bool Player::Play(int const index) {
return true;
}

serialization::Method const& Player::last_method_in() const {
return *last_method_in_;
}

serialization::Method const& Player::last_method_out_return() const {
return *last_method_out_return_;
}

std::unique_ptr<serialization::Method> Player::Read() {
std::string const line = GetLine(stream_);
if (line.empty()) {
return nullptr;
}

static auto* const encoder = new HexadecimalEncoder</*null_terminated=*/true>;
auto const bytes = encoder->Decode({line.c_str(), strlen(line.c_str())});
auto method = std::make_unique<serialization::Method>();
CHECK(method->ParseFromArray(bytes.data.get(), static_cast<int>(bytes.size)));

return method;
}

} // namespace journal
} // namespace principia
7 changes: 7 additions & 0 deletions journal/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ class Player final {
// |index| is the 0-based index of the message in the journal.
bool Play(int index);

// Same as |Play|, but does not execute the messages, only parse them.
bool Scan(int index);

// Return the last replayed messages.
serialization::Method const& last_method_in() const;
serialization::Method const& last_method_out_return() const;
Expand All @@ -29,6 +32,10 @@ class Player final {
// Reads one message from the stream. Returns a |nullptr| at end of stream.
std::unique_ptr<serialization::Method> Read();

// Implementation of |Play| and |Scan|.
bool Process(std::unique_ptr<serialization::Method> method_in,
int const index, bool const play);

template<typename Profile>
bool RunIfAppropriate(serialization::Method const& method_in,
serialization::Method const& method_out_return);
Expand Down
25 changes: 22 additions & 3 deletions journal/player_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,30 @@ TEST_F(PlayerTest, DISABLED_SECULAR_Benchmarks) {
benchmark::RunSpecifiedBenchmarks();
}

// A convenience test to find the last unpaired method of a journal. You must
// set |path|.
TEST_F(PlayerTest, DISABLED_SECULAR_Scan) {
std::string path =
R"(P:\Public Mockingbird\Principia\Crashes\3375\JOURNAL.20220610-092143)"; // NOLINT
Player player(path);
int count = 0;
while (player.Scan(count)) {
++count;
LOG_IF(ERROR, (count % 100'000) == 0) << count
<< " journal entries replayed";
}
LOG(ERROR) << count << " journal entries in total";
LOG(ERROR) << "Last successful method in:\n"
<< player.last_method_in().DebugString();
LOG(ERROR) << "Last successful method out/return: \n"
<< player.last_method_out_return().DebugString();
}

// A test to debug a journal. You must set |path| and fill the |method_in| and
// |method_out_return| protocol buffers.
TEST_F(PlayerTest, DISABLED_SECULAR_Debug) {
// An example of how journaling may be used for debugging. You must set
// |path| and fill the |method_in| and |method_out_return| protocol buffers.
std::string path =
R"(P:\Public Mockingbird\Principia\Journals\JOURNAL.20211128-162300)"; // NOLINT
R"(P:\Public Mockingbird\Principia\Crashes\3375\JOURNAL.20220610-092143)"; // NOLINT
Player player(path);
int count = 0;
while (player.Play(count)) {
Expand Down
4 changes: 3 additions & 1 deletion journal/profiles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ std::uint64_t SerializePointer(T* t) {
#define PRINCIPIA_SET_VERBOSE_LOGGING 1

#if PRINCIPIA_PERFORM_RUN_CHECKS
#define PRINCIPIA_CHECK_EQ(a, b) CHECK((a) == (b))
#define PRINCIPIA_CHECK_EQ(a, b) \
CHECK((a) == (b)) << "Set PRINCIPIA_PERFORM_RUN_CHECKS to 0 in profile.cpp " \
<< "to disable this check."
#else
#define PRINCIPIA_CHECK_EQ(a, b) \
{ \
Expand Down
26 changes: 26 additions & 0 deletions ksp_plugin/interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,13 @@ Status* __cdecl principia__SayNotFound() {
absl::NotFoundError("Not found from native C++!")));
}

#define PRINCIPIA_VERIFY_SERIALIZATION 0

#if PRINCIPIA_VERIFY_SERIALIZATION
static PushDeserializer* verification_deserializer = nullptr;
static Plugin const* verification_plugin = nullptr;
#endif

// |plugin| must not be null. The caller takes ownership of the result, except
// when it is null (at the end of the stream). No transfer of ownership of
// |*plugin|. |*serializer| must be null on the first call and must be passed
Expand Down Expand Up @@ -1096,6 +1103,18 @@ char const* __cdecl principia__SerializePlugin(
// If this is the end of the serialization, delete the serializer and return a
// nullptr.
if (bytes.size == 0) {
#if PRINCIPIA_VERIFY_SERIALIZATION
principia__DeserializePlugin("",
&verification_deserializer,
&verification_plugin,
compressor,
encoder);
CHECK_NOTNULL(verification_plugin);
LOG(INFO) << "Deleting verification plugin";
delete verification_plugin;
verification_plugin = nullptr;
LOG(INFO) << "Verification plugin deleted";
#endif
LOG(INFO) << "End plugin serialization";
TakeOwnership(serializer);
arena->Reset();
Expand All @@ -1104,6 +1123,13 @@ char const* __cdecl principia__SerializePlugin(

// Encode and return to the client.
auto hexadecimal = NewEncoder(encoder)->Encode(bytes);
#if PRINCIPIA_VERIFY_SERIALIZATION
principia__DeserializePlugin(hexadecimal.data.get(),
&verification_deserializer,
&verification_plugin,
compressor,
encoder);
#endif
return m.Return(hexadecimal.data.release());
}

Expand Down
3 changes: 3 additions & 0 deletions ksp_plugin/vessel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <vector>

#include "absl/container/btree_set.h"
#include "base/macros.hpp"
#include "base/map_util.hpp"
#include "geometry/named_quantities.hpp"
#include "ksp_plugin/integrators.hpp"
Expand Down Expand Up @@ -630,6 +631,8 @@ void Vessel::WriteToMessage(not_null<serialization::Vessel*> const message,
}
message->set_is_collapsible(is_collapsible_);
checkpointer_->WriteToMessage(message->mutable_checkpoint());
LOG(INFO) << name_ << " " << NAMED(message->SpaceUsed()) << " "
<< NAMED(message->ByteSize());
}

not_null<std::unique_ptr<Vessel>> Vessel::ReadFromMessage(
Expand Down
3 changes: 3 additions & 0 deletions ksp_plugin_adapter/ksp_plugin_adapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,7 @@ public override void OnSave(ConfigNode node) {
base.OnSave(node);
if (PluginRunning()) {
IntPtr serializer = IntPtr.Zero;
int chunks = 0;
for (;;) {
string serialization = plugin_.SerializePlugin(
ref serializer,
Expand All @@ -781,7 +782,9 @@ public override void OnSave(ConfigNode node) {
break;
}
node.AddValue(principia_serialized_plugin, serialization);
++chunks;
}
Log.Info("Serialization has " + chunks + " chunks");
}
}

Expand Down

0 comments on commit c3bdba1

Please sign in to comment.