Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

amqp_trace_plugin SHiP protocol for transaction_trace #10057

Merged
merged 3 commits into from
Feb 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions libraries/state_history/include/eosio/state_history/type_convert.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#pragma once

#include <eosio/ship_protocol.hpp>
#include <eosio/state_history/types.hpp>
#include <eosio/chain_types.hpp>

namespace eosio {
namespace state_history {

template <typename T>
auto to_uint64_t(T n) -> std::enable_if_t<std::is_same_v<T, eosio::chain::name>, decltype(n.value)> {
return n.value;
}
template <typename T>
auto to_uint64_t(T n) -> std::enable_if_t<std::is_same_v<T, eosio::chain::name>, decltype(n.to_uint64_t())> {
return n.to_uint64_t();
}

eosio::checksum256 convert(const eosio::chain::checksum_type& obj) {
static_assert( sizeof(eosio::checksum256) == sizeof(eosio::chain::checksum_type), "convert may need updated" );
std::array<uint8_t, 32> bytes;
static_assert(bytes.size() == sizeof(obj));
memcpy(bytes.data(), &obj, bytes.size());
return eosio::checksum256(bytes);
}

eosio::ship_protocol::account_delta convert(const eosio::chain::account_delta& obj) {
static_assert( sizeof(eosio::ship_protocol::account_delta) == sizeof(eosio::chain::account_delta), "convert may need updated" );
static_assert( fc::reflector<eosio::chain::account_delta>::total_member_count == 2, "convert may need updated" );
eosio::ship_protocol::account_delta result;
result.account.value = to_uint64_t(obj.account);
result.delta = obj.delta;
return result;
}

eosio::ship_protocol::action_receipt_v0 convert(const eosio::chain::action_receipt& obj) {
static_assert( fc::reflector<eosio::chain::action_receipt>::total_member_count == 7, "convert may need updated" );
eosio::ship_protocol::action_receipt_v0 result;
result.receiver.value = to_uint64_t(obj.receiver);
result.act_digest = convert(obj.act_digest);
result.global_sequence = obj.global_sequence;
result.recv_sequence = obj.recv_sequence;
for (auto& auth : obj.auth_sequence)
result.auth_sequence.push_back({ eosio::name{ to_uint64_t(auth.first) }, auth.second });
result.code_sequence.value = obj.code_sequence.value;
result.abi_sequence.value = obj.abi_sequence.value;
return result;
}

eosio::ship_protocol::action convert(const eosio::chain::action& obj) {
static_assert( sizeof(eosio::ship_protocol::action) == sizeof(std::tuple<eosio::name,eosio::name,std::vector<permission_level>,eosio::input_stream>), "convert may need updated" );
static_assert( fc::reflector<eosio::chain::action>::total_member_count == 4, "convert may need updated" );
eosio::ship_protocol::action result;
result.account.value = to_uint64_t(obj.account);
result.name.value = to_uint64_t(obj.name);
for (auto& auth : obj.authorization)
result.authorization.push_back(
{ eosio::name{ to_uint64_t(auth.actor) }, eosio::name{ to_uint64_t(auth.permission) } });
result.data = { obj.data.data(), obj.data.data() + obj.data.size() };
return result;
}

eosio::ship_protocol::action_trace_v1 convert(const eosio::chain::action_trace& obj) {
static_assert( fc::reflector<eosio::chain::action_trace>::total_member_count == 18, "convert may need updated" );
eosio::ship_protocol::action_trace_v1 result;
result.action_ordinal.value = obj.action_ordinal.value;
result.creator_action_ordinal.value = obj.creator_action_ordinal.value;
if (obj.receipt)
result.receipt = convert(*obj.receipt);
result.receiver.value = to_uint64_t(obj.receiver);
result.act = convert(obj.act);
result.context_free = obj.context_free;
result.elapsed = obj.elapsed.count();
result.console = obj.console;
for (auto& delta : obj.account_ram_deltas) result.account_ram_deltas.push_back(convert(delta));
for (auto& delta : obj.account_disk_deltas) result.account_disk_deltas.push_back(convert(delta));
if (obj.except)
result.except = obj.except->to_string();
if (obj.error_code)
result.error_code = *obj.error_code;
result.return_value = { obj.return_value.data(), obj.return_value.size() };
return result;
}

eosio::ship_protocol::transaction_trace_v0 convert(const eosio::chain::transaction_trace& obj) {
static_assert( fc::reflector<eosio::chain::transaction_trace>::total_member_count == 13, "convert may need updated" );
eosio::ship_protocol::transaction_trace_v0 result{};
result.id = convert(obj.id);
if (obj.receipt) {
result.status = (eosio::ship_protocol::transaction_status)obj.receipt->status.value;
result.cpu_usage_us = obj.receipt->cpu_usage_us;
result.net_usage_words = obj.receipt->net_usage_words.value;
} else {
result.status = eosio::ship_protocol::transaction_status::hard_fail;
}
result.elapsed = obj.elapsed.count();
result.net_usage = obj.net_usage;
result.scheduled = obj.scheduled;
for (auto& at : obj.action_traces) result.action_traces.push_back(convert(at));
if (obj.account_ram_delta)
result.account_ram_delta = convert(*obj.account_ram_delta);
if (obj.except)
result.except = obj.except->to_string();
if (obj.error_code)
result.error_code = *obj.error_code;
if (obj.failed_dtrx_trace)
result.failed_dtrx_trace.push_back({ convert(*obj.failed_dtrx_trace) });
return result;
}

}}
3 changes: 2 additions & 1 deletion plugins/amqp_trace_plugin/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
file(GLOB HEADERS "include/eosio/amqp_trace_plugin/*.hpp")
add_library( amqp_trace_plugin
amqp_trace_plugin.cpp
amqp_trace_plugin_impl.cpp
${HEADERS} )

target_link_libraries( amqp_trace_plugin chain_plugin amqp appbase fc amqpcpp )
target_link_libraries( amqp_trace_plugin state_history chain_plugin amqp abieos appbase fc amqpcpp )
target_include_directories( amqp_trace_plugin PUBLIC include )
60 changes: 3 additions & 57 deletions plugins/amqp_trace_plugin/amqp_trace_plugin.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <eosio/amqp_trace_plugin/amqp_trace_plugin.hpp>
#include <eosio/amqp_trace_plugin/amqp_trace_plugin_impl.hpp>
#include <eosio/amqp/amqp_handler.hpp>
#include <eosio/chain_plugin/chain_plugin.hpp>

Expand All @@ -16,61 +17,6 @@ static appbase::abstract_plugin& amqp_trace_plugin_ = appbase::app().register_pl

namespace eosio {

using boost::signals2::scoped_connection;

struct amqp_trace_plugin_impl : std::enable_shared_from_this<amqp_trace_plugin_impl> {

std::optional<amqp> amqp_trace;
std::optional<scoped_connection> applied_transaction_connection;

std::string amqp_trace_address;
std::string amqp_trace_exchange;
bool started = false;

public:

// called from any thread
void publish_error( std::string tid, int64_t error_code, std::string error_message ) {
try {
transaction_trace_msg msg{transaction_trace_exception{error_code}};
std::get<transaction_trace_exception>(msg).error_message = std::move( error_message );
auto buf = fc::raw::pack( msg );
amqp_trace->publish( amqp_trace_exchange, tid, std::move( buf ) );
} FC_LOG_AND_DROP()
}

// called on application thread
void on_applied_transaction(const chain::transaction_trace_ptr& trace, const chain::packed_transaction_ptr& t) {
try {
publish_result( t, trace );
} FC_LOG_AND_DROP()
}

private:

// called from application thread
void publish_result( const chain::packed_transaction_ptr& trx, const chain::transaction_trace_ptr& trace ) {
try {
if( !trace->except ) {
dlog( "chain accepted transaction, bcast ${id}", ("id", trace->id) );
} else {
dlog( "trace except : ${m}", ("m", trace->except->to_string()) );
}
amqp_trace->publish( amqp_trace_exchange, trx->id(), [trace]() {
fc::unsigned_int which = fc::get_index<transaction_trace_msg, chain::transaction_trace>();
uint32_t payload_size = fc::raw::pack_size( which );
payload_size += fc::raw::pack_size( *trace );
std::vector<char> buf( payload_size );
fc::datastream<char*> ds( buf.data(), payload_size );
fc::raw::pack( ds, which );
fc::raw::pack( ds, *trace );
return buf;
});
} FC_LOG_AND_DROP()
}

};

amqp_trace_plugin::amqp_trace_plugin()
: my(std::make_shared<amqp_trace_plugin_impl>()) {}

Expand Down Expand Up @@ -114,7 +60,7 @@ void amqp_trace_plugin::plugin_startup() {
auto chain_plug = app().find_plugin<chain_plugin>();
EOS_ASSERT( chain_plug, chain::missing_chain_plugin_exception, "chain_plugin required" );

my->applied_transaction_connection.emplace(
applied_transaction_connection.emplace(
chain_plug->chain().applied_transaction.connect(
[me = my]( std::tuple<const chain::transaction_trace_ptr&, const chain::packed_transaction_ptr&> t ) {
me->on_applied_transaction( std::get<0>( t ), std::get<1>( t ) );
Expand All @@ -133,7 +79,7 @@ void amqp_trace_plugin::plugin_shutdown() {
try {
dlog( "shutdown.." );

my->applied_transaction_connection.reset();
applied_transaction_connection.reset();
if( my->amqp_trace ) {
// use stop() instead of destroying amqp_trace as amqp_trx_plugin keeps a pointer to amqp_trace
// and needs to live until amqp_trx_plugin shutdown.
Expand Down
49 changes: 49 additions & 0 deletions plugins/amqp_trace_plugin/amqp_trace_plugin_impl.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#include <eosio/amqp_trace_plugin/amqp_trace_types.hpp>
#include <eosio/amqp_trace_plugin/amqp_trace_plugin_impl.hpp>
#include <eosio/state_history/type_convert.hpp>
#include <eosio/for_each_field.hpp>
#include <eosio/to_bin.hpp>

#include <eosio/chain/exceptions.hpp>
#include <eosio/chain/transaction.hpp>

namespace eosio {

// called from any thread
void amqp_trace_plugin_impl::publish_error( std::string tid, int64_t error_code, std::string error_message ) {
try {
transaction_trace_msg msg{transaction_trace_exception{error_code}};
std::get<transaction_trace_exception>( msg ).error_message = std::move( error_message );
auto buf = convert_to_bin( msg );
amqp_trace->publish( amqp_trace_exchange, tid, std::move( buf ) );
}
FC_LOG_AND_DROP()
}

// called on application thread
void amqp_trace_plugin_impl::on_applied_transaction( const chain::transaction_trace_ptr& trace,
const chain::packed_transaction_ptr& t ) {
try {
publish_result( t, trace );
}
FC_LOG_AND_DROP()
}

// called from application thread
void amqp_trace_plugin_impl::publish_result( const chain::packed_transaction_ptr& trx,
const chain::transaction_trace_ptr& trace ) {
try {
if( !trace->except ) {
dlog( "chain accepted transaction, bcast ${id}", ("id", trace->id) );
} else {
dlog( "trace except : ${m}", ("m", trace->except->to_string()) );
}
amqp_trace->publish( amqp_trace_exchange, trx->id(), [trace]() {
transaction_trace_msg msg{ eosio::state_history::convert( *trace ) };
return convert_to_bin( msg );
} );
}
FC_LOG_AND_DROP()
}

} // namespace eosio
Original file line number Diff line number Diff line change
@@ -1,26 +1,20 @@
#pragma once
#include <eosio/chain_plugin/chain_plugin.hpp>
#include <appbase/application.hpp>
#include <appbase/plugin.hpp>
#include <boost/signals2/connection.hpp>

namespace eosio {

// publish message types
struct transaction_trace_exception {
int64_t error_code; ///< fc::exception code()
std::string error_message;
};
using transaction_trace_msg = std::variant<transaction_trace_exception, chain::transaction_trace>;

class amqp_trace_plugin : public appbase::plugin<amqp_trace_plugin> {

public:
APPBASE_PLUGIN_REQUIRES((chain_plugin))
APPBASE_PLUGIN_REQUIRES()

amqp_trace_plugin();
virtual ~amqp_trace_plugin();

virtual void set_program_options(options_description& cli, options_description& cfg) override;
void plugin_initialize(const variables_map& options);
virtual void set_program_options(appbase::options_description& cli, appbase::options_description& cfg) override;
void plugin_initialize(const appbase::variables_map& options);
void plugin_startup();
void plugin_shutdown();
void handle_sighup() override;
Expand All @@ -30,8 +24,7 @@ class amqp_trace_plugin : public appbase::plugin<amqp_trace_plugin> {

private:
std::shared_ptr<struct amqp_trace_plugin_impl> my;
std::optional<boost::signals2::scoped_connection> applied_transaction_connection;
};

} // namespace eosio

FC_REFLECT( eosio::transaction_trace_exception, (error_code)(error_message) );
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#pragma once
#include <eosio/amqp/amqp_handler.hpp>
#include <eosio/chain/trace.hpp>
#include <eosio/chain/transaction.hpp>

namespace eosio {

struct amqp_trace_plugin_impl : std::enable_shared_from_this<amqp_trace_plugin_impl> {

std::optional<amqp> amqp_trace;

std::string amqp_trace_address;
std::string amqp_trace_exchange;
bool started = false;

public:

// called from any thread
void publish_error( std::string tid, int64_t error_code, std::string error_message );

// called on application thread
void on_applied_transaction(const chain::transaction_trace_ptr& trace, const chain::packed_transaction_ptr& t);

private:

// called from application thread
void publish_result( const chain::packed_transaction_ptr& trx, const chain::transaction_trace_ptr& trace );
};

} // namespace eosio
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#include <eosio/ship_protocol.hpp>
#include <eosio/to_bin.hpp>
#include <variant>

namespace eosio {

// publish message types
struct transaction_trace_exception {
std::int64_t error_code; ///< fc::exception code()
std::string error_message;
};

using transaction_trace_msg = std::variant<eosio::transaction_trace_exception, eosio::ship_protocol::transaction_trace>;

EOSIO_REFLECT( eosio::transaction_trace_exception, error_code, error_message );

} // namespace eosio

Loading