Skip to content

Commit

Permalink
Merge pull request #1066 from JoinColony/maint/latestState
Browse files Browse the repository at this point in the history
Add endpoint to oracle to facilitate fast-sync
  • Loading branch information
kronosapiens authored Jul 11, 2022
2 parents 9353cec + 63ee3da commit a0dac33
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 29 deletions.
95 changes: 69 additions & 26 deletions packages/reputation-miner/ReputationMiner.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ class ReputationMiner {
await this.updatePeriodLength(repCycle);
this.db = new Database(this.dbPath, { });
// this.db = await sqlite.open({filename: this.dbPath, driver: sqlite3.Database});
await this.createDB();
await ReputationMiner.createDB(this.db);
this.prepareQueries()

// NB some technical debt here with names. The minerAddress arg passed in on the command line, which
// has been used for realWallet and is where transactions will be signed from may or may not be acting
Expand Down Expand Up @@ -1470,6 +1471,10 @@ class ReputationMiner {
}
}

async getAllReputationsInHash(reputationRootHash) {
return this.queries.getAllReputationsInHash.all(reputationRootHash);
}

async loadState(reputationRootHash) {
this.nReputations = ethers.constants.Zero;
this.reputations = {};
Expand Down Expand Up @@ -1618,13 +1623,53 @@ class ReputationMiner {
}});
}

async createDB() {
async saveLatestToFile() {
const latestConfirmedReputationHash = await this.colonyNetwork.getReputationRootHash();
const currentNLeaves = await this.colonyNetwork.getReputationRootHashNNodes();

const db = new Database("./latestConfirmed.sqlite", { });
await ReputationMiner.createDB(db);
const allReputations = await this.queries.getAllReputationsInHash.all(latestConfirmedReputationHash);

if (allReputations.length === 0) {
return new Error("No such reputation state");
}

const saveHashAndLeaves = db.prepare(`INSERT OR IGNORE INTO reputation_states (root_hash, n_leaves) VALUES (?, ?)`);
const saveColony = db.prepare(`INSERT OR IGNORE INTO colonies (address) VALUES (?)`);
const saveUser = db.prepare(`INSERT OR IGNORE INTO users (address) VALUES (?)`);
const saveSkill = db.prepare(`INSERT OR IGNORE INTO skills (skill_id) VALUES (?)`);

const insertReputation = db.prepare(
`INSERT OR IGNORE INTO reputations (reputation_rowid, colony_rowid, skill_id, user_rowid, value)
SELECT
(SELECT reputation_states.rowid FROM reputation_states WHERE reputation_states.root_hash=?),
(SELECT colonies.rowid FROM colonies WHERE colonies.address=?),
?,
(SELECT users.rowid FROM users WHERE users.address=?),
?`
);
saveHashAndLeaves.run(latestConfirmedReputationHash, currentNLeaves.toString());
for (let i = 0; i < Object.keys(allReputations).length; i += 1) {
const reputation = allReputations[i];
const { skill_id: skillId, value, colony_address: colonyAddress, user_address: userAddress } = reputation;
saveColony.run(colonyAddress);
saveUser.run(userAddress);
saveSkill.run(skillId);
insertReputation.run(latestConfirmedReputationHash, colonyAddress, skillId, userAddress, value);
}

await db.close()
return "./latestConfirmed.sqlite";
}

static async createDB(db) {
// Not regularly used, so not preparing them and saving the statement for reuse
await this.db.prepare("CREATE TABLE IF NOT EXISTS users ( address text NOT NULL UNIQUE )").run();
await this.db.prepare("CREATE TABLE IF NOT EXISTS reputation_states ( root_hash text NOT NULL UNIQUE, n_leaves INTEGER NOT NULL)").run();
await this.db.prepare("CREATE TABLE IF NOT EXISTS colonies ( address text NOT NULL UNIQUE )").run();
await this.db.prepare("CREATE TABLE IF NOT EXISTS skills ( skill_id INTEGER PRIMARY KEY )").run();
await this.db.prepare(
await db.prepare("CREATE TABLE IF NOT EXISTS users ( address text NOT NULL UNIQUE )").run();
await db.prepare("CREATE TABLE IF NOT EXISTS reputation_states ( root_hash text NOT NULL UNIQUE, n_leaves INTEGER NOT NULL)").run();
await db.prepare("CREATE TABLE IF NOT EXISTS colonies ( address text NOT NULL UNIQUE )").run();
await db.prepare("CREATE TABLE IF NOT EXISTS skills ( skill_id INTEGER PRIMARY KEY )").run();
await db.prepare(
`CREATE TABLE IF NOT EXISTS reputations (
reputation_rowid text NOT NULL,
colony_rowid INTEGER NOT NULL,
Expand All @@ -1636,27 +1681,27 @@ class ReputationMiner {
).run();

// Do we have to do a database upgrade, from when we renamed n_nodes to n_leaves?
const nNodesColumn = await this.db.prepare("SELECT * FROM PRAGMA_TABLE_INFO('reputation_states') WHERE name='n_nodes';").all()
const nNodesColumn = await db.prepare("SELECT * FROM PRAGMA_TABLE_INFO('reputation_states') WHERE name='n_nodes';").all()
if (nNodesColumn.length === 1) {
await this.db.prepare("ALTER TABLE 'reputation_states' RENAME COLUMN n_nodes to n_leaves").run();
const check1 = await this.db.prepare("SELECT * FROM PRAGMA_TABLE_INFO('reputation_states') where name='n_nodes'").all()
const check2 = await this.db.prepare("SELECT * FROM PRAGMA_TABLE_INFO('reputation_states') where name='n_leaves'").all()
await db.prepare("ALTER TABLE 'reputation_states' RENAME COLUMN n_nodes to n_leaves").run();
const check1 = await db.prepare("SELECT * FROM PRAGMA_TABLE_INFO('reputation_states') where name='n_nodes'").all()
const check2 = await db.prepare("SELECT * FROM PRAGMA_TABLE_INFO('reputation_states') where name='n_leaves'").all()
if (check1.length !== 0 || check2.length !== 1){
console.log('Unexpected result of DB upgrade');
process.exit();
}
console.log('n_nodes -> n_leaves database upgrade complete');
}
await this.db.pragma('journal_mode = WAL');
await this.db.prepare('CREATE INDEX IF NOT EXISTS reputation_states_root_hash ON reputation_states (root_hash)').run();
await this.db.prepare('CREATE INDEX IF NOT EXISTS users_address ON users (address)').run();
await this.db.prepare('CREATE INDEX IF NOT EXISTS reputation_skill_id ON reputations (skill_id)').run();
await this.db.prepare('CREATE INDEX IF NOT EXISTS colonies_address ON colonies (address)').run();
await db.pragma('journal_mode = WAL');
await db.prepare('CREATE INDEX IF NOT EXISTS reputation_states_root_hash ON reputation_states (root_hash)').run();
await db.prepare('CREATE INDEX IF NOT EXISTS users_address ON users (address)').run();
await db.prepare('CREATE INDEX IF NOT EXISTS reputation_skill_id ON reputations (skill_id)').run();
await db.prepare('CREATE INDEX IF NOT EXISTS colonies_address ON colonies (address)').run();

// We added a composite key to reputations - do we need to port it over?
let res = await this.db.prepare("SELECT COUNT(pk) AS c FROM PRAGMA_TABLE_INFO('reputations') WHERE pk <> 0").all();
let res = await db.prepare("SELECT COUNT(pk) AS c FROM PRAGMA_TABLE_INFO('reputations') WHERE pk <> 0").all();
if (res[0].c === 0){
await this.db.prepare(
await db.prepare(
`CREATE TABLE reputations2 (
reputation_rowid text NOT NULL,
colony_rowid INTEGER NOT NULL,
Expand All @@ -1667,23 +1712,21 @@ class ReputationMiner {
)`
).run();


await this.db.prepare(`INSERT INTO reputations2 SELECT * FROM reputations`).run()
await this.db.prepare(`DROP TABLE reputations`).run()
await this.db.prepare(`ALTER TABLE reputations2 RENAME TO reputations`).run()
await db.prepare(`INSERT INTO reputations2 SELECT * FROM reputations`).run()
await db.prepare(`DROP TABLE reputations`).run()
await db.prepare(`ALTER TABLE reputations2 RENAME TO reputations`).run()
console.log("Composite primary key added to reputations table")
}

// Do we need to add an index to the reputations table for the /all endpoint?
res = await this.db.prepare("SELECT COUNT(*) AS c FROM sqlite_master WHERE type='index' and name='allEndpoint'").all();
res = await db.prepare("SELECT COUNT(*) AS c FROM sqlite_master WHERE type='index' and name='allEndpoint'").all();
if (res[0].c === 0){
await this.db.prepare(
await db.prepare(
`CREATE INDEX allEndpoint ON reputations (colony_rowid, user_rowid, reputation_rowid)`
).run()
console.log("Index for /all endpoint added to reputations table")
}

this.prepareQueries()
}

async resetDB() {
Expand All @@ -1693,7 +1736,7 @@ class ReputationMiner {
await this.db.prepare(`DROP TABLE IF EXISTS skills`).run();
await this.db.prepare(`DROP TABLE IF EXISTS reputations`).run();
await this.db.prepare(`DROP TABLE IF EXISTS reputation_states`).run();
await this.createDB();
await ReputationMiner.createDB(this.db);
}
}

Expand Down
13 changes: 11 additions & 2 deletions packages/reputation-miner/ReputationMinerClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,16 @@ class ReputationMinerClient {
return res.status(500).send({ message: "An error occurred querying the reputation" });
}
});

this._app.get("/latestState", cache('1 hour'), async (req, res) => {
try {
const dbPathLatest = await this._miner.saveLatestToFile();
return res.download(dbPathLatest)
} catch (err) {
console.log(err)
return res.status(500).send({ message: "An error occurred generating the database of the state" });
}
});
}
}

Expand Down Expand Up @@ -263,13 +273,12 @@ class ReputationMinerClient {
await this._miner.initialise(colonyNetworkAddress);

// Get latest state from database if available, otherwise sync to current state on-chain
await this._miner.createDB();
await ReputationMiner.createDB(this._miner.db);
await this._miner.loadState(latestConfirmedReputationHash);
if (this._miner.nReputations.eq(0)) {
this._adapter.log("Latest state not found - need to sync");
await this._miner.sync(startingBlock, true);
}

// Initial call to process the existing log from the cycle we're currently in
await this.processReputationLog();
}
Expand Down
49 changes: 48 additions & 1 deletion test/reputation-system/client-sync-functionality.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
const path = require("path");
const chai = require("chai");
const bnChai = require("bn-chai");
const fs = require("fs");
const request = require("async-request");

const { TruffleLoader } = require("../../packages/package-utils");
const { DEFAULT_STAKE, INITIAL_FUNDING, UINT256_MAX } = require("../../helpers/constants");
const { forwardTime, currentBlock, advanceMiningCycleNoContest, getActiveRepCycle } = require("../../helpers/test-helper");
const { forwardTime, currentBlock, advanceMiningCycleNoContest, getActiveRepCycle, TestAdapter } = require("../../helpers/test-helper");
const { giveUserCLNYTokensAndStake, setupFinalizedTask, fundColonyWithTokens } = require("../../helpers/test-data-generator");
const ReputationMinerTestWrapper = require("../../packages/reputation-miner/test/ReputationMinerTestWrapper");
const ReputationMinerClient = require("../../packages/reputation-miner/ReputationMinerClient");

const { expect } = chai;
chai.use(bnChai(web3.utils.BN));
Expand Down Expand Up @@ -226,5 +229,49 @@ process.env.SOLIDITY_COVERAGE
const clientHash3 = await reputationMiner1.reputationTree.getRootHash();
expect(clientHash2).to.equal(clientHash3);
});

it("should be able to download a sqlite file containing the latest state", async () => {
const adapter = new TestAdapter();
const client = new ReputationMinerClient({ loader, realProviderPort, minerAddress: MINER1, useJsTree: true, auto: false, adapter });
await client.initialise(colonyNetwork.address, 1);

await fundColonyWithTokens(metaColony, clnyToken, INITIAL_FUNDING.muln(100));
await setupFinalizedTask({ colonyNetwork, colony: metaColony, token: clnyToken, worker: MINER1, manager: accounts[6] });

await advanceMiningCycleNoContest({ colonyNetwork, client: reputationMiner1, test: this });
await advanceMiningCycleNoContest({ colonyNetwork, client: reputationMiner1, test: this });
await reputationMiner1.saveCurrentState();

const currentState = await colonyNetwork.getReputationRootHash();
await colonyNetwork.getReputationRootHashNLeaves();

const url = `http://127.0.0.1:3000/latestState`;
const res = await request(url);
expect(res.statusCode).to.equal(200);

const fileName = "./latestConfirmed.sqlite";

// Does it exist?
expect(fs.existsSync(fileName)).to.equal(true);

// Does it contain a valid state?
const reputationMiner3 = new ReputationMinerTestWrapper({
loader,
minerAddress: MINER1,
realProviderPort,
useJsTree: true,
dbPath: fileName,
});
await reputationMiner3.initialise(colonyNetwork.address);
await reputationMiner3.sync("latest");

const loadedState = await reputationMiner3.getRootHash();
expect(loadedState).to.equal(currentState);
// delete it
fs.unlinkSync(fileName);
fs.unlinkSync(`${fileName}-shm`);
fs.unlinkSync(`${fileName}-wal`);
await client.close();
});
});
});

0 comments on commit a0dac33

Please sign in to comment.