Skip to content

Commit

Permalink
Revert "Revert "Enricher fixes for archival mode"" (#45)
Browse files Browse the repository at this point in the history
* Revert "Revert "Enricher fixes for archival mode""

* fixed minor indentation and logical errors

* reverted batch size to 500

* created custom interface for returning era reward points

* created interface for active exposures

* removed session keys from new session event

* updated mockApi to preserve default function calls

* reverted back lint changes, created interface for individual exposure and casted keys to  storage tuple of variable length.

* minor fixes and code optimization

* Stylistic cleanup.

* Add more comments and cleanup.

Co-authored-by: Yameen Malik <yameen@seecode.io>
Co-authored-by: Yameen <yameen@seedcode.io>
Co-authored-by: Jake Naviasky <jake@commonwealth.im>
  • Loading branch information
4 people authored Feb 9, 2021
1 parent 4bff63b commit 5c14429
Show file tree
Hide file tree
Showing 12 changed files with 373 additions and 166 deletions.
9 changes: 9 additions & 0 deletions scripts/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ const argv = yargs.options({
type: 'boolean',
description: 'run listener in archival mode or not',
},
startBlock: {
alias: 'b',
type: 'number',
description: 'when running in archival mode, which block should we start from',
},

}).check((data) => {
if (!chainSupportedBy(data.network, SubstrateEvents.Types.EventChains) && data.spec) {
throw new Error('cannot pass spec on non-substrate network');
Expand All @@ -71,6 +77,8 @@ const argv = yargs.options({
return true;
}).argv;
const archival: boolean = argv.archival;
// if running in archival mode then which block shall we star from
const startBlock: number = argv.startBlock ?? 0;
const network = argv.network;
const url: string = argv.url || networkUrls[network];
const spec = networkSpecs[network] || {};
Expand Down Expand Up @@ -99,6 +107,7 @@ if (chainSupportedBy(network, SubstrateEvents.Types.EventChains)) {
handlers: [ new StandaloneEventHandler() ],
skipCatchup,
archival,
startBlock,
verbose: true,
});
});
Expand Down
1 change: 1 addition & 0 deletions src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ export interface ISubscribeOptions<Api> {
handlers: IEventHandler<IChainEventData>[];
skipCatchup?: boolean;
archival?: boolean;
startBlock?: number;
discoverReconnectRange?: () => Promise<IDisconnectedRange>;
verbose?: boolean;
}
Expand Down
187 changes: 108 additions & 79 deletions src/substrate/filters/enricher.ts

Large diffs are not rendered by default.

40 changes: 28 additions & 12 deletions src/substrate/poller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export class Poller extends IEventPoller<ApiPromise, Block> {
// fetch blocks from start to end
const blockNumbers = [ ...Array(range.endBlock - range.startBlock).keys()]
.map((i) => range.startBlock + i);
log.debug(`Fetching hashes for blocks: ${JSON.stringify(blockNumbers)}`);
log.info(`Fetching hashes for blocks: ${JSON.stringify(blockNumbers)}`);

// the hashes are pruned when using api.query.system.blockHash.multi
// therefore fetching hashes from chain. the downside is that for every
Expand All @@ -73,30 +73,46 @@ export class Poller extends IEventPoller<ApiPromise, Block> {

/**
* Connects to chain, fetches blocks specified in given range in provided batch size,
* prcoesses the blocks if a handler is provided else returns the blocks for
* further processing
* prcoesses the blocks if a handler is provided
* @param range IDisconnectedRange having startBlock and optional endBlock
* @param batchSize size of the batch in which blocks are to be fetched from chain
* @param processBlockFn an optional function to process the blocks
*/
public async archive(range: IDisconnectedRange, batchSize: number = 500, processBlockFn: (block: Block) => any = null): Promise<Block[]> {
if(!range.endBlock){
public async archive(
range: IDisconnectedRange,
batchSize: number = 500,
processBlockFn: (block: Block) => any = null,
) {
const syncWithHead = !range.endBlock;

// if the endBlock is not provided then we will run archival mode until we reach the head
if (syncWithHead) {
const header = await this._api.rpc.chain.getHeader();
range.endBlock = +header.number;
range.endBlock = +header.number;
}
const blocks = [];

for (let block = range.startBlock; block < range.endBlock; block = Math.min(block + batchSize, range.endBlock)) {
try {
let currentBlocks = await this.poll({startBlock: block, endBlock: Math.min(block + batchSize, range.endBlock)}, batchSize);
if(processBlockFn){
await Promise.all(currentBlocks.map(processBlockFn));
const currentBlocks = await this.poll({
startBlock: block,
endBlock: Math.min(block + batchSize, range.endBlock)
}, batchSize);

// process all blocks sequentially
if (processBlockFn) {
for (const block of currentBlocks) {
await processBlockFn(block);
}
}
blocks.push(...currentBlocks);
} catch (e) {
log.error(`Block polling failed after disconnect at block ${range.startBlock}`);
return;
}
// if sync with head then update the endBlock to current header
if (syncWithHead) {
const header = await this._api.rpc.chain.getHeader();
range.endBlock = +header.number;
}
}
return blocks;
}
}
1 change: 1 addition & 0 deletions src/substrate/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export class Processor extends IEventProcessor<ApiPromise, Block> {
return result;
} catch (e) {
log.error(`Event enriching failed for ${kind}`);
log.error(`Error: ${e}`);
return null;
}
} else {
Expand Down
53 changes: 24 additions & 29 deletions src/substrate/subscribeFunc.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { WsProvider, ApiPromise } from '@polkadot/api';
import { TypeRegistry } from '@polkadot/types';
import { RegisteredTypes } from '@polkadot/types/types';

import { IDisconnectedRange, CWEvent, SubscribeFunc, ISubscribeOptions } from '../interfaces';
Expand Down Expand Up @@ -46,7 +45,7 @@ export async function createApi(
* @returns An active block subscriber.
*/
export const subscribeEvents: SubscribeFunc<ApiPromise, Block, ISubscribeOptions<ApiPromise>> = async (options) => {
const { chain, api, handlers, skipCatchup, archival, discoverReconnectRange, verbose } = options;
const { chain, api, handlers, skipCatchup, archival, startBlock, discoverReconnectRange, verbose } = options;
// helper function that sends an event through event handlers
const handleEventFn = async (event: CWEvent<IEventData>) => {
let prevResult = null;
Expand All @@ -71,12 +70,23 @@ export const subscribeEvents: SubscribeFunc<ApiPromise, Block, ISubscribeOptions
const events: CWEvent<IEventData>[] = await processor.process(block);

// send all events through event-handlers in sequence
await Promise.all(events.map((event) => handleEventFn(event)));
for(const event of events) await handleEventFn(event);
};

const subscriber = new Subscriber(api, verbose);
const poller = new Poller(api);

// if running in archival mode: run poller.archive with batch_size 50
// then exit without subscribing.
// TODO: should we start subscription?
if (archival) {
// default to startBlock 0
const offlineRange: IDisconnectedRange = { startBlock : startBlock ?? 0 };
log.info(`Executing in archival mode, polling blocks starting from: ${offlineRange.startBlock}`);
await poller.archive(offlineRange, 50, processBlockFn);
return subscriber;
}

// helper function that runs after we've been offline/the server's been down,
// and attempts to fetch events from skipped blocks
const pollMissedBlocksFn = async () => {
Expand All @@ -101,37 +111,23 @@ export const subscribeEvents: SubscribeFunc<ApiPromise, Block, ISubscribeOptions
offlineRange = { startBlock: lastBlockNumber };
}


// if we can't figure out when the last block we saw was,
// and we are not running in archival mode, do nothing
// do nothing
// (i.e. don't try and fetch all events from block 0 onward)
if(!offlineRange || !offlineRange.startBlock){
if(archival){
offlineRange = {startBlock: 0}
}
else{
log.warn('Unable to determine offline time range.');
return;
}
if (!offlineRange || !offlineRange.startBlock) {
log.warn('Unable to determine offline time range.');
return;
}

// if running in archival mode then run poller.archive with
// batch_size 500
if(archival){
log.info(`Executing in archival mode, polling blocks starting from: ${offlineRange.startBlock}`);
await poller.archive(offlineRange,500,processBlockFn);
}
// else just run poller normally
else {
try {
const blocks = await poller.poll(offlineRange);
await Promise.all(blocks.map(processBlockFn));
} catch (e) {
log.error(`Block polling failed after disconnect at block ${offlineRange.startBlock}`);
}
try {
const blocks = await poller.poll(offlineRange);
await Promise.all(blocks.map(processBlockFn));
} catch (e) {
log.error(`Block polling failed after disconnect at block ${offlineRange.startBlock}`);
}
};

if (!skipCatchup || archival) {
if (!skipCatchup) {
await pollMissedBlocksFn();
} else {
log.info('Skipping event catchup on startup!');
Expand All @@ -146,6 +142,5 @@ export const subscribeEvents: SubscribeFunc<ApiPromise, Block, ISubscribeOptions
} catch (e) {
log.error(`Subscription error: ${e.message}`);
}

return subscriber;
};
24 changes: 21 additions & 3 deletions src/substrate/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ export interface IAllGood extends IEvent {
sessionIndex: number;
validators: Array<AccountId>;
}

export interface IHeartbeatReceived extends IEvent {
kind: EventKind.HeartbeatReceived;
authorityId: string;
Expand All @@ -182,20 +181,34 @@ export interface IOffence extends IEvent {
offenders: Array<string>
}

// Individual Exposure
export interface IndividualExposure {
who: AccountId,
value: string
}

// Active Exposure
export interface ActiveExposure {
[key: string]: {
own: number,
total: number,
others: IndividualExposure[],
}
}

/**
* Session Event
*/
export interface INewSession extends IEvent {
kind: EventKind.NewSession;
activeExposures: { [key: string]: any };
activeExposures: ActiveExposure;
active: Array<AccountId>;
waiting: Array<AccountId>;
sessionIndex: number;
currentEra?: number;
validatorInfo: { [key: string]: Validator },
}


/**
* Staking Events
*/
Expand Down Expand Up @@ -527,6 +540,11 @@ export interface IIdentityKilled extends IEvent {
who: AccountId;
}

// Interface for era reward points
export interface AccountPoints {
[key: string]: number;
}

export type IEventData =
ISlash
| IReward
Expand Down
26 changes: 26 additions & 0 deletions src/substrate/utils/currentPoint.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { AccountId, EraPoints, EraRewardPoints, RewardPoint, EraIndex, BlockHash } from '@polkadot/types/interfaces';
import { ApiPromise } from '@polkadot/api';
import { AccountPoints } from '../types';

export function currentPoints(api: ApiPromise, era: EraIndex, hash: BlockHash, validators: AccountId[]): Promise<AccountPoints> {
const points: AccountPoints = {};
// api call to retreive eraRewardPoints for version >= 38
if (api.query.staking.erasRewardPoints)
return api.query.staking.erasRewardPoints.at<EraRewardPoints>(hash, era).then((rewardPoints) => {
rewardPoints.individual.forEach((rewardPoint, accountKey) => {
points[accountKey.toString()] = +rewardPoint;
});
return points;
})
else {
// creating eraRewardPoints for for version = 31
return api.query.staking.currentEraPointsEarned.at<EraPoints>(hash, era).then((eraPoints) => {
const individual = eraPoints.individual;
individual.map((point, idx) => {
points[validators[idx].toString()] = +point
});
return points;
});

}
}
Loading

0 comments on commit 5c14429

Please sign in to comment.