Skip to content

Commit

Permalink
Merge pull request #148 from PermanentOrg/per-9646_fire_sns_events_fr…
Browse files Browse the repository at this point in the history
…om_event_endpoint

Per 9646 fire sns events from event endpoint
  • Loading branch information
liam-lloyd authored Nov 27, 2024
2 parents d05ee38 + 0760fd9 commit 3510017
Show file tree
Hide file tree
Showing 16 changed files with 249 additions and 35 deletions.
2 changes: 2 additions & 0 deletions .env.template
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ DEV_NAME=<your given name here (lowercase)>
AWS_ACCESS_KEY_ID=<YOUR AWS ACCESS KEY>
AWS_SECRET_ACCESS_KEY=<YOUR AWS SECRET>
AWS_REGION="us-west-2"

# Set to your local low priority SNS topic ARN (you may have to create this topic) if testing
# thumbnail regeneration
LOW_PRIORITY_TOPIC_ARN=test
EVENT_TOPIC_ARN=test

ARCHIVEMATICA_BASE_URL=<URL here>
ARCHIVEMATICA_API_KEY=<Archivematica username>:<Archivematica API key>
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ For these, simply fill in any fake value to prevent `require-env-variable` from
| AWS_ACCESS_KEY_ID | none | The same one you use in `devenv` |
| AWS_SECRET_ACCESS_KEY | none | The same one you use in `devenv` |
| LOW_PRIORITY_TOPIC_ARN | test | Doesn't need to be set to a real ARN unless your work touches it specifically |
| EVENT_TOPIC_ARN | test | Doesn't need to be set to a real ARN unless your working with events specifically |
| MIXPANEL_TOKEN | none | Found in Mixpanel at Settings > Project Settings > Project Token |
| ARCHIVEMATICA_BASE_URL | none | It is the url of the EC2 instance on which archivematica is running |
| ARCHIVEMATICA_API_KEY | none | Found in Bitwarden, not needed unless you're running the cleanup cron |
Expand Down
21 changes: 4 additions & 17 deletions packages/api/src/account/controller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import { db } from "../database";
import { EVENT_ACTION, EVENT_ACTOR, EVENT_ENTITY } from "../constants";
import { verifyUserAuthentication, extractIp } from "../middleware";
import { app } from "../app";
import { createEvent } from "../event/service";

jest.mock("../database");
jest.mock("@stela/logger");
jest.mock("../middleware");
jest.mock("../event/service");

const loadFixtures = async (): Promise<void> => {
await db.sql("fixtures.create_test_accounts");
Expand Down Expand Up @@ -100,27 +102,12 @@ describe("leaveArchive", () => {
await agent.delete("/api/v2/account/archive/2").expect(400);
});

test("should log an action in the database events table", async () => {
test("should log an event", async () => {
const eventsBeforeLeave = await db.query(selectEventRow);
expect(eventsBeforeLeave.rows.length).toBe(0);

await agent.delete("/api/v2/account/archive/1").expect(204);

const eventsAfterLeave = await db.query(selectEventRow);
expect(eventsAfterLeave.rows.length).toBe(1);
});

test("logged event contains expected body values", async () => {
await agent.delete("/api/v2/account/archive/1").expect(204);

const eventResult = await db.query<{ body: object }>(selectEventRow);
expect(eventResult.rows.length).toBe(1);

const eventBody = eventResult.rows[0]?.body;
expect(eventBody).toEqual({
archiveId: "1",
accountId: "3",
accountPrimaryEmail: "test+1@permanent.org",
});
expect(createEvent).toHaveBeenCalled();
});
});
1 change: 1 addition & 0 deletions packages/api/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ requireEnv("SENTRY_DSN");
requireEnv("AWS_REGION");
requireEnv("LOW_PRIORITY_TOPIC_ARN");
requireEnv("MIXPANEL_TOKEN");
requireEnv("EVENT_TOPIC_ARN");
49 changes: 49 additions & 0 deletions packages/api/src/event/controller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { NextFunction } from "express";
import createError from "http-errors";
import { db } from "../database";
import { mixpanelClient } from "../mixpanel";
import { publisherClient } from "../publisher_client";
import { app } from "../app";
import {
verifyUserAuthentication,
Expand All @@ -14,6 +15,7 @@ import type { CreateEventRequest, ChecklistItem } from "./models";
jest.mock("../database");
jest.mock("../middleware");
jest.mock("../mixpanel");
jest.mock("../publisher_client");

const testSubject = "fcb2b59b-df07-4e79-ad20-bf7f067a965e";
const testEmail = "test+1@permanent.org";
Expand Down Expand Up @@ -379,6 +381,21 @@ describe("POST /event", () => {
expect(result.rows).toHaveLength(1);
});

test("should forward the event to SNS", async () => {
await agent
.post("/api/v2/event")
.send({
entity: "account",
action: "create",
version: 1,
entityId: "123",
body: {},
})
.expect(200);

expect(publisherClient.publishMessage).toHaveBeenCalled();
});

test("should send Mixpanel event if body includes analytics", async () => {
await agent
.post("/api/v2/event")
Expand Down Expand Up @@ -501,6 +518,38 @@ describe("POST /event", () => {
.expect(500);
});

test("should return 500 error if database call returns an empty result", async () => {
jest.spyOn(db, "sql").mockImplementation((() => ({
rows: [],
})) as unknown as typeof db.sql);
await agent
.post("/api/v2/event")
.send({
entity: "account",
action: "create",
version: 1,
entityId: "123",
body: {},
})
.expect(500);
});

test("should return 500 error if SNS publish fails", async () => {
jest.spyOn(publisherClient, "publishMessage").mockImplementation(() => {
throw new Error("SNS error");
});
await agent
.post("/api/v2/event")
.send({
entity: "account",
action: "create",
version: 1,
entityId: "123",
body: {},
})
.expect(500);
});

test("should return 400 error if entity value is invalid", async () => {
await agent
.post("/api/v2/event")
Expand Down
4 changes: 3 additions & 1 deletion packages/api/src/event/queries/create_event.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ event (
:ip,
:userAgent,
:body
);
)
RETURNING
id;
45 changes: 33 additions & 12 deletions packages/api/src/event/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import createError from "http-errors";
import { UAParser } from "ua-parser-js";
import { logger } from "@stela/logger";
import { db } from "../database";
import { publisherClient } from "../publisher_client";
import { mixpanelClient } from "../mixpanel";
import type { CreateEventRequest, ChecklistItem } from "./models";
import {
Expand Down Expand Up @@ -30,19 +31,22 @@ export const createEvent = async (data: CreateEventRequest): Promise<void> => {
);
}
}

const actorType = data.userSubjectFromAuthToken ?? "" ? "user" : "admin";
await db
.sql("event.queries.create_event", {
entity: data.entity,
action: data.action,
version: data.version,
actorType,
actorId: data.userSubjectFromAuthToken ?? data.adminSubjectFromAuthToken,
entityId: data.entityId,
ip: data.ip,
userAgent: data.userAgent,
body: data.body,
})
const event = {
entity: data.entity,
action: data.action,
version: data.version,
actorType,
actorId: data.userSubjectFromAuthToken ?? data.adminSubjectFromAuthToken,
entityId: data.entityId,
ip: data.ip,
userAgent: data.userAgent,
body: data.body,
};

const result = await db
.sql<{ id: string }>("event.queries.create_event", event)
.catch((err) => {
if (isInvalidEnumError(err)) {
const badValue = getInvalidValueFromInvalidEnumMessage(err.message);
Expand All @@ -54,6 +58,23 @@ export const createEvent = async (data: CreateEventRequest): Promise<void> => {
logger.error(err);
throw new createError.InternalServerError(`Failed to create event`);
});

if (result.rows[0] === undefined) {
throw new createError.InternalServerError(`Failed to create event`);
}

try {
await publisherClient.publishMessage(process.env["EVENT_TOPIC_ARN"] ?? "", {
id: result.rows[0].id,
body: JSON.stringify(event),
attributes: { Entity: event.entity, Action: event.action },
});
} catch (err) {
logger.error(err);
throw new createError.InternalServerError(
`Failed to publish message to topic`
);
}
};

const checklistEvents: Record<string, string> = {
Expand Down
78 changes: 77 additions & 1 deletion packages/api/src/publisher_client.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { SNSClient } from "@aws-sdk/client-sns";
import { PublishBatchCommand, SNSClient } from "@aws-sdk/client-sns";
import { publisherClient } from "./publisher_client";

const mockSend = jest.fn();
Expand Down Expand Up @@ -65,3 +65,79 @@ describe("batchPublishMessages", () => {
expect(result.messagesSent).toBe(9);
});
});

describe("publishMessage", () => {
beforeEach(() => {
jest.clearAllMocks();
});

test("should publish a message", async () => {
(SNSClient as jest.Mock).mockImplementation(() => ({
send: mockSend.mockResolvedValue({ Failed: [] }),
}));

await publisherClient.publishMessage("topic", { id: "1", body: "message" });
expect(mockSend).toHaveBeenCalledTimes(1);
});

test("should include message attributes if provided", async () => {
(SNSClient as jest.Mock).mockImplementation(() => ({
send: mockSend.mockResolvedValue({ Failed: [] }),
}));

await publisherClient.publishMessage("topic", {
id: "1",
body: "message",
attributes: {
Entity: "account",
Action: "login",
},
});
expect(
(
(
(mockSend.mock.calls as unknown[])[0] as unknown[]
)[0] as PublishBatchCommand
).input
).toEqual(
new PublishBatchCommand({
TopicArn: "topic",
PublishBatchRequestEntries: [
{
Id: "1",
Message: "message",
MessageAttributes: {
Entity: {
DataType: "String",
StringValue: "account",
},
Action: {
DataType: "String",
StringValue: "login",
},
},
},
],
}).input
);
});

test("should throw an error if the message fails to publish", async () => {
(SNSClient as jest.Mock).mockImplementation(() => ({
send: mockSend.mockResolvedValue({ Failed: [{ Id: "1" }] }),
}));

let error = null;
try {
await publisherClient.publishMessage("topic", {
id: "1",
body: "message",
});
} catch (err) {
error = err;
} finally {
expect(mockSend).toHaveBeenCalledTimes(1);
expect(error).not.toBeNull();
}
});
});
32 changes: 28 additions & 4 deletions packages/api/src/publisher_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ import {
SNSClient,
PublishBatchCommand,
type BatchResultErrorEntry,
type PublishBatchRequestEntry,
} from "@aws-sdk/client-sns";

export const lowPriorityTopicArn = process.env["LOW_PRIORITY_TOPIC_ARN"] ?? "";

export interface Message {
id: string;
body: string;
attributes?: Record<string, string>;
}

const batchPublishMessages = async (
Expand All @@ -24,10 +26,21 @@ const batchPublishMessages = async (
groupsOfTenMessages.map(async (groupOfTenMessages) => {
const command = new PublishBatchCommand({
TopicArn: topicArn,
PublishBatchRequestEntries: groupOfTenMessages.map((message) => ({
Id: message.id,
Message: message.body,
})),
PublishBatchRequestEntries: groupOfTenMessages.map((message) => {
const entry: PublishBatchRequestEntry = {
Id: message.id,
Message: message.body,
};
if (message.attributes !== undefined) {
entry.MessageAttributes = Object.fromEntries(
Object.entries(message.attributes).map(([key, value]) => [
key,
{ DataType: "String", StringValue: value },
])
);
}
return entry;
}),
});

const response = await snsClient.send(command);
Expand All @@ -51,6 +64,17 @@ const batchPublishMessages = async (
};
};

const publishMessage = async (
topicArn: string,
message: Message
): Promise<void> => {
const result = await batchPublishMessages(topicArn, [message]);
if (result.failedMessages.length !== 0) {
throw new Error(`Failed to send message ${JSON.stringify(message)}`);
}
};

export const publisherClient = {
batchPublishMessages,
publishMessage,
};
1 change: 1 addition & 0 deletions terraform/prod_cluster/secrets.tf
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ resource "kubernetes_secret" "prod-secrets" {
"AWS_ACCESS_KEY_ID" = var.aws_access_key_id
"AWS_SECRET_ACCESS_KEY" = var.aws_secret_access_key
"LOW_PRIORITY_TOPIC_ARN" = var.low_priority_topic_arn
"EVENT_TOPIC_ARN" = var.event_topic_arn
"MIXPANEL_TOKEN" = var.mixpanel_token
"ARCHIVEMATICA_BASE_URL" = var.archivematica_base_url
"ARCHIVEMATICA_API_KEY" = var.archivematica_api_key
Expand Down
11 changes: 11 additions & 0 deletions terraform/prod_cluster/stela_prod_deployment.tf
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,17 @@ resource "kubernetes_deployment" "stela_prod" {
}
}

env {
name = "EVENT_TOPIC_ARN"
value_from {
secret_key_ref {
name = "prod-secrets"
key = "EVENT_TOPIC_ARN"
optional = false
}
}
}

env {
name = "MIXPANEL_TOKEN"
value_from {
Expand Down
Loading

0 comments on commit 3510017

Please sign in to comment.