Skip to content

Commit

Permalink
Merge branch 'GH-986-retry-with-oc' into GH-1039-interrupt-block-vali…
Browse files Browse the repository at this point in the history
…dation
  • Loading branch information
heifner committed Nov 22, 2024
2 parents f3665ba + 84f9886 commit 414349a
Show file tree
Hide file tree
Showing 18 changed files with 148 additions and 154 deletions.
2 changes: 0 additions & 2 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5325,7 +5325,6 @@ transaction_trace_ptr controller::push_transaction( const transaction_metadata_p
uint32_t billed_cpu_time_us, bool explicit_billed_cpu_time,
int64_t subjective_cpu_bill_us ) {
validate_db_available_size();
EOS_ASSERT( get_read_mode() != db_read_mode::IRREVERSIBLE, transaction_type_exception, "push transaction not allowed in irreversible mode" );
EOS_ASSERT( trx && !trx->implicit() && !trx->scheduled(), transaction_type_exception, "Implicit/Scheduled transaction not allowed" );
return my->push_transaction(trx, block_deadline, max_transaction_time, billed_cpu_time_us, explicit_billed_cpu_time, subjective_cpu_bill_us );
}
Expand All @@ -5334,7 +5333,6 @@ transaction_trace_ptr controller::push_scheduled_transaction( const transaction_
fc::time_point block_deadline, fc::microseconds max_transaction_time,
uint32_t billed_cpu_time_us, bool explicit_billed_cpu_time )
{
EOS_ASSERT( get_read_mode() != db_read_mode::IRREVERSIBLE, transaction_type_exception, "push scheduled transaction not allowed in irreversible mode" );
validate_db_available_size();
return my->push_scheduled_transaction( trxid, block_deadline, max_transaction_time, billed_cpu_time_us, explicit_billed_cpu_time );
}
Expand Down
3 changes: 2 additions & 1 deletion libraries/chain/fork_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,11 @@ namespace eosio::chain {

template<class BSP>
void fork_database_impl<BSP>::reset_root_impl( const bsp_t& root_bsp ) {
index.clear();
assert(root_bsp);
root = root_bsp;
root->set_valid(true);
pending_savanna_lib_id = block_id_type{};
index.clear();
}

template<class BSP>
Expand Down
4 changes: 2 additions & 2 deletions libraries/chain/include/eosio/chain/wasm_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ namespace eosio { namespace chain {
// returns true if EOS VM OC is enabled
bool is_eos_vm_oc_enabled() const;

// return internal executing action id, used for testing
uint64_t get_executing_action_id() const;
// return number of wasm execution interrupted by eos vm oc compile completing, used for testing
uint64_t get_eos_vm_oc_compile_interrupt_count() const;
#endif

//call before dtor to skip what can be minutes of dtor overhead with some runtimes; can cause leaks
Expand Down
52 changes: 27 additions & 25 deletions libraries/chain/include/eosio/chain/wasm_interface_private.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <eosio/chain/code_object.hpp>
#include <eosio/chain/global_property_object.hpp>
#include <eosio/chain/exceptions.hpp>
#include <eosio/chain/thread_utils.hpp>
#include <fc/scoped_exit.hpp>

#include "IR/Module.h"
Expand Down Expand Up @@ -100,8 +101,8 @@ struct eosvmoc_tier {
#ifdef EOSIO_EOS_VM_OC_RUNTIME_ENABLED
if(eosvmoc_tierup != wasm_interface::vm_oc_enable::oc_none) {
EOS_ASSERT(vm != wasm_interface::vm_type::eos_vm_oc, wasm_exception, "You can't use EOS VM OC as the base runtime when tier up is activated");
eosvmoc = std::make_unique<eosvmoc_tier>(data_dir, eosvmoc_config, d, [this](boost::asio::io_context& ctx, uint64_t executing_action_id, fc::time_point queued_time) {
async_compile_complete(ctx, executing_action_id, queued_time);
eosvmoc = std::make_unique<eosvmoc_tier>(data_dir, eosvmoc_config, d, [this](boost::asio::io_context& ctx, const digest_type& code_id, fc::time_point queued_time) {
async_compile_complete(ctx, code_id, queued_time);
});
}
#endif
Expand All @@ -111,19 +112,18 @@ struct eosvmoc_tier {

#ifdef EOSIO_EOS_VM_OC_RUNTIME_ENABLED
// called from async thread
void async_compile_complete(boost::asio::io_context& ctx, uint64_t exec_action_id, fc::time_point queued_time) {
if (exec_action_id == executing_action_id) { // is action still executing?
void async_compile_complete(boost::asio::io_context& ctx, const digest_type& code_id, fc::time_point queued_time) {
if (executing_code_hash.load() == code_id) { // is action still executing?
auto elapsed = fc::time_point::now() - queued_time;
ilog("EOS VM OC tier up for ${id} compile complete ${t}ms",
("id", exec_action_id)("t", elapsed.count()/1000));
ilog("EOS VM OC tier up for ${id} compile complete ${t}ms", ("id", code_id)("t", elapsed.count()/1000));
auto expire_in = std::max(fc::microseconds(0), fc::milliseconds(500) - elapsed);
std::shared_ptr<boost::asio::steady_timer> timer = std::make_shared<boost::asio::steady_timer>(ctx);
timer->expires_from_now(std::chrono::microseconds(expire_in.count()));
timer->async_wait([timer, this, exec_action_id](const boost::system::error_code& ec) {
timer->async_wait([timer, this, code_id](const boost::system::error_code& ec) {
if (ec)
return;
if (exec_action_id == executing_action_id) {
ilog("EOS VM OC tier up interrupting ${id}", ("id", exec_action_id));
if (executing_code_hash.load() == code_id) {
ilog("EOS VM OC tier up interrupting ${id}", ("id", code_id));
eos_vm_oc_compile_interrupt = true;
main_thread_timer.expire_now();
}
Expand All @@ -135,14 +135,9 @@ struct eosvmoc_tier {
void apply( const digest_type& code_hash, const uint8_t& vm_type, const uint8_t& vm_version, apply_context& context ) {
bool attempt_tierup = false;
#ifdef EOSIO_EOS_VM_OC_RUNTIME_ENABLED
auto ex = fc::make_scoped_exit([&]() {
eos_vm_oc_compile_interrupt = false;
++executing_action_id; // indicate no longer executing
});
attempt_tierup = eosvmoc && (eosvmoc_tierup == wasm_interface::vm_oc_enable::oc_all || context.should_use_eos_vm_oc());
const bool allow_oc_interrupt = attempt_tierup && context.is_applying_block() && context.trx_context.has_undo();
if (attempt_tierup) {
const bool allow_oc_interrupt = context.is_applying_block() && context.trx_context.has_undo();
const uint32_t exec_action_id = allow_oc_interrupt ? executing_action_id.load() : 0;
const chain::eosvmoc::code_descriptor* cd = nullptr;
chain::eosvmoc::code_cache_base::get_cd_failure failure = chain::eosvmoc::code_cache_base::get_cd_failure::temporary;
try {
Expand All @@ -158,7 +153,7 @@ struct eosvmoc_tier {
context.trx_context.resume_billing_timer();
});
context.trx_context.pause_billing_timer();
cd = eosvmoc->cc.get_descriptor_for_code(m, exec_action_id, code_hash, vm_version, failure);
cd = eosvmoc->cc.get_descriptor_for_code(m, code_hash, vm_version, failure);
} catch (...) {
// swallow errors here, if EOS VM OC has gone in to the weeds we shouldn't bail: continue to try and run baseline
// In the future, consider moving bits of EOS VM that can fire exceptions and such out of this call path
Expand All @@ -175,23 +170,29 @@ struct eosvmoc_tier {
}
}
#endif
auto ex = fc::make_scoped_exit([&]() {
eos_vm_oc_compile_interrupt = false;
executing_code_hash.store({}); // indicate no longer executing
});
executing_code_hash.store(code_hash);
try {
get_instantiated_module(code_hash, vm_type, vm_version, context.trx_context)->apply(context);
} catch (const interrupt_exception& e) {
if (eos_vm_oc_compile_interrupt) {
wlog("EOS VM OC compile complete id: ${id}, interrupt of ${r} <= ${a}::${act} code ${h}",
("id", executing_action_id.load())("r", context.get_receiver())("a", context.get_action().account)
("act", context.get_action().name)("h", code_hash));
EOS_THROW(interrupt_oc_exception, "EOS VM OC compile complete id: ${id}, interrupt of ${r} <= ${a}::${act} code ${h}",
("id", executing_action_id.load())("r", context.get_receiver())("a", context.get_action().account)
("act", context.get_action().name)("h", code_hash));
if (allow_oc_interrupt && eos_vm_oc_compile_interrupt) {
++eos_vm_oc_compile_interrupt_count;
wlog("EOS VM OC compile complete interrupt of ${r} <= ${a}::${act} code ${h}, interrupt #${c}",
("r", context.get_receiver())("a", context.get_action().account)
("act", context.get_action().name)("h", code_hash)("c", eos_vm_oc_compile_interrupt_count));
EOS_THROW(interrupt_oc_exception, "EOS VM OC compile complete interrupt of ${r} <= ${a}::${act} code ${h}, interrupt #${c}",
("r", context.get_receiver())("a", context.get_action().account)
("act", context.get_action().name)("h", code_hash)("c", eos_vm_oc_compile_interrupt_count));
}
throw;
}
}

// used for testing
uint64_t get_executing_action_id() const { return executing_action_id; }
uint64_t get_eos_vm_oc_compile_interrupt_count() const { return eos_vm_oc_compile_interrupt_count; }

bool is_code_cached(const digest_type& code_hash, const uint8_t& vm_type, const uint8_t& vm_version) const {
// This method is only called from tests; performance is not critical.
Expand Down Expand Up @@ -305,8 +306,9 @@ struct eosvmoc_tier {
platform_timer& main_thread_timer;
const wasm_interface::vm_type wasm_runtime_time;
const wasm_interface::vm_oc_enable eosvmoc_tierup;
std::atomic<uint64_t> executing_action_id{1}; // monotonic increasing for each action apply
large_atomic<digest_type> executing_code_hash{};
std::atomic<bool> eos_vm_oc_compile_interrupt{false};
uint32_t eos_vm_oc_compile_interrupt_count{0}; // for testing

#ifdef EOSIO_EOS_VM_OC_RUNTIME_ENABLED
std::unique_ptr<struct eosvmoc_tier> eosvmoc{nullptr}; // used by all threads
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#pragma once

#include <boost/lockfree/spsc_queue.hpp>

#include <eosio/chain/webassembly/eos-vm-oc/eos-vm-oc.hpp>
#include <eosio/chain/webassembly/eos-vm-oc/ipc_helpers.hpp>
#include <boost/multi_index_container.hpp>
Expand All @@ -10,29 +8,19 @@
#include <boost/multi_index/composite_key.hpp>
#include <boost/multi_index/key_extractors.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/lockfree/spsc_queue.hpp>

#include <boost/interprocess/mem_algo/rbtree_best_fit.hpp>
#include <boost/asio/local/datagram_protocol.hpp>

#include <thread>
#include <fc/crypto/sha256.hpp>

namespace std {
template<> struct hash<eosio::chain::eosvmoc::code_tuple> {
size_t operator()(const eosio::chain::eosvmoc::code_tuple& ct) const noexcept {
return ct.code_id._hash[0];
}
};
}
#include <thread>

namespace boost {
template<> struct hash<eosio::chain::eosvmoc::code_tuple> {
size_t operator()(const eosio::chain::eosvmoc::code_tuple& ct) const {
std::size_t seed = 0;
boost::hash_combine(seed, ct.code_id._hash[0]);
boost::hash_combine(seed, ct.vm_version);
return seed;
}
};
namespace fc {
inline size_t hash_value(const fc::sha256& code_id) {
return boost::hash<fc::sha256>()(code_id);
}
}

namespace eosio { namespace chain { namespace eosvmoc {
Expand All @@ -46,10 +34,6 @@ using allocator_t = bip::rbtree_best_fit<bip::null_mutex_family, bip::offset_ptr

struct config;

inline size_t hash_value(const eosio::chain::eosvmoc::code_tuple& ct) {
return boost::hash<eosio::chain::eosvmoc::code_tuple>()(ct);
}

class code_cache_base {
public:
code_cache_base(const std::filesystem::path& data_dir, const eosvmoc::config& eosvmoc_config, const chainbase::database& db);
Expand Down Expand Up @@ -80,10 +64,7 @@ class code_cache_base {
indexed_by<
sequenced<>,
hashed_unique<tag<by_hash>,
composite_key< code_descriptor,
member<code_descriptor, digest_type, &code_descriptor::code_hash>,
member<code_descriptor, uint8_t, &code_descriptor::vm_version>
>
member<code_descriptor, digest_type, &code_descriptor::code_hash>
>
>
> code_cache_index;
Expand All @@ -104,20 +85,20 @@ class code_cache_base {
compile_wasm_message msg;
std::vector<wrapped_fd> fds_to_pass;

const code_tuple& code() const { return msg.code; }
const digest_type& code_id() const { return msg.code.code_id; }
};
//these are really only useful to the async code cache, but keep them here so free_code can be shared
using queued_compilies_t = boost::multi_index_container<
queued_compile_entry,
indexed_by<
sequenced<>,
hashed_unique<tag<by_hash>,
const_mem_fun<queued_compile_entry, const code_tuple&, &queued_compile_entry::code>>
const_mem_fun<queued_compile_entry, const digest_type&, &queued_compile_entry::code_id>>
>
>;
std::mutex _mtx;
queued_compilies_t _queued_compiles; // protected by _mtx
std::unordered_map<code_tuple, bool> _outstanding_compiles_and_poison; // protected by _mtx
std::unordered_map<digest_type, bool> _outstanding_compiles_and_poison; // protected by _mtx
std::atomic<size_t> _outstanding_compiles{0};

size_t _free_bytes_eviction_threshold;
Expand All @@ -132,8 +113,8 @@ class code_cache_base {

class code_cache_async : public code_cache_base {
public:
// called from async thread, provides executing_action_id of any compiles spawned by get_descriptor_for_code
using compile_complete_callback = std::function<void(boost::asio::io_context&, uint64_t, fc::time_point)>;
// called from async thread, provides code_id of any compiles spawned by get_descriptor_for_code
using compile_complete_callback = std::function<void(boost::asio::io_context&, const digest_type&, fc::time_point)>;

code_cache_async(const std::filesystem::path& data_dir, const eosvmoc::config& eosvmoc_config,
const chainbase::database& db, compile_complete_callback cb);
Expand All @@ -142,21 +123,20 @@ class code_cache_async : public code_cache_base {
//If code is in cache: returns pointer & bumps to front of MRU list
//If code is not in cache, and not blacklisted, and not currently compiling: return nullptr and kick off compile
//otherwise: return nullptr
const code_descriptor* const
get_descriptor_for_code(mode m, uint64_t executing_action_id,
const digest_type& code_id, const uint8_t& vm_version, get_cd_failure& failure);
const code_descriptor* const get_descriptor_for_code(mode m, const digest_type& code_id, const uint8_t& vm_version,
get_cd_failure& failure);

private:
compile_complete_callback _compile_complete_func; // called from async thread, provides executing_action_id
std::thread _monitor_reply_thread;
boost::lockfree::spsc_queue<wasm_compilation_result_message> _result_queue;
std::unordered_set<code_tuple> _blacklist;
std::unordered_set<digest_type> _blacklist;
size_t _threads;

void wait_on_compile_monitor_message();
std::tuple<size_t, size_t> consume_compile_thread_queue();
void process_queued_compiles();
void write_message(const code_tuple& ct, const eosvmoc_message& message, const std::vector<wrapped_fd>& fds);
void write_message(const digest_type& code_id, const eosvmoc_message& message, const std::vector<wrapped_fd>& fds);

};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ struct code_tuple {

struct compile_wasm_message {
code_tuple code;
uint64_t executing_action_id{0}; // action id that initiated the compilation
fc::time_point queued_time; // when compilation was queued to begin
std::optional<eosvmoc::subjective_compile_limits> limits;
//Two sent fd: 1) communication socket for result, 2) the wasm to compile
Expand All @@ -37,7 +36,6 @@ struct code_compilation_result_message {
unsigned apply_offset;
int starting_memory_pages;
unsigned initdata_prologue_size;
uint64_t executing_action_id{0}; // action id that initiated the compilation
fc::time_point queued_time; // when compilation was queued to begin
//Two sent fds: 1) wasm code, 2) initial memory snapshot
};
Expand All @@ -54,7 +52,6 @@ struct wasm_compilation_result_message {
code_tuple code;
wasm_compilation_result result;
size_t cache_free_bytes;
uint64_t executing_action_id{0}; // action id that initiated the compilation, copied from compile_wasm_message
fc::time_point queued_time; // when compilation was queued to begin, copied from compile_wasm_message
};

Expand All @@ -69,9 +66,9 @@ using eosvmoc_message = std::variant<initialize_message,
FC_REFLECT(eosio::chain::eosvmoc::initialize_message, )
FC_REFLECT(eosio::chain::eosvmoc::initalize_response_message, (error_message))
FC_REFLECT(eosio::chain::eosvmoc::code_tuple, (code_id)(vm_version))
FC_REFLECT(eosio::chain::eosvmoc::compile_wasm_message, (code)(executing_action_id)(queued_time)(limits))
FC_REFLECT(eosio::chain::eosvmoc::compile_wasm_message, (code)(queued_time)(limits))
FC_REFLECT(eosio::chain::eosvmoc::evict_wasms_message, (codes))
FC_REFLECT(eosio::chain::eosvmoc::code_compilation_result_message, (start)(apply_offset)(starting_memory_pages)(initdata_prologue_size)(executing_action_id)(queued_time))
FC_REFLECT(eosio::chain::eosvmoc::code_compilation_result_message, (start)(apply_offset)(starting_memory_pages)(initdata_prologue_size)(queued_time))
FC_REFLECT(eosio::chain::eosvmoc::compilation_result_unknownfailure, )
FC_REFLECT(eosio::chain::eosvmoc::compilation_result_toofull, )
FC_REFLECT(eosio::chain::eosvmoc::wasm_compilation_result_message, (code)(result)(cache_free_bytes)(executing_action_id)(queued_time))
FC_REFLECT(eosio::chain::eosvmoc::wasm_compilation_result_message, (code)(result)(cache_free_bytes)(queued_time))
4 changes: 2 additions & 2 deletions libraries/chain/wasm_interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ namespace eosio { namespace chain {
return my->is_eos_vm_oc_enabled();
}

uint64_t wasm_interface::get_executing_action_id() const {
return my->get_executing_action_id();
uint64_t wasm_interface::get_eos_vm_oc_compile_interrupt_count() const {
return my->get_eos_vm_oc_compile_interrupt_count();
}
#endif

Expand Down
Loading

0 comments on commit 414349a

Please sign in to comment.