Skip to content

Commit

Permalink
Chain events Listener: Archival Mode (#33)
Browse files Browse the repository at this point in the history
* initial commit archival mode for listener

* fixed indentation

* added comment

* bumped polkadot api dependency

* updated tests for poller

* removed unused import

* downgraded polkadot to 2.5.1

* updated lock file

* Resolved issues

* removed unused imports

* added description for archive method

* resolved logical error with blocks processing

* Bump version and fix build.

Co-authored-by: Jake Naviasky <jake@commonwealth.im>
  • Loading branch information
YameenMalik and jnaviask authored Dec 2, 2020
1 parent 588e610 commit 73eb9d9
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 33 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"integration-test": "nyc ts-mocha --config ./.mocharc.json ./test/integration/*.spec.ts",
"test": "nyc ts-mocha --config ./.mocharc.json ./test/integration/*.spec.ts ./test/unit/**/*.spec.ts",
"listen": "ts-node -T ./scripts/listener.ts",
"listen-archival": "ts-node -T ./scripts/listener.ts -n edgeware-local -a true",
"scrape": "ts-node -T ./scripts/scraper.ts",
"batch-poll": "ts-node -T ./scripts/batchPoller.ts"
},
Expand Down
13 changes: 10 additions & 3 deletions scripts/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ const argv = yargs.options({
alias: 'c',
type: 'string',
description: 'eth contract address',
}
},
archival: {
alias: 'a',
type: 'boolean',
description: 'run listener in archival mode or not',
},
}).check((data) => {
if (!chainSupportedBy(data.network, SubstrateEvents.Types.EventChains) && data.spec) {
throw new Error('cannot pass spec on non-substrate network');
Expand All @@ -63,9 +68,10 @@ const argv = yargs.options({
}
return true;
}).argv;

const archival: boolean = argv.archival;
const network = argv.network;
const spec = specs[argv.spec] || specs[network] || {};
// if running archival mode, the archival node requires mainnet specs
const spec = specs[argv.spec] || archival == true? specs['mainnet']:specs[network] || {};
const url: string = argv.url || networks[network];
const contract: string | undefined = argv.contractAddress || contracts[network];

Expand All @@ -91,6 +97,7 @@ if (chainSupportedBy(network, SubstrateEvents.Types.EventChains)) {
api,
handlers: [ new StandaloneEventHandler() ],
skipCatchup,
archival,
verbose: true,
});
});
Expand Down
4 changes: 3 additions & 1 deletion src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,15 @@ export interface ISubscribeOptions<Api> {
api: Api;
handlers: IEventHandler<IChainEventData>[];
skipCatchup?: boolean;
archival?: boolean;
discoverReconnectRange?: () => Promise<IDisconnectedRange>;
verbose?: boolean;
}


export type SubscribeFunc<
Api, RawEvent, Options extends ISubscribeOptions<Api>
> = (options: Options) => Promise<IEventSubscriber<Api, RawEvent>>;
> = (options: Options) => Promise<IEventSubscriber<Api, RawEvent>>;

export interface IDisconnectedRange {
startBlock: number;
Expand Down
36 changes: 35 additions & 1 deletion src/substrate/poller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,13 @@ export class Poller extends IEventPoller<ApiPromise, Block> {
const blockNumbers = [ ...Array(range.endBlock - range.startBlock).keys()]
.map((i) => range.startBlock + i);
log.debug(`Fetching hashes for blocks: ${JSON.stringify(blockNumbers)}`);
const hashes: Hash[] = await this._api.query.system.blockHash.multi(blockNumbers);

// the hashes are pruned when using api.query.system.blockHash.multi
// therefore fetching hashes from chain. the downside is that for every
// single hash a separate request is made
const hashes = await Promise.all(
blockNumbers.map( (number)=> (this._api.rpc.chain.getBlockHash(number))))

// remove all-0 block hashes -- those blocks have been pruned & we cannot fetch their data
const nonZeroHashes = hashes.filter((hash) => !hash.isEmpty);
log.info(`${nonZeroHashes.length} active and ${hashes.length - nonZeroHashes.length} pruned hashes fetched!`);
Expand All @@ -65,4 +70,33 @@ export class Poller extends IEventPoller<ApiPromise, Block> {

return blocks;
}

/**
* Connects to chain, fetches blocks specified in given range in provided batch size,
* prcoesses the blocks if a handler is provided else returns the blocks for
* further processing
* @param range IDisconnectedRange having startBlock and optional endBlock
* @param batchSize size of the batch in which blocks are to be fetched from chain
* @param processBlockFn an optional function to process the blocks
*/
public async archive(range: IDisconnectedRange, batchSize: number = 500, processBlockFn: (block: Block) => any = null): Promise<Block[]> {
if(!range.endBlock){
const header = await this._api.rpc.chain.getHeader();
range.endBlock = +header.number;
}
const blocks = [];
for (let block = range.startBlock; block < range.endBlock; block = Math.min(block + batchSize, range.endBlock)) {
try {
let currentBlocks = await this.poll({startBlock: block, endBlock: Math.min(block + batchSize, range.endBlock)}, batchSize);
if(processBlockFn){
await Promise.all(currentBlocks.map(processBlockFn));
}
blocks.push(...currentBlocks);
} catch (e) {
log.error(`Block polling failed after disconnect at block ${range.startBlock}`);
return;
}
}
return blocks;
}
}
41 changes: 27 additions & 14 deletions src/substrate/subscribeFunc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export async function createApi(
* @returns An active block subscriber.
*/
export const subscribeEvents: SubscribeFunc<ApiPromise, Block, ISubscribeOptions<ApiPromise>> = async (options) => {
const { chain, api, handlers, skipCatchup, discoverReconnectRange, verbose } = options;
const { chain, api, handlers, skipCatchup, archival, discoverReconnectRange, verbose } = options;
// helper function that sends an event through event handlers
const handleEventFn = async (event: CWEvent<IEventData>) => {
let prevResult = null;
Expand Down Expand Up @@ -87,7 +87,6 @@ export const subscribeEvents: SubscribeFunc<ApiPromise, Block, ISubscribeOptions
// grab the cached block immediately to avoid a new block appearing before the
// server can do its thing...
const lastBlockNumber = processor.lastBlockNumber;

// determine how large of a reconnect we dealt with
let offlineRange: IDisconnectedRange;

Expand All @@ -105,23 +104,37 @@ export const subscribeEvents: SubscribeFunc<ApiPromise, Block, ISubscribeOptions
offlineRange = { startBlock: lastBlockNumber };
}

// if we can't figure out when the last block we saw was, do nothing
// if we can't figure out when the last block we saw was,
// and we are not running in archival mode, do nothing
// (i.e. don't try and fetch all events from block 0 onward)
if (!offlineRange || !offlineRange.startBlock) {
log.warn('Unable to determine offline time range.');
return;
if(!offlineRange || !offlineRange.startBlock){
if(archival){
offlineRange = {startBlock: 0}
}
else{
log.warn('Unable to determine offline time range.');
return;
}
}

// poll the missed blocks for events
try {
const blocks = await poller.poll(offlineRange);
await Promise.all(blocks.map(processBlockFn));
} catch (e) {
log.error(`Block polling failed after disconnect at block ${offlineRange.startBlock}`);

// if running in archival mode then run poller.archive with
// batch_size 500
if(archival){
log.info(`Executing in archival mode, polling blocks starting from: ${offlineRange.startBlock}`);
await poller.archive(offlineRange,500,processBlockFn);
}
// else just run poller normally
else {
try {
const blocks = await poller.poll(offlineRange);
await Promise.all(blocks.map(processBlockFn));
} catch (e) {
log.error(`Block polling failed after disconnect at block ${offlineRange.startBlock}`);
}
}
};

if (!skipCatchup) {
if (!skipCatchup || archival) {
await pollMissedBlocksFn();
} else {
log.info('Skipping event catchup on startup!');
Expand Down
40 changes: 27 additions & 13 deletions test/unit/edgeware/poller.spec.ts
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,15 @@ const getMockApi = () => {
'events.at': (hash: Hash) => {
return events[hash as unknown as number] || [];
},
'blockHash.multi': (blockNumbers: number[]) => {
return blockNumbers.map((n) => {
// fake a few values to test the size reduction actually works
if (n === 2600 || n === 2400) {
return hashes[5];
}
if (n >= 100 && n <= 110) {
return hashes[n - 100];
} else {
return new IMockHash(0);
}
});
getBlockHash: (blockNumber: number) => {
if (blockNumber === 2600 || blockNumber === 2400) {
return hashes[5];
}
if (blockNumber >= 100 && blockNumber <= 110) {
return hashes[blockNumber - 100];
} else {
return new IMockHash(0);
}
},
getBlock: (hash) => {
return {
Expand All @@ -78,7 +75,6 @@ describe('Edgeware Event Poller Tests', () => {
it('should return block data', async () => {
// setup mock data
const api = getMockApi();

// setup test class
const poller = new Poller(api);

Expand Down Expand Up @@ -167,4 +163,22 @@ describe('Edgeware Event Poller Tests', () => {
assert.equal(blocks[0].versionNumber, 10);
assert.equal(blocks[0].versionName, 'edgeware');
});

it('should return all blocks with non-zeroed hashes', async () => {
// setup mock data
const api = getMockApi();

// setup test class
const poller = new Poller(api);

// run test
const blocks = await poller.archive({ startBlock: 0});
assert.lengthOf(blocks, 4);
assert.equal(+blocks[0].header.number, 105);
assert.deepEqual(blocks[0].events, []);
assert.equal(blocks[0].versionNumber, 10);
assert.equal(blocks[0].versionName, 'edgeware');

});

});
1 change: 1 addition & 0 deletions test/unit/edgeware/testUtil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ export function constructFakeApi(
subscribeNewHeads: callOverrides['subscribeNewHeads'],
getHeader: callOverrides['getHeader'],
getBlock: callOverrides['getBlock'],
getBlockHash: callOverrides['getBlockHash'],
},
state: {
getRuntimeVersion: callOverrides['getRuntimeVersion'],
Expand Down
2 changes: 1 addition & 1 deletion yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5276,4 +5276,4 @@ yn@3.1.1:
yn@^2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/yn/-/yn-2.0.0.tgz#e5adabc8acf408f6385fc76495684c88e6af689a"
integrity sha1-5a2ryKz0CPY4X8dklWhMiOavaJo=
integrity sha1-5a2ryKz0CPY4X8dklWhMiOavaJo=

0 comments on commit 73eb9d9

Please sign in to comment.