Skip to content

Commit

Permalink
feat: Surface provisioning failures to user (#1002)
Browse files Browse the repository at this point in the history
This PR updates provisioning such that user-related errors are written
to the user-owned logs table, allowing them to debug issues with their
schema.

Logging to the user table is tricky, since this table is created
_during_ the provisioning step itself. Therefore, I have split
provisioning in to two phases:
1. System Resources - Setups up system related entities: database,
schema, logs table/jobs etc.
2. User Resources - Applies user schema, configures Hasura etc.

This separation allows us to isolate the tasks which are likely to fail
due to user error, and therefore only surface errors which are relevant.

The creation of the logs table _should always succeed_, if it doesn't
there is something wrong with the system, i.e. some form of bug has been
introduced. Errors thrown during the System portion of provisioning will
be error logged to the machine, and I will tune the existing alert so
that we are notified of these errors.

Additionally, I have converted all non-critical error logs to warnings,
so that we don't get alerted on non-issues.

closes: #901
  • Loading branch information
morgsmccauley authored Aug 9, 2024
1 parent 5e30e61 commit 44bc2eb
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 44 deletions.
5 changes: 3 additions & 2 deletions runner/src/indexer-config/indexer-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ export class ProvisioningConfig extends BaseConfig {
constructor (
public readonly accountId: string,
public readonly functionName: string,
public readonly schema: string
public readonly schema: string,
public readonly logLevel: LogLevel = LogLevel.INFO
) {
super(accountId, functionName);
}
Expand Down Expand Up @@ -101,7 +102,7 @@ export default class IndexerConfig extends ProvisioningConfig {
public readonly schema: string,
public readonly logLevel: LogLevel
) {
super(accountId, functionName, schema);
super(accountId, functionName, schema, logLevel);
const hash = crypto.createHash('sha256');
hash.update(`${accountId}/${functionName}`);
this.executorId = hash.digest('hex');
Expand Down
6 changes: 3 additions & 3 deletions runner/src/indexer-meta/indexer-meta.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import PgClient, { type PostgresConnectionParams } from '../pg-client';
import { trace } from '@opentelemetry/api';
import type LogEntry from './log-entry';
import { LogLevel } from './log-entry';
import type IndexerConfig from '../indexer-config';
import { type ProvisioningConfig } from '../indexer-config/indexer-config';

export enum IndexerStatus {
PROVISIONING = 'PROVISIONING',
Expand All @@ -29,11 +29,11 @@ export default class IndexerMeta implements IndexerMetaInterface {
tracer = trace.getTracer('queryapi-runner-indexer-logger');

private readonly pgClient: PgClient;
private readonly indexerConfig: IndexerConfig;
private readonly indexerConfig: ProvisioningConfig;
private readonly logInsertQueryTemplate: string = 'INSERT INTO %I.sys_logs (block_height, date, timestamp, type, level, message) VALUES %L';

constructor (
indexerConfig: IndexerConfig,
indexerConfig: ProvisioningConfig,
databaseConnectionParameters: PostgresConnectionParams,
pgClientInstance: PgClient | undefined = undefined,
) {
Expand Down
8 changes: 7 additions & 1 deletion runner/src/provisioner/provisioner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,13 @@ describe('Provisioner', () => {
};
});

provisioner = new Provisioner(hasuraClient, adminPgClient, cronPgClient, undefined, crypto, pgFormat, PgClient as any, testingRetryConfig);
const IndexerMeta = jest.fn().mockImplementation(() => {
return {
writeLogs: jest.fn()
};
});

provisioner = new Provisioner(hasuraClient, adminPgClient, cronPgClient, undefined, crypto, pgFormat, PgClient as any, testingRetryConfig, IndexerMeta);

indexerConfig = new IndexerConfig('', accountId, functionName, 0, '', databaseSchema, LogLevel.INFO);
});
Expand Down
103 changes: 75 additions & 28 deletions runner/src/provisioner/provisioner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import { logsTableDDL } from './schemas/logs-table';
import { metadataTableDDL } from './schemas/metadata-table';
import PgClientClass, { type PostgresConnectionParams } from '../pg-client';
import { type ProvisioningConfig } from '../indexer-config/indexer-config';
import { METADATA_TABLE_UPSERT, MetadataFields, IndexerStatus } from '../indexer-meta';
import IndexerMetaClass, { METADATA_TABLE_UPSERT, MetadataFields, IndexerStatus, LogEntry } from '../indexer-meta';
import logger from '../logger';

const DEFAULT_PASSWORD_LENGTH = 16;

Expand Down Expand Up @@ -60,6 +61,9 @@ const defaultRetryConfig: RetryConfig = {
export default class Provisioner {
tracer: Tracer = trace.getTracer('queryapi-runner-provisioner');

private readonly SYSTEM_TABLES = ['sys_logs', 'sys_metadata'];
private readonly logger: typeof logger;

constructor (
private readonly hasuraClient: HasuraClient = new HasuraClient(),
private readonly adminDefaultPgClient: PgClientClass = adminDefaultPgClientGlobal,
Expand All @@ -69,7 +73,10 @@ export default class Provisioner {
private readonly pgFormat: typeof pgFormatLib = pgFormatLib,
private readonly PgClient: typeof PgClientClass = PgClientClass,
private readonly retryConfig: RetryConfig = defaultRetryConfig,
) {}
private readonly IndexerMeta: typeof IndexerMetaClass = IndexerMetaClass
) {
this.logger = logger.child({ service: 'Provisioner' });
}

generatePassword (length: number = DEFAULT_PASSWORD_LENGTH): string {
return this.crypto
Expand Down Expand Up @@ -322,42 +329,82 @@ export default class Provisioner {
}, 'Failed to deprovision');
}

async provisionUserApi (indexerConfig: ProvisioningConfig): Promise<void> { // replace any with actual type
async provisionUserApi (indexerConfig: ProvisioningConfig): Promise<void> {
const logger = this.logger.child({ accountId: indexerConfig.accountId, functionName: indexerConfig.functionName });

await wrapSpan(async () => {
await wrapError(async () => {
try {
await this.provisionSystemResources(indexerConfig);
} catch (error) {
logger.error('Failed to provision system resources', error);
throw error;
}

try {
await this.provisionUserResources(indexerConfig);
} catch (err) {
const error = err as Error;

try {
await this.writeFailureToUserLogs(indexerConfig, error);
} catch (error) {
logger.error('Failed to log provisioning failure', error);
}

logger.warn('Failed to provision user resources', error);
throw error;
}
}, 'Failed to provision endpoint');
}, this.tracer, 'provision indexer resources');
}

async writeFailureToUserLogs (indexerConfig: ProvisioningConfig, error: Error): Promise<void> {
const indexerMeta = new this.IndexerMeta(indexerConfig, await this.getPostgresConnectionParameters(indexerConfig.userName()));
await indexerMeta.writeLogs([LogEntry.systemError(error.message)]);
}

async provisionSystemResources (indexerConfig: ProvisioningConfig): Promise<void> {
const userName = indexerConfig.userName();
const databaseName = indexerConfig.databaseName();
const schemaName = indexerConfig.schemaName();

await wrapSpan(async () => {
await wrapError(
async () => {
if (!await this.hasuraClient.doesSourceExist(databaseName)) {
const password = this.generatePassword();
await this.createUserDb(userName, password, databaseName);
await this.addDatasource(userName, password, databaseName);
}
if (!await this.hasuraClient.doesSourceExist(databaseName)) {
const password = this.generatePassword();
await this.createUserDb(userName, password, databaseName);
await this.addDatasource(userName, password, databaseName);
}

await this.createSchema(databaseName, schemaName);
await this.createSchema(databaseName, schemaName);

await this.createMetadataTable(databaseName, schemaName);
await this.setProvisioningStatus(userName, schemaName);
await this.setupPartitionedLogsTable(userName, databaseName, schemaName);
await this.runIndexerSql(databaseName, schemaName, indexerConfig.schema);
await this.createMetadataTable(databaseName, schemaName);
await this.setProvisioningStatus(userName, schemaName);
await this.setupPartitionedLogsTable(userName, databaseName, schemaName);

const updatedTableNames = await this.getTableNames(schemaName, databaseName);
await this.trackTables(schemaName, this.SYSTEM_TABLES, databaseName);

await this.trackTables(schemaName, updatedTableNames, databaseName);
await this.exponentialRetry(async () => {
await this.addPermissionsToTables(indexerConfig, this.SYSTEM_TABLES, ['select', 'insert', 'update', 'delete']);
});
}

await this.exponentialRetry(async () => {
await this.trackForeignKeyRelationships(schemaName, databaseName);
});
async provisionUserResources (indexerConfig: ProvisioningConfig): Promise<void> {
const databaseName = indexerConfig.databaseName();
const schemaName = indexerConfig.schemaName();

await this.exponentialRetry(async () => {
await this.addPermissionsToTables(indexerConfig, updatedTableNames, ['select', 'insert', 'update', 'delete']);
});
},
'Failed to provision endpoint'
);
}, this.tracer, 'provision indexer resources');
await this.runIndexerSql(databaseName, schemaName, indexerConfig.schema);

const userTableNames = (await this.getTableNames(schemaName, databaseName)).filter((tableName) => !this.SYSTEM_TABLES.includes(tableName));

await this.trackTables(schemaName, userTableNames, databaseName);

await this.exponentialRetry(async () => {
await this.trackForeignKeyRelationships(schemaName, databaseName);
});

await this.exponentialRetry(async () => {
await this.addPermissionsToTables(indexerConfig, userTableNames, ['select', 'insert', 'update', 'delete']);
});
}

async exponentialRetry (fn: () => Promise<void>): Promise<void> {
Expand Down
8 changes: 2 additions & 6 deletions runner/src/server/services/data-layer/data-layer-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,12 @@ export function createDataLayerService (
.then(() => {
logger.info('Successfully provisioned Data Layer');
})
.catch((err) => {
logger.error('Failed to provision Data Layer', err);
throw err;
})
);

callback(null, { taskId });
})
.catch((err) => {
logger.error('Failed to check if Data Layer is provisioned', err);
logger.warn('Failed to check if Data Layer is provisioned', err);

const internal = new StatusBuilder()
.withCode(status.INTERNAL)
Expand Down Expand Up @@ -148,7 +144,7 @@ export function createDataLayerService (
logger.info('Successfully deprovisioned Data Layer');
})
.catch((err) => {
logger.error('Failed to deprovision Data Layer', err);
logger.warn('Failed to deprovision Data Layer', err);
throw err;
})
);
Expand Down
2 changes: 1 addition & 1 deletion runner/src/server/services/runner/runner-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ export function getRunnerService (
executors.set(indexerConfig.executorId, streamHandler);
callback(null, { executorId: indexerConfig.executorId });
streamHandler.start().catch((error: Error) => {
logger.error('Failed to start executor', error);
logger.warn('Failed to start executor', error);
});
} catch (e) {
const error = e as Error;
Expand Down
4 changes: 2 additions & 2 deletions runner/src/stream-handler/stream-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export default class StreamHandler {
this.executorContext.executionState = ExecutionState.RUNNING;
} catch (error: any) {
const errorContent = error instanceof Error ? error.toString() : JSON.stringify(error);
this.logger.error('Terminating thread', error);
this.logger.warn('Terminating thread', error);
this.executorContext.executionState = ExecutionState.STALLED;
throw new Error(`Failed to start Indexer: ${errorContent}`);
}
Expand All @@ -92,7 +92,7 @@ export default class StreamHandler {
}

private handleError (error: Error): void {
this.logger.error('Terminating thread', error);
this.logger.warn('Terminating thread', error);
this.executorContext.executionState = ExecutionState.STALLED;

if (this.indexerMeta) {
Expand Down
2 changes: 1 addition & 1 deletion runner/src/stream-handler/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ async function blockQueueConsumer (workerContext: WorkerContext): Promise<void>
const error = err as Error;
if (previousError !== error.message) {
previousError = error.message;
workerContext.logger.error(`Failed on block ${currBlockHeight}`, err);
workerContext.logger.warn(`Failed on block ${currBlockHeight}`, err);
}
const sleepSpan = tracer.startSpan('Sleep for 10 seconds after failing', {}, context.active());
await sleep(10000);
Expand Down
17 changes: 17 additions & 0 deletions runner/tests/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,23 @@ describe('Indexer integration', () => {
await expect(pgClient.query('SELECT * FROM cron.job WHERE jobname like $1', ['provisioning_near_test_provisioning%']).then(({ rows }) => rows)).resolves.toHaveLength(0);
await expect(hasuraClient.doesSourceExist(testConfig1.databaseName())).resolves.toBe(false);
});

it('Writes provisioning errors to user logs table', async () => {
const testConfig = new IndexerConfig(
'test:stream',
'user-failures.near', // must be unique to prevent conflicts with other tests
'test',
0,
'',
'broken schema',
LogLevel.INFO
);

await expect(provisioner.provisionUserApi(testConfig)).rejects.toThrow();

const logs: any = await indexerLogsQuery(testConfig.schemaName(), graphqlClient);
expect(logs[0].message).toContain('Failed to run user script');
});
});

async function prepareIndexer (indexerConfig: IndexerConfig, provisioner: Provisioner, hasuraContainer: StartedHasuraGraphQLContainer): Promise<Indexer> {
Expand Down

0 comments on commit 44bc2eb

Please sign in to comment.