Skip to content

Commit

Permalink
feat: add a buffer after reading blocks from ogmios
Browse files Browse the repository at this point in the history
  • Loading branch information
iccicci committed Aug 14, 2023
1 parent 0c988e3 commit 0095c80
Show file tree
Hide file tree
Showing 20 changed files with 109 additions and 60 deletions.
1 change: 1 addition & 0 deletions compose/common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ x-with-postgres: &with-postgres

x-projector-environment: &projector-environment
API_URL: http://0.0.0.0:3000
BLOCKS_BUFFER_LENGTH: ${BLOCKS_BUFFER_LENGTH:-10}
DROP_SCHEMA: ${DROP_PROJECTOR_SCHEMA:-false}
POSTGRES_HOST: postgres
POSTGRES_POOL_MAX: ${POSTGRES_POOL_MAX:-10}
Expand Down
1 change: 1 addition & 0 deletions packages/cardano-services/.env.test
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
POSTGRES_CONNECTION_STRING_DB_SYNC=postgresql://postgres:doNoUseThisSecret!@127.0.0.1:5433/cexplorer
POSTGRES_CONNECTION_STRING_HANDLE=postgresql://postgres:doNoUseThisSecret!@127.0.0.1:5433/handle
POSTGRES_CONNECTION_STRING_PROJECTION=postgresql://postgres:doNoUseThisSecret!@127.0.0.1:5433/projection
POSTGRES_CONNECTION_STRING_STAKE_POOL=postgresql://postgres:doNoUseThisSecret!@127.0.0.1:5433/stake_pool
CARDANO_NODE_CONFIG_PATH=./config/network/testnet/cardano-node/config.json
DB_CACHE_TTL=120
Expand Down
6 changes: 5 additions & 1 deletion packages/cardano-services/src/Program/programs/projector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ import { URL } from 'url';
import { createLogger } from 'bunyan';
import { getConnectionConfig, getOgmiosObservableCardanoNode } from '../services';

export const BLOCKS_BUFFER_LENGTH_DEFAULT = 10;
export const PROJECTOR_API_URL_DEFAULT = new URL('http://localhost:3002');

export type ProjectorArgs = CommonProgramOptions &
PosgresProgramOptions<''> &
HandlePolicyIdsProgramOptions &
OgmiosProgramOptions & {
blocksBufferLength: number;
dropSchema: boolean;
dryRun: boolean;
exitAtBlockNo: Cardano.BlockNo;
Expand Down Expand Up @@ -50,8 +52,9 @@ const createProjectionHttpService = async (options: ProjectionMapFactoryOptions)
});
const connectionConfig$ = getConnectionConfig(dnsResolver, 'projector', '', args);
const buffer = new TypeormStabilityWindowBuffer({ logger });
const { dropSchema, dryRun, exitAtBlockNo, projectionNames, synchronize, handlePolicyIds } = args;
const { blocksBufferLength, dropSchema, dryRun, exitAtBlockNo, handlePolicyIds, projectionNames, synchronize } = args;
const projection$ = createTypeormProjection({
blocksBufferLength,
buffer,
connectionConfig$,
devOptions: { dropSchema, synchronize },
Expand All @@ -61,6 +64,7 @@ const createProjectionHttpService = async (options: ProjectionMapFactoryOptions)
handlePolicyIds
},
projectionSource$: Bootstrap.fromCardanoNode({
blocksBufferLength,
buffer,
cardanoNode,
logger
Expand Down
1 change: 1 addition & 0 deletions packages/cardano-services/src/Program/programs/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export enum ServiceNames {
export const POOLS_METRICS_INTERVAL_DEFAULT = 1000;

export enum ProjectorOptionDescriptions {
BlocksBufferLength = 'Chain sync event (blocks) buffer length',
DropSchema = 'Drop and recreate database schema to project from origin',
DryRun = 'Initialize the projection, but do not start it',
ExitAtBlockNo = 'Exit after processing this block. Intended for benchmark testing',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { passthrough, shareRetryBackoff } from '@cardano-sdk/util-rxjs';

export interface CreateTypeormProjectionProps {
projections: ProjectionName[];
blocksBufferLength: number;
buffer?: TypeormStabilityWindowBuffer;
projectionSource$: Observable<ProjectionEvent>;
connectionConfig$: Observable<PgConnectionConfig>;
Expand All @@ -51,7 +52,10 @@ export const createObservableDataSource = ({
entities,
extensions,
migrationsRun
}: Omit<CreateTypeormProjectionProps, 'exitAtBlockNo' | 'projections' | 'projectionSource$' | 'projectionOptions'> &
}: Omit<
CreateTypeormProjectionProps,
'blocksBufferLength' | 'exitAtBlockNo' | 'projections' | 'projectionSource$' | 'projectionOptions'
> &
Pick<PreparedProjection, 'entities' | 'extensions'> & { migrationsRun: boolean }) =>
connectionConfig$.pipe(
switchMap((connectionConfig) =>
Expand Down Expand Up @@ -89,6 +93,7 @@ export const createObservableDataSource = ({
* Dependencies of each projection are defined in ./prepareTypeormProjection.ts
*/
export const createTypeormProjection = ({
blocksBufferLength,
projections,
projectionSource$,
connectionConfig$,
Expand All @@ -101,6 +106,7 @@ export const createTypeormProjection = ({
const { handlePolicyIds } = { handlePolicyIds: [], ...projectionOptions };

logger.debug(`Creating projection with policyIds ${JSON.stringify(handlePolicyIds)}`);
logger.debug(`Using a ${blocksBufferLength} blocks buffer`);

const { mappers, entities, stores, extensions } = prepareTypeormProjection({
buffer,
Expand Down
13 changes: 12 additions & 1 deletion packages/cardano-services/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ import {
loadProviderServer,
stringOptionToBoolean
} from './Program';
import {
BLOCKS_BUFFER_LENGTH_DEFAULT,
PROJECTOR_API_URL_DEFAULT,
ProjectorArgs,
loadProjector
} from './Program/programs/projector';
import { Command, Option } from 'commander';
import { DB_CACHE_TTL_DEFAULT } from './InMemoryCache';
import {
Expand All @@ -58,7 +64,6 @@ import {
PgBossWorkerOptionDescriptions,
loadPgBossWorker
} from './Program/programs/pgBossWorker';
import { PROJECTOR_API_URL_DEFAULT, ProjectorArgs, loadProjector } from './Program/programs/projector';
import { PgBossQueue, isValidQueue } from './PgBoss';
import { ProjectionName } from './Projection';
import { URL } from 'url';
Expand Down Expand Up @@ -124,6 +129,12 @@ withCommonOptions(
),
{ apiUrl: PROJECTOR_API_URL_DEFAULT }
)
.addOption(
new Option('--blocks-buffer-length <blocksBufferLength>', ProjectorOptionDescriptions.BlocksBufferLength)
.default(BLOCKS_BUFFER_LENGTH_DEFAULT)
.env('BLOCKS_BUFFER_LENGTH')
.argParser((blocksBufferLength) => Number.parseInt(blocksBufferLength, 10))
)
.addOption(
new Option('--drop-schema <true/false>', ProjectorOptionDescriptions.DropSchema)
.default(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ describe('createTypeormProjection', () => {
const buffer = new TypeormStabilityWindowBuffer({ allowNonSequentialBlockHeights: true, logger });
const projections = [ProjectionName.UTXO];
const projection$ = createTypeormProjection({
blocksBufferLength: 10,
buffer,
connectionConfig$: projectorConnectionConfig$,
devOptions: { dropSchema: true, synchronize: true },
logger,
projectionSource$: Bootstrap.fromCardanoNode({
blocksBufferLength: 10,
buffer,
cardanoNode: data.cardanoNode,
logger
Expand Down
46 changes: 35 additions & 11 deletions packages/cardano-services/test/cli.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,13 @@ describe('CLI', () => {
let lastBlock: LedgerTipModel;
let postgresConnectionString: string;
let postgresConnectionStringHandle: string;
let postgresConnectionStringProjection: string;
let postgresConnectionStringStakePool: string;

beforeAll(() => {
postgresConnectionString = process.env.POSTGRES_CONNECTION_STRING_DB_SYNC!;
postgresConnectionStringHandle = process.env.POSTGRES_CONNECTION_STRING_HANDLE!;
postgresConnectionStringProjection = process.env.POSTGRES_CONNECTION_STRING_PROJECTION!;
postgresConnectionStringStakePool = process.env.POSTGRES_CONNECTION_STRING_STAKE_POOL!;
});

Expand Down Expand Up @@ -2176,7 +2178,7 @@ describe('CLI', () => {
'--ogmios-url',
'ws://localhost:1234',
'--postgres-connection-string',
'postgresql://postgres:doNoUseThisSecret!@127.0.0.1:5432/projection',
postgresConnectionStringProjection,
ProjectionName.UTXO
]);
await assertServerAlive();
Expand All @@ -2187,7 +2189,7 @@ describe('CLI', () => {
'--ogmios-url',
'ws://localhost:1234',
'--postgres-connection-string',
'postgresql://postgres:doNoUseThisSecret!@127.0.0.1:5432/projection',
postgresConnectionStringProjection,
`${ProjectionName.UTXO},${ProjectionName.StakePool}`
]);
await assertServerAlive();
Expand All @@ -2200,7 +2202,7 @@ describe('CLI', () => {
'--ogmios-url',
'ws://localhost:1234',
'--postgres-connection-string',
'postgresql://postgres:doNoUseThisSecret!@127.0.0.1:5432/projection'
postgresConnectionStringProjection
]);
await assertServerAlive();
});
Expand All @@ -2210,7 +2212,7 @@ describe('CLI', () => {
'--ogmios-url',
'ws://localhost:1234',
'--postgres-connection-string',
'postgresql://postgres:doNoUseThisSecret!@127.0.0.1:5432/projection',
postgresConnectionStringProjection,
'--drop-schema',
'true',
ProjectionName.UTXO
Expand Down Expand Up @@ -2243,7 +2245,7 @@ describe('CLI', () => {
'--ogmios-url',
'ws://localhost:1234',
'--postgres-connection-string',
'postgresql://postgres:doNoUseThisSecret!@127.0.0.1:5432/projection',
postgresConnectionStringProjection,
'--handle-policy-ids',
'f0ff48bbb7bbe9d59a40f1ce90e9e9d0ff5002ec48f232b49ca0fb9a',
ProjectionName.Handle
Expand All @@ -2258,7 +2260,7 @@ describe('CLI', () => {
'--ogmios-url',
'ws://localhost:1234',
'--postgres-connection-string',
'postgresql://postgres:doNoUseThisSecret!@127.0.0.1:5432/projection',
postgresConnectionStringProjection,
'--logger-min-severity',
'debug',
'--handle-policy-ids-file',
Expand All @@ -2285,7 +2287,7 @@ describe('CLI', () => {
'--ogmios-url',
'ws://localhost:1234',
'--postgres-connection-string',
'postgresql://postgres:doNoUseThisSecret!@127.0.0.1:5432/projection',
postgresConnectionStringProjection,
ProjectionName.Handle
],
{ env: {}, stdio: 'pipe' }
Expand Down Expand Up @@ -2313,7 +2315,7 @@ describe('CLI', () => {
'--ogmios-url',
'ws://localhost:1234',
'--postgres-connection-string',
'postgresql://postgres:doNoUseThisSecret!@127.0.0.1:5432/projection',
postgresConnectionStringProjection,
'--handle-policy-ids',
'policyId',
ProjectionName.Handle
Expand All @@ -2330,6 +2332,28 @@ describe('CLI', () => {
done();
});
});

test('uses the configured blocks buffer length', async () => {
const chunks: string[] = [];

startProjector([
'--ogmios-url',
'ws://localhost:1234',
'--postgres-connection-string',
postgresConnectionStringProjection,
'--logger-min-severity',
'debug',
'--blocks-buffer-length',
'23',
ProjectionName.StakePool
]);

proc.stdout?.on('data', (data: Buffer) => chunks.push(data.toString('utf8')));

await assertServerAlive();

expect(chunks.join('')).toMatch('Using a 23 blocks buffer');
});
});

describe('with environment variables', () => {
Expand All @@ -2355,7 +2379,7 @@ describe('CLI', () => {
startProjector(
{
OGMIOS_URL: 'ws://localhost:1234',
POSTGRES_CONNECTION_STRING: 'postgresql://postgres:doNoUseThisSecret!@127.0.0.1:5432/projection'
POSTGRES_CONNECTION_STRING: postgresConnectionStringProjection
},
[ProjectionName.UTXO]
);
Expand All @@ -2366,7 +2390,7 @@ describe('CLI', () => {
startProjector(
{
OGMIOS_URL: 'ws://localhost:1234',
POSTGRES_CONNECTION_STRING: 'postgresql://postgres:doNoUseThisSecret!@127.0.0.1:5432/projection'
POSTGRES_CONNECTION_STRING: postgresConnectionStringProjection
},
[`${ProjectionName.UTXO},${ProjectionName.StakePool}`]
);
Expand All @@ -2376,7 +2400,7 @@ describe('CLI', () => {
test('with multiple projections as --projection-names argument', async () => {
startProjector({
OGMIOS_URL: 'ws://localhost:1234',
POSTGRES_CONNECTION_STRING: 'postgresql://postgres:doNoUseThisSecret!@127.0.0.1:5432/projection',
POSTGRES_CONNECTION_STRING: postgresConnectionStringProjection,
PROJECTION_NAMES: `${ProjectionName.UTXO},${ProjectionName.StakePool}`
});
await assertServerAlive();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ describe('bufferChainSyncEvent', () => {
consumer.consumeTill(3);
await sleep(10);
producer.produceTill(16);
await sleep(10);
await sleep(20);
consumer.consumeTill(4);
await sleep(10);
};
Expand Down
2 changes: 1 addition & 1 deletion packages/e2e/test/projection/offline-fork.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ describe('resuming projection when intersection is not local tip', () => {
buffer: StabilityWindowBuffer,
into: ProjectionOperator<Mappers.WithStakeKeys>
) =>
Bootstrap.fromCardanoNode({ buffer, cardanoNode, logger }).pipe(
Bootstrap.fromCardanoNode({ blocksBufferLength: 10, buffer, cardanoNode, logger }).pipe(
Mappers.withCertificates(),
Mappers.withStakeKeys(),
into,
Expand Down
4 changes: 2 additions & 2 deletions packages/e2e/test/projection/single-tenant-utxo.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ describe('single-tenant utxo projection', () => {
afterEach(async () => cleanup());

const projectMultiTenant = () =>
Bootstrap.fromCardanoNode({ buffer, cardanoNode, logger }).pipe(
Bootstrap.fromCardanoNode({ blocksBufferLength: 10, buffer, cardanoNode, logger }).pipe(
Mappers.withMint(),
Mappers.withUtxo(),
Postgres.withTypeormTransaction({ dataSource$: of(dataSource), logger }),
Expand All @@ -123,7 +123,7 @@ describe('single-tenant utxo projection', () => {
);

const projectSingleTenant = (addresses: Cardano.PaymentAddress[]) =>
Bootstrap.fromCardanoNode({ buffer, cardanoNode, logger }).pipe(
Bootstrap.fromCardanoNode({ blocksBufferLength: 10, buffer, cardanoNode, logger }).pipe(
Mappers.withMint(),
Mappers.withUtxo(),
Mappers.filterProducedUtxoByAddresses({ addresses }),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ describe('TypeormStabilityWindowBuffer', () => {
await buffer.initialize(queryRunner);
project$ = defer(() =>
Bootstrap.fromCardanoNode({
blocksBufferLength: 10,
buffer,
cardanoNode: {
...cardanoNode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@ describe('storeAssets', () => {
const entities = [BlockEntity, BlockDataEntity, AssetEntity];

const project$ = () =>
Bootstrap.fromCardanoNode({
buffer,
cardanoNode: stubEvents.cardanoNode,
logger
}).pipe(
Bootstrap.fromCardanoNode({ blocksBufferLength: 10, buffer, cardanoNode: stubEvents.cardanoNode, logger }).pipe(
Mappers.withMint(),
withTypeormTransaction({
dataSource$,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,9 @@ describe('storeHandles', () => {
);

const project$ = () =>
Bootstrap.fromCardanoNode({
buffer,
cardanoNode: stubEvents.cardanoNode,
logger
}).pipe(applyOperators());
Bootstrap.fromCardanoNode({ blocksBufferLength: 10, buffer, cardanoNode: stubEvents.cardanoNode, logger }).pipe(
applyOperators()
);

const projectTilFirst = createProjectorTilFirst(project$);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,7 @@ describe('storeStakePoolMetadataJob', () => {
let queryRunner: QueryRunner;
let buffer: TypeormStabilityWindowBuffer;
const project$ = () =>
Bootstrap.fromCardanoNode({
buffer,
cardanoNode: stubEvents.cardanoNode,
logger
}).pipe(
Bootstrap.fromCardanoNode({ blocksBufferLength: 10, buffer, cardanoNode: stubEvents.cardanoNode, logger }).pipe(
// skipping 1st event because it's not rolled back
filter((evt) => {
const SKIP = 32_159;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,7 @@ describe('storeStakePools', () => {
let queryRunner: QueryRunner;
let buffer: TypeormStabilityWindowBuffer;
const project = () =>
Bootstrap.fromCardanoNode({
buffer,
cardanoNode: data.cardanoNode,
logger
}).pipe(
Bootstrap.fromCardanoNode({ blocksBufferLength: 10, buffer, cardanoNode: data.cardanoNode, logger }).pipe(
Mappers.withCertificates(),
Mappers.withStakePools(),
withTypeormTransaction({ dataSource$: of(dataSource), logger }),
Expand Down
6 changes: 1 addition & 5 deletions packages/projection-typeorm/test/operators/storeUtxo.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@ describe('storeUtxo', () => {
const entities = [BlockEntity, BlockDataEntity, AssetEntity, TokensEntity, OutputEntity];

const project$ = () =>
Bootstrap.fromCardanoNode({
buffer,
cardanoNode: stubEvents.cardanoNode,
logger
}).pipe(
Bootstrap.fromCardanoNode({ blocksBufferLength: 10, buffer, cardanoNode: stubEvents.cardanoNode, logger }).pipe(
Mappers.withMint(),
Mappers.withUtxo(),
withTypeormTransaction({
Expand Down
Loading

0 comments on commit 0095c80

Please sign in to comment.