Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reduce ship memory usage #577

Closed
wants to merge 14 commits into from
Closed
8 changes: 5 additions & 3 deletions libraries/libfc/include/fc/io/datastream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,12 @@ class datastream<size_t, void> {
};

template <typename Streambuf>
class datastream<Streambuf, typename std::enable_if_t<std::is_base_of_v<std::streambuf, Streambuf>>> {
class datastream<Streambuf, typename std::enable_if_t<std::is_base_of_v<std::streambuf, std::remove_reference_t<Streambuf>>>> {
private:
Streambuf buf;

using reference_type = std::add_lvalue_reference_t<Streambuf>;

public:
template <typename... Args>
datastream(Args&&... args)
Expand All @@ -120,8 +122,8 @@ class datastream<Streambuf, typename std::enable_if_t<std::is_base_of_v<std::str
}
bool remaining() { return buf.in_avail(); }

Streambuf& storage() { return buf; }
const Streambuf& storage() const { return buf; }
reference_type storage() { return buf; }
const reference_type storage() const { return buf; }
};

template <typename Container>
Expand Down
148 changes: 99 additions & 49 deletions libraries/state_history/create_deltas.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ bool include_delta(const chain::protocol_state_object& old, const chain::protoco
return old.activated_protocol_features != curr.activated_protocol_features;
}

std::vector<table_delta> create_deltas(const chainbase::database& db, bool full_snapshot) {
std::vector<table_delta> deltas;
void pack_deltas(boost::iostreams::filtering_ostreambuf& obuf, const chainbase::database& db, bool full_snapshot) {

fc::datastream<boost::iostreams::filtering_ostreambuf&> ds{obuf};

const auto& table_id_index = db.get_index<chain::table_id_multi_index>();
std::map<uint64_t, const chain::table_id_object*> removed_table_id;
for (auto& rem : table_id_index.last_undo_session().removed_values)
Expand All @@ -73,72 +75,120 @@ std::vector<table_delta> create_deltas(const chainbase::database& db, bool full_
return *it->second;
};

auto pack_row = [&](auto& row) { return fc::raw::pack(make_history_serial_wrapper(db, row)); };
auto pack_contract_row = [&](auto& row) {
return fc::raw::pack(make_history_context_wrapper(db, get_table_id(row.t_id._id), row));
auto pack_row = [&](auto& ds, auto& row) { fc::raw::pack(ds, make_history_serial_wrapper(db, row)); };
auto pack_contract_row = [&](auto& ds, auto& row) {
fc::raw::pack(ds, make_history_context_wrapper(db, get_table_id(row.t_id._id), row));
};

auto process_table = [&](auto* name, auto& index, auto& pack_row) {
auto process_table = [&](auto& ds, auto* name, auto& index, auto& pack_row) {

auto pack_row_v0 = [&](auto& ds, bool present, auto& row) {
fc::raw::pack(ds, present);
fc::datastream<size_t> ps;
pack_row(ps, row);
fc::raw::pack(ds, fc::unsigned_int(ps.tellp()));
pack_row(ds, row);
};

if (full_snapshot) {
if (index.indices().empty())
return;
deltas.push_back({});
auto& delta = deltas.back();
delta.name = name;
for (auto& row : index.indices())
delta.rows.obj.emplace_back(true, pack_row(row));

fc::raw::pack(ds, fc::unsigned_int(0)); // table_delta = std::variant<table_delta_v0> and fc::unsigned_int struct_version
fc::raw::pack(ds, name);
fc::raw::pack(ds, fc::unsigned_int(index.indices().size()));
for (auto& row : index.indices()) {
pack_row_v0(ds, true, row);
}
} else {
auto undo = index.last_undo_session();
if (undo.old_values.empty() && undo.new_values.empty() && undo.removed_values.empty())
return;
deltas.push_back({});
auto& delta = deltas.back();
delta.name = name;
for (auto& old : undo.old_values) {
auto& row = index.get(old.id);
if (include_delta(old, row))
delta.rows.obj.emplace_back(true, pack_row(row));
}
for (auto& old : undo.removed_values)
delta.rows.obj.emplace_back(false, pack_row(old));
for (auto& row : undo.new_values) {
delta.rows.obj.emplace_back(true, pack_row(row));
}

if(delta.rows.obj.empty()) {
deltas.pop_back();
size_t num_entries =
std::count_if(undo.old_values.begin(), undo.old_values.end(),
[&index](const auto& old) { return include_delta(old, index.get(old.id)); }) +
std::distance(undo.removed_values.begin(), undo.removed_values.end()) +
std::distance(undo.new_values.begin(), undo.new_values.end());

if (num_entries) {
fc::raw::pack(ds, fc::unsigned_int(0)); // table_delta = std::variant<table_delta_v0> and fc::unsigned_int struct_version
fc::raw::pack(ds, name);
fc::raw::pack(ds, fc::unsigned_int((uint32_t)num_entries));

for (auto& old : undo.old_values) {
auto& row = index.get(old.id);
if (include_delta(old, row)) {
pack_row_v0(ds, true, row);
}
}

for (auto& old : undo.removed_values) {
pack_row_v0(ds, false, old);
}

for (auto& row : undo.new_values) {
pack_row_v0(ds, true, row);
}
}
}
};

process_table("account", db.get_index<chain::account_index>(), pack_row);
process_table("account_metadata", db.get_index<chain::account_metadata_index>(), pack_row);
process_table("code", db.get_index<chain::code_index>(), pack_row);

process_table("contract_table", db.get_index<chain::table_id_multi_index>(), pack_row);
process_table("contract_row", db.get_index<chain::key_value_index>(), pack_contract_row);
process_table("contract_index64", db.get_index<chain::index64_index>(), pack_contract_row);
process_table("contract_index128", db.get_index<chain::index128_index>(), pack_contract_row);
process_table("contract_index256", db.get_index<chain::index256_index>(), pack_contract_row);
process_table("contract_index_double", db.get_index<chain::index_double_index>(), pack_contract_row);
process_table("contract_index_long_double", db.get_index<chain::index_long_double_index>(), pack_contract_row);

process_table("global_property", db.get_index<chain::global_property_multi_index>(), pack_row);
process_table("generated_transaction", db.get_index<chain::generated_transaction_multi_index>(), pack_row);
process_table("protocol_state", db.get_index<chain::protocol_state_multi_index>(), pack_row);

process_table("permission", db.get_index<chain::permission_index>(), pack_row);
process_table("permission_link", db.get_index<chain::permission_link_index>(), pack_row);
auto has_table = [&](auto x) -> int {
auto& index = db.get_index<std::remove_pointer_t<decltype(x)>>();
if (full_snapshot) {
return !index.indices().empty();
} else {
auto undo = index.last_undo_session();
return std::find_if(undo.old_values.begin(), undo.old_values.end(),
[&index](const auto& old) { return include_delta(old, index.get(old.id)); }) != undo.old_values.end() ||
!undo.removed_values.empty() || !undo.new_values.empty();
}
};

process_table("resource_limits", db.get_index<chain::resource_limits::resource_limits_index>(), pack_row);
process_table("resource_usage", db.get_index<chain::resource_limits::resource_usage_index>(), pack_row);
process_table("resource_limits_state", db.get_index<chain::resource_limits::resource_limits_state_index>(),
int num_tables = std::apply(
[&has_table](auto... args) { return (has_table(args) + ... ); },
std::tuple<chain::account_index*, chain::account_metadata_index*, chain::code_index*,
chain::table_id_multi_index*, chain::key_value_index*, chain::index64_index*, chain::index128_index*,
chain::index256_index*, chain::index_double_index*, chain::index_long_double_index*,
chain::global_property_multi_index*, chain::generated_transaction_multi_index*,
chain::protocol_state_multi_index*, chain::permission_index*, chain::permission_link_index*,
chain::resource_limits::resource_limits_index*, chain::resource_limits::resource_usage_index*,
chain::resource_limits::resource_limits_state_index*,
chain::resource_limits::resource_limits_config_index*>());

fc::raw::pack(ds, fc::unsigned_int(num_tables));

process_table(ds, "account", db.get_index<chain::account_index>(), pack_row);
process_table(ds, "account_metadata", db.get_index<chain::account_metadata_index>(), pack_row);
process_table(ds, "code", db.get_index<chain::code_index>(), pack_row);

process_table(ds, "contract_table", db.get_index<chain::table_id_multi_index>(), pack_row);
process_table(ds, "contract_row", db.get_index<chain::key_value_index>(), pack_contract_row);
process_table(ds, "contract_index64", db.get_index<chain::index64_index>(), pack_contract_row);
process_table(ds, "contract_index128", db.get_index<chain::index128_index>(), pack_contract_row);
process_table(ds, "contract_index256", db.get_index<chain::index256_index>(), pack_contract_row);
process_table(ds, "contract_index_double", db.get_index<chain::index_double_index>(), pack_contract_row);
process_table(ds, "contract_index_long_double", db.get_index<chain::index_long_double_index>(), pack_contract_row);

process_table(ds, "global_property", db.get_index<chain::global_property_multi_index>(), pack_row);
process_table(ds, "generated_transaction", db.get_index<chain::generated_transaction_multi_index>(), pack_row);
process_table(ds, "protocol_state", db.get_index<chain::protocol_state_multi_index>(), pack_row);

process_table(ds, "permission", db.get_index<chain::permission_index>(), pack_row);
process_table(ds, "permission_link", db.get_index<chain::permission_link_index>(), pack_row);

process_table(ds, "resource_limits", db.get_index<chain::resource_limits::resource_limits_index>(), pack_row);
process_table(ds, "resource_usage", db.get_index<chain::resource_limits::resource_usage_index>(), pack_row);
process_table(ds, "resource_limits_state", db.get_index<chain::resource_limits::resource_limits_state_index>(),
pack_row);
process_table("resource_limits_config", db.get_index<chain::resource_limits::resource_limits_config_index>(),
process_table(ds, "resource_limits_config", db.get_index<chain::resource_limits::resource_limits_config_index>(),
pack_row);

return deltas;
obuf.pubsync();

}


} // namespace state_history
} // namespace eosio
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#pragma once

#include <eosio/state_history/types.hpp>
#include <boost/iostreams/filtering_streambuf.hpp>

namespace eosio {
namespace state_history {

std::vector<table_delta> create_deltas(const chainbase::database& db, bool full_snapshot);
void pack_deltas(boost::iostreams::filtering_ostreambuf& ds, const chainbase::database& db, bool full_snapshot);


} // namespace state_history
} // namespace eosio
Loading