Skip to content

Commit

Permalink
feat: add healthCheck$ to ObservableCardanoNode
Browse files Browse the repository at this point in the history
  • Loading branch information
rhyslbw authored and mkazlauskas committed Apr 25, 2023
1 parent 5a3782a commit df35035
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 10 deletions.
3 changes: 2 additions & 1 deletion packages/core/src/CardanoNode/types/ObservableCardanoNode.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Cardano } from '../..';
import type { Cardano, HealthCheckResponse } from '../..';
import type { EraSummary } from './CardanoNode';
import type { Observable } from 'rxjs';

Expand Down Expand Up @@ -46,6 +46,7 @@ export interface ObservableChainSync {
export interface ObservableCardanoNode {
eraSummaries$: Observable<EraSummary[]>;
genesisParameters$: Observable<Cardano.CompactGenesis>;
healthCheck$: Observable<HealthCheckResponse>;
/**
* Find a common point between your local state and the node.
*
Expand Down
3 changes: 2 additions & 1 deletion packages/e2e/test/projection/offline-fork.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ const createForkProjectionSource = (
tip: someEventsWithStakeKeyRegistration[someEventsWithStakeKeyRegistration.length - 1].tip
}
});
}
},
healthCheck$: new Observable()
});

describe('resuming projection when intersection is not local tip', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,36 @@ import {
Cardano,
CardanoNodeErrors,
EraSummary,
HealthCheckResponse,
Milliseconds,
ObservableCardanoNode,
ObservableChainSync,
PointOrOrigin
} from '@cardano-sdk/core';
import {
ConnectionConfig,
createConnectionObject,
createStateQueryClient,
getServerHealth
} from '@cardano-ogmios/client';
import { InteractionContextProps, createObservableInteractionContext } from './createObservableInteractionContext';
import { Intersection, findIntersect } from '@cardano-ogmios/client/dist/ChainSync';
import { Logger } from 'ts-log';
import { Observable, distinctUntilChanged, from, shareReplay, switchMap } from 'rxjs';
import {
Observable,
catchError,
distinctUntilChanged,
from,
map,
of,
shareReplay,
switchMap,
throwError,
timeout
} from 'rxjs';
import { WithLogger, contextLogger } from '@cardano-sdk/util';
import { createObservableChainSyncClient } from './createObservableChainSyncClient';
import { createStateQueryClient } from '@cardano-ogmios/client';
import { ogmiosServerHealthToHealthCheckResponse } from '../../util';
import { ogmiosToCorePointOrOrigin, ogmiosToCoreTipOrOrigin, pointOrOriginToOgmios } from './util';
import { queryEraSummaries, queryGenesisParameters } from '../queries';
import isEqual from 'lodash/isEqual';
Expand All @@ -23,16 +42,25 @@ const ogmiosToCoreIntersection = (intersection: Intersection) => ({
tip: ogmiosToCoreTipOrOrigin(intersection.tip)
});

export type OgmiosObservableCardanoNodeProps = Omit<InteractionContextProps, 'interactionType'>;
const DEFAULT_HEALTH_CHECK_TIMEOUT = 2000;
export type OgmiosObservableCardanoNodeProps = Omit<InteractionContextProps, 'interactionType'> & {
/**
* Default: 2000ms
*/
healthCheckTimeout?: Milliseconds;
};

export class OgmiosObservableCardanoNode implements ObservableCardanoNode {
readonly #connectionConfig$: Observable<ConnectionConfig>;
readonly #logger: Logger;
readonly #interactionContext$;

readonly eraSummaries$: Observable<EraSummary[]>;
readonly genesisParameters$: Observable<Cardano.CompactGenesis>;
readonly healthCheck$: Observable<HealthCheckResponse>;

constructor(props: OgmiosObservableCardanoNodeProps, { logger }: WithLogger) {
this.#connectionConfig$ = props.connectionConfig$;
this.#logger = contextLogger(logger, 'ObservableOgmiosCardanoNode');
this.#interactionContext$ = createObservableInteractionContext(
{
Expand All @@ -43,7 +71,7 @@ export class OgmiosObservableCardanoNode implements ObservableCardanoNode {
).pipe(shareReplay({ bufferSize: 1, refCount: true }));
const stateQueryClient$ = this.#interactionContext$.pipe(
switchMap((interactionContext) => from(createStateQueryClient(interactionContext))),
distinctUntilChanged(isEqual),
distinctUntilChanged((a, b) => isEqual(a, b)),
shareReplay({ bufferSize: 1, refCount: true })
);
this.eraSummaries$ = stateQueryClient$.pipe(switchMap((client) => from(queryEraSummaries(client, this.#logger))));
Expand All @@ -52,6 +80,22 @@ export class OgmiosObservableCardanoNode implements ObservableCardanoNode {
distinctUntilChanged(isEqual),
shareReplay({ bufferSize: 1, refCount: true })
);
this.healthCheck$ = this.#connectionConfig$.pipe(
switchMap((connectionConfig) => from(getServerHealth({ connection: createConnectionObject(connectionConfig) }))),
map(ogmiosServerHealthToHealthCheckResponse),
timeout({
first: props.healthCheckTimeout || DEFAULT_HEALTH_CHECK_TIMEOUT,
with: () => {
logger.error('healthCheck$ didnt emit within healthCheckTimeout');
return throwError(() => new CardanoNodeErrors.CardanoClientErrors.ConnectionError());
}
}),
catchError((error) => {
this.#logger.error(error);
return of({ ok: false });
}),
shareReplay({ bufferSize: 1, refCount: true })
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* eslint-disable sonarjs/no-duplicate-string */
import { CardanoNodeErrors, Milliseconds } from '@cardano-sdk/core';
import { Connection, createConnectionObject } from '@cardano-ogmios/client';
import { Milliseconds } from '@cardano-sdk/core';
import {
MockOgmiosServerConfig,
createMockOgmiosServer,
Expand All @@ -10,7 +10,7 @@ import {
waitForWsClientsDisconnect
} from '../mocks/mockOgmiosServer';
import { OgmiosObservableCardanoNode } from '../../src';
import { combineLatest, firstValueFrom, mergeMap, of, take, toArray } from 'rxjs';
import { combineLatest, delay as delayEmission, firstValueFrom, mergeMap, of, take, toArray } from 'rxjs';
import { getRandomPort } from 'get-port-please';
import { logger } from '@cardano-sdk/util-dev';
import delay from 'delay';
Expand Down Expand Up @@ -128,4 +128,76 @@ describe('ObservableOgmiosCardanoNode', () => {
expect(epochLength2).toEqual(mockGenesisConfig.epochLength + 1);
await serverClosePromise(server);
});
describe('healthCheck', () => {
it('is ok if node is close to the network tip', async () => {
const server = await startMockOgmiosServer(connection.port, {
healthCheck: { response: { networkSynchronization: 0.999, success: true } }
});
const node = new OgmiosObservableCardanoNode({ connectionConfig$: of(connection) }, { logger });
const res = await firstValueFrom(node.healthCheck$);
expect(res.ok).toBe(true);
await serverClosePromise(server);
});
it('simultaneous healthChecks share a single health request to ogmios', async () => {
const server = await startMockOgmiosServer(connection.port, {
healthCheck: { response: { networkSynchronization: 0.999, success: true } }
});
const node = new OgmiosObservableCardanoNode({ connectionConfig$: of(connection) }, { logger });
const [res1, res2] = await firstValueFrom(combineLatest([node.healthCheck$, node.healthCheck$]));
expect(res1.ok).toBe(true);
expect(res1).toEqual(res2);
expect(server.invocations.health).toBe(1);
await serverClosePromise(server);
});
it('is not ok if node is not close to the network tip', async () => {
const server = await startMockOgmiosServer(connection.port, {
healthCheck: { response: { networkSynchronization: 0.8, success: true } }
});
const node = new OgmiosObservableCardanoNode({ connectionConfig$: of(connection) }, { logger });
const res = await firstValueFrom(node.healthCheck$);
expect(res.ok).toBe(false);
await serverClosePromise(server);
});
it('is not ok when ogmios responds with an unknown result', async () => {
const server = await startMockOgmiosServer(connection.port, {
healthCheck: {
response: { failWith: new CardanoNodeErrors.CardanoClientErrors.UnknownResultError(''), success: false }
}
});
const node = new OgmiosObservableCardanoNode({ connectionConfig$: of(connection) }, { logger });
const res = await firstValueFrom(node.healthCheck$);
expect(res.ok).toBe(false);
await serverClosePromise(server);
});
it('is not ok when connection is refused', async () => {
const node = new OgmiosObservableCardanoNode({ connectionConfig$: of(connection) }, { logger });
// Server is not started
const result = await firstValueFrom(node.healthCheck$);
expect(result.ok).toBe(false);
});
it('is ok when connectionConfig$ emits within "healthCheckTimeout" duration', async () => {
const server = await startMockOgmiosServer(connection.port, {
healthCheck: { response: { networkSynchronization: 0.999, success: true } }
});
const node = new OgmiosObservableCardanoNode(
{ connectionConfig$: of(connection).pipe(delayEmission(25)), healthCheckTimeout: Milliseconds(50) },
{ logger }
);
const result = await firstValueFrom(node.healthCheck$);
expect(result.ok).toBe(true);
await serverClosePromise(server);
});
it('is not ok when connectionConfig$ takes longer than "healthCheckTimeout" to emit', async () => {
const server = await startMockOgmiosServer(connection.port, {
healthCheck: { response: { networkSynchronization: 0.999, success: true } }
});
const node = new OgmiosObservableCardanoNode(
{ connectionConfig$: of(connection).pipe(delayEmission(50)), healthCheckTimeout: Milliseconds(25) },
{ logger }
);
const result = await firstValueFrom(node.healthCheck$);
expect(result.ok).toBe(false);
await serverClosePromise(server);
});
});
});
3 changes: 2 additions & 1 deletion packages/ogmios/test/mocks/mockOgmiosServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ const handleQuery = async (query: string, config: MockOgmiosServerConfig, send:
send(result);
};

export type MockServer = Server & { wss: WebSocket.Server };
export type MockServer = Server & { wss: WebSocket.Server; invocations: InvocationState };

// eslint-disable-next-line sonarjs/cognitive-complexity
export const createMockOgmiosServer = (config: MockOgmiosServerConfig): MockServer => {
Expand Down Expand Up @@ -306,6 +306,7 @@ export const createMockOgmiosServer = (config: MockOgmiosServerConfig): MockServ
});
});
(server as any).wss = wss;
(server as any).invocations = invocations;
return server as MockServer;
};

Expand Down
3 changes: 2 additions & 1 deletion packages/util-dev/src/chainSync/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ export const chainSyncData = memoize((dataSet: ChainSyncDataSet) => {
intersection
});
},
genesisParameters$: of(compactGenesis)
genesisParameters$: of(compactGenesis),
healthCheck$: new Observable()
};
return {
allEvents,
Expand Down

0 comments on commit df35035

Please sign in to comment.