Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Miner (hopefully) faster sync, faster startup if submission made #1015

Merged
merged 4 commits into from
Feb 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ reputations.json
reputationStates.sqlite
reputationStates.sqlite-shm
reputationStates.sqlite-wal
justificationTreeCache.json
truffle-security-output.json
.coverage_contracts
etherrouter-address.json
Expand Down
101 changes: 86 additions & 15 deletions packages/reputation-miner/ReputationMiner.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import Database from "better-sqlite3";
import PatriciaTreeNoHash from "./patriciaNoHashKey";
import PatriciaTree from "./patricia";

const fs = require('fs').promises;
const path = require('path');

// We don't need the account address right now for this secret key, but I'm leaving it in in case we
// do in the future.
// const accountAddress = "0xbb46703786c2049d4d6dd43f5b4edf52a20fefe4";
Expand All @@ -24,6 +27,8 @@ class ReputationMiner {
constructor({ loader, minerAddress, privateKey, provider, realProviderPort = 8545, useJsTree = false, dbPath = "./reputationStates.sqlite" }) {
this.loader = loader;
this.dbPath = dbPath;
this.justificationCachePath = `${path.dirname(dbPath)}/justificationTreeCache.json`
this.justificationHashes = {}

this.useJsTree = useJsTree;
if (!this.useJsTree) {
Expand Down Expand Up @@ -192,25 +197,24 @@ class ReputationMiner {
}

/**
* When called, adds the entire contents of the current (active) log to its reputation tree. It also builds a Justification Tree as it does so
* in case a dispute is called which would require it.
* When called, adds the entire contents of the current (active) log to its reputation tree. It also optionally
* builds a Justification Tree as it does so in case a dispute is called which would require it.
* @return {Promise}
*/
async addLogContentsToReputationTree(blockNumber = "latest") {
if (this.useJsTree) {
this.justificationTree = new PatriciaTreeNoHash();
async addLogContentsToReputationTree(blockNumber = "latest", buildJustificationTree = true) {
let jtType;
if (buildJustificationTree){
if (this.useJsTree) {
kronosapiens marked this conversation as resolved.
Show resolved Hide resolved
jtType = "js";
} else {
jtType = "solidity";
}
} else {
const contractFactory = new ethers.ContractFactory(
this.patriciaTreeNoHashContractDef.abi,
this.patriciaTreeNoHashContractDef.bytecode,
this.ganacheWallet
);
jtType = "noop"
}

const contract = await contractFactory.deploy();
await contract.deployed();
await this.instantiateJustificationTree(jtType);

this.justificationTree = new ethers.Contract(contract.address, this.patriciaTreeNoHashContractDef.abi, this.ganacheWallet);
}
this.justificationHashes = {};
this.reverseReputationHashLookup = {};
const repCycle = await this.getActiveRepCycle(blockNumber);
Expand Down Expand Up @@ -1289,7 +1293,7 @@ class ReputationMiner {
if (applyLogs) {
const nLeaves = ethers.BigNumber.from(`0x${event.data.slice(66, 130)}`);
const previousBlock = event.blockNumber - 1;
await this.addLogContentsToReputationTree(previousBlock);
await this.addLogContentsToReputationTree(previousBlock, false);
localHash = await this.reputationTree.getRootHash();
const localNLeaves = this.nReputations;
if (localHash !== hash || !localNLeaves.eq(nLeaves)) {
Expand Down Expand Up @@ -1388,6 +1392,73 @@ class ReputationMiner {
}
}

async loadJustificationTree(justificationRootHash) {
this.justificationHashes = {};
let jtType;
if (this.useJsTree) {
jtType = "js"
} else {
jtType = "solidity"
}

await this.instantiateJustificationTree(jtType);

try {

const justificationHashFile = await fs.readFile(this.justificationCachePath, 'utf8')
this.justificationHashes = JSON.parse(justificationHashFile);

for (let i = 0; i < Object.keys(this.justificationHashes).length; i += 1) {
const hash = Object.keys(this.justificationHashes)[i];
const tx = await this.justificationTree.insert(
hash,
this.justificationHashes[hash].jhLeafValue,
{ gasLimit: 4000000 }
);
if (!this.useJsTree) {
await tx.wait();
}
}
} catch (err) {
console.log(err);
}

const currentJRH = await this.justificationTree.getRootHash();
if (justificationRootHash && currentJRH !== justificationRootHash) {
console.log("WARNING: The supplied JRH failed to be recreated successfully. Are you sure it was saved?");
}
}

async instantiateJustificationTree(type = "js") {
if (type === "js") {
this.justificationTree = new PatriciaTreeNoHash();
} else if (type === "solidity") {
const contractFactory = new ethers.ContractFactory(
this.patriciaTreeNoHashContractDef.abi,
this.patriciaTreeNoHashContractDef.bytecode,
this.ganacheWallet
);

const contract = await contractFactory.deploy();
await contract.deployed();

this.justificationTree = new ethers.Contract(contract.address, this.patriciaTreeNoHashContractDef.abi, this.ganacheWallet);
} else if (type === "noop") {
this.justificationTree = {
insert: () => {return { wait: () => {}}},
getRootHash: () => {},
getImpliedRoot: () => {},
getProof: () => {},
}
} else {
console.log(`UNKNOWN TYPE for justification tree instantiation: ${type}`)
}
}

async saveJustificationTree(){
await fs.writeFile(this.justificationCachePath, JSON.stringify(this.justificationHashes));
}

async getAddressesWithReputation(reputationRootHash, colonyAddress, skillId) {
const res = await this.queries.getAddressesWithReputation.all(reputationRootHash, colonyAddress.toLowerCase(), skillId);
const addresses = res.map(x => x.user_address)
Expand Down
60 changes: 50 additions & 10 deletions packages/reputation-miner/ReputationMinerClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -224,26 +224,64 @@ class ReputationMinerClient {
this.resolveBlockChecksFinished = undefined;
await this._miner.initialise(colonyNetworkAddress);

// Get latest state from database if available, otherwise sync to current state on-chain
const latestReputationHash = await this._miner.colonyNetwork.getReputationRootHash();
await this._miner.createDB();
await this._miner.loadState(latestReputationHash);
if (this._miner.nReputations.eq(0)) {
this._adapter.log("Latest state not found - need to sync");
await this._miner.sync(startingBlock, true);
let resumedSuccessfully = false;
// If we have a JRH saved, and it goes from the current (on chain) state to
// a state that we know, then let's assume it's correct
const latestConfirmedReputationHash = await this._miner.colonyNetwork.getReputationRootHash();
const repCycle = await this._miner.getActiveRepCycle();

await this._miner.loadJustificationTree();
const jhKeys = Object.keys(this._miner.justificationHashes)
const firstLeaf = jhKeys[0]
const lastLeaf = jhKeys[jhKeys.length - 1]

if (firstLeaf && lastLeaf) { // lastLeaf will never be undefined if firstLeaf isn't, but this is more semantic
const firstStateHash = this._miner.justificationHashes[firstLeaf].jhLeafValue.slice(0, 66)
const lastStateHash = this._miner.justificationHashes[lastLeaf].jhLeafValue.slice(0, 66)

if (firstStateHash === latestConfirmedReputationHash){
await this._miner.loadState(lastStateHash);
const currentStateHash = await this._miner.reputationTree.getRootHash();
if (currentStateHash === lastStateHash){
const submittedState = await repCycle.getReputationHashSubmission(this._miner.minerAddress);
if (submittedState.proposedNewRootHash === ethers.utils.hexZeroPad(0, 32)) {
resumedSuccessfully = true;
this._adapter.log("Successfully resumed pre-submission");
} else {
const jrh = await this._miner.justificationTree.getRootHash();
if (submittedState.proposedNewRootHash === currentStateHash && submittedState.jrh === jrh){
resumedSuccessfully = true;
this._adapter.log("Successfully resumed mid-submission");
}
}
}
}
}

if (!resumedSuccessfully) {
// Reset any partial loading we did trying to resume.
await this._miner.initialise(colonyNetworkAddress);

// Get latest state from database if available, otherwise sync to current state on-chain
await this._miner.createDB();
await this._miner.loadState(latestConfirmedReputationHash);
if (this._miner.nReputations.eq(0)) {
this._adapter.log("Latest state not found - need to sync");
await this._miner.sync(startingBlock, true);
}

// Initial call to process the existing log from the cycle we're currently in
await this.processReputationLog();
}

this.gasBlockAverages = [];

// Initial call to process the existing log from the cycle we're currently in
await this.processReputationLog();
this._miner.realProvider.polling = true;
this._miner.realProvider.pollingInterval = 1000;

this.blockTimeoutCheck = setTimeout(this.reportBlockTimeout.bind(this), 300000);

// Work out when the confirm timeout should be.
const repCycle = await this._miner.getActiveRepCycle();
await this._miner.updatePeriodLength(repCycle);

await this.setMiningCycleTimeout(repCycle);
Expand Down Expand Up @@ -639,6 +677,8 @@ class ReputationMinerClient {
await this._miner.addLogContentsToReputationTree();
this._adapter.log("💾 Writing new reputation state to database");
await this._miner.saveCurrentState();
this._adapter.log("💾 Caching justification tree to disk");
await this._miner.saveJustificationTree();
}

async getTwelveBestSubmissions() {
Expand Down
16 changes: 12 additions & 4 deletions packages/reputation-miner/bin/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ const {
} = argv;

class RetryProvider extends ethers.providers.StaticJsonRpcProvider {
constructor(url, adapterObject){
super(url);
constructor(connectionInfo, adapterObject){
super(connectionInfo);
this.adapter = adapterObject;
}

static attemptCheck(err, attemptNumber){
console.log("Retrying RPC request #", attemptNumber);
if (attemptNumber === 10){
if (attemptNumber === 5){
return false;
}
return true;
Expand Down Expand Up @@ -93,7 +93,15 @@ if (network) {
const rpcEndpoint = `${localProviderAddress || "http://localhost"}:${localPort || "8545"}`;
provider = new ethers.providers.JsonRpcProvider(rpcEndpoint);
} else {
const providers = providerAddress.map(endpoint => new RetryProvider(endpoint, adapterObject));
const providers = providerAddress.map(endpoint => {
const {protocol, username, password, host, pathname} = new URL(endpoint);
const connectionInfo = {
url: `${protocol}//${host}${pathname}`,
user: decodeURI(username),
password: decodeURI(password.replace(/%23/, '#')),
}
return new RetryProvider(connectionInfo, adapterObject);
})
// This is, at best, a huge hack...
providers.forEach(x => x.getNetwork());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,61 @@ process.env.SOLIDITY_COVERAGE
assert.equal(acceptedRootHash, rootHash);
});

it("should load the reputation state and JRH from disk if available", async function () {
const rootHash = await reputationMinerClient._miner.getRootHash();
const nLeaves = await reputationMinerClient._miner.getRootHashNLeaves();
const jrh = await reputationMinerClient._miner.justificationTree.getRootHash();

const repCycleEthers = await reputationMinerClient._miner.getActiveRepCycle();

class TestAdapter {
constructor() {
this.outputs = [];
}

log(line) {
this.outputs.push(line);
}
}

// start up another one - does it quick-load pre submission?
let adapter = new TestAdapter();

const reputationMinerClient2 = new ReputationMinerClient({
loader,
realProviderPort,
minerAddress: MINER1,
useJsTree: true,
auto: true,
adapter,
});
await reputationMinerClient2.initialise(colonyNetwork.address, startingBlockNumber);
expect(adapter.outputs[0]).to.equal("Successfully resumed pre-submission", "The client didn't resume pre-submission");
await reputationMinerClient2.close();

const receive2Submissions = getWaitForNSubmissionsPromise(repCycleEthers, rootHash, nLeaves, jrh, 2);

// Forward through half of the cycle duration and wait for the client to submit some entries
await forwardTime(MINING_CYCLE_DURATION / 2, this);
await receive2Submissions; // It might submit a couple more, but that's fine for the purposes of this test.
await reputationMinerClient.close();

adapter = new TestAdapter();

// start up another one.
const reputationMinerClient3 = new ReputationMinerClient({
loader,
realProviderPort,
minerAddress: MINER1,
useJsTree: true,
auto: true,
adapter,
});
await reputationMinerClient3.initialise(colonyNetwork.address, startingBlockNumber);
expect(adapter.outputs[0]).to.equal("Successfully resumed mid-submission", "The client didn't resume mid-submission");
await reputationMinerClient2.close();
});

function noEventSeen(contract, event) {
return new Promise(function (resolve, reject) {
contract.on(event, async () => {
Expand Down