Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reduce ship memory usage #577

Closed
wants to merge 14 commits into from
Closed
8 changes: 5 additions & 3 deletions libraries/libfc/include/fc/io/datastream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,12 @@ class datastream<size_t, void> {
};

template <typename Streambuf>
class datastream<Streambuf, typename std::enable_if_t<std::is_base_of_v<std::streambuf, Streambuf>>> {
class datastream<Streambuf, typename std::enable_if_t<std::is_base_of_v<std::streambuf, std::remove_reference_t<Streambuf>>>> {
private:
Streambuf buf;

using reference_type = std::add_lvalue_reference_t<Streambuf>;

public:
template <typename... Args>
datastream(Args&&... args)
Expand All @@ -120,8 +122,8 @@ class datastream<Streambuf, typename std::enable_if_t<std::is_base_of_v<std::str
}
bool remaining() { return buf.in_avail(); }

Streambuf& storage() { return buf; }
const Streambuf& storage() const { return buf; }
reference_type storage() { return buf; }
const reference_type storage() const { return buf; }
};

template <typename Container>
Expand Down
144 changes: 98 additions & 46 deletions libraries/state_history/create_deltas.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include <eosio/state_history/create_deltas.hpp>
#include <eosio/state_history/serialization.hpp>
#include <boost/hana/count_if.hpp>
#include <boost/hana/tuple.hpp>

namespace eosio {
namespace state_history {
Expand Down Expand Up @@ -57,8 +59,13 @@ bool include_delta(const chain::protocol_state_object& old, const chain::protoco
return old.activated_protocol_features != curr.activated_protocol_features;
}

std::vector<table_delta> create_deltas(const chainbase::database& db, bool full_snapshot) {
std::vector<table_delta> deltas;

// std::vector<table_delta> create_deltas(const chainbase::database& db, bool full_snapshot) {
heifner marked this conversation as resolved.
Show resolved Hide resolved
void pack_deltas(boost::iostreams::filtering_ostreambuf& obuf, const chainbase::database& db, bool full_snapshot) {

// std::vector<table_delta> deltas;
heifner marked this conversation as resolved.
Show resolved Hide resolved
fc::datastream<boost::iostreams::filtering_ostreambuf&> ds{obuf};

const auto& table_id_index = db.get_index<chain::table_id_multi_index>();
std::map<uint64_t, const chain::table_id_object*> removed_table_id;
for (auto& rem : table_id_index.last_undo_session().removed_values)
Expand All @@ -73,72 +80,117 @@ std::vector<table_delta> create_deltas(const chainbase::database& db, bool full_
return *it->second;
};

auto pack_row = [&](auto& row) { return fc::raw::pack(make_history_serial_wrapper(db, row)); };
auto pack_contract_row = [&](auto& row) {
return fc::raw::pack(make_history_context_wrapper(db, get_table_id(row.t_id._id), row));
auto pack_row = [&](auto& ds, auto& row) { fc::raw::pack(ds, make_history_serial_wrapper(db, row)); };
auto pack_contract_row = [&](auto& ds, auto& row) {
fc::raw::pack(ds, make_history_context_wrapper(db, get_table_id(row.t_id._id), row));
};

auto process_table = [&](auto* name, auto& index, auto& pack_row) {
auto process_table = [&](auto& ds, auto* name, auto& index, auto& pack_row) {

auto pack_row_v0 = [&](auto& ds, bool present, auto& row) {
fc::raw::pack(ds, present);
fc::datastream<size_t> ps;
pack_row(ps, row);
fc::raw::pack(ds, fc::unsigned_int(ps.tellp()));
pack_row(ds, row);
};

if (full_snapshot) {
if (index.indices().empty())
return;
deltas.push_back({});
auto& delta = deltas.back();
delta.name = name;
for (auto& row : index.indices())
delta.rows.obj.emplace_back(true, pack_row(row));

fc::raw::pack(ds, fc::unsigned_int(0));
heifner marked this conversation as resolved.
Show resolved Hide resolved
fc::raw::pack(ds, name);
fc::raw::pack(ds, fc::unsigned_int(index.indices().size()));
for (auto& row : index.indices()) {
pack_row_v0(ds, true, row);
}
} else {
auto undo = index.last_undo_session();
if (undo.old_values.empty() && undo.new_values.empty() && undo.removed_values.empty())
return;
deltas.push_back({});
auto& delta = deltas.back();
delta.name = name;
for (auto& old : undo.old_values) {
auto& row = index.get(old.id);
if (include_delta(old, row))
delta.rows.obj.emplace_back(true, pack_row(row));
}
for (auto& old : undo.removed_values)
delta.rows.obj.emplace_back(false, pack_row(old));
for (auto& row : undo.new_values) {
delta.rows.obj.emplace_back(true, pack_row(row));
}

if(delta.rows.obj.empty()) {
deltas.pop_back();
size_t num_entries =
std::count_if(undo.old_values.begin(), undo.old_values.end(),
[&index](const auto& old) { return include_delta(old, index.get(old.id)); }) +
std::distance(undo.removed_values.begin(), undo.removed_values.end()) +
std::distance(undo.new_values.begin(), undo.new_values.end());

if (num_entries) {
fc::raw::pack(ds, fc::unsigned_int(0));
heifner marked this conversation as resolved.
Show resolved Hide resolved
fc::raw::pack(ds, name);
fc::raw::pack(ds, fc::unsigned_int((uint32_t)num_entries));

for (auto& old : undo.old_values) {
auto& row = index.get(old.id);
if (include_delta(old, row)) {
pack_row_v0(ds, true, row);
}
}

for (auto& old : undo.removed_values) {
pack_row_v0(ds, false, old);
}

for (auto& row : undo.new_values) {
pack_row_v0(ds, true, row);
}
}
}
};

process_table("account", db.get_index<chain::account_index>(), pack_row);
process_table("account_metadata", db.get_index<chain::account_metadata_index>(), pack_row);
process_table("code", db.get_index<chain::code_index>(), pack_row);
constexpr auto types = boost::hana::tuple_t<chain::account_index, chain::account_metadata_index, chain::code_index, chain::table_id_multi_index,
chain::key_value_index, chain::index64_index, chain::index128_index, chain::index256_index,
chain::index_double_index, chain::index_long_double_index, chain::global_property_multi_index,
chain::generated_transaction_multi_index, chain::protocol_state_multi_index, chain::permission_index,
chain::permission_link_index, chain::resource_limits::resource_limits_index,
chain::resource_limits::resource_usage_index, chain::resource_limits::resource_limits_state_index,
chain::resource_limits::resource_limits_config_index>;

process_table("contract_table", db.get_index<chain::table_id_multi_index>(), pack_row);
process_table("contract_row", db.get_index<chain::key_value_index>(), pack_contract_row);
process_table("contract_index64", db.get_index<chain::index64_index>(), pack_contract_row);
process_table("contract_index128", db.get_index<chain::index128_index>(), pack_contract_row);
process_table("contract_index256", db.get_index<chain::index256_index>(), pack_contract_row);
process_table("contract_index_double", db.get_index<chain::index_double_index>(), pack_contract_row);
process_table("contract_index_long_double", db.get_index<chain::index_long_double_index>(), pack_contract_row);
auto num_tables = boost::hana::count_if(types, [full_snapshot, &db](auto x) {
auto& index = db.get_index<typename decltype(x)::type>();
if (full_snapshot) {
return !index.indices().empty();
} else {
auto undo = index.last_undo_session();
return std::find_if(undo.old_values.begin(), undo.old_values.end(),
[&index](const auto& old) { return include_delta(old, index.get(old.id)); }) != undo.old_values.end() ||
!undo.removed_values.empty() || !undo.new_values.empty();
}
});
heifner marked this conversation as resolved.
Show resolved Hide resolved

fc::raw::pack(ds, fc::unsigned_int(num_tables));

process_table("global_property", db.get_index<chain::global_property_multi_index>(), pack_row);
process_table("generated_transaction", db.get_index<chain::generated_transaction_multi_index>(), pack_row);
process_table("protocol_state", db.get_index<chain::protocol_state_multi_index>(), pack_row);
process_table(ds, "account", db.get_index<chain::account_index>(), pack_row);
process_table(ds, "account_metadata", db.get_index<chain::account_metadata_index>(), pack_row);
process_table(ds, "code", db.get_index<chain::code_index>(), pack_row);

process_table("permission", db.get_index<chain::permission_index>(), pack_row);
process_table("permission_link", db.get_index<chain::permission_link_index>(), pack_row);
process_table(ds, "contract_table", db.get_index<chain::table_id_multi_index>(), pack_row);
process_table(ds, "contract_row", db.get_index<chain::key_value_index>(), pack_contract_row);
process_table(ds, "contract_index64", db.get_index<chain::index64_index>(), pack_contract_row);
process_table(ds, "contract_index128", db.get_index<chain::index128_index>(), pack_contract_row);
process_table(ds, "contract_index256", db.get_index<chain::index256_index>(), pack_contract_row);
process_table(ds, "contract_index_double", db.get_index<chain::index_double_index>(), pack_contract_row);
process_table(ds, "contract_index_long_double", db.get_index<chain::index_long_double_index>(), pack_contract_row);

process_table("resource_limits", db.get_index<chain::resource_limits::resource_limits_index>(), pack_row);
process_table("resource_usage", db.get_index<chain::resource_limits::resource_usage_index>(), pack_row);
process_table("resource_limits_state", db.get_index<chain::resource_limits::resource_limits_state_index>(),
process_table(ds, "global_property", db.get_index<chain::global_property_multi_index>(), pack_row);
process_table(ds, "generated_transaction", db.get_index<chain::generated_transaction_multi_index>(), pack_row);
process_table(ds, "protocol_state", db.get_index<chain::protocol_state_multi_index>(), pack_row);

process_table(ds, "permission", db.get_index<chain::permission_index>(), pack_row);
process_table(ds, "permission_link", db.get_index<chain::permission_link_index>(), pack_row);

process_table(ds, "resource_limits", db.get_index<chain::resource_limits::resource_limits_index>(), pack_row);
process_table(ds, "resource_usage", db.get_index<chain::resource_limits::resource_usage_index>(), pack_row);
process_table(ds, "resource_limits_state", db.get_index<chain::resource_limits::resource_limits_state_index>(),
pack_row);
process_table("resource_limits_config", db.get_index<chain::resource_limits::resource_limits_config_index>(),
process_table(ds, "resource_limits_config", db.get_index<chain::resource_limits::resource_limits_config_index>(),
pack_row);

return deltas;
obuf.pubsync();

}


} // namespace state_history
} // namespace eosio
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#pragma once

#include <eosio/state_history/types.hpp>
#include <boost/iostreams/filtering_streambuf.hpp>

namespace eosio {
namespace state_history {

std::vector<table_delta> create_deltas(const chainbase::database& db, bool full_snapshot);
void pack_deltas(boost::iostreams::filtering_ostreambuf& ds, const chainbase::database& db, bool full_snapshot);


} // namespace state_history
} // namespace eosio
47 changes: 35 additions & 12 deletions libraries/state_history/include/eosio/state_history/log.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class state_history_log {
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_guard =
boost::asio::make_work_guard(ctx);
std::recursive_mutex mx;
std::optional<std::mutex> prune_mx;

public:
state_history_log(const char* const name, std::string log_filename, std::string index_filename,
Expand All @@ -99,12 +100,13 @@ class state_history_log {
, prune_config(prune_conf) {
open_log();
open_index();

if(prune_config) {
EOS_ASSERT(prune_config->prune_blocks, chain::plugin_exception, "state history log prune configuration requires at least one block");
EOS_ASSERT(__builtin_popcount(prune_config->prune_threshold) == 1, chain::plugin_exception, "state history prune threshold must be power of 2");
//switch this over to the mask that will be used
prune_config->prune_threshold = ~(prune_config->prune_threshold-1);
prune_mx.emplace();
}

//check for conversions to/from pruned log, as long as log contains something
Expand Down Expand Up @@ -144,6 +146,17 @@ class state_history_log {
});
}

bool is_pruned() const {
return prune_config.has_value();
}

void acquire_prune_lock(std::unique_lock<std::mutex>& lock) {
if (prune_mx) {
std::unique_lock<std::mutex> prune_lock(*prune_mx);
lock.swap(prune_lock);
heifner marked this conversation as resolved.
Show resolved Hide resolved
}
}

void stop() {
if (thr.joinable()) {
work_guard.reset();
Expand All @@ -156,7 +169,7 @@ class state_history_log {
if (thr.joinable()) {
work_guard.reset();
thr.join();
}
}

//nothing to do if log is empty or we aren't pruning
if(_begin_block == _end_block)
Expand Down Expand Up @@ -199,7 +212,7 @@ class state_history_log {
}

std::unique_lock<std::recursive_mutex> lock(mx);

auto block_num = chain::block_header::num_from_id(header.block_id);
EOS_ASSERT(_begin_block == _end_block || block_num <= _end_block, chain::plugin_exception,
"missed a block in ${name}.log", ("name", name));
Expand Down Expand Up @@ -228,12 +241,13 @@ class state_history_log {
header.magic = ship_magic(get_ship_version(header.magic), ship_feature_pruned_log);

uint64_t pos = log.tellp();

write_header(header);
write_payload(log);

EOS_ASSERT(log.tellp() == pos + state_history_log_header_serial_size + header.payload_size, chain::plugin_exception,
"wrote payload with incorrect size to ${name}.log", ("name", name));
if (header.payload_size != 0)
EOS_ASSERT(log.tellp() == pos + state_history_log_header_serial_size + header.payload_size, chain::plugin_exception,
"wrote payload with incorrect size to ${name}.log", ("name", name));
fc::raw::pack(log, pos);

fc::raw::pack(index, pos);
Expand Down Expand Up @@ -270,7 +284,7 @@ class state_history_log {

private:
//file position must be at start of last block's suffix (back pointer)
//called from open_log / ctor
//called from open_log / ctor
bool get_last_block() {
state_history_log_header header;
uint64_t suffix;
Expand Down Expand Up @@ -303,13 +317,22 @@ class state_history_log {
if(_end_block - _begin_block <= prune_config->prune_blocks)
return;

const uint32_t prune_to_num = _end_block - prune_config->prune_blocks;
uint64_t prune_to_pos = get_pos(prune_to_num);
auto do_prune = [&]() {
const uint32_t prune_to_num = _end_block - prune_config->prune_blocks;
uint64_t prune_to_pos = get_pos(prune_to_num);

log.punch_hole(state_history_log_header_serial_size, prune_to_pos);
log.punch_hole(state_history_log_header_serial_size, prune_to_pos);

_begin_block = prune_to_num;
log.flush();
_begin_block = prune_to_num;
log.flush();
};

if (prune_mx) {
std::unique_lock<std::mutex> lock(*prune_mx);
do_prune();
} else {
do_prune();
}

if(auto l = fc::logger::get(); l.is_enabled(loglevel))
l.log(fc::log_message(fc::log_context(loglevel, __FILE__, __LINE__, __func__),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ datastream<ST>& operator<<(datastream<ST>& ds, const eosio::state_history::big_v
return ds;
}

template <typename ST, typename T>
datastream<ST>& operator>>(datastream<ST>& ds, eosio::state_history::big_vector_wrapper<T>& obj) {
fc::unsigned_int sz;
fc::raw::unpack(ds, sz);
obj.obj.resize(sz);
for (auto& x : obj.obj)
fc::raw::unpack(ds, x);
return ds;
}
heifner marked this conversation as resolved.
Show resolved Hide resolved

template <typename ST>
inline void history_pack_varuint64(datastream<ST>& ds, uint64_t val) {
do {
Expand Down Expand Up @@ -715,4 +725,15 @@ datastream<ST>& operator<<(datastream<ST>& ds, const eosio::state_history::get_b
return ds;
}

template <typename ST>
datastream<ST>& operator<<(datastream<ST>& ds, const eosio::state_history::get_blocks_result_base& obj) {
fc::raw::pack(ds, obj.head);
fc::raw::pack(ds, obj.last_irreversible);
fc::raw::pack(ds, obj.this_block);
fc::raw::pack(ds, obj.prev_block);
history_pack_big_bytes(ds, obj.block);
return ds;
}


} // namespace fc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <eosio/chain/block_state.hpp>
#include <eosio/state_history/types.hpp>
#include <boost/iostreams/filtering_streambuf.hpp>

namespace eosio {
namespace state_history {
Expand All @@ -13,8 +14,8 @@ struct trace_converter {
std::map<transaction_id_type, augmented_transaction_trace> cached_traces;
std::optional<augmented_transaction_trace> onblock_trace;

void add_transaction(const transaction_trace_ptr& trace, const chain::packed_transaction_ptr& transaction);
bytes pack(const chainbase::database& db, bool trace_debug_mode, const block_state_ptr& block_state);
void add_transaction(const transaction_trace_ptr& trace, const chain::packed_transaction_ptr& transaction);
void pack(boost::iostreams::filtering_ostreambuf& ds, const chainbase::database& db, bool trace_debug_mode, const block_state_ptr& block_state);
};

} // namespace state_history
Expand Down
Loading