Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#IC-349] Add fiscalCode to message_status in CreateMessage process #195

Merged
Merged
137 changes: 137 additions & 0 deletions OnFailedProcessMessage/__tests__/handler.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import * as TE from "fp-ts/lib/TaskEither";
import * as E from "fp-ts/lib/Either";
import * as O from "fp-ts/lib/Option";
import * as MS from "@pagopa/io-functions-commons/dist/src/models/message_status";
import { initTelemetryClient } from "../../utils/appinsights";
import { MessageStatusValueEnum } from "@pagopa/io-functions-commons/dist/generated/definitions/MessageStatusValue";
import { NonNegativeNumber } from "@pagopa/ts-commons/lib/numbers";
import { FiscalCode, NonEmptyString } from "@pagopa/ts-commons/lib/strings";
import { getOnFailedProcessMessageHandler } from "../handler";
import { MessageModel } from "@pagopa/io-functions-commons/dist/src/models/message";
import { Context } from "@azure/functions";
import { CreatedMessageEvent } from "../../utils/events/message";
import {
aNewMessageWithoutContent,
aRetrievedMessage,
aRetrievedMessageStatus
} from "../../__mocks__/mocks";

const contextMock = ({
bindings: {},
executionContext: { functionName: "funcname" },
// eslint-disable no-console
log: { ...console, verbose: console.log }
} as unknown) as Context;

const mockTelemetryClient = ({
trackEvent: jest.fn()
} as unknown) as ReturnType<typeof initTelemetryClient>;

const getQueryIteratorMock = jest.fn();
const lMessageModel = ({
getQueryIterator: getQueryIteratorMock
} as unknown) as MessageModel;

const lMessageStatusModel = ({
upsert: (...args) => TE.of({} /* anything */),
findLastVersionByModelId: (...args) => TE.right(O.none)
} as unknown) as MS.MessageStatusModel;
const getMessageStatusUpdaterMock = jest.spyOn(MS, "getMessageStatusUpdater");

const aCreatedMessageEvent: CreatedMessageEvent = {
messageId: aNewMessageWithoutContent.id,
serviceVersion: 1 as NonNegativeNumber
};

beforeEach(() => {
jest.clearAllMocks();
// Mock getMessageStatusUpdater
getMessageStatusUpdaterMock.mockImplementation(
(
_messageStatusModel: MS.MessageStatusModel,
messageId: NonEmptyString,
fiscalCode: FiscalCode
) => (status: MessageStatusValueEnum) =>
TE.right({
...aRetrievedMessageStatus,
id: messageId,
messageId,
status,
fiscalCode
})
);
getQueryIteratorMock.mockImplementation(() => {
const asyncIterable = {
[Symbol.asyncIterator]() {
return {
i: 0,
async next() {
if (this.i++ < 1) {
return await Promise.resolve({
value: [E.right(aRetrievedMessage)],
done: false
});
}

return { done: true };
}
};
}
};
return asyncIterable;
});
});

describe("getOnFailedProcessMessageHandler", () => {
it("GIVEN a created message event with an existing messageId WHEN the failed handler is called THEN the message status is created with input messageId and retreived fiscalCode", async () => {
await getOnFailedProcessMessageHandler({
lMessageStatusModel,
lMessageModel,
telemetryClient: mockTelemetryClient
})(contextMock, aCreatedMessageEvent);

expect(getMessageStatusUpdaterMock).toBeCalledWith(
lMessageStatusModel,
aCreatedMessageEvent.messageId,
aRetrievedMessage.fiscalCode
);

expect(getQueryIteratorMock).toBeCalledWith(
expect.objectContaining({
parameters: expect.arrayContaining([
expect.objectContaining({ value: aCreatedMessageEvent.messageId })
])
})
fabriziopapi marked this conversation as resolved.
Show resolved Hide resolved
);

expect(mockTelemetryClient.trackEvent).toBeCalledWith(
expect.objectContaining({
name: "api.messages.create.failedprocessing",
properties: expect.objectContaining({ messageId: "A_MESSAGE_ID" })
})
);
});

it("GIVEN a created message event with an not existing messageId WHEN the failed handler is called THEN a cosmos exception is thrown", async () => {
getQueryIteratorMock.mockImplementationOnce(() => ({
[Symbol.asyncIterator]() {
return {
i: 0,
async next() {
return { done: true };
}
};
}
}));

await expect(
getOnFailedProcessMessageHandler({
lMessageStatusModel,
lMessageModel,
telemetryClient: mockTelemetryClient
})(contextMock, aCreatedMessageEvent)
).rejects.toEqual(
expect.objectContaining({ kind: "COSMOS_ERROR_RESPONSE" })
);
});
});
80 changes: 63 additions & 17 deletions OnFailedProcessMessage/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,26 @@ import {
MessageStatusModel
} from "@pagopa/io-functions-commons/dist/src/models/message_status";
import { MessageStatusValueEnum } from "@pagopa/io-functions-commons/dist/generated/definitions/MessageStatusValue";
import { initTelemetryClient } from "../utils/appinsights";
import { withJsonInput } from "../utils/with-json-input";
import { withDecodedInput } from "../utils/with-decoded-input";
import { MessageModel } from "@pagopa/io-functions-commons/dist/src/models/message";
import { constant, pipe } from "fp-ts/lib/function";
import * as E from "fp-ts/Either";
import * as TE from "fp-ts/TaskEither";
import {
CosmosDecodingError,
CosmosErrorResponse
} from "@pagopa/io-functions-commons/dist/src/utils/cosmosdb_model";
import {
asyncIterableToArray,
flattenAsyncIterable
} from "@pagopa/io-functions-commons/dist/src/utils/async";
import { CreatedMessageEvent } from "../utils/events/message";
import { withDecodedInput } from "../utils/with-decoded-input";
import { withJsonInput } from "../utils/with-json-input";
import { initTelemetryClient } from "../utils/appinsights";

export interface IOnFailedProcessMessageHandlerInput {
readonly lMessageStatusModel: MessageStatusModel;
readonly lMessageModel: MessageModel;
readonly telemetryClient: ReturnType<typeof initTelemetryClient>;
}

Expand All @@ -23,21 +36,54 @@ type Handler = (c: Context, i: unknown) => Promise<void>;
*/
export const getOnFailedProcessMessageHandler = ({
lMessageStatusModel,
lMessageModel,
telemetryClient
}: IOnFailedProcessMessageHandlerInput): Handler =>
withJsonInput(
withDecodedInput(CreatedMessageEvent, async (_, { messageId }) => {
await getMessageStatusUpdater(
lMessageStatusModel,
messageId
)(MessageStatusValueEnum.FAILED)();

telemetryClient.trackEvent({
name: "api.messages.create.failedprocessing",
properties: {
messageId
},
tagOverrides: { samplingEnabled: "false" }
});
})
withDecodedInput(CreatedMessageEvent, async (_, { messageId }) =>
pipe(
// query for message with input messageId in order to retrieve the fiscalCode
lMessageModel.getQueryIterator({
parameters: [{ name: "@messageId", value: messageId }],
query: `SELECT TOP 1 * FROM m WHERE m.id = @messageId`
}),
flattenAsyncIterable,
asyncIterableToArray,
constant,
TE.fromTask,
TE.filterOrElse(
messages => messages.length === 1,
() =>
CosmosErrorResponse({
code: 404,
message: "Missing message",
name: "Not Found"
})
),
TE.chainEitherKW(messages =>
pipe(messages[0], E.mapLeft(CosmosDecodingError))
),
// create the message status for the failed message
TE.chain(message =>
getMessageStatusUpdater(
lMessageStatusModel,
messageId,
message.fiscalCode
)(MessageStatusValueEnum.FAILED)
),
TE.map(() => {
telemetryClient.trackEvent({
name: "api.messages.create.failedprocessing",
properties: {
messageId
},
tagOverrides: { samplingEnabled: "false" }
});
}),
// throw error to trigger retry
TE.getOrElse(e => {
throw e;
})
)()
)
);
10 changes: 10 additions & 0 deletions OnFailedProcessMessage/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import {
MESSAGE_STATUS_COLLECTION_NAME,
MessageStatusModel
} from "@pagopa/io-functions-commons/dist/src/models/message_status";
import {
MessageModel,
MESSAGE_COLLECTION_NAME
} from "@pagopa/io-functions-commons/dist/src/models/message";
import { cosmosdbInstance } from "../utils/cosmosdb";
import { getConfigOrThrow } from "../utils/config";
import { initTelemetryClient } from "../utils/appinsights";
Expand All @@ -15,12 +19,18 @@ const messageStatusModel = new MessageStatusModel(
cosmosdbInstance.container(MESSAGE_STATUS_COLLECTION_NAME)
);

const messageModel = new MessageModel(
cosmosdbInstance.container(MESSAGE_COLLECTION_NAME),
config.MESSAGE_CONTAINER_NAME
);

const telemetryClient = initTelemetryClient(
config.APPINSIGHTS_INSTRUMENTATIONKEY
);

const activityFunctionHandler: AzureFunction = getOnFailedProcessMessageHandler(
{
lMessageModel: messageModel,
lMessageStatusModel: messageStatusModel,
telemetryClient
}
Expand Down
42 changes: 40 additions & 2 deletions ProcessMessage/__tests__/handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ import {
} from "../../__mocks__/mocks";
import { getProcessMessageHandler } from "../handler";
import {
FiscalCode,
NonEmptyString,
OrganizationFiscalCode
} from "@pagopa/ts-commons/lib/strings";
import { Context } from "@azure/functions";
import { MessageStatusModel } from "@pagopa/io-functions-commons/dist/src/models/message_status";
import { pipe } from "fp-ts/lib/function";
import { readableReport } from "@pagopa/ts-commons/lib/reporters";
import {
Expand All @@ -56,6 +56,8 @@ import { Second } from "@pagopa/ts-commons/lib/units";
import * as lolex from "lolex";
import { subSeconds } from "date-fns";
import { DEFAULT_PENDING_ACTIVATION_GRACE_PERIOD_SECONDS } from "../../utils/config";
import * as MS from "@pagopa/io-functions-commons/dist/src/models/message_status";
import { MessageStatusValueEnum } from "@pagopa/io-functions-commons/dist/generated/definitions/MessageStatusValue";

const createContext = (): Context =>
(({
Expand Down Expand Up @@ -97,7 +99,8 @@ const lServicePreferencesModel = ({
const lMessageStatusModel = ({
upsert: (...args) => TE.of({} /* anything */),
findLastVersionByModelId: (...args) => TE.right(O.none)
} as unknown) as MessageStatusModel;
} as unknown) as MS.MessageStatusModel;
const getMessageStatusUpdaterMock = jest.spyOn(MS, "getMessageStatusUpdater");

const activationFindLastVersionMock = jest.fn();
const lActivation = ({
Expand Down Expand Up @@ -227,6 +230,29 @@ beforeEach(() => {
// we should refactor them to have them independent, however for now we keep the workaround
jest.resetAllMocks();
clock = lolex.install({ now: ExecutionDateContext });
// Mock getMessageStatusUpdater
getMessageStatusUpdaterMock.mockImplementation(
(
_messageStatusModel: MS.MessageStatusModel,
messageId: NonEmptyString,
fiscalCode: FiscalCode
) => (status: MessageStatusValueEnum) =>
TE.right({
_etag: "a",
_rid: "a",
_self: "self",
_ts: 0,
kind: "IRetrievedMessageStatus",
id: messageId,
version: 0 as NonNegativeInteger,
messageId,
status,
updatedAt: new Date(),
isRead: false,
isArchived: false,
fiscalCode
})
);
});

afterEach(() => {
Expand Down Expand Up @@ -310,6 +336,12 @@ describe("getprocessMessageHandler", () => {

await processMessageHandler(context, JSON.stringify(messageEvent));

expect(getMessageStatusUpdaterMock).toHaveBeenCalledWith(
lMessageStatusModel,
messageEvent.messageId,
profileResult.fiscalCode
);

pipe(
context.bindings.processedMessage,
ProcessedMessageEvent.decode,
Expand Down Expand Up @@ -398,6 +430,12 @@ describe("getprocessMessageHandler", () => {

await processMessageHandler(context, JSON.stringify(messageEvent));

expect(getMessageStatusUpdaterMock).toHaveBeenCalledWith(
lMessageStatusModel,
messageEvent.messageId,
profileResult.fiscalCode
);

pipe(
context.bindings.processedMessage,
ProcessedMessageEvent.decode,
Expand Down
12 changes: 8 additions & 4 deletions ProcessMessage/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,8 @@ export const getProcessMessageHandler = ({
context.log.warn(`${logPrefix}|RESULT=PROFILE_NOT_FOUND`);
await getMessageStatusUpdater(
lMessageStatusModel,
createdMessageEvent.message.id
createdMessageEvent.message.id,
newMessageWithoutContent.fiscalCode
)(MessageStatusValueEnum.REJECTED)();

return;
Expand All @@ -425,7 +426,8 @@ export const getProcessMessageHandler = ({
context.log.warn(`${logPrefix}|RESULT=MASTER_INBOX_DISABLED`);
await getMessageStatusUpdater(
lMessageStatusModel,
createdMessageEvent.message.id
createdMessageEvent.message.id,
newMessageWithoutContent.fiscalCode
)(MessageStatusValueEnum.REJECTED)();
return;
}
Expand Down Expand Up @@ -510,7 +512,8 @@ export const getProcessMessageHandler = ({
context.log.warn(`${logPrefix}|RESULT=SENDER_BLOCKED`);
await getMessageStatusUpdater(
lMessageStatusModel,
createdMessageEvent.message.id
createdMessageEvent.message.id,
profile.fiscalCode
)(MessageStatusValueEnum.REJECTED)();
return;
}
Expand All @@ -526,7 +529,8 @@ export const getProcessMessageHandler = ({

await getMessageStatusUpdater(
lMessageStatusModel,
createdMessageEvent.message.id
createdMessageEvent.message.id,
profile.fiscalCode
)(MessageStatusValueEnum.PROCESSED)();

telemetryClient.trackEvent({
Expand Down
Loading