Skip to content
This repository has been archived by the owner on Jan 9, 2024. It is now read-only.

fix(mem): removing kinesis batcher as possible memory leaker #826

Merged
merged 4 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"build-schema": "node dist/server/buildSchema.js",
"test-ci": "npm test",
"test": "jest \"\\.spec\\.ts\"",
"test-integrations": "jest \"\\.integration\\.ts\" --runInBand --forceExit",
"test-integrations": "jest \"\\.integration\\.ts\" --runInBand",
"lint-check": "eslint --fix-dry-run \"src/**/*.ts\"",
"lint-fix": "eslint --fix \"src/**/*.ts\""
},
Expand Down
8 changes: 4 additions & 4 deletions src/aws/eventBridgeBase.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ describe('EventBridgeBase.putEvent', () => {
await client.putEvents(command);
expect(sentryStub.callCount).toBe(1);
expect(sentryStub.getCall(0).firstArg.message).toContain(
`Failed to send event to event bus`
`Failed to send event to event bus`,
);
expect(consoleSpy.callCount).toBe(1);
expect(consoleSpy.getCall(0).firstArg.message).toContain(
`Failed to send event to event bus`
`Failed to send event to event bus`,
);
});

Expand All @@ -41,14 +41,14 @@ describe('EventBridgeBase.putEvent', () => {
await client.putEvents(command);
expect(sentryStub.callCount).toBe(1);
expect(sentryStub.getCall(0).firstArg.message).toContain(
`Failed to send event to event bus`
`Failed to send event to event bus`,
);
expect(sentryStub.getCall(0).args[1]).toMatchObject({
extra: { originalError: 'boo!' },
});
expect(consoleSpy.callCount).toBe(1);
expect(consoleSpy.getCall(0).firstArg).toContain(
`Failed to send event to event bus`
`Failed to send event to event bus`,
);
expect(consoleSpy.getCall(0).firstArg).toContain(`boo!`);
});
Expand Down
6 changes: 3 additions & 3 deletions src/aws/eventBridgeBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ export class EventBridgeBase {
async putEvents(command: PutEventsCommand) {
const failedEventError = new Error(
`Failed to send event to event bus. Event Body:\n ${JSON.stringify(
command.input['Entries']
)}`
command.input['Entries'],
)}`,
);
try {
const output: PutEventsCommandOutput = await this.client.send(command);
Expand All @@ -34,7 +34,7 @@ export class EventBridgeBase {
}
} catch (error) {
serverLogger.error(
failedEventError.message + ` OriginalError: ${error.message}`
failedEventError.message + ` OriginalError: ${error.message}`,
);
// Capture full client send failure in Sentry and Cloudwatch
Sentry.captureException(failedEventError, {
Expand Down
103 changes: 0 additions & 103 deletions src/businessEvents/eventBatchProcessor.spec.ts

This file was deleted.

117 changes: 0 additions & 117 deletions src/businessEvents/eventBatchProcessor.ts

This file was deleted.

6 changes: 3 additions & 3 deletions src/businessEvents/eventBridgeHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import { EventBridgeBase } from '../aws/eventBridgeBase';
export class EventBridgeHandler extends EventBridgeBase {
constructor(
emitter: ItemsEventEmitter,
events: Array<keyof typeof EventType>
events: Array<keyof typeof EventType>,
) {
super(eventBridgeClient);
// register handler for item events
events.forEach((event) =>
emitter.on(
EventType[event],
async (data: ItemEventPayload) => await this.process(data)
)
async (data: ItemEventPayload) => await this.process(data),
),
);
}
/**
Expand Down
22 changes: 7 additions & 15 deletions src/businessEvents/eventHandlers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { EventBatchProcessor } from './eventBatchProcessor';
import { unifiedEventKinesisHandler } from './unifiedEventKinesisHandler';
import { UnifiedEventKinesHandler } from './unifiedEventKinesisHandler';
import { SqsListener } from './sqs/sqsListener';
import { ItemsEventEmitter } from './itemsEventEmitter';
import { SnowplowHandler } from './snowplowHandler';
Expand All @@ -17,17 +16,10 @@ export type ItemEventHandlerFn = (emitter: ItemsEventEmitter) => void;
export function unifiedEventHandler(emitter: ItemsEventEmitter): void {
// Create a list of event names (as strings) to register
// batch kinesis listener for unified event stream
const unifiedEventsToListen = Object.values(
config.aws.kinesis.unifiedEvents.events
) as string[];
// Start event batch handler for unified events to kinesis
new EventBatchProcessor( // eslint-disable-line
emitter,
unifiedEventsToListen,
unifiedEventKinesisHandler,
config.aws.kinesis.interval,
config.aws.kinesis.maxBatch
);
const unifiedEventsToListen = Object.keys(
config.aws.kinesis.unifiedEvents.events,
) as Array<keyof typeof EventType>;
new UnifiedEventKinesHandler(emitter, unifiedEventsToListen);
}

/**
Expand All @@ -43,7 +35,7 @@ export function sqsEventHandler(emitter: ItemsEventEmitter): void {
*/
export function snowplowEventHandler(emitter: ItemsEventEmitter): void {
const snowplowEventsToListen = Object.values(
config.snowplow.events
config.snowplow.events,
) as string[];
new SnowplowHandler(emitter, tracker, snowplowEventsToListen);
}
Expand All @@ -52,6 +44,6 @@ export function eventBridgeEventHandler(emitter: ItemsEventEmitter): void {
const eventsToListen = Object.keys(EventType);
new EventBridgeHandler(
emitter,
eventsToListen as Array<keyof typeof EventType>
eventsToListen as Array<keyof typeof EventType>,
);
}
1 change: 0 additions & 1 deletion src/businessEvents/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
export * from './types';
export * from './eventBatchProcessor';
export * from './sqs/sqsListener';
export * from './unifiedEventKinesisHandler';
export * from './itemsEventEmitter';
Expand Down
2 changes: 1 addition & 1 deletion src/businessEvents/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export const itemsEventEmitter = new ItemsEventEmitter();

export function initItemEventHandlers(
emitter: ItemsEventEmitter,
handlers: ItemEventHandlerFn[]
handlers: ItemEventHandlerFn[],
) {
handlers.forEach((handler) => handler(emitter));
}
2 changes: 1 addition & 1 deletion src/businessEvents/itemsEventEmitter.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ describe('ItemsEventEmitter', () => {
let clock;
const handler = sinon.fake();
Object.values(EventType).forEach((event: string) =>
emitter.on(event, handler)
emitter.on(event, handler),
);

const testSavedItem: SavedItem = {
Expand Down
Loading