Skip to content

Commit

Permalink
revert commit for threaded write
Browse files Browse the repository at this point in the history
  • Loading branch information
huangminghuang committed Dec 29, 2022
1 parent 7b2efb8 commit a9cea07
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 296 deletions.
6 changes: 3 additions & 3 deletions libraries/chain/include/eosio/chain/log_catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,11 @@ struct log_catalog {
return {fc::datastream<const char*>(nullptr, 0), static_cast<uint32_t>(0)};
}

template <typename Result>
auto ro_stream_for_block(uint32_t block_num, Result& result) -> std::optional<decltype( std::declval<LogData>().ro_stream_at(0, result))> {
template <typename ...Rest>
auto ro_stream_for_block(uint32_t block_num, Rest&& ...rest) -> std::optional<decltype( std::declval<LogData>().ro_stream_at(0, std::forward<Rest&&>(rest)...))> {
auto pos = get_block_position(block_num, mapmode::readonly);
if (pos) {
return log_data.ro_stream_at(*pos, result);
return log_data.ro_stream_at(*pos, std::forward<Rest&&>(rest)...);
}
return {};
}
Expand Down
335 changes: 122 additions & 213 deletions libraries/state_history/include/eosio/state_history/log.hpp

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ template <typename Session>
class blocks_result_send_queue_entry : public send_queue_entry_base<Session> {
eosio::state_history::get_blocks_result_v0 r;
std::vector<char> data;
eosio::log_entry_type buf;
std::variant<std::vector<char>, locked_decompress_stream> buf;

template <typename Next>
void async_send(Session* s, bool fin, const std::vector<char>& d, Next&& next) {
Expand All @@ -55,7 +55,7 @@ class blocks_result_send_queue_entry : public send_queue_entry_base<Session> {
}

template <typename Next>
void async_send(Session* s, bool fin, maybe_locked_decompress_stream& locked_strm, Next&& next) {
void async_send(Session* s, bool fin, locked_decompress_stream& locked_strm, Next&& next) {
data.resize(s->default_frame_size);
auto size = bio::read(locked_strm.buf, data.data(), s->default_frame_size);
data.resize(size);
Expand All @@ -77,15 +77,6 @@ class blocks_result_send_queue_entry : public send_queue_entry_base<Session> {
});
}

template <typename Next>
void async_send(Session* s, bool fin, const std::shared_ptr<std::vector<char>>& d, Next&& next) {
s->socket_stream.async_write_some(
fin, boost::asio::buffer(*d),
[s = s->shared_from_this(), next = std::forward<Next>(next)](boost::system::error_code ec, size_t) mutable {
s->callback(ec, "async_write", [s, next = std::move(next)]() mutable { next(s.get()); });
});
}

template <typename Next>
void async_send_buf(Session* s, bool fin, Next&& next) {
std::visit([this, s, fin,
Expand Down Expand Up @@ -244,13 +235,15 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock
send();
}

uint64_t get_trace_log_entry(const eosio::state_history::get_blocks_result_v0& result, eosio::log_entry_type& buf) {
uint64_t get_trace_log_entry(const eosio::state_history::get_blocks_result_v0& result,
std::variant<std::vector<char>, locked_decompress_stream>& buf) {
if (result.traces.has_value())
return plugin->get_trace_log()->get_unpacked_entry(result.this_block->block_num, buf);
return 0;
}

uint64_t get_delta_log_entry(const eosio::state_history::get_blocks_result_v0& result, eosio::log_entry_type& buf) {
uint64_t get_delta_log_entry(const eosio::state_history::get_blocks_result_v0& result,
std::variant<std::vector<char>, locked_decompress_stream>& buf) {
if (result.deltas.has_value())
return plugin->get_chain_state_log()->get_unpacked_entry(result.this_block->block_num, buf);
return 0;
Expand Down
14 changes: 4 additions & 10 deletions plugins/state_history_plugin/state_history_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
state_history_log_header header{.magic = ship_magic(ship_current_version, 0),
.block_id = block_state->id,
.payload_size = 0};
trace_log->pack_and_async_write_entry(header, block_state->block->previous, [this, &block_state](auto&& buf) {
trace_log->pack_and_write_entry(header, block_state->block->previous, [this, &block_state](auto&& buf) {
trace_converter.pack(buf, chain_plug->chain().db(), trace_debug_mode, block_state);
});
}
Expand All @@ -315,15 +315,9 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl

state_history_log_header header{
.magic = ship_magic(ship_current_version, 0), .block_id = block_state->id, .payload_size = 0};
if (fresh) {
chain_state_log->pack_and_write_entry(header, block_state->header.previous, [this](auto&& buf) {
pack_deltas(buf, chain_plug->chain().db(), true);
});
} else {
chain_state_log->pack_and_async_write_entry(header, block_state->header.previous, [this](auto&& buf) {
pack_deltas(buf, chain_plug->chain().db(), false);
});
}
chain_state_log->pack_and_write_entry(header, block_state->header.previous, [this, fresh](auto&& buf) {
pack_deltas(buf, chain_plug->chain().db(), fresh);
});
} // store_chain_state
}; // state_history_plugin_impl

Expand Down
51 changes: 7 additions & 44 deletions plugins/state_history_plugin/tests/session_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,53 +364,16 @@ void store_read_test_case(uint64_t data_size, eosio::state_history_log_config co
BOOST_REQUIRE_EQUAL(log.get_log_file().tellp(), pos);


auto same_data = [](const std::vector<int32_t>& lhs, std::vector<char>& rhs) -> bool {
return (lhs.size() * sizeof(int32_t) == rhs.size()) && std::equal(rhs.begin(), rhs.end(), (const char*)lhs.data());
};


eosio::log_entry_type buf;
std::variant<std::vector<char>, eosio::locked_decompress_stream> buf;
log.get_unpacked_entry(1, buf);

{
std::vector<char> decompressed;
auto locked_strm_ptr = std::get_if<eosio::maybe_locked_decompress_stream>(&buf);
BOOST_REQUIRE(locked_strm_ptr);
BOOST_CHECK(locked_strm_ptr->lock.has_value() == std::holds_alternative<eosio::state_history::prune_config>(config));
bio::copy(locked_strm_ptr->buf, bio::back_inserter(decompressed));
BOOST_CHECK(same_data(data, decompressed));
}

data = generate_data(data_size);

header.block_id = block_id_for(2);
log.pack_and_async_write_entry(header, block_id_for(1),
[&](auto&& buf) { bio::write(buf, (const char*)data.data(), data.size() * sizeof(data[0])); });

// in this case, we should get the cached value
{
log.get_unpacked_entry(2, buf);
auto decompressed = std::get_if<std::shared_ptr<eosio::chain::bytes>>(&buf);
BOOST_REQUIRE(decompressed);
BOOST_CHECK( same_data(data, *decompressed->get()));
}

{
// let's wait until the index has been writeen
while (boost::filesystem::file_size( log_dir.path()/"ship.index" ) < 2*sizeof(uint64_t)) {
using namespace std::chrono_literals;
std::this_thread::sleep_for(200ms);
};

log.clear_cache(); // clear the cache so we can read the entry directly from disk
log.get_unpacked_entry(2, buf);
auto locked_strm_ptr = std::get_if<eosio::maybe_locked_decompress_stream>(&buf);
BOOST_REQUIRE(locked_strm_ptr);
std::vector<char> decompressed;
bio::copy(locked_strm_ptr->buf, bio::back_inserter(decompressed));
BOOST_CHECK(same_data(data, decompressed));
}
std::vector<char> decompressed;
auto& locked_strm = std::get<eosio::locked_decompress_stream>(buf);
BOOST_CHECK(locked_strm.lock.has_value());
bio::copy(locked_strm.buf, bio::back_inserter(decompressed));

BOOST_CHECK_EQUAL(data.size() * sizeof(data[0]), decompressed.size());
BOOST_CHECK(std::equal(decompressed.begin(), decompressed.end(), (const char*)data.data()));
}

BOOST_AUTO_TEST_CASE(store_read_entry_no_prune) {
Expand Down
7 changes: 3 additions & 4 deletions tests/ship_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,11 @@ struct ship_log_fixture {
BOOST_REQUIRE_EQUAL(log->end_block()-1, last);
if(enable_read) {
for(auto i = first; i <= last; i++) {
eosio::log_entry_type result;
std::variant<std::vector<char>, eosio::locked_decompress_stream> result;
log->get_unpacked_entry(i, result);
std::visit(eosio::chain::overloaded{
[&](std::vector<char>& buff) { BOOST_REQUIRE(buff == written_data.at(i)); },
[&](std::shared_ptr<std::vector<char>>& buff) { BOOST_REQUIRE(*buff == written_data.at(i)); },
[&](eosio::maybe_locked_decompress_stream& strm) {
[&](eosio::locked_decompress_stream& strm) {
std::vector<char> buff;
boost::iostreams::copy(strm.buf, boost::iostreams::back_inserter(buff));
BOOST_REQUIRE(buff == written_data.at(i));
Expand All @@ -65,7 +64,7 @@ struct ship_log_fixture {
}

void check_not_present(uint32_t index) {
eosio::log_entry_type result;
std::variant<std::vector<char>, eosio::locked_decompress_stream> result;
BOOST_REQUIRE_EQUAL(log->get_unpacked_entry(index, result), 0);
}

Expand Down
4 changes: 0 additions & 4 deletions tests/ship_streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,6 @@ int main(int argc, char* argv[]) {
boost::beast::flat_buffer buffer;
stream.read(buffer);

FILE* f = fopen("last_result.bin", "wb");
fwrite((const char*)buffer.data().data(), buffer.data().size(), 1, f);
fclose(f);

eosio::input_stream is((const char*)buffer.data().data(), buffer.data().size());
rapidjson::Document result_doucment;
result_doucment.Parse(result_type.bin_to_json(is).c_str());
Expand Down
7 changes: 2 additions & 5 deletions unittests/state_history_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -641,16 +641,13 @@ struct state_history_tester : state_history_tester_logs, tester {
};

static std::vector<char> get_decompressed_entry(eosio::state_history_log& log, block_num_type block_num) {
eosio::log_entry_type buf;
std::variant<std::vector<char>, eosio::locked_decompress_stream> buf;
log.get_unpacked_entry(block_num, buf);
namespace bio = boost::iostreams;
return std::visit(eosio::chain::overloaded{ [](std::vector<char>& bytes) {
return bytes;
},
[](std::shared_ptr<std::vector<char>>& bytes) {
return *bytes;
},
[](eosio::maybe_locked_decompress_stream& strm) {
[](eosio::locked_decompress_stream& strm) {
std::vector<char> bytes;
bio::copy(strm.buf, bio::back_inserter(bytes));
return bytes;
Expand Down

0 comments on commit a9cea07

Please sign in to comment.