Skip to content
This repository has been archived by the owner on Jan 9, 2024. It is now read-only.

Commit

Permalink
fix(mem): removing kinesis batcher as possible memory leaker (#826)
Browse files Browse the repository at this point in the history
* fix(mem): remove batching for kinesis

* fix(test): updating test to destroy read client too

* fix(force): removing force exit

* fix(review): feedback
  • Loading branch information
bassrock authored Nov 30, 2023
1 parent f85a08d commit bf7d49e
Show file tree
Hide file tree
Showing 78 changed files with 512 additions and 779 deletions.
4 changes: 1 addition & 3 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ jobs:
name: run tests
command: |
export $(egrep -v '^#' .docker/local.env | xargs -0)
npm run test-integrations -- --ci --watchAll=false --forceExit
npm run test-integrations -- --ci --watchAll=false
test_specs:
description: Run spec tests
Expand Down Expand Up @@ -339,7 +339,6 @@ workflows:
sentry_org: pocket
workspace-path: /tmp/workspace
source-maps-path: /tmp/workspace/app_prod/dist
inject-source-maps: true
requires:
- deploy_prod

Expand All @@ -354,6 +353,5 @@ workflows:
sentry_org: pocket
workspace-path: /tmp/workspace
source-maps-path: /tmp/workspace/app_prod/dist
inject-source-maps: true
requires:
- deploy_dev
4 changes: 3 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"build-schema": "node dist/server/buildSchema.js",
"test-ci": "npm test",
"test": "jest \"\\.spec\\.ts\"",
"test-integrations": "jest \"\\.integration\\.ts\" --runInBand --forceExit",
"test-integrations": "jest \"\\.integration\\.ts\" --runInBand",
"lint-check": "eslint --fix-dry-run \"src/**/*.ts\"",
"lint-fix": "eslint --fix \"src/**/*.ts\""
},
Expand Down
8 changes: 4 additions & 4 deletions src/aws/eventBridgeBase.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ describe('EventBridgeBase.putEvent', () => {
await client.putEvents(command);
expect(sentryStub.callCount).toBe(1);
expect(sentryStub.getCall(0).firstArg.message).toContain(
`Failed to send event to event bus`
`Failed to send event to event bus`,
);
expect(consoleSpy.callCount).toBe(1);
expect(consoleSpy.getCall(0).firstArg.message).toContain(
`Failed to send event to event bus`
`Failed to send event to event bus`,
);
});

Expand All @@ -41,14 +41,14 @@ describe('EventBridgeBase.putEvent', () => {
await client.putEvents(command);
expect(sentryStub.callCount).toBe(1);
expect(sentryStub.getCall(0).firstArg.message).toContain(
`Failed to send event to event bus`
`Failed to send event to event bus`,
);
expect(sentryStub.getCall(0).args[1]).toMatchObject({
extra: { originalError: 'boo!' },
});
expect(consoleSpy.callCount).toBe(1);
expect(consoleSpy.getCall(0).firstArg).toContain(
`Failed to send event to event bus`
`Failed to send event to event bus`,
);
expect(consoleSpy.getCall(0).firstArg).toContain(`boo!`);
});
Expand Down
6 changes: 3 additions & 3 deletions src/aws/eventBridgeBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ export class EventBridgeBase {
async putEvents(command: PutEventsCommand) {
const failedEventError = new Error(
`Failed to send event to event bus. Event Body:\n ${JSON.stringify(
command.input['Entries']
)}`
command.input['Entries'],
)}`,
);
try {
const output: PutEventsCommandOutput = await this.client.send(command);
Expand All @@ -34,7 +34,7 @@ export class EventBridgeBase {
}
} catch (error) {
serverLogger.error(
failedEventError.message + ` OriginalError: ${error.message}`
failedEventError.message + ` OriginalError: ${error.message}`,
);
// Capture full client send failure in Sentry and Cloudwatch
Sentry.captureException(failedEventError, {
Expand Down
103 changes: 0 additions & 103 deletions src/businessEvents/eventBatchProcessor.spec.ts

This file was deleted.

117 changes: 0 additions & 117 deletions src/businessEvents/eventBatchProcessor.ts

This file was deleted.

6 changes: 3 additions & 3 deletions src/businessEvents/eventBridgeHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import { EventBridgeBase } from '../aws/eventBridgeBase';
export class EventBridgeHandler extends EventBridgeBase {
constructor(
emitter: ItemsEventEmitter,
events: Array<keyof typeof EventType>
events: Array<keyof typeof EventType>,
) {
super(eventBridgeClient);
// register handler for item events
events.forEach((event) =>
emitter.on(
EventType[event],
async (data: ItemEventPayload) => await this.process(data)
)
async (data: ItemEventPayload) => await this.process(data),
),
);
}
/**
Expand Down
22 changes: 7 additions & 15 deletions src/businessEvents/eventHandlers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { EventBatchProcessor } from './eventBatchProcessor';
import { unifiedEventKinesisHandler } from './unifiedEventKinesisHandler';
import { UnifiedEventKinesisHandler } from './unifiedEventKinesisHandler';
import { SqsListener } from './sqs/sqsListener';
import { ItemsEventEmitter } from './itemsEventEmitter';
import { SnowplowHandler } from './snowplowHandler';
Expand All @@ -17,17 +16,10 @@ export type ItemEventHandlerFn = (emitter: ItemsEventEmitter) => void;
export function unifiedEventHandler(emitter: ItemsEventEmitter): void {
// Create a list of event names (as strings) to register
// batch kinesis listener for unified event stream
const unifiedEventsToListen = Object.values(
config.aws.kinesis.unifiedEvents.events
) as string[];
// Start event batch handler for unified events to kinesis
new EventBatchProcessor( // eslint-disable-line
emitter,
unifiedEventsToListen,
unifiedEventKinesisHandler,
config.aws.kinesis.interval,
config.aws.kinesis.maxBatch
);
const unifiedEventsToListen = Object.keys(
config.aws.kinesis.unifiedEvents.events,
) as Array<keyof typeof EventType>;
new UnifiedEventKinesisHandler(emitter, unifiedEventsToListen);
}

/**
Expand All @@ -43,7 +35,7 @@ export function sqsEventHandler(emitter: ItemsEventEmitter): void {
*/
export function snowplowEventHandler(emitter: ItemsEventEmitter): void {
const snowplowEventsToListen = Object.values(
config.snowplow.events
config.snowplow.events,
) as string[];
new SnowplowHandler(emitter, tracker, snowplowEventsToListen);
}
Expand All @@ -52,6 +44,6 @@ export function eventBridgeEventHandler(emitter: ItemsEventEmitter): void {
const eventsToListen = Object.keys(EventType);
new EventBridgeHandler(
emitter,
eventsToListen as Array<keyof typeof EventType>
eventsToListen as Array<keyof typeof EventType>,
);
}
1 change: 0 additions & 1 deletion src/businessEvents/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
export * from './types';
export * from './eventBatchProcessor';
export * from './sqs/sqsListener';
export * from './unifiedEventKinesisHandler';
export * from './itemsEventEmitter';
Expand Down
Loading

0 comments on commit bf7d49e

Please sign in to comment.