From 73eb9d9190a2f7806876c516469517c95d28cca7 Mon Sep 17 00:00:00 2001 From: Yameen Malik <65854545+YameenMalik@users.noreply.github.com> Date: Thu, 3 Dec 2020 01:26:43 +0500 Subject: [PATCH] Chain events Listener: Archival Mode (#33) * initial commit archival mode for listener * fixed indentation * added comment * bumped polkadot api dependency * updated tests for poller * removed unused import * downgraded polkadot to 2.5.1 * updated lock file * Resolved issues * removed unused imports * added description for archive method * resolved logical error with blocks processing * Bump version and fix build. Co-authored-by: Jake Naviasky --- package.json | 1 + scripts/listener.ts | 13 +++++++--- src/interfaces.ts | 4 ++- src/substrate/poller.ts | 36 ++++++++++++++++++++++++++- src/substrate/subscribeFunc.ts | 41 ++++++++++++++++++++----------- test/unit/edgeware/poller.spec.ts | 40 ++++++++++++++++++++---------- test/unit/edgeware/testUtil.ts | 1 + yarn.lock | 2 +- 8 files changed, 105 insertions(+), 33 deletions(-) mode change 100644 => 100755 test/unit/edgeware/poller.spec.ts diff --git a/package.json b/package.json index 575d52cb070..e2b13eda9c2 100644 --- a/package.json +++ b/package.json @@ -14,6 +14,7 @@ "integration-test": "nyc ts-mocha --config ./.mocharc.json ./test/integration/*.spec.ts", "test": "nyc ts-mocha --config ./.mocharc.json ./test/integration/*.spec.ts ./test/unit/**/*.spec.ts", "listen": "ts-node -T ./scripts/listener.ts", + "listen-archival": "ts-node -T ./scripts/listener.ts -n edgeware-local -a true", "scrape": "ts-node -T ./scripts/scraper.ts", "batch-poll": "ts-node -T ./scripts/batchPoller.ts" }, diff --git a/scripts/listener.ts b/scripts/listener.ts index a06a957a13c..209824cfbd5 100644 --- a/scripts/listener.ts +++ b/scripts/listener.ts @@ -53,7 +53,12 @@ const argv = yargs.options({ alias: 'c', type: 'string', description: 'eth contract address', - } + }, + archival: { + alias: 'a', + type: 'boolean', + description: 'run listener in archival mode or not', + }, }).check((data) => { if (!chainSupportedBy(data.network, SubstrateEvents.Types.EventChains) && data.spec) { throw new Error('cannot pass spec on non-substrate network'); @@ -63,9 +68,10 @@ const argv = yargs.options({ } return true; }).argv; - +const archival: boolean = argv.archival; const network = argv.network; -const spec = specs[argv.spec] || specs[network] || {}; +// if running archival mode, the archival node requires mainnet specs +const spec = specs[argv.spec] || archival == true? specs['mainnet']:specs[network] || {}; const url: string = argv.url || networks[network]; const contract: string | undefined = argv.contractAddress || contracts[network]; @@ -91,6 +97,7 @@ if (chainSupportedBy(network, SubstrateEvents.Types.EventChains)) { api, handlers: [ new StandaloneEventHandler() ], skipCatchup, + archival, verbose: true, }); }); diff --git a/src/interfaces.ts b/src/interfaces.ts index 26933b7ed0a..bd5ae090626 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -72,13 +72,15 @@ export interface ISubscribeOptions { api: Api; handlers: IEventHandler[]; skipCatchup?: boolean; + archival?: boolean; discoverReconnectRange?: () => Promise; verbose?: boolean; } + export type SubscribeFunc< Api, RawEvent, Options extends ISubscribeOptions -> = (options: Options) => Promise>; + > = (options: Options) => Promise>; export interface IDisconnectedRange { startBlock: number; diff --git a/src/substrate/poller.ts b/src/substrate/poller.ts index 23784d84f6a..eb920ab8430 100644 --- a/src/substrate/poller.ts +++ b/src/substrate/poller.ts @@ -47,8 +47,13 @@ export class Poller extends IEventPoller { const blockNumbers = [ ...Array(range.endBlock - range.startBlock).keys()] .map((i) => range.startBlock + i); log.debug(`Fetching hashes for blocks: ${JSON.stringify(blockNumbers)}`); - const hashes: Hash[] = await this._api.query.system.blockHash.multi(blockNumbers); + // the hashes are pruned when using api.query.system.blockHash.multi + // therefore fetching hashes from chain. the downside is that for every + // single hash a separate request is made + const hashes = await Promise.all( + blockNumbers.map( (number)=> (this._api.rpc.chain.getBlockHash(number)))) + // remove all-0 block hashes -- those blocks have been pruned & we cannot fetch their data const nonZeroHashes = hashes.filter((hash) => !hash.isEmpty); log.info(`${nonZeroHashes.length} active and ${hashes.length - nonZeroHashes.length} pruned hashes fetched!`); @@ -65,4 +70,33 @@ export class Poller extends IEventPoller { return blocks; } + + /** + * Connects to chain, fetches blocks specified in given range in provided batch size, + * prcoesses the blocks if a handler is provided else returns the blocks for + * further processing + * @param range IDisconnectedRange having startBlock and optional endBlock + * @param batchSize size of the batch in which blocks are to be fetched from chain + * @param processBlockFn an optional function to process the blocks + */ + public async archive(range: IDisconnectedRange, batchSize: number = 500, processBlockFn: (block: Block) => any = null): Promise { + if(!range.endBlock){ + const header = await this._api.rpc.chain.getHeader(); + range.endBlock = +header.number; + } + const blocks = []; + for (let block = range.startBlock; block < range.endBlock; block = Math.min(block + batchSize, range.endBlock)) { + try { + let currentBlocks = await this.poll({startBlock: block, endBlock: Math.min(block + batchSize, range.endBlock)}, batchSize); + if(processBlockFn){ + await Promise.all(currentBlocks.map(processBlockFn)); + } + blocks.push(...currentBlocks); + } catch (e) { + log.error(`Block polling failed after disconnect at block ${range.startBlock}`); + return; + } + } + return blocks; + } } diff --git a/src/substrate/subscribeFunc.ts b/src/substrate/subscribeFunc.ts index 6db92a741a0..94fea871007 100644 --- a/src/substrate/subscribeFunc.ts +++ b/src/substrate/subscribeFunc.ts @@ -49,7 +49,7 @@ export async function createApi( * @returns An active block subscriber. */ export const subscribeEvents: SubscribeFunc> = async (options) => { - const { chain, api, handlers, skipCatchup, discoverReconnectRange, verbose } = options; + const { chain, api, handlers, skipCatchup, archival, discoverReconnectRange, verbose } = options; // helper function that sends an event through event handlers const handleEventFn = async (event: CWEvent) => { let prevResult = null; @@ -87,7 +87,6 @@ export const subscribeEvents: SubscribeFunc { 'events.at': (hash: Hash) => { return events[hash as unknown as number] || []; }, - 'blockHash.multi': (blockNumbers: number[]) => { - return blockNumbers.map((n) => { - // fake a few values to test the size reduction actually works - if (n === 2600 || n === 2400) { - return hashes[5]; - } - if (n >= 100 && n <= 110) { - return hashes[n - 100]; - } else { - return new IMockHash(0); - } - }); + getBlockHash: (blockNumber: number) => { + if (blockNumber === 2600 || blockNumber === 2400) { + return hashes[5]; + } + if (blockNumber >= 100 && blockNumber <= 110) { + return hashes[blockNumber - 100]; + } else { + return new IMockHash(0); + } }, getBlock: (hash) => { return { @@ -78,7 +75,6 @@ describe('Edgeware Event Poller Tests', () => { it('should return block data', async () => { // setup mock data const api = getMockApi(); - // setup test class const poller = new Poller(api); @@ -167,4 +163,22 @@ describe('Edgeware Event Poller Tests', () => { assert.equal(blocks[0].versionNumber, 10); assert.equal(blocks[0].versionName, 'edgeware'); }); + + it('should return all blocks with non-zeroed hashes', async () => { + // setup mock data + const api = getMockApi(); + + // setup test class + const poller = new Poller(api); + + // run test + const blocks = await poller.archive({ startBlock: 0}); + assert.lengthOf(blocks, 4); + assert.equal(+blocks[0].header.number, 105); + assert.deepEqual(blocks[0].events, []); + assert.equal(blocks[0].versionNumber, 10); + assert.equal(blocks[0].versionName, 'edgeware'); + + }); + }); diff --git a/test/unit/edgeware/testUtil.ts b/test/unit/edgeware/testUtil.ts index 0af4ec6ae63..30188eddfb1 100644 --- a/test/unit/edgeware/testUtil.ts +++ b/test/unit/edgeware/testUtil.ts @@ -82,6 +82,7 @@ export function constructFakeApi( subscribeNewHeads: callOverrides['subscribeNewHeads'], getHeader: callOverrides['getHeader'], getBlock: callOverrides['getBlock'], + getBlockHash: callOverrides['getBlockHash'], }, state: { getRuntimeVersion: callOverrides['getRuntimeVersion'], diff --git a/yarn.lock b/yarn.lock index e547645b858..5c1e1bb8b4f 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5276,4 +5276,4 @@ yn@3.1.1: yn@^2.0.0: version "2.0.0" resolved "https://registry.yarnpkg.com/yn/-/yn-2.0.0.tgz#e5adabc8acf408f6385fc76495684c88e6af689a" - integrity sha1-5a2ryKz0CPY4X8dklWhMiOavaJo= + integrity sha1-5a2ryKz0CPY4X8dklWhMiOavaJo= \ No newline at end of file