From df3503597832939e6dc9c7ec953d24b3d709c723 Mon Sep 17 00:00:00 2001 From: Rhys Bartels-Waller Date: Tue, 7 Mar 2023 15:00:48 +0100 Subject: [PATCH] feat: add healthCheck$ to ObservableCardanoNode --- .../types/ObservableCardanoNode.ts | 3 +- .../e2e/test/projection/offline-fork.test.ts | 3 +- .../OgmiosObservableCardanoNode.ts | 52 ++++++++++++- .../ObservableOgmiosCardanoNode.test.ts | 76 ++++++++++++++++++- .../ogmios/test/mocks/mockOgmiosServer.ts | 3 +- packages/util-dev/src/chainSync/index.ts | 3 +- 6 files changed, 130 insertions(+), 10 deletions(-) diff --git a/packages/core/src/CardanoNode/types/ObservableCardanoNode.ts b/packages/core/src/CardanoNode/types/ObservableCardanoNode.ts index ff9453ce98e..4e6a17e6def 100644 --- a/packages/core/src/CardanoNode/types/ObservableCardanoNode.ts +++ b/packages/core/src/CardanoNode/types/ObservableCardanoNode.ts @@ -1,4 +1,4 @@ -import type { Cardano } from '../..'; +import type { Cardano, HealthCheckResponse } from '../..'; import type { EraSummary } from './CardanoNode'; import type { Observable } from 'rxjs'; @@ -46,6 +46,7 @@ export interface ObservableChainSync { export interface ObservableCardanoNode { eraSummaries$: Observable; genesisParameters$: Observable; + healthCheck$: Observable; /** * Find a common point between your local state and the node. * diff --git a/packages/e2e/test/projection/offline-fork.test.ts b/packages/e2e/test/projection/offline-fork.test.ts index e16f2eed941..298e5fc6f69 100644 --- a/packages/e2e/test/projection/offline-fork.test.ts +++ b/packages/e2e/test/projection/offline-fork.test.ts @@ -106,7 +106,8 @@ const createForkProjectionSource = ( tip: someEventsWithStakeKeyRegistration[someEventsWithStakeKeyRegistration.length - 1].tip } }); - } + }, + healthCheck$: new Observable() }); describe('resuming projection when intersection is not local tip', () => { diff --git a/packages/ogmios/src/CardanoNode/OgmiosObservableCardanoNode/OgmiosObservableCardanoNode.ts b/packages/ogmios/src/CardanoNode/OgmiosObservableCardanoNode/OgmiosObservableCardanoNode.ts index c1f68dbfb1f..907379f92a5 100644 --- a/packages/ogmios/src/CardanoNode/OgmiosObservableCardanoNode/OgmiosObservableCardanoNode.ts +++ b/packages/ogmios/src/CardanoNode/OgmiosObservableCardanoNode/OgmiosObservableCardanoNode.ts @@ -3,17 +3,36 @@ import { Cardano, CardanoNodeErrors, EraSummary, + HealthCheckResponse, + Milliseconds, ObservableCardanoNode, ObservableChainSync, PointOrOrigin } from '@cardano-sdk/core'; +import { + ConnectionConfig, + createConnectionObject, + createStateQueryClient, + getServerHealth +} from '@cardano-ogmios/client'; import { InteractionContextProps, createObservableInteractionContext } from './createObservableInteractionContext'; import { Intersection, findIntersect } from '@cardano-ogmios/client/dist/ChainSync'; import { Logger } from 'ts-log'; -import { Observable, distinctUntilChanged, from, shareReplay, switchMap } from 'rxjs'; +import { + Observable, + catchError, + distinctUntilChanged, + from, + map, + of, + shareReplay, + switchMap, + throwError, + timeout +} from 'rxjs'; import { WithLogger, contextLogger } from '@cardano-sdk/util'; import { createObservableChainSyncClient } from './createObservableChainSyncClient'; -import { createStateQueryClient } from '@cardano-ogmios/client'; +import { ogmiosServerHealthToHealthCheckResponse } from '../../util'; import { ogmiosToCorePointOrOrigin, ogmiosToCoreTipOrOrigin, pointOrOriginToOgmios } from './util'; import { queryEraSummaries, queryGenesisParameters } from '../queries'; import isEqual from 'lodash/isEqual'; @@ -23,16 +42,25 @@ const ogmiosToCoreIntersection = (intersection: Intersection) => ({ tip: ogmiosToCoreTipOrOrigin(intersection.tip) }); -export type OgmiosObservableCardanoNodeProps = Omit; +const DEFAULT_HEALTH_CHECK_TIMEOUT = 2000; +export type OgmiosObservableCardanoNodeProps = Omit & { + /** + * Default: 2000ms + */ + healthCheckTimeout?: Milliseconds; +}; export class OgmiosObservableCardanoNode implements ObservableCardanoNode { + readonly #connectionConfig$: Observable; readonly #logger: Logger; readonly #interactionContext$; readonly eraSummaries$: Observable; readonly genesisParameters$: Observable; + readonly healthCheck$: Observable; constructor(props: OgmiosObservableCardanoNodeProps, { logger }: WithLogger) { + this.#connectionConfig$ = props.connectionConfig$; this.#logger = contextLogger(logger, 'ObservableOgmiosCardanoNode'); this.#interactionContext$ = createObservableInteractionContext( { @@ -43,7 +71,7 @@ export class OgmiosObservableCardanoNode implements ObservableCardanoNode { ).pipe(shareReplay({ bufferSize: 1, refCount: true })); const stateQueryClient$ = this.#interactionContext$.pipe( switchMap((interactionContext) => from(createStateQueryClient(interactionContext))), - distinctUntilChanged(isEqual), + distinctUntilChanged((a, b) => isEqual(a, b)), shareReplay({ bufferSize: 1, refCount: true }) ); this.eraSummaries$ = stateQueryClient$.pipe(switchMap((client) => from(queryEraSummaries(client, this.#logger)))); @@ -52,6 +80,22 @@ export class OgmiosObservableCardanoNode implements ObservableCardanoNode { distinctUntilChanged(isEqual), shareReplay({ bufferSize: 1, refCount: true }) ); + this.healthCheck$ = this.#connectionConfig$.pipe( + switchMap((connectionConfig) => from(getServerHealth({ connection: createConnectionObject(connectionConfig) }))), + map(ogmiosServerHealthToHealthCheckResponse), + timeout({ + first: props.healthCheckTimeout || DEFAULT_HEALTH_CHECK_TIMEOUT, + with: () => { + logger.error('healthCheck$ didnt emit within healthCheckTimeout'); + return throwError(() => new CardanoNodeErrors.CardanoClientErrors.ConnectionError()); + } + }), + catchError((error) => { + this.#logger.error(error); + return of({ ok: false }); + }), + shareReplay({ bufferSize: 1, refCount: true }) + ); } /** diff --git a/packages/ogmios/test/CardanoNode/ObservableOgmiosCardanoNode.test.ts b/packages/ogmios/test/CardanoNode/ObservableOgmiosCardanoNode.test.ts index 2a72a65036a..a54cdf25fbd 100644 --- a/packages/ogmios/test/CardanoNode/ObservableOgmiosCardanoNode.test.ts +++ b/packages/ogmios/test/CardanoNode/ObservableOgmiosCardanoNode.test.ts @@ -1,6 +1,6 @@ /* eslint-disable sonarjs/no-duplicate-string */ +import { CardanoNodeErrors, Milliseconds } from '@cardano-sdk/core'; import { Connection, createConnectionObject } from '@cardano-ogmios/client'; -import { Milliseconds } from '@cardano-sdk/core'; import { MockOgmiosServerConfig, createMockOgmiosServer, @@ -10,7 +10,7 @@ import { waitForWsClientsDisconnect } from '../mocks/mockOgmiosServer'; import { OgmiosObservableCardanoNode } from '../../src'; -import { combineLatest, firstValueFrom, mergeMap, of, take, toArray } from 'rxjs'; +import { combineLatest, delay as delayEmission, firstValueFrom, mergeMap, of, take, toArray } from 'rxjs'; import { getRandomPort } from 'get-port-please'; import { logger } from '@cardano-sdk/util-dev'; import delay from 'delay'; @@ -128,4 +128,76 @@ describe('ObservableOgmiosCardanoNode', () => { expect(epochLength2).toEqual(mockGenesisConfig.epochLength + 1); await serverClosePromise(server); }); + describe('healthCheck', () => { + it('is ok if node is close to the network tip', async () => { + const server = await startMockOgmiosServer(connection.port, { + healthCheck: { response: { networkSynchronization: 0.999, success: true } } + }); + const node = new OgmiosObservableCardanoNode({ connectionConfig$: of(connection) }, { logger }); + const res = await firstValueFrom(node.healthCheck$); + expect(res.ok).toBe(true); + await serverClosePromise(server); + }); + it('simultaneous healthChecks share a single health request to ogmios', async () => { + const server = await startMockOgmiosServer(connection.port, { + healthCheck: { response: { networkSynchronization: 0.999, success: true } } + }); + const node = new OgmiosObservableCardanoNode({ connectionConfig$: of(connection) }, { logger }); + const [res1, res2] = await firstValueFrom(combineLatest([node.healthCheck$, node.healthCheck$])); + expect(res1.ok).toBe(true); + expect(res1).toEqual(res2); + expect(server.invocations.health).toBe(1); + await serverClosePromise(server); + }); + it('is not ok if node is not close to the network tip', async () => { + const server = await startMockOgmiosServer(connection.port, { + healthCheck: { response: { networkSynchronization: 0.8, success: true } } + }); + const node = new OgmiosObservableCardanoNode({ connectionConfig$: of(connection) }, { logger }); + const res = await firstValueFrom(node.healthCheck$); + expect(res.ok).toBe(false); + await serverClosePromise(server); + }); + it('is not ok when ogmios responds with an unknown result', async () => { + const server = await startMockOgmiosServer(connection.port, { + healthCheck: { + response: { failWith: new CardanoNodeErrors.CardanoClientErrors.UnknownResultError(''), success: false } + } + }); + const node = new OgmiosObservableCardanoNode({ connectionConfig$: of(connection) }, { logger }); + const res = await firstValueFrom(node.healthCheck$); + expect(res.ok).toBe(false); + await serverClosePromise(server); + }); + it('is not ok when connection is refused', async () => { + const node = new OgmiosObservableCardanoNode({ connectionConfig$: of(connection) }, { logger }); + // Server is not started + const result = await firstValueFrom(node.healthCheck$); + expect(result.ok).toBe(false); + }); + it('is ok when connectionConfig$ emits within "healthCheckTimeout" duration', async () => { + const server = await startMockOgmiosServer(connection.port, { + healthCheck: { response: { networkSynchronization: 0.999, success: true } } + }); + const node = new OgmiosObservableCardanoNode( + { connectionConfig$: of(connection).pipe(delayEmission(25)), healthCheckTimeout: Milliseconds(50) }, + { logger } + ); + const result = await firstValueFrom(node.healthCheck$); + expect(result.ok).toBe(true); + await serverClosePromise(server); + }); + it('is not ok when connectionConfig$ takes longer than "healthCheckTimeout" to emit', async () => { + const server = await startMockOgmiosServer(connection.port, { + healthCheck: { response: { networkSynchronization: 0.999, success: true } } + }); + const node = new OgmiosObservableCardanoNode( + { connectionConfig$: of(connection).pipe(delayEmission(50)), healthCheckTimeout: Milliseconds(25) }, + { logger } + ); + const result = await firstValueFrom(node.healthCheck$); + expect(result.ok).toBe(false); + await serverClosePromise(server); + }); + }); }); diff --git a/packages/ogmios/test/mocks/mockOgmiosServer.ts b/packages/ogmios/test/mocks/mockOgmiosServer.ts index ed69e9ea418..e124e47357a 100644 --- a/packages/ogmios/test/mocks/mockOgmiosServer.ts +++ b/packages/ogmios/test/mocks/mockOgmiosServer.ts @@ -228,7 +228,7 @@ const handleQuery = async (query: string, config: MockOgmiosServerConfig, send: send(result); }; -export type MockServer = Server & { wss: WebSocket.Server }; +export type MockServer = Server & { wss: WebSocket.Server; invocations: InvocationState }; // eslint-disable-next-line sonarjs/cognitive-complexity export const createMockOgmiosServer = (config: MockOgmiosServerConfig): MockServer => { @@ -306,6 +306,7 @@ export const createMockOgmiosServer = (config: MockOgmiosServerConfig): MockServ }); }); (server as any).wss = wss; + (server as any).invocations = invocations; return server as MockServer; }; diff --git a/packages/util-dev/src/chainSync/index.ts b/packages/util-dev/src/chainSync/index.ts index 192e73b1f02..1cc17e9c4b5 100644 --- a/packages/util-dev/src/chainSync/index.ts +++ b/packages/util-dev/src/chainSync/index.ts @@ -124,7 +124,8 @@ export const chainSyncData = memoize((dataSet: ChainSyncDataSet) => { intersection }); }, - genesisParameters$: of(compactGenesis) + genesisParameters$: of(compactGenesis), + healthCheck$: new Observable() }; return { allEvents,