Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix more syncing issues #650

Merged
merged 11 commits into from
May 11, 2022
32 changes: 31 additions & 1 deletion common-files/utils/event-queue.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,28 @@ async function enqueueEvent(callback, priority, args) {
});
}

/**
These functions pause the queue once the current process at the head of the queue has
completed. It will then wait until we tell it to start again via unpause.
While paused, it will still accept incoming items.
*/
function pauseQueue(priority) {
return new Promise(resolve => {
// put an event at the head of the queue which will cleanly pause it.
queues[priority].unshift(async () => {
queues[priority].autostart = false;
queues[priority].stop();
logger.info(`queue ${priority} has been paused`);
resolve();
});
});
}

function unpauseQueue(priority) {
queues[priority].autostart = true;
queues[priority].unshift(async () => logger.info(`queue ${priority} has been unpaused`));
}

/**
This function will return when the event has been on chain for <confirmations> blocks
It's useful to call this if you want to be sure that your event has been confirmed
Expand Down Expand Up @@ -135,4 +157,12 @@ async function queueManager(eventObject, eventArgs) {
}

/* ignore unused exports */
export { flushQueue, enqueueEvent, queueManager, dequeueEvent, waitForConfirmation };
export {
flushQueue,
enqueueEvent,
queueManager,
dequeueEvent,
waitForConfirmation,
pauseQueue,
unpauseQueue,
};
7 changes: 6 additions & 1 deletion nightfall-optimist/src/event-handlers/block-proposed.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ async function blockProposedEventHandler(data) {
logger.debug(
`Processing external offchain transaction with L2 hash ${tx.transactionHash}`,
);
return transactionSubmittedEventHandler({ offchain: true, ...tx }); // must be offchain or we'll have seen them
return transactionSubmittedEventHandler({
blocknumberL2: block.blockNumberL2,
mempool: false,
offchain: true,
...tx,
}); // must be offchain or we'll have seen them, mempool = false
}
return true;
}),
Expand Down
7 changes: 5 additions & 2 deletions nightfall-optimist/src/event-handlers/subscribe.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ export async function waitForContract(contractName) {
* @param arg - List of arguments to be passed to callback, the first element must be the event-handler functions
* @returns = List of emitters from each contract.
*/
export async function startEventQueue(callback, ...arg) {
export async function startEventQueue(lastSyncedBlock, callback, ...arg) {
const contractNames = [
STATE_CONTRACT_NAME,
SHIELD_CONTRACT_NAME,
Expand All @@ -96,7 +96,10 @@ export async function startEventQueue(callback, ...arg) {
];
const contracts = await Promise.all(contractNames.map(c => waitForContract(c)));
const emitters = contracts.map(e => {
const emitterC = e.events.allEvents();
let emitterC;
// if we've just resynced, start from the end of the sync:
if (lastSyncedBlock) emitterC = e.events.allEvents({ fromBlock: lastSyncedBlock + 1 });
else emitterC = e.events.allEvents();
emitterC.on('changed', event => callback(event, arg));
emitterC.on('data', event => callback(event, arg));
return emitterC;
Expand Down
4 changes: 2 additions & 2 deletions nightfall-optimist/src/index.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ const main = async () => {
await subscribeToProposedBlockWebSocketConnection(setBlockProposedWebSocketConnection);
// try to sync any missing blockchain state
// only then start making blocks and listening to new proposers
initialBlockSync(proposer).then(async () => {
await startEventQueue(queueManager, eventHandlers, proposer);
initialBlockSync(proposer).then(async lastSyncedBlock => {
await startEventQueue(lastSyncedBlock, queueManager, eventHandlers, proposer);
queues[0].on('end', () => {
// We do the proposer isMe check here to fail fast instead of re-enqueing.
// We check if the queue[2] is empty, this is safe it is manually enqueued/dequeued.
Expand Down
8 changes: 8 additions & 0 deletions nightfall-optimist/src/routes/proposer.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -374,11 +374,19 @@ router.post('/offchain-transaction', async (req, res) => {
case 3: {
// When comparing this with getTransactionSubmittedCalldata,
// note we dont need to decompressProof as proofs are only compressed if they go on-chain.
// let's not directly call transactionSubmittedEventHandler, instead, we'll queue it
await enqueueEvent(transactionSubmittedEventHandler, 1, {
offchain: true,
...transaction,
fee: Number(fee),
});
/*
await transactionSubmittedEventHandler({
offchain: true,
...transaction,
fee: Number(fee),
});
*/
res.sendStatus(200);
break;
}
Expand Down
6 changes: 3 additions & 3 deletions nightfall-optimist/src/services/database.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ export async function getLatestBlockInfo() {
const db = connection.db(OPTIMIST_DB);
const [blockInfo] = await db
.collection(SUBMITTED_BLOCKS_COLLECTION)
.find({}, { blockNumberL2: 1, blockHash: 1 })
.find({}, { blockNumberL2: 1, blockHash: 1, blockNumber: 1 })
.sort({ blockNumberL2: -1 })
.limit(1)
.toArray();
Expand Down Expand Up @@ -269,12 +269,12 @@ export async function getMostProfitableTransactions(number) {
Function to save a (unprocessed) Transaction
*/
export async function saveTransaction(_transaction) {
const { mempool = true } = _transaction; // mempool may not exist
const { mempool = true, blockNumberL2 = -1 } = _transaction;
const transaction = {
_id: _transaction.transactionHash,
..._transaction,
mempool,
blockNumberL2: -1,
blockNumberL2,
};
logger.debug(
`saving transaction ${transaction.transactionHash}, with layer 1 block number ${_transaction.blockNumber}`,
Expand Down
16 changes: 12 additions & 4 deletions nightfall-optimist/src/services/state-sync.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

import config from 'config';
import { getContractInstance } from 'common-files/utils/contract.mjs';
import { pauseQueue, unpauseQueue } from 'common-files/utils/event-queue.mjs';
import blockProposedEventHandler from '../event-handlers/block-proposed.mjs';
import transactionSubmittedEventHandler from '../event-handlers/transaction-submitted.mjs';
import newCurrentProposerEventHandler from '../event-handlers/new-current-proposer.mjs';
import committedToChallengeEventHandler from '../event-handlers/challenge-commit.mjs';
import rollbackEventHandler from '../event-handlers/rollback.mjs';
import { getBlockByBlockNumberL2, getBlocks } from './database.mjs';
import { getBlockByBlockNumberL2, getBlocks, getLatestBlockInfo } from './database.mjs';
import { stopMakingChallenges, startMakingChallenges } from './challenges.mjs';
import { waitForContract } from '../event-handlers/subscribe.mjs';

Expand Down Expand Up @@ -102,8 +103,9 @@ export default async proposer => {
const lastBlockNumberL2 = Number(
(await stateContractInstance.methods.getNumberOfL2Blocks().call()) - 1,
);
if (lastBlockNumberL2 === -1) return; // The blockchain is empty

if (lastBlockNumberL2 === -1) return null; // The blockchain is empty
// pause the queues so we stop processing incoming events while we sync
await Promise.all([pauseQueue(0), pauseQueue(1)]);
const missingBlocks = await checkBlocks(); // Stores any gaps of missing blocks
// const [fromBlock] = missingBlocks[0];
const latestBlockLocally = (await getBlockByBlockNumberL2(lastBlockNumberL2)) ?? undefined;
Expand All @@ -121,5 +123,11 @@ export default async proposer => {
}
const currentProposer = (await stateContractInstance.methods.currentProposer().call())
.thisAddress;
await newCurrentProposerEventHandler({ returnValues: { proposer: currentProposer } }, [proposer]);
if (currentProposer !== proposer.address)
await newCurrentProposerEventHandler({ returnValues: { proposer: currentProposer } }, [
proposer,
]);
unpauseQueue(0);
unpauseQueue(1);
return (await getLatestBlockInfo()).blockNumber;
};
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"test-e2e-protocol": "LOG_LEVEL=error mocha --timeout 0 --bail --exit test/e2e/protocol/*.test.mjs ",
"test-gas": "mocha --timeout 0 --bail --exit test/e2e/gas.test.mjs ",
"test-e2e-tokens": "LOG_LEVEL=error mocha --timeout 0 --bail --exit test/e2e/tokens/*.test.mjs ",
"test-erc20-tokens": "LOG_LEVEL=error mocha --timeout 0 --bail --exit test/e2e/tokens/erc20.test.mjs ",
"lint": "eslint . --ext js,mjs,jsx,ts,tsx && find-unused-exports",
"prepare": "husky install",
"doc:build:sdk": "jsdoc -c jsdoc.json cli/lib/nf3.mjs",
Expand Down