Skip to content

Commit

Permalink
feat(payout): read epoch start block from contract
Browse files Browse the repository at this point in the history
REFERENCE #4
  • Loading branch information
hobofan committed Jul 18, 2018
1 parent 6dc35e8 commit 708894a
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 25 deletions.
45 changes: 39 additions & 6 deletions src/payout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@ use sync_proposition_ledger::PropositionLedger;

/// Number of host blockchain blocks that make up a epoch
// TODO: should be taken from smart contract
pub const EPOCH_LENGTH: u64 = 20;
/// This block is the start of the first epoch
// TODO: should be taken from smart contract
pub const EPOCH_START_BLOCK: u64 = 0;
pub const EPOCH_LENGTH: u64 = 100;

pub type PayoutEpochs = HashMap<u64, Vec<Payout>>;

Expand Down Expand Up @@ -88,12 +85,41 @@ impl<H: Hasher> Hashable<H> for Payout {
}
}

pub fn retrieve_epoch_start_block(
eloop_handle: &tokio_core::reactor::Handle,
config: &Config,
) -> impl Future<Item = U256, Error = ()> {
let web3 = web3::Web3::new(
web3::transports::WebSocket::with_event_loop(
config.network_address.as_ref().unwrap(),
&eloop_handle,
).unwrap(),
);

let contract = rlay_token_contract(&config, &web3);

contract
.query(
"epochs_start",
(),
None,
web3::contract::Options::default(),
None,
)
.map_err(|err| {
error!("{:?}", err);
()
})
}

/// Fill the epoch payouts map with the payouts for all completed epochs.
///
/// See also [`payouts_for_epoch`].
///
/// [`payouts_for_epoch`]: ./fn.payouts_for_epoch.html
pub fn fill_epoch_payouts(
epoch_start_block: U256,
epoch_length: U256,
ledger_block_highwatermark_mtx: &Mutex<u64>,
ledger_mtx: &Mutex<PropositionLedger>,
payout_epochs_mtx: &Mutex<PayoutEpochs>,
Expand All @@ -102,15 +128,22 @@ pub fn fill_epoch_payouts(
let ledger_block_highwatermark = ledger_block_highwatermark_mtx.lock().unwrap();
let mut payout_epochs = payout_epochs_mtx.lock().unwrap();

let latest_completed_epoch = (*ledger_block_highwatermark - EPOCH_START_BLOCK) / EPOCH_LENGTH;
let latest_completed_epoch =
(*ledger_block_highwatermark - epoch_start_block.as_u64()) / epoch_length.as_u64();
debug!("Ledger sync highwatermark: {}", ledger_block_highwatermark);
debug!("Latest completed epoch: {}", latest_completed_epoch);
for epoch in 0..=latest_completed_epoch {
if payout_epochs.contains_key(&epoch) {
continue;
}

let payouts = payouts_for_epoch(epoch, ledger_mtx, entity_map_mtx);
let payouts = payouts_for_epoch(
epoch,
epoch_start_block,
epoch_length,
ledger_mtx,
entity_map_mtx,
);
debug!("Calculated payouts for epoch {}: {:?}", epoch, payouts);
payout_epochs.insert(epoch, payouts);
}
Expand Down
6 changes: 4 additions & 2 deletions src/payout_calculation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use cid::ToCid;
use tiny_keccak::keccak256;
use rquantiles::*;

use payout::{Payout, EPOCH_LENGTH, EPOCH_START_BLOCK};
use payout::Payout;
use sync_proposition_ledger::{Proposition, PropositionLedger};
use sync_ontology::{entity_map_individuals, EntityMap};

Expand All @@ -19,6 +19,8 @@ const TOKENS_PER_BLOCK: f64 = 25000000000000000000f64;
/// that the local mirror of the ledger has been synced accordingly.
pub fn payouts_for_epoch(
epoch: u64,
epoch_start_block: U256,
epoch_length: U256,
ledger_mtx: &Mutex<PropositionLedger>,
entity_map_mtx: &Mutex<EntityMap>,
) -> Vec<Payout> {
Expand All @@ -28,7 +30,7 @@ pub fn payouts_for_epoch(
let entity_map = entity_map_mtx
.lock()
.expect("Could not gain lock for entity_map mutex");
let epoch_end = (epoch * EPOCH_LENGTH) + EPOCH_START_BLOCK;
let epoch_end = (epoch * epoch_length.as_u64()) + epoch_start_block.as_u64();

let relevant_propositions: Vec<_> = ledger
.iter()
Expand Down
46 changes: 29 additions & 17 deletions src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio_core;
use web3::futures::{self, prelude::*};
use web3::types::{Filter, Log};
use web3::types::{Filter, Log, U256};
use web3;
use rustc_hex::ToHex;

use config::Config;
use sync_ontology::{sync_ontology, Entity};
use sync_proposition_ledger::{sync_ledger, PropositionLedger};
use payout::{fill_epoch_payouts, fill_epoch_payouts_cumulative, load_epoch_payouts,
submit_epoch_payouts, Payout, PayoutEpochs};
retrieve_epoch_start_block, submit_epoch_payouts, Payout, PayoutEpochs, EPOCH_LENGTH};

// TODO: possibly contribute to rust-web3
/// Subscribe on a filter, but also get all historic logs that fit the filter
Expand Down Expand Up @@ -53,28 +53,39 @@ pub fn run_sync(config: &Config) {
let payout_epochs_cum: PayoutEpochs = HashMap::new();
let payout_epochs_cum_mutex = Arc::new(Mutex::new(payout_epochs_cum));

// Sync ontology concepts from smart contract to local state
let sync_ontology_fut = sync_ontology(eloop.handle(), config.clone(), entity_map_mutex.clone());
// Sync proposition ledger from smart contract to local state
let sync_proposition_ledger_fut = sync_ledger(
eloop.handle(),
config.clone(),
proposition_ledger_mutex.clone(),
proposition_ledger_block_highwatermark_mutex.clone(),
);
let calculate_payouts_fut = Interval::new(Duration::from_secs(15))
.for_each(|_| {
fill_epoch_payouts(
&proposition_ledger_block_highwatermark_mutex.clone(),
&proposition_ledger_mutex.clone(),
&payout_epochs_mutex.clone(),
&entity_map_mutex.clone(),
);
fill_epoch_payouts_cumulative(
&payout_epochs_mutex.clone(),
&payout_epochs_cum_mutex.clone(),
);
Ok(())
})
.map_err(|_| ());
// Calculate the payouts based on proposition ledger
let epoch_length: U256 = EPOCH_LENGTH.into();
let calculate_payouts_fut = retrieve_epoch_start_block(&eloop.handle().clone(), config)
.and_then(|epoch_start_block| {
Interval::new(Duration::from_secs(15))
.and_then(move |_| Ok(epoch_start_block))
.for_each(|epoch_start_block| {
fill_epoch_payouts(
epoch_start_block,
epoch_length,
&proposition_ledger_block_highwatermark_mutex.clone(),
&proposition_ledger_mutex.clone(),
&payout_epochs_mutex.clone(),
&entity_map_mutex.clone(),
);
fill_epoch_payouts_cumulative(
&payout_epochs_mutex.clone(),
&payout_epochs_cum_mutex.clone(),
);
Ok(())
})
.map_err(|_| ())
});
// Print some statistics about the local state
let counter_stream = Interval::new(Duration::from_secs(5))
.for_each(|_| {
let entity_map_lock = entity_map_mutex.lock().unwrap();
Expand Down Expand Up @@ -118,6 +129,7 @@ pub fn run_sync(config: &Config) {
})
.map_err(|_| ());

// Submit calculated payout roots to smart contract
let submit_handle = eloop.handle().clone();
let submit_payout_epochs_mutex = payout_epochs_mutex.clone();
let submit_payout_epochs_cum_mutex = payout_epochs_cum_mutex.clone();
Expand Down

0 comments on commit 708894a

Please sign in to comment.