Skip to content

Commit

Permalink
Merge pull request #534 from Joystream/runtime-upgrade-fix-1-new-conn…
Browse files Browse the repository at this point in the history
…ection

Reconnect to node on runtime upgrade
  • Loading branch information
mnaamani authored Oct 18, 2023
2 parents 0a91b97 + c96e04e commit 1309629
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 72 deletions.
133 changes: 81 additions & 52 deletions packages/hydra-indexer/src/substrate/SubstrateService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,26 @@ import Debug from 'debug'
import pProps from 'p-props'

import { getConfig } from '../'
import { getApiPromise, getBlockTimestamp, ISubstrateService } from '.'
import { createApiPromise, getBlockTimestamp, ISubstrateService } from '.'

// import pTimeout from 'p-timeout'
import pRetry from 'p-retry'

import BN from 'bn.js'
import { BlockData } from '../model'
import { eventEmitter, IndexerEvents } from '../node/event-emitter'
import { Subscription } from 'rxjs'

const debug = Debug('hydra-indexer:substrate-service')

export class SubstrateService implements ISubstrateService {
private shouldStop = false
private api: ApiPromise | undefined

async init(): Promise<void> {
debug(`Initializing SubstrateService`)
await getApiPromise()
await this.subscribeToHeads()
await this.connect()

eventEmitter.on(IndexerEvents.INDEXER_STOP, async () => await this.stop())
// eventEmitter.on(
// IndexerEvents.API_CONNECTED,
// async () => await this.subscribeToHeads()
// )

pForever(async () => {
if (this.shouldStop) {
Expand All @@ -52,6 +48,33 @@ export class SubstrateService implements ISubstrateService {
})
}

private async connect() {
this.api = await createApiPromise()

const subscriptions = this.subscribeToHeads(this.api)

this.api
.once('disconnected', () => {
debug('Api disconnected')
})
.on('error', async (e) => {
debug(`Api error: ${JSON.stringify(e)}`)
})
.once('decorated', async () => {
debug('Api decorated')
subscriptions.forEach((sub) => sub.unsubscribe())
const oldApi = this.api
await this.connect()
// allow short time for running queries to complete
await delay(1000)
try {
oldApi?.isConnected && (await oldApi?.disconnect())
} catch (err) {
debug(`Error trying to disconnection Api ${err}`)
}
})
}

async getHeader(hash: Hash | Uint8Array | string): Promise<Header> {
return this.apiCall(
(api) => api.rpc.chain.getHeader(hash),
Expand All @@ -66,40 +89,35 @@ export class SubstrateService implements ISubstrateService {
)
}

async subscribeToHeads(): Promise<void> {
subscribeToHeads(api: ApiPromise): Subscription[] {
debug(`Subscribing to new heads`)
const api = await getApiPromise()
api.rx.rpc.chain.subscribeFinalizedHeads().subscribe({
next: (header: Header) =>
eventEmitter.emit(IndexerEvents.NEW_FINALIZED_HEAD, {
header,
height: header.number.toNumber(),
}),
})

api.rx.rpc.chain.subscribeNewHeads().subscribe({
next: (header: Header) =>
eventEmitter.emit(IndexerEvents.NEW_BEST_HEAD, {
header,
height: header.number.toNumber(),
}),
})

api.rx.rpc.chain.subscribeAllHeads().subscribe({
next: (header: Header) =>
eventEmitter.emit(IndexerEvents.NEW_HEAD, {
header,
height: header.number.toNumber(),
}),
})
return [
api.rx.rpc.chain.subscribeFinalizedHeads().subscribe({
next: (header: Header) =>
eventEmitter.emit(IndexerEvents.NEW_FINALIZED_HEAD, {
header,
height: header.number.toNumber(),
}),
}),

api.rx.rpc.chain.subscribeNewHeads().subscribe({
next: (header: Header) =>
eventEmitter.emit(IndexerEvents.NEW_BEST_HEAD, {
header,
height: header.number.toNumber(),
}),
}),

api.rx.rpc.chain.subscribeAllHeads().subscribe({
next: (header: Header) =>
eventEmitter.emit(IndexerEvents.NEW_HEAD, {
header,
height: header.number.toNumber(),
}),
}),
]
}

// async subscribeFinalizedHeads(v: Callback<Header>): UnsubscribePromise {
// const api = await getApiPromise()
// api.rpc.chain.subscribeFinalizedHeads()
// return (await getApiPromise()).rpc.chain.subscribeFinalizedHeads(v)
// }

async getBlockHash(
blockNumber?: BlockNumber | Uint8Array | number | string
): Promise<Hash> {
Expand Down Expand Up @@ -139,15 +157,19 @@ export class SubstrateService implements ISubstrateService {
'The indexer is stopping, aborting all API calls'
)
}
const api = await getApiPromise()
return promiseFn(api)
if (!this.api || !this.api.isConnected) {
throw Error(`Api connection not ready`)
}
return promiseFn(this.api)
},
{
retries: getConfig().SUBSTRATE_API_CALL_RETRIES,
onFailedAttempt: (i) =>
onFailedAttempt: async (i) => {
debug(
`Failed to execute "${functionName}" after ${i.attemptNumber} attempts. Retries left: ${i.retriesLeft}`
),
)
await delay(200)
},
}
)
}
Expand All @@ -174,35 +196,42 @@ export class SubstrateService implements ISubstrateService {
}

async metadata(hash: Hash): Promise<MetadataLatest> {
const metadata = await this.apiCall((api) =>
api.rpc.state.getMetadata(hash)
const metadata = await this.apiCall(
(api) => api.rpc.state.getMetadata(hash),
'get metadata'
)
return metadata.asLatest
}

async runtimeVersion(hash: Hash): Promise<RuntimeVersion> {
return this.apiCall((api) => api.rpc.state.getRuntimeVersion(hash))
return this.apiCall(
(api) => api.rpc.state.getRuntimeVersion(hash),
'get runtime version'
)
}

async timestamp(hash: Hash): Promise<BN> {
return this.apiCall((api) => api.query.timestamp.now.at(hash))
return this.apiCall(
(api) => api.query.timestamp.now.at(hash),
'get timestamp'
)
}

async lastRuntimeUpgrade(
hash: Hash
): Promise<LastRuntimeUpgradeInfo | undefined> {
const info = await this.apiCall((api) =>
api.query.system.lastRuntimeUpgrade.at(hash)
const info = await this.apiCall(
(api) => api.query.system.lastRuntimeUpgrade.at(hash),
'get last runtime upgrade'
)
return info.unwrapOr(undefined)
}

async stop(): Promise<void> {
debug(`Stopping substrate service`)
this.shouldStop = true
const api = await getApiPromise()
if (api.isConnected) {
await api.disconnect()
if (this.api && this.api.isConnected) {
await this.api.disconnect()
debug(`Api disconnected`)
}
debug(`Done`)
Expand Down
22 changes: 2 additions & 20 deletions packages/hydra-indexer/src/substrate/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import Debug from 'debug'
import { getConfig } from '../node/config'
import { ISubstrateService } from './ISubstrateService'
import { SubstrateService } from './SubstrateService'
import { eventEmitter, IndexerEvents } from '../node/event-emitter'

export * from './ISubstrateService'
export * from './SubstrateService'
Expand All @@ -14,7 +13,6 @@ export { getBlockTimestamp } from './timestamp'
const debug = Debug('hydra-indexer:substrate-api')

let substrateService: ISubstrateService
let apiPromise: ApiPromise

export async function getSubstrateService(): Promise<ISubstrateService> {
if (substrateService) {
Expand All @@ -25,11 +23,7 @@ export async function getSubstrateService(): Promise<ISubstrateService> {
return substrateService
}

export async function getApiPromise(): Promise<ApiPromise> {
if (apiPromise) {
return apiPromise
}

export async function createApiPromise(): Promise<ApiPromise> {
debug(`Creating new Api Promise`)

const conf = getConfig()
Expand All @@ -39,7 +33,7 @@ export async function getApiPromise(): Promise<ApiPromise> {

names.length && debug(`Injected types: ${names.join(', ')}`)

apiPromise = await pRetry(
return pRetry(
async () =>
new ApiPromise({
provider,
Expand All @@ -54,16 +48,4 @@ export async function getApiPromise(): Promise<ApiPromise> {
debug(`API failed to connect: ${JSON.stringify(error)}`),
}
)

apiPromise.on('error', async (e) => {
debug(`Api error: ${JSON.stringify(e)}, reconnecting....`)
apiPromise = await getApiPromise()
})

apiPromise.on('connected', () => {
debug(`Api connected`)
eventEmitter.emit(IndexerEvents.API_CONNECTED)
})

return apiPromise
}

0 comments on commit 1309629

Please sign in to comment.