Skip to content

Commit

Permalink
Merge pull request #253 from Uniswap/parallelize-read
Browse files Browse the repository at this point in the history
feat: update firehose client to use aws v3 sdk
  • Loading branch information
ConjunctiveNormalForm authored Jan 12, 2024
2 parents 33ac28c + 6e0d97d commit b994d02
Show file tree
Hide file tree
Showing 5 changed files with 729 additions and 205 deletions.
14 changes: 9 additions & 5 deletions lib/handlers/quote/injector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import {
FADE_RATE_BUCKET,
FADE_RATE_S3_KEY,
INTEGRATION_S3_KEY,
PROD_COMPLIANCE_S3_KEY,
PRODUCTION_S3_KEY,
PROD_COMPLIANCE_S3_KEY,
WEBHOOK_CONFIG_BUCKET,
} from '../../constants';
import {
Expand All @@ -20,13 +20,13 @@ import {
UniswapXParamServiceMetricDimension,
} from '../../entities/aws-metrics-logger';
import { S3WebhookConfigurationProvider } from '../../providers';
import { FirehoseLogger } from '../../providers/analytics';
import { S3CircuitBreakerConfigurationProvider } from '../../providers/circuit-breaker/s3';
import { S3FillerComplianceConfigurationProvider } from '../../providers/compliance/s3';
import { Quoter, WebhookQuoter } from '../../quoters';
import { STAGE } from '../../util/stage';
import { ApiInjector, ApiRInj } from '../base/api-handler';
import { PostQuoteRequestBody } from './schema';
import { FirehoseLogger } from '../../providers/analytics';

export interface ContainerInjected {
quoters: Quoter[];
Expand Down Expand Up @@ -54,7 +54,7 @@ export class QuoteInjector extends ApiInjector<ContainerInjected, RequestInjecte
`${FADE_RATE_BUCKET}-${stage}-1`,
FADE_RATE_S3_KEY
);

const complianceKey = stage === STAGE.BETA ? BETA_COMPLIANCE_S3_KEY : PROD_COMPLIANCE_S3_KEY;
const fillerComplianceProvider = new S3FillerComplianceConfigurationProvider(
log,
Expand All @@ -64,7 +64,9 @@ export class QuoteInjector extends ApiInjector<ContainerInjected, RequestInjecte

const firehose = new FirehoseLogger(log, process.env.ANALYTICS_STREAM_ARN!);

const quoters: Quoter[] = [new WebhookQuoter(log, firehose, webhookProvider, circuitBreakerProvider, fillerComplianceProvider)];
const quoters: Quoter[] = [
new WebhookQuoter(log, firehose, webhookProvider, circuitBreakerProvider, fillerComplianceProvider),
];
return {
quoters: quoters,
firehose: firehose,
Expand Down Expand Up @@ -127,7 +129,9 @@ export class MockQuoteInjector extends ApiInjector<ContainerInjected, RequestInj
PROD_COMPLIANCE_S3_KEY
);
const firehose = new FirehoseLogger(log, process.env.ANALYTICS_STREAM_ARN!);
const quoters: Quoter[] = [new WebhookQuoter(log, firehose, webhookProvider, circuitBreakerProvider, fillerComplianceProvider)];
const quoters: Quoter[] = [
new WebhookQuoter(log, firehose, webhookProvider, circuitBreakerProvider, fillerComplianceProvider),
];

return {
quoters: quoters,
Expand Down
25 changes: 10 additions & 15 deletions lib/providers/analytics/firehose.ts
Original file line number Diff line number Diff line change
@@ -1,45 +1,40 @@
import { Firehose } from 'aws-sdk';
import { AnalyticsEvent } from '../../entities/analytics-events';
import { FirehoseClient, PutRecordCommand } from '@aws-sdk/client-firehose';
import { default as Logger } from 'bunyan';

import { IAnalyticsLogger } from '.';
import { AnalyticsEvent } from '../../entities/analytics-events';

export class FirehoseLogger implements IAnalyticsLogger {
private log: Logger;
private readonly streamName: string;
private readonly firehose: Firehose;
private readonly firehose: FirehoseClient;

constructor(_log: Logger, streamArn: string) {
this.log = _log;
// Split the streamArn to extract the streamName
const streamArnParts = streamArn.split('/');

if (streamArnParts.length !== 2) {
this.log.error(
{ streamArn: streamArn },
`Firehose client error parsing stream from ${streamArn}.`
);
this.log.error({ streamArn: streamArn }, `Firehose client error parsing stream from ${streamArn}.`);
}

this.streamName = streamArnParts[1];
this.firehose = new Firehose();
this.firehose = new FirehoseClient();
}

async sendAnalyticsEvent(analyticsEvent: AnalyticsEvent): Promise<void> {
const jsonString = JSON.stringify(analyticsEvent) + '\n';
const params = {
DeliveryStreamName: this.streamName,
Record: {
Data: jsonString,
Data: Buffer.from(jsonString, 'base64'),
},
};

try {
await this.firehose.putRecord(params).promise();
await this.firehose.send(new PutRecordCommand(params));
} catch (error) {
this.log.error(
{ streamName: this.streamName },
`Firehose client error putting record. ${error}`
);
this.log.error({ streamName: this.streamName }, `Firehose client error putting record. ${error}`);
}
}
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"dependencies": {
"@aws-cdk/aws-redshift-alpha": "^2.54.0-alpha.0",
"@aws-sdk/client-dynamodb": "^3.386.0",
"@aws-sdk/client-firehose": "^3.490.0",
"@aws-sdk/client-redshift-data": "^3.382.0",
"@aws-sdk/client-s3": "^3.304.0",
"@aws-sdk/lib-dynamodb": "^3.386.0",
Expand All @@ -71,7 +72,6 @@
"@uniswap/v3-sdk": "^3.9.0",
"aws-cdk-lib": "2.85.0",
"aws-embedded-metrics": "^4.1.0",
"aws-sdk": "^2.1238.0",
"axios": "^1.2.1",
"axios-retry": "^3.4.0",
"bunyan": "^1.8.15",
Expand Down
29 changes: 15 additions & 14 deletions test/providers/analytics/firehose.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { FirehoseLogger } from '../../../lib/providers/analytics';
import { AnalyticsEvent, AnalyticsEventType } from '../../../lib/entities/analytics-events';
import { Firehose } from 'aws-sdk';
import { FirehoseClient } from '@aws-sdk/client-firehose';

jest.mock('aws-sdk');
import { AnalyticsEvent, AnalyticsEventType } from '../../../lib/entities/analytics-events';
import { FirehoseLogger } from '../../../lib/providers/analytics';

const mockedFirehose = Firehose as jest.Mocked<typeof Firehose>;
const mockedFirehose = FirehoseClient as jest.Mocked<typeof FirehoseClient>;

const logger = { error: jest.fn() } as any;

Expand Down Expand Up @@ -36,22 +35,24 @@ describe('FirehoseLogger', () => {

it('should send analytics event to Firehose', async () => {
const firehose = new FirehoseLogger(logger, validStreamArn);
const analyticsEvent = new AnalyticsEvent(AnalyticsEventType.WEBHOOK_RESPONSE,{ status: 200 });
const analyticsEvent = new AnalyticsEvent(AnalyticsEventType.WEBHOOK_RESPONSE, { status: 200 });

const putRecordMock = jest.fn();
mockedFirehose.prototype.putRecord = putRecordMock;

putRecordMock.mockImplementationOnce((_params, callback) => {
callback(null, { "Encrypted": true, "RecordId": "123" });
});
mockedFirehose.prototype.send = putRecordMock;

await firehose.sendAnalyticsEvent(analyticsEvent);

expect(putRecordMock).toHaveBeenCalledWith({
const input = {
DeliveryStreamName: 'stream-name',
Record: {
Data: JSON.stringify(analyticsEvent) + '\n',
Data: Buffer.from(JSON.stringify(analyticsEvent) + '\n', 'base64'),
},
});
};

expect(putRecordMock).toHaveBeenCalledWith(
expect.objectContaining({
input: input,
})
);
});
});
Loading

0 comments on commit b994d02

Please sign in to comment.