Skip to content

Commit

Permalink
fix(indexer): introduce asyncForEach to support async/await in foreach
Browse files Browse the repository at this point in the history
  • Loading branch information
metmirr committed Jun 15, 2020
1 parent df7570a commit d6f84e8
Showing 1 changed file with 20 additions and 6 deletions.
26 changes: 20 additions & 6 deletions substrate-query-framework/index-builder/src/IndexBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
ISubstrateQueryService,
SavedEntityEvent,
makeDatabaseManager,
QueryEvent,
} from '.';

const debug = require('debug')('index-builder:indexer');
Expand All @@ -18,6 +19,8 @@ export default class IndexBuilder {

private _processing_pack!: QueryEventProcessingPack;

private lastProcessedEvent!: SavedEntityEvent;

private constructor(producer: QueryBlockProducer, processing_pack: QueryEventProcessingPack) {
this._producer = producer;
this._processing_pack = processing_pack;
Expand All @@ -39,12 +42,11 @@ export default class IndexBuilder {

debug('Spawned worker.');

// Get the last processed event
// Should use db.get(SavedEntityEvent, {}) ???
const savedEntityEvent = await getRepository(SavedEntityEvent).findOne();
const lastProcessedEvent = await getRepository(SavedEntityEvent).findOne({ where: { id: 1 } });

if (savedEntityEvent !== undefined) {
await this._producer.start(savedEntityEvent.blockNumber);
if (lastProcessedEvent !== undefined) {
this.lastProcessedEvent = lastProcessedEvent;
await this._producer.start(this.lastProcessedEvent.blockNumber);
} else {
// Setup worker
await this._producer.start();
Expand All @@ -58,7 +60,13 @@ export default class IndexBuilder {
_onQueryEventBlock(query_event_block: QueryEventBlock): void {
debug(`Yay, block producer at height: #${query_event_block.block_number}`);

query_event_block.query_events.forEach(async (query_event, index) => {
let query_events = query_event_block.query_events;
// Filter processed events! Will run only once
if (this.lastProcessedEvent && query_event_block.block_number === this.lastProcessedEvent.blockNumber) {
query_events = query_events.filter((event) => event.index !== this.lastProcessedEvent.index);
}

asyncForEach(query_events, async (query_event: QueryEvent) => {
if (!this._processing_pack[query_event.event_method]) {
debug(`Unrecognized: ` + query_event.event_name);

Expand Down Expand Up @@ -93,3 +101,9 @@ export default class IndexBuilder {
});
}
}

async function asyncForEach(array: Array<any>, callback: Function) {
for (let index = 0; index < array.length; index++) {
await callback(array[index], index, array);
}
}

0 comments on commit d6f84e8

Please sign in to comment.