Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move abi serialization of transaction trace off main thread #999

Merged
merged 21 commits into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
51d2b5b
Move abi serialization of transaction trace off main thread for send_…
greg7mdp Apr 6, 2023
0b3cc3d
Merge branch 'gh-698' of github.com:AntelopeIO/leap into gh-698-part2
greg7mdp Apr 6, 2023
66c1731
Move serialization off main thread for compute_transaction
greg7mdp Apr 7, 2023
e616649
Merge branch 'gh-698' of github.com:AntelopeIO/leap into gh-698-part2
greg7mdp Apr 7, 2023
c2aab35
correct abi caching, avoid unnecessary serializer copies
greg7mdp Apr 10, 2023
9fd4dff
Update get_block() API to use new architecture.
greg7mdp Apr 10, 2023
28b03d2
Make a few lambdas `mutable` to avoid copying abi_serializers
greg7mdp Apr 10, 2023
01a0dc5
Merge branch 'main' of github.com:AntelopeIO/leap into gh-698-part2
greg7mdp Apr 10, 2023
ac8c37c
First batch of changes from PR review comments
greg7mdp Apr 11, 2023
eb49677
2nd batch of changes from PR review comments
greg7mdp Apr 11, 2023
841a59e
remove unnecesary `mutable` qualifiers.
greg7mdp Apr 11, 2023
52e25cd
Avoid unnecessary copies, restore `mutable` on lambdas capturing the …
greg7mdp Apr 11, 2023
9aa412b
Whitespace cleanup
greg7mdp Apr 11, 2023
6457040
Merge branch 'main' of github.com:AntelopeIO/leap into gh-698-part2
greg7mdp Apr 11, 2023
52f0a2a
have `send_transaction` call `send_transaction2`
greg7mdp Apr 12, 2023
8fabe5d
Commonize to a single send_transaction function.
greg7mdp Apr 12, 2023
0c04150
Change `abi_cache` name as per PR review comment.
greg7mdp Apr 12, 2023
6e47c71
Add comment as suggested in PR review.
greg7mdp Apr 12, 2023
380373b
Address PR comments (mostly allow parameters to be moved by accepting…
greg7mdp Apr 13, 2023
8d830dc
Keep track of whether the serializer cache contains a setabi action.
greg7mdp Apr 13, 2023
e0edbcd
Revert "Keep track of whether the serializer cache contains a setabi …
greg7mdp Apr 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 87 additions & 17 deletions libraries/chain/include/eosio/chain/abi_serializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include <fc/variant_object.hpp>
#include <fc/scoped_exit.hpp>

namespace eosio { namespace chain {
namespace eosio::chain {

using std::map;
using std::string;
Expand Down Expand Up @@ -485,14 +485,15 @@ namespace impl {
};

try {
auto abi = resolver(act.account);
if (abi) {
auto type = abi->get_action_type(act.name);
auto abi_optional = resolver(act.account);
if (abi_optional) {
abi_serializer& abi = *abi_optional;
auto type = abi.get_action_type(act.name);
if (!type.empty()) {
try {
binary_to_variant_context _ctx(*abi, ctx, type);
binary_to_variant_context _ctx(abi, ctx, type);
_ctx.short_path = true; // Just to be safe while avoiding the complexity of threading an override boolean all over the place
mvo( "data", abi->_binary_to_variant( type, act.data, _ctx ));
mvo( "data", abi._binary_to_variant( type, act.data, _ctx ));
} catch(...) {
// any failure to serialize data, then leave as not serailzed
set_hex_data(mvo, "data", act.data);
Expand Down Expand Up @@ -546,13 +547,14 @@ namespace impl {
mvo("return_value_hex_data", act_trace.return_value);
auto act = act_trace.act;
try {
auto abi = resolver(act.account);
if (abi) {
auto type = abi->get_action_result_type(act.name);
auto abi_optional = resolver(act.account);
if (abi_optional) {
abi_serializer& abi = *abi_optional;
auto type = abi.get_action_result_type(act.name);
if (!type.empty()) {
binary_to_variant_context _ctx(*abi, ctx, type);
binary_to_variant_context _ctx(abi, ctx, type);
_ctx.short_path = true; // Just to be safe while avoiding the complexity of threading an override boolean all over the place
mvo( "return_value_data", abi->_binary_to_variant( type, act_trace.return_value, _ctx ));
mvo( "return_value_data", abi._binary_to_variant( type, act_trace.return_value, _ctx ));
}
}
} catch(...) {}
Expand Down Expand Up @@ -678,13 +680,13 @@ namespace impl {
* this will degrade to the common fc::to_variant as soon as the type no longer contains
* ABI related info
*
* @tparam Reslover - callable with the signature (const name& code_account) -> std::optional<abi_def>
* @tparam Resolver - callable with the signature (const name& code_account) -> std::optional<abi_def>
*/
template<typename T, typename Resolver>
class abi_to_variant_visitor
{
public:
abi_to_variant_visitor( mutable_variant_object& _mvo, const T& _val, Resolver _resolver, abi_traverse_context& _ctx )
abi_to_variant_visitor( mutable_variant_object& _mvo, const T& _val, Resolver& _resolver, abi_traverse_context& _ctx )
:_vo(_mvo)
,_val(_val)
,_resolver(_resolver)
Expand All @@ -707,7 +709,7 @@ namespace impl {
private:
mutable_variant_object& _vo;
const T& _val;
Resolver _resolver;
Resolver& _resolver;
abi_traverse_context& _ctx;
};

Expand Down Expand Up @@ -890,7 +892,7 @@ namespace impl {
class abi_from_variant_visitor : public reflector_init_visitor<T>
{
public:
abi_from_variant_visitor( const variant_object& _vo, T& v, Resolver _resolver, abi_traverse_context& _ctx )
abi_from_variant_visitor( const variant_object& _vo, T& v, Resolver& _resolver, abi_traverse_context& _ctx )
: reflector_init_visitor<T>(v)
,_vo(_vo)
,_resolver(_resolver)
Expand All @@ -914,7 +916,7 @@ namespace impl {

private:
const variant_object& _vo;
Resolver _resolver;
Resolver& _resolver;
abi_traverse_context& _ctx;
};

Expand Down Expand Up @@ -970,5 +972,73 @@ void abi_serializer::from_variant( const fc::variant& v, T& o, Resolver resolver
from_variant( v, o, resolver, create_yield_function(max_serialization_time) );
}

using abi_serializer_cache_t = std::unordered_map<account_name, std::optional<abi_serializer>>;

class abi_resolver {
public:
abi_resolver(abi_serializer_cache_t&& abi_serializers) :
abi_serializers(std::move(abi_serializers))
{}

std::optional<std::reference_wrapper<abi_serializer>> operator()(const account_name& account) {
auto it = abi_serializers.find(account);
if (it != abi_serializers.end() && it->second)
return std::reference_wrapper<abi_serializer>(*it->second);
return {};
};

private:
abi_serializer_cache_t abi_serializers;
};

class abi_serializer_cache_builder {
public:
abi_serializer_cache_builder(std::function<std::optional<abi_serializer>(const account_name& name)> resolver) :
resolver_(std::move(resolver))
{
}

abi_serializer_cache_builder(const abi_serializer_cache_builder&) = delete;

abi_serializer_cache_builder&& add_serializers(const chain::signed_block_ptr& block) && {
for( const auto& receipt: block->transactions ) {
if( std::holds_alternative<chain::packed_transaction>( receipt.trx ) ) {
const auto& pt = std::get<chain::packed_transaction>( receipt.trx );
const auto& t = pt.get_transaction();
for( const auto& a: t.actions )
add_to_cache( a );
for( const auto& a: t.context_free_actions )
add_to_cache( a );
}
}
return std::move(*this);
}

abi_serializer_cache_builder&& add_serializers(const transaction_trace_ptr& trace_ptr) && {
for( const auto& trace: trace_ptr->action_traces ) {
add_to_cache(trace.act);
}
return std::move(*this);
}

abi_serializer_cache_t&& get() && {
return std::move(abi_serializers);
}

private:
void add_to_cache(const chain::action& a) {
auto it = abi_serializers.find( a.account );
if( it == abi_serializers.end() ) {
try {
abi_serializers.emplace_hint( it, a.account, resolver_( a.account ) );
} catch( ... ) {
// keep behavior of not throwing on invalid abi, will result in hex data
}
}
}

std::function<std::optional<abi_serializer>(const account_name& name)> resolver_;
abi_serializer_cache_t abi_serializers;
};

} } // eosio::chain
} // eosio::chain
20 changes: 20 additions & 0 deletions libraries/chain/include/eosio/chain/exceptions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,26 @@
NEXT(e.dynamic_copy_exception());\
}

/**
* Capture all exceptions and return return_type which is constructible from a fc::exception_ptr
*/
#define CATCH_AND_RETURN(return_type)\
catch ( const fc::exception& err ) {\
return return_type(err.dynamic_copy_exception());\
} catch ( const std::exception& e ) {\
fc::exception fce( \
FC_LOG_MESSAGE( warn, "rethrow ${what}: ", ("what",e.what())),\
fc::std_exception_code,\
BOOST_CORE_TYPEID(e).name(),\
e.what() ) ;\
return return_type(fce.dynamic_copy_exception());\
} catch( ... ) {\
fc::unhandled_exception e(\
FC_LOG_MESSAGE(warn, "rethrow"),\
std::current_exception());\
return return_type(e.dynamic_copy_exception());\
}

#define EOS_RECODE_EXC( cause_type, effect_type ) \
catch( const cause_type& e ) \
{ throw( effect_type( e.what(), e.get_log() ) ); }
Expand Down
5 changes: 4 additions & 1 deletion libraries/chain/include/eosio/chain/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,10 @@ namespace eosio::chain {
// http_plugin thread pool) and which completes the API processing and returns the result T.
// -------------------------------------------------------------------------------------------------------
template<typename T>
using next_function_variant = std::variant<fc::exception_ptr, T, std::function<T()>>;
using t_or_exception = std::variant<T, fc::exception_ptr>;
linh2931 marked this conversation as resolved.
Show resolved Hide resolved

template<typename T>
using next_function_variant = std::variant<fc::exception_ptr, T, std::function<t_or_exception<T>()>>;

template<typename T>
using next_function = std::function<void(const next_function_variant<T>&)>;
Expand Down
53 changes: 2 additions & 51 deletions plugins/chain_api_plugin/chain_api_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,37 +57,12 @@ parse_params<chain_apis::read_only::get_transaction_status_params, http_params_t

#define CHAIN_RO_CALL(call_name, http_response_code, params_type) CALL_WITH_400(chain, ro_api, chain_apis::read_only, call_name, http_response_code, params_type)
#define CHAIN_RW_CALL(call_name, http_response_code, params_type) CALL_WITH_400(chain, rw_api, chain_apis::read_write, call_name, http_response_code, params_type)
#define CHAIN_RO_CALL_POST(call_name, http_response_code, params_type) CALL_WITH_400_POST(chain, ro_api, chain_apis::read_only, call_name, http_response_code, params_type)
#define CHAIN_RO_CALL_ASYNC(call_name, call_result, http_response_code, params_type) CALL_ASYNC_WITH_400(chain, ro_api, chain_apis::read_only, call_name, call_result, http_response_code, params_type)
#define CHAIN_RW_CALL_ASYNC(call_name, call_result, http_response_code, params_type) CALL_ASYNC_WITH_400(chain, rw_api, chain_apis::read_write, call_name, call_result, http_response_code, params_type)

#define CHAIN_RO_CALL_WITH_400(call_name, http_response_code, params_type) CALL_WITH_400(chain, ro_api, chain_apis::read_only, call_name, http_response_code, params_type)

template<class API, class PARAMS_PARSER, class HANDLER>
static api_entry make_api_entry(http_plugin& _http_plugin, API& api, const char* api_name,
const char* call_name, PARAMS_PARSER params_parser, HANDLER handler) {
return api_entry(
std::string("/v1/") + api_name + "/" + call_name,
[&_http_plugin, api, api_name, call_name,
params_parser = std::move(params_parser), handler = std::move(handler)](string&&, string&& body, url_response_callback&& cb) {
auto deadline = api.start();
try {
auto start = fc::time_point::now();
auto params = params_parser(body);
FC_CHECK_DEADLINE(deadline);

// call first handler on main thread (likely because it accesses non thread-safe data)
// returns a thread-safe lambda that can be enqueued on the http thread pool to complete the request
auto completion_handler = handler(api, start, deadline, params, cb);
FC_CHECK_DEADLINE(deadline); // make sure remaining time is > 0

// execute thread-safe http_handler on _http_plugin's thread pool
_http_plugin.post_http_thread_pool(std::move(completion_handler));
} catch (...) {
http_plugin::handle_exception(api_name, call_name, body, cb);
}
});
}

void chain_api_plugin::plugin_startup() {
ilog( "starting chain_api_plugin" );
my.reset(new chain_api_plugin_impl(app().get_plugin<chain_plugin>().chain()));
Expand All @@ -104,6 +79,7 @@ void chain_api_plugin::plugin_startup() {
CHAIN_RO_CALL(get_info, 200, http_params_types::no_params)}, appbase::exec_queue::read_only, appbase::priority::medium_high);
_http_plugin.add_api({
CHAIN_RO_CALL(get_activated_protocol_features, 200, http_params_types::possible_no_params),
CHAIN_RO_CALL_POST(get_block, 200, http_params_types::params_required), // _POST because get_block() returns a lambda to be executed on the http thread pool
CHAIN_RO_CALL(get_block_info, 200, http_params_types::params_required),
CHAIN_RO_CALL(get_block_header_state, 200, http_params_types::params_required),
CHAIN_RO_CALL(get_account, 200, http_params_types::params_required),
Expand Down Expand Up @@ -153,31 +129,6 @@ void chain_api_plugin::plugin_startup() {
}, appbase::exec_queue::read_only);
}

_http_plugin.add_api({
make_api_entry(
_http_plugin, ro_api, "chain", "get_block",
[](const string& body) {
return parse_params<chain_apis::read_only::get_raw_block_params, http_params_types::params_required>(body);
},
[max_response_time, &chain](auto& api, fc::time_point start, fc::time_point deadline, auto &params, const url_response_callback &cb) {

chain::signed_block_ptr block = api.get_raw_block(params, deadline);
auto max_time = std::min(chain.get_abi_serializer_max_time(), max_response_time);
auto abi_cache = api.get_block_serializers(block, max_time);
auto post_time = fc::time_point::now();
auto remaining_time = max_time - (post_time - start);

return [api, cb, deadline, post_time, remaining_time, abi_cache=std::move(abi_cache), block=std::move(block)]() mutable {
try {
auto new_deadline = deadline + (fc::time_point::now() - post_time);
fc::variant result = api.convert_block(block, std::move(abi_cache), remaining_time);
cb(200, new_deadline, std::move(result));
} catch( ... ) {
http_plugin::handle_exception("chain", "get_block", "", cb);
}
};
})},
appbase::exec_queue::read_only);
}

void chain_api_plugin::plugin_shutdown() {}
Expand Down
Loading