Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

implement basic scheduler #145

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 34 additions & 34 deletions libraries/chain/block_schedule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,17 @@ typedef std::hash<decltype(AccountName::value)> account_hash;
static account_hash account_hasher;

struct schedule_entry {
schedule_entry(uint _cycle, uint _thread, pending_transaction const * _transaction)
schedule_entry(uint _cycle, uint _thread, const pending_transaction& _transaction)
: cycle(_cycle)
, thread(_thread)
, transaction(_transaction)
{}

uint cycle;
uint thread;
pending_transaction const *transaction;
std::reference_wrapper<const pending_transaction> transaction;

friend bool operator<( schedule_entry const &l, schedule_entry const &r ) {
friend bool operator<( const schedule_entry& l, const schedule_entry& r ) {
if (l.cycle < r.cycle) {
return true;
} else if (l.cycle == r.cycle) {
Expand All @@ -67,18 +67,18 @@ struct schedule_entry {
}
};

static block_schedule from_entries(vector<schedule_entry> &entries) {
static block_schedule from_entries(vector<schedule_entry>& entries) {
// sort in reverse to save allocations, this should put the highest thread index
// for the highest cycle index first meaning the naive resize in the loop below
// is usually the largest and only resize
auto reverse = [](schedule_entry const &l, schedule_entry const &r) {
auto reverse = [](const schedule_entry& l, const schedule_entry& r) {
return !(l < r);
};

std::sort(entries.begin(), entries.end(), reverse);

block_schedule result;
for(auto const & entry : entries) {
for(const auto& entry : entries) {
if (result.cycles.size() <= entry.cycle) {
result.cycles.resize(entry.cycle + 1);
}
Expand All @@ -93,18 +93,18 @@ static block_schedule from_entries(vector<schedule_entry> &entries) {
// allocations, we cannot emplace_back as that would reverse
// the transactions in a thread
auto &thread = cycle.at(entry.thread);
thread.transactions.emplace(thread.transactions.begin(), *entry.transaction);
thread.transactions.emplace(thread.transactions.begin(), entry.transaction.get());
}

return result;
}

template<typename CONTAINER>
auto initialize_pointer_vector(CONTAINER const &c) {
vector<pending_transaction const *> result;
template<typename Container>
auto initialize_ref_vector(const Container& c) {
vector<std::reference_wrapper<const pending_transaction>> result;
result.reserve(c.size());
for (auto const &t : c) {
result.emplace_back(&t);
for (const auto& t : c) {
result.emplace_back(t);
}

return result;
Expand All @@ -113,17 +113,17 @@ auto initialize_pointer_vector(CONTAINER const &c) {
struct transaction_size_visitor : public fc::visitor<size_t>
{
template <typename T>
size_t operator()(const T &trx_p) const {
return fc::raw::pack_size(*trx_p);
size_t operator()(std::reference_wrapper<const T> trx) const {
return fc::raw::pack_size(trx.get());
}
};

struct block_size_skipper {
size_t current_size;
size_t const max_size;

bool should_skip(pending_transaction const *t) const {
size_t transaction_size = t->visit(transaction_size_visitor());
bool should_skip(const pending_transaction& t) const {
size_t transaction_size = t.visit(transaction_size_visitor());
// postpone transaction if it would make block too big
if( transaction_size + current_size > max_size ) {
return true;
Expand All @@ -132,21 +132,21 @@ struct block_size_skipper {
}
}

void apply(pending_transaction const *t) {
size_t transaction_size = t->visit(transaction_size_visitor());
void apply(const pending_transaction& t) {
size_t transaction_size = t.visit(transaction_size_visitor());
current_size += transaction_size;
}
};

auto make_skipper(const global_property_object &properties) {
auto make_skipper(const global_property_object& properties) {
static const size_t max_block_header_size = fc::raw::pack_size( signed_block_header() ) + 4;
auto maximum_block_size = properties.configuration.maxBlockSize;
return block_size_skipper { max_block_header_size, (size_t)maximum_block_size };
}

block_schedule block_schedule::by_threading_conflicts(
vector<pending_transaction> const &transactions,
const global_property_object &properties
const vector<pending_transaction>& transactions,
const global_property_object& properties
)
{
static uint const MAX_TXS_PER_THREAD = 4;
Expand All @@ -158,8 +158,8 @@ block_schedule block_schedule::by_threading_conflicts(
vector<schedule_entry> schedule;
schedule.reserve(transactions.size());

auto current = initialize_pointer_vector(transactions);
vector<pending_transaction const *> postponed;
auto current = initialize_ref_vector(transactions);
decltype(current) postponed;
postponed.reserve(transactions.size());

vector<uint> txs_per_thread;
Expand All @@ -179,7 +179,7 @@ block_schedule block_schedule::by_threading_conflicts(

auto assigned_to = optional<uint>();
bool postpone = false;
auto scopes = t->visit(scope_extracting_visitor());
auto scopes = t.get().visit(scope_extracting_visitor());

for (const auto &a : scopes) {
uint hash_index = account_hasher(a) % HASH_SIZE;
Expand Down Expand Up @@ -218,9 +218,9 @@ block_schedule block_schedule::by_threading_conflicts(
}
}

current.resize(0);
txs_per_thread.resize(0);
assigned_threads.resize(0);
current.clear();
txs_per_thread.clear();
assigned_threads.clear();
assigned_threads.resize(HASH_SIZE);
std::swap(current, postponed);
++cycle;
Expand All @@ -230,8 +230,8 @@ block_schedule block_schedule::by_threading_conflicts(
}

block_schedule block_schedule::by_cycling_conflicts(
vector<pending_transaction> const &transactions,
const global_property_object &properties
const vector<pending_transaction>& transactions,
const global_property_object& properties
)
{
auto skipper = make_skipper(properties);
Expand All @@ -240,8 +240,8 @@ block_schedule block_schedule::by_cycling_conflicts(
vector<schedule_entry> schedule;
schedule.reserve(transactions.size());

auto current = initialize_pointer_vector(transactions);
vector<pending_transaction const *> postponed;
auto current = initialize_ref_vector(transactions);
decltype(current) postponed;
postponed.reserve(transactions.size());

int cycle = 0;
Expand All @@ -256,7 +256,7 @@ block_schedule block_schedule::by_cycling_conflicts(
continue;
}

auto scopes = t->visit(scope_extracting_visitor());
auto scopes = t.get().visit(scope_extracting_visitor());
bool u = false;
for (const auto &a : scopes) {
uint hash_index = account_hasher(a) % HASH_SIZE;
Expand All @@ -279,8 +279,8 @@ block_schedule block_schedule::by_cycling_conflicts(
}
}

current.resize(0);
used.resize(0);
current.clear();
used.clear();
used.resize(HASH_SIZE);
std::swap(current, postponed);
++cycle;
Expand Down
26 changes: 12 additions & 14 deletions libraries/chain/chain_controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,12 +303,12 @@ signed_block chain_controller::_generate_block(

vector<pending_transaction> pending;
pending.reserve(generated.size() + _pending_transactions.size());
for (auto const &gt: generated) {
pending.emplace_back(pending_transaction {&gt.trx});
for (const auto& gt: generated) {
pending.emplace_back(std::reference_wrapper<const GeneratedTransaction> {gt.trx});
}

for(auto const &st: _pending_transactions) {
pending.emplace_back(pending_transaction {&st});
for(const auto& st: _pending_transactions) {
pending.emplace_back(std::reference_wrapper<const SignedTransaction> {st});
}

auto schedule = scheduler(pending, get_global_properties());
Expand Down Expand Up @@ -345,15 +345,14 @@ signed_block chain_controller::_generate_block(
try
{
auto temp_session = _db.start_undo_session(true);
if (trx.contains<SignedTransaction const *>()) {
auto const &t = *trx.get<SignedTransaction const *>();
if (trx.contains<std::reference_wrapper<const SignedTransaction>>()) {
const auto& t = trx.get<std::reference_wrapper<const SignedTransaction>>().get();
validate_referenced_accounts(t);
check_transaction_authorization(t);
auto processed = _apply_transaction(t);
block_thread.user_input.emplace_back(processed);
} else if (trx.contains<GeneratedTransaction const *>()) {
#warning TODO: Process generated transaction
// auto processed = _apply_transaction(*trx.get<GeneratedTransaction const *>());
} else if (trx.contains<std::reference_wrapper<const GeneratedTransaction>>()) {
// auto processed = _apply_transaction(trx.get<std::reference_wrapper<const GeneratedTransaction>>().get());
// block_thread.generated_input.emplace_back(processed);
} else {
FC_THROW_EXCEPTION(tx_scheduling_exception, "Unknown transaction type in block_schedule");
Expand All @@ -366,10 +365,10 @@ signed_block chain_controller::_generate_block(
{
// Do nothing, transaction will not be re-applied
wlog( "Transaction was not processed while generating block due to ${e}", ("e", e) );
if (trx.contains<SignedTransaction const *>()) {
wlog( "The transaction was ${t}", ("t", *trx.get<SignedTransaction const *>()) );
} else if (trx.contains<GeneratedTransaction const *>()) {
wlog( "The transaction was ${t}", ("t", *trx.get<GeneratedTransaction const *>()) );
if (trx.contains<std::reference_wrapper<const SignedTransaction>>()) {
wlog( "The transaction was ${t}", ("t", trx.get<std::reference_wrapper<const SignedTransaction>>().get()) );
} else if (trx.contains<std::reference_wrapper<const GeneratedTransaction>>()) {
wlog( "The transaction was ${t}", ("t", trx.get<std::reference_wrapper<const GeneratedTransaction>>().get()) );
}
invalid_transaction_count++;
}
Expand Down Expand Up @@ -503,7 +502,6 @@ void chain_controller::_apply_block(const signed_block& next_block)
for (const auto& cycle : next_block.cycles) {
for (const auto& thread : cycle) {
for(const auto& trx : thread.generated_input ) {
#warning TODO: Process generated transaction
}
for(const auto& trx : thread.user_input ) {
_apply_transaction(trx);
Expand Down
67 changes: 7 additions & 60 deletions libraries/chain/include/eos/chain/block_schedule.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include <set>

namespace eos { namespace chain {
using pending_transaction = static_variant<SignedTransaction const *, GeneratedTransaction const *>;
using pending_transaction = static_variant<std::reference_wrapper<const SignedTransaction>, std::reference_wrapper<const GeneratedTransaction>>;

struct thread_schedule {
vector<pending_transaction> transactions;
Expand All @@ -43,7 +43,7 @@ namespace eos { namespace chain {
*/
struct block_schedule
{
typedef block_schedule (*factory)(vector<pending_transaction> const &, const global_property_object&);
typedef block_schedule (*factory)(const vector<pending_transaction>&, const global_property_object&);
vector<cycle_schedule> cycles;

// Algorithms
Expand All @@ -53,73 +53,20 @@ namespace eos { namespace chain {
* falling back on cycles
* @return the block scheduler
*/
static block_schedule by_threading_conflicts(vector<pending_transaction> const &transactions, const global_property_object& properties);
static block_schedule by_threading_conflicts(const vector<pending_transaction>& transactions, const global_property_object& properties);

/**
* A greedy scheduler that attempts uses future cycles to resolve scope contention
* @return the block scheduler
*/
static block_schedule by_cycling_conflicts(vector<pending_transaction> const &transactions, const global_property_object& properties);

/*
* templated meta schedulers for fuzzing
*/
template<typename NEXT>
struct shuffled_functor {
shuffled_functor(NEXT &&_next) : next(_next){};

block_schedule operator()(vector<pending_transaction> const &transactions, const global_property_object& properties) {
std::random_device rd;
std::mt19937 rng(rd());
auto copy = std::vector<pending_transaction>(transactions);
std::shuffle(copy.begin(), copy.end(), rng);
return next(copy, properties);
}

NEXT &&next;
};

template<typename NEXT>
static shuffled_functor<NEXT> shuffled(NEXT &&next) {
return shuffled_functor<NEXT>(next);
}

template<int NUM, int DEN, typename NEXT>
struct lossy_functor {
lossy_functor(NEXT &&_next) : next(_next){};

block_schedule operator()(vector<pending_transaction> const &transactions, const global_property_object& properties) {
std::random_device rd;
std::mt19937 rng(rd());
std::uniform_real_distribution<> dist(0, 1);
double const cutoff = (double)NUM / (double)DEN;

auto copy = std::vector<pending_transaction>();
copy.reserve(transactions.size());
std::copy_if (transactions.begin(), transactions.end(), copy.begin(), [&](pending_transaction const& trx){
return dist(rng) >= cutoff;
});

return next(copy, properties);
}

NEXT &&next;
};

template<int NUM, int DEN, typename NEXT>
static lossy_functor<NUM, DEN, NEXT> lossy(NEXT &&next) {
return lossy_functor<NUM, DEN, NEXT>(next);
}
static block_schedule by_cycling_conflicts(const vector<pending_transaction>& transactions, const global_property_object& properties);
};

struct scope_extracting_visitor : public fc::visitor<std::set<AccountName>> {
template <typename T>
std::set<AccountName> operator()(const T &trx_p) const {
std::set<AccountName> unique_names(trx_p->scope.begin(), trx_p->scope.end());
for (auto const &m : trx_p->messages) {
unique_names.insert(m.code);
}

std::set<AccountName> operator()(std::reference_wrapper<const T> trx) const {
const auto& t = trx.get();
std::set<AccountName> unique_names(t.scope.begin(), t.scope.end());
return unique_names;
}
};
Expand Down
Loading