Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Automated CB based on timestamps, Cron and DB part #246

Merged
merged 17 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions jest-dynamodb-config.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,16 @@ module.exports = {
],
ProvisionedThroughput: { ReadCapacityUnits: 10, WriteCapacityUnits: 10 },
},
{
TableName: 'Timestamp',
KeySchema: [
{ AttributeName: 'hash', KeyType: 'HASH' },
],
AttributeDefinitions: [
{ AttributeName: 'hash', AttributeType: 'S' },
],
ProvisionedThroughput: { ReadCapacityUnits: 10, WriteCapacityUnits: 10 },
}
],
port: 8000,
};
4 changes: 4 additions & 0 deletions lib/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export const BETA_COMPLIANCE_S3_KEY = 'beta.json';
export const DYNAMO_TABLE_NAME = {
FADES: 'Fades',
SYNTHETIC_SWITCH_TABLE: 'SyntheticSwitchTable',
TIMESTAMP: 'Timestamp',
ConjunctiveNormalForm marked this conversation as resolved.
Show resolved Hide resolved
};

export const DYNAMO_TABLE_KEY = {
Expand All @@ -23,4 +24,7 @@ export const DYNAMO_TABLE_KEY = {
TRADE_TYPE: 'type',
LOWER: 'lower',
ENABLED: 'enabled',
BLOCK_UNTIL_TIMESTAMP: 'blockUntilTimestamp',
LAST_POST_TIMESTAMP: 'lastPostTimestamp',
FADED: 'faded',
};
122 changes: 88 additions & 34 deletions lib/cron/fade-rate.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { DynamoDBDocumentClient } from '@aws-sdk/lib-dynamodb';
import { metricScope, MetricsLogger } from 'aws-embedded-metrics';
import { ScheduledHandler } from 'aws-lambda/trigger/cloudwatch-events';
import { EventBridgeEvent } from 'aws-lambda/trigger/eventbridge';
import Logger from 'bunyan';

import {
BETA_S3_KEY,
FADE_RATE_BUCKET,
FADE_RATE_S3_KEY,
PRODUCTION_S3_KEY,
WEBHOOK_CONFIG_BUCKET,
} from '../constants';
import { BETA_S3_KEY, PRODUCTION_S3_KEY, WEBHOOK_CONFIG_BUCKET } from '../constants';
import { CircuitBreakerMetricDimension } from '../entities';
import { checkDefined } from '../preconditions/preconditions';
import { S3WebhookConfigurationProvider } from '../providers';
import { S3CircuitBreakerConfigurationProvider } from '../providers/circuit-breaker/s3';
import { FadesRepository, FadesRowType, SharedConfigs } from '../repositories';
import { FadesRepository, FadesRowType, SharedConfigs, TimestampRepoRow } from '../repositories';
import { TimestampRepository } from '../repositories/timestamp-repository';
import { STAGE } from '../util/stage';

export type FillerFades = Record<string, number>;
export type FillerTimestamps = Map<string, Omit<TimestampRepoRow, 'hash'>>;

export const BLOCK_PER_FADE_SECS = 60 * 5; // 5 minutes

export const handler: ScheduledHandler = metricScope((metrics) => async (_event: EventBridgeEvent<string, void>) => {
await main(metrics);
});
Expand All @@ -32,6 +33,16 @@ async function main(metrics: MetricsLogger) {
const stage = process.env['stage'];
const s3Key = stage === STAGE.BETA ? BETA_S3_KEY : PRODUCTION_S3_KEY;
const webhookProvider = new S3WebhookConfigurationProvider(log, `${WEBHOOK_CONFIG_BUCKET}-${stage}-1`, s3Key);
await webhookProvider.fetchEndpoints();
const documentClient = DynamoDBDocumentClient.from(new DynamoDBClient({}), {
marshallOptions: {
convertEmptyValues: true,
},
unmarshallOptions: {
wrapNumbers: true,
},
});
const timestampDB = TimestampRepository.create(documentClient);

const sharedConfig: SharedConfigs = {
Database: checkDefined(process.env.REDSHIFT_DATABASE),
Expand All @@ -40,49 +51,92 @@ async function main(metrics: MetricsLogger) {
};
const fadesRepository = FadesRepository.create(sharedConfig);
await fadesRepository.createFadesView();
/*
query redshift for recent orders
| fillerAddress | faded | postTimestamp |
|---- 0x1 ------|---- 0 ----|---- 12222222 ---|
|---- 0x2 ------|---- 1 ----|---- 12345679 --|
|---- 0x1 ------|---- 0 ----|---- 12345678 ---|
*/
const result = await fadesRepository.getFades();

if (result) {
const fillerHashes = webhookProvider.fillers();
const fillerTimestamps = await timestampDB.getFillerTimestampsMap(fillerHashes);
const addressToFillerHash = await webhookProvider.addressToFillerHash();
const fillerFadeRate = calculateFillerFadeRates(result, addressToFillerHash, log);
log.info({ fadeRates: [...fillerFadeRate.entries()] }, 'filler fade rate');

const configProvider = new S3CircuitBreakerConfigurationProvider(
log,
`${FADE_RATE_BUCKET}-prod-1`,
FADE_RATE_S3_KEY
// aggregated # of fades by filler entity (not address)
// | hash | faded |
ConjunctiveNormalForm marked this conversation as resolved.
Show resolved Hide resolved
// |-- foo --|--- 3 ---|`
// |-- bar --|--- 1 ---|
const fillersNewFades = getFillersNewFades(result, addressToFillerHash, fillerTimestamps, log);

// | hash |lastPostTimestamp|blockUntilTimestamp|
// |---- foo ----|---- 1300000 ----|---- now + fades * block_per_fade ----|
// |---- bar ----|---- 1300000 ----|---- 13500000 ----|
const updatedTimestamps = calculateNewTimestamps(
fillerTimestamps,
fillersNewFades,
Math.floor(Date.now() / 1000),
log
);
await configProvider.putConfigurations(fillerFadeRate, metrics);
log.info({ updatedTimestamps }, 'filler for which to update timestamp');
await timestampDB.updateTimestampsBatch(updatedTimestamps);
}
}

// aggregates potentially multiple filler addresses into filler name
// and calculates fade rate for each
export function calculateFillerFadeRates(
/* compute blockUntil timestamp for each filler
blockedUntilTimestamp > current timestamp: skip
lastPostTimestamp < blockedUntilTimestamp < current timestamp: block for # * unit block time from now
*/
export function calculateNewTimestamps(
fillerTimestamps: FillerTimestamps,
fillersNewFades: FillerFades,
newPostTimestamp: number,
log?: Logger
): [string, number, number][] {
const updatedTimestamps: [string, number, number][] = [];
Object.entries(fillersNewFades).forEach((row) => {
const hash = row[0];
const fades = row[1];
if (fillerTimestamps.has(hash) && fillerTimestamps.get(hash)!.blockUntilTimestamp > newPostTimestamp) {
return;
}
if (fades) {
const blockUntilTimestamp = newPostTimestamp + fades * BLOCK_PER_FADE_SECS;
updatedTimestamps.push([hash, newPostTimestamp, blockUntilTimestamp]);
}
});
log?.info({ updatedTimestamps }, 'updated timestamps');
return updatedTimestamps;
}

/* find the number of new fades, for each filler entity, from
the last time this cron is run
*/
export function getFillersNewFades(
rows: FadesRowType[],
addressToFillerHash: Map<string, string>,
fillerTimestamps: FillerTimestamps,
log?: Logger
): Map<string, number> {
const fadeRateMap = new Map<string, number>();
const fillerToQuotesMap = new Map<string, [number, number]>();
): FillerFades {
const newFadesMap: FillerFades = {}; // filler hash -> # of new fades
rows.forEach((row) => {
const fillerAddr = row.fillerAddress.toLowerCase();
const fillerHash = addressToFillerHash.get(fillerAddr);
if (!fillerHash) {
log?.info({ addressToFillerHash, fillerAddress: fillerAddr }, 'filler address not found in webhook config');
} else {
if (!fillerToQuotesMap.has(fillerHash)) {
fillerToQuotesMap.set(fillerHash, [row.fadedQuotes, row.totalQuotes]);
log?.info({ fillerAddr }, 'filler address not found in webhook config');
} else if (
(fillerTimestamps.has(fillerHash) && row.postTimestamp > fillerTimestamps.get(fillerHash)!.lastPostTimestamp) ||
!fillerTimestamps.has(fillerHash)
) {
if (!newFadesMap[fillerHash]) {
newFadesMap[fillerHash] = row.faded;
} else {
const [fadedQuotes, totalQuotes] = fillerToQuotesMap.get(fillerHash) as [number, number];
fillerToQuotesMap.set(fillerHash, [fadedQuotes + row.fadedQuotes, totalQuotes + row.totalQuotes]);
newFadesMap[fillerHash] += row.faded;
}
}
});

fillerToQuotesMap.forEach((value, key) => {
const fadeRate = value[0] / value[1];
fadeRateMap.set(key, fadeRate);
});
return fadeRateMap;
log?.info({ newFadesMap }, '# of new fades by filler');
return newFadesMap;
}
6 changes: 2 additions & 4 deletions lib/providers/circuit-breaker/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,16 @@ import { NodeHttpHandler } from '@smithy/node-http-handler';
import { MetricsLogger, Unit } from 'aws-embedded-metrics';
import Logger from 'bunyan';

import { CircuitBreakerConfiguration, CircuitBreakerConfigurationProvider } from '.';
import { Metric } from '../../entities';
import { checkDefined } from '../../preconditions/preconditions';
import { CircuitBreakerConfiguration, CircuitBreakerConfigurationProvider } from '.';

export class S3CircuitBreakerConfigurationProvider implements CircuitBreakerConfigurationProvider {
private log: Logger;
private fillers: CircuitBreakerConfiguration[];
private lastUpdatedTimestamp: number;
private client: S3Client;

// try to refetch endpoints every 5 mins
private static UPDATE_PERIOD_MS = 5 * 60000;
private static UPDATE_PERIOD_MS = 1 * 60000;
private static FILL_RATE_THRESHOLD = 0.75;

constructor(_log: Logger, private bucket: string, private key: string) {
Expand Down
2 changes: 1 addition & 1 deletion lib/quoters/WebhookQuoter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import axios, { AxiosError, AxiosResponse } from 'axios';
import Logger from 'bunyan';
import { v4 as uuidv4 } from 'uuid';

import { Quoter, QuoterType } from '.';
import {
AnalyticsEvent,
AnalyticsEventType,
Expand All @@ -18,7 +19,6 @@ import { FirehoseLogger } from '../providers/analytics';
import { CircuitBreakerConfigurationProvider } from '../providers/circuit-breaker';
import { FillerComplianceConfigurationProvider } from '../providers/compliance';
import { timestampInMstoISOString } from '../util/time';
import { Quoter, QuoterType } from '.';

// TODO: shorten, maybe take from env config
const WEBHOOK_TIMEOUT_MS = 500;
Expand Down
18 changes: 18 additions & 0 deletions lib/repositories/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ export enum TimestampThreshold {
TWO_MONTHS = "'2 MONTHS'",
}

export type TimestampRepoRow = {
hash: string;
lastPostTimestamp: number;
blockUntilTimestamp: number;
};

export type DynamoTimestampRepoRow = Exclude<TimestampRepoRow, 'lastPostTimestamp' | 'blockUntilTimestamp'> & {
lastPostTimestamp: string;
blockUntilTimestamp: string;
};

export abstract class BaseRedshiftRepository {
constructor(readonly client: RedshiftDataClient, private readonly configs: SharedConfigs) {}

Expand Down Expand Up @@ -65,3 +76,10 @@ export interface BaseSwitchRepository {
putSynthSwitch(trade: SynthSwitchTrade, lower: string, enabled: boolean): Promise<void>;
syntheticQuoteForTradeEnabled(trade: SynthSwitchQueryParams): Promise<boolean>;
}

export interface BaseTimestampRepository {
updateTimestampsBatch(toUpdate: [string, number, number?][]): Promise<void>;
getFillerTimestamps(hash: string): Promise<TimestampRepoRow>;
getFillerTimestampsMap(hashes: string[]): Promise<Map<string, Omit<TimestampRepoRow, 'hash'>>>;
getTimestampsBatch(hashes: string[]): Promise<TimestampRepoRow[]>;
}
43 changes: 20 additions & 23 deletions lib/repositories/fades-repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import { BaseRedshiftRepository, SharedConfigs } from './base';

export type FadesRowType = {
fillerAddress: string;
totalQuotes: number;
fadedQuotes: number;
faded: number;
postTimestamp: number;
};

export class FadesRepository extends BaseRedshiftRepository {
Expand All @@ -29,13 +29,15 @@ export class FadesRepository extends BaseRedshiftRepository {
await this.executeStatement(CREATE_VIEW_SQL, FadesRepository.log, { waitTimeMs: 2_000 });
}

//get latest 20 orders for each filler address, and whether they are faded or not
async getFades(): Promise<FadesRowType[]> {
const stmtId = await this.executeStatement(FADE_RATE_SQL, FadesRepository.log, { waitTimeMs: 2_000 });
const response = await this.client.send(new GetStatementResultCommand({ Id: stmtId }));
/* result should be in the following format
| rfqFiller | fade_rate |
|---- foo ------|---- 0.05 ------|
|---- bar ------|---- 0.01 ------|
| rfqFiller | faded | postTimestamp |
|---- bar ------|---- 0 ----|---- 12222222 ----|
|---- foo ------|---- 1 ----|---- 12345679 ----|
|---- foo ------|---- 0 ----|---- 12345678 ----|
*/
const result = response.Records;
if (!result) {
Expand All @@ -45,8 +47,8 @@ export class FadesRepository extends BaseRedshiftRepository {
const formattedResult = result.map((row) => {
const formattedRow: FadesRowType = {
fillerAddress: row[0].stringValue as string,
totalQuotes: Number(row[1].longValue as number),
fadedQuotes: Number(row[2].longValue as number),
faded: Number(row[1].longValue as number),
postTimestamp: Number(row[2].longValue as number),
};
return formattedRow;
});
Expand All @@ -56,14 +58,15 @@ export class FadesRepository extends BaseRedshiftRepository {
}

const CREATE_VIEW_SQL = `
CREATE OR REPLACE VIEW rfqOrdersTimestamp
CREATE OR REPLACE VIEW latestRfqs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gets the latest 20 RFQs for each filler right? The limit of 1k would prevent us from going over 50 fillers. Is this 50 filler limit expected for the foreseeable future?

AS (
WITH latestOrders AS (
SELECT * FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY filler ORDER BY createdat DESC) AS row_num FROM postedorders
)
WHERE row_num <= 30
WHERE row_num <= 20
AND deadline < EXTRACT(EPOCH FROM GETDATE()) -- exclude orders that can still be filled
LIMIT 1000
)
SELECT
latestOrders.chainid as chainId, latestOrders.filler as rfqFiller, latestOrders.startTime as decayStartTime, latestOrders.quoteid, archivedorders.filler as actualFiller, latestOrders.createdat as postTimestamp, archivedorders.txhash as txHash, archivedOrders.fillTimestamp as fillTimestamp,
Expand All @@ -80,23 +83,17 @@ AND rfqFiller != '0x0000000000000000000000000000000000000000'
AND chainId NOT IN (5,8001,420,421613) -- exclude mainnet goerli, polygon goerli, optimism goerli and arbitrum goerli testnets
AND
postTimestamp >= extract(epoch from (GETDATE() - INTERVAL '168 HOURS')) -- 7 days rolling window
);
)
ORDER BY rfqFiller, postTimestamp DESC
LIMIT 1000
`;

const FADE_RATE_SQL = `
WITH ORDERS_CTE AS (
SELECT
rfqFiller,
COUNT(*) AS totalQuotes,
SUM(CASE WHEN (decayStartTime < fillTimestamp) THEN 1 ELSE 0 END) AS fadedQuotes
FROM rfqOrdersTimestamp
GROUP BY rfqFiller
)
SELECT
rfqFiller,
totalQuotes,
fadedQuotes,
fadedQuotes / totalQuotes as fadeRate
FROM ORDERS_CTE
WHERE totalQuotes >= 10;
postTimestamp,
CASE WHEN (decayStartTime < fillTimestamp) THEN 1 ELSE 0 END AS faded
FROM latestRfqs
ORDER BY rfqFiller, postTimestamp DESC
LIMIT 1000
`;
2 changes: 0 additions & 2 deletions lib/repositories/switch-repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ export class SwitchRepository implements BaseSwitchRepository {
{ execute: true, consistent: true }
);

SwitchRepository.log.info({ res: result.Item }, 'get result');
if (result.Item && BigNumber.from(result.Item.lower).lte(amount)) {
return result.Item.enabled;
} else {
Expand All @@ -71,7 +70,6 @@ export class SwitchRepository implements BaseSwitchRepository {
}

public async putSynthSwitch(trade: SynthSwitchTrade, lower: string, enabled: boolean): Promise<void> {
SwitchRepository.log.info({ pk: `${SwitchRepository.getKey(trade)}` }, 'put pk');
await this.switchEntity.put(
{
[PARTITION_KEY]: `${SwitchRepository.getKey(trade)}`,
Expand Down
Loading
Loading