Skip to content

Commit

Permalink
GH-1690 Cleanup to be more readable
Browse files Browse the repository at this point in the history
  • Loading branch information
heifner committed Nov 6, 2023
1 parent acf9e95 commit 8d428eb
Showing 1 changed file with 52 additions and 50 deletions.
102 changes: 52 additions & 50 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,6 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
return;
}

chain::controller& chain = chain_plug->chain();
const auto max_trx_time_ms = (trx_type == transaction_metadata::trx_type::read_only) ? -1 : _max_transaction_time_ms.load();
fc::microseconds max_trx_cpu_usage = max_trx_time_ms < 0 ? fc::microseconds::maximum() : fc::milliseconds(max_trx_time_ms);

Expand All @@ -815,57 +814,60 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
};
}

boost::asio::post( chain.get_thread_pool(),
[this, trx{trx}, &chain, time_limit{max_trx_cpu_usage}, trx_type,
api_trx, is_transient, next{std::move(next)}, return_failure_traces]() mutable {

transaction_metadata_ptr trx_meta;
boost::asio::post(
chain_plug->chain().get_thread_pool(), // use chain thread pool for key recovery
[this, trx{trx}, time_limit{max_trx_cpu_usage}, trx_type, is_transient, next{std::move(next)}, api_trx, return_failure_traces]() mutable {

chain::controller& chain = chain_plug->chain();
transaction_metadata_ptr trx_meta;
try {
trx_meta = transaction_metadata::recover_keys(trx, chain.get_chain_id(), time_limit, trx_type,
chain.configured_subjective_signature_length_limit());
} catch (...) {
// use read_write when read is likely fine; maintains previous behavior of next() always being called from the main thread
app().executor().post(
priority::low, exec_queue::read_write,
[this, ex_ptr{std::current_exception()}, trx{std::move(trx)}, is_transient, next{std::move(next)}]() {
auto start = fc::time_point::now();
auto idle_time = _time_tracker.add_idle_time(start);
auto trx_tracker = _time_tracker.start_trx(is_transient, start);
fc_tlog(_log, "Time since last trx: ${t}us", ("t", idle_time));
auto ex_handler = [this, is_transient, &next, &trx](fc::exception_ptr ex) {
log_trx_results(trx, nullptr, ex, 0, is_transient);
next(std::move(ex));
};
try {
std::rethrow_exception(ex_ptr);
} CATCH_AND_CALL(ex_handler)
});
return;
}

// key recovery complete, continue execution on the main thread
app().executor().post(
priority::low, exec_queue::read_write,
[this, trx_meta{std::move(trx_meta)}, is_transient, next{std::move(next)}, api_trx, return_failure_traces]() {
auto start = fc::time_point::now();
auto idle_time = _time_tracker.add_idle_time(start);
auto trx_tracker = _time_tracker.start_trx(is_transient, start);
fc_tlog(_log, "Time since last trx: ${t}us", ("t", idle_time));

auto exception_handler = [this, is_transient, &next, &trx_meta](fc::exception_ptr ex) {
log_trx_results(trx_meta->packed_trx(), nullptr, ex, 0, is_transient);
next(std::move(ex));
};
try {
trx_meta = transaction_metadata::recover_keys( trx, chain.get_chain_id(), time_limit, trx_type, chain.configured_subjective_signature_length_limit() );
} catch(...) {
// use read_write when read is likely fine; this maintains previous behavior of next() always being called from the main thread
app().executor().post(priority::low, exec_queue::read_write,
[ex_ptr = std::current_exception(), this, trx{std::move(trx)}, is_transient, next{std::move(next)}]() {
auto start = fc::time_point::now();
auto idle_time = _time_tracker.add_idle_time(start);
auto trx_tracker = _time_tracker.start_trx(is_transient, start);
fc_tlog(_log, "Time since last trx: ${t}us", ("t", idle_time));
auto ex_handler =
[this, is_transient, &next, &trx](fc::exception_ptr ex) {
log_trx_results(trx, nullptr, ex, 0, is_transient);
next(std::move(ex));
};
try {
std::rethrow_exception(ex_ptr);
} CATCH_AND_CALL(ex_handler)
});
return;
if (!process_incoming_transaction_async(trx_meta, api_trx, return_failure_traces, trx_tracker, next)) {
if (in_producing_mode()) {
schedule_maybe_produce_block(true);
} else {
restart_speculative_block();
}
}
}

app().executor().post(priority::low, exec_queue::read_write,
[this, trx_meta{std::move(trx_meta)}, api_trx, is_transient, next{std::move(next)}, return_failure_traces]() mutable {
auto start = fc::time_point::now();
auto idle_time = _time_tracker.add_idle_time(start);
auto trx_tracker = _time_tracker.start_trx(is_transient, start);
fc_tlog(_log, "Time since last trx: ${t}us", ("t", idle_time));

auto exception_handler =
[this, is_transient, &next, &trx_meta](fc::exception_ptr ex) {
log_trx_results(trx_meta->packed_trx(), nullptr, ex, 0, is_transient);
next(std::move(ex));
};
try {
if (!process_incoming_transaction_async(trx_meta, api_trx, return_failure_traces, trx_tracker, next)) {
if (in_producing_mode()) {
schedule_maybe_produce_block(true);
} else {
restart_speculative_block();
}
}
}
CATCH_AND_CALL(exception_handler);
});
} );
CATCH_AND_CALL(exception_handler);
});
});
}

bool process_incoming_transaction_async(const transaction_metadata_ptr& trx,
Expand Down

0 comments on commit 8d428eb

Please sign in to comment.