diff --git a/back/package.json b/back/package.json index ccc13da556..fe057cb7d7 100644 --- a/back/package.json +++ b/back/package.json @@ -16,6 +16,7 @@ "trigger-sending-emails-with-assessment-creation-link": "ts-node src/adapters/primary/scripts/triggerSendingEmailsWithAssessmentCreationLink.ts", "trigger-refresh-materialized-views": "ts-node src/adapters/primary/scripts/triggerRefreshMaterializedViews.ts", "trigger-convention-reminder": "ts-node src/adapters/primary/scripts/triggerConventionReminder.ts", + "trigger-resync-old-conventions-to-pe": "ts-node src/adapters/primary/scripts/triggerResyncOldConventionsToPe.ts", "seed": "NODE_ENV=local ts-node src/adapters/primary/scripts/seed.ts", "generate-api-key": "ts-node src/adapters/primary/scripts/generateApiKey.ts", "test:unit": "jest --watchAll --testRegex=.unit.test.ts", @@ -61,6 +62,7 @@ "ramda": "^0.28.0", "shared": "workspace:*", "ts-node": "^10.9.1", + "ts-pattern": "4.2.2", "uuid": "^8.3.2", "zod": "3.21.4" }, diff --git a/back/scalingo/cron.json b/back/scalingo/cron.json index a1ae26edab..17654f44d6 100644 --- a/back/scalingo/cron.json +++ b/back/scalingo/cron.json @@ -20,6 +20,10 @@ "command": "03 2 * * * pnpm back trigger-update-establishments-from-sirene", "size": "M" }, + { + "command": "02 3 * * * pnpm back run trigger-resync-old-conventions-to-pe", + "size": "M" + }, { "command": "15 05 * * * pnpm back run trigger-convention-reminder", "size": "M" diff --git a/back/src/adapters/primary/config/createUseCases.ts b/back/src/adapters/primary/config/createUseCases.ts index 8b493e4742..63da480b61 100644 --- a/back/src/adapters/primary/config/createUseCases.ts +++ b/back/src/adapters/primary/config/createUseCases.ts @@ -414,6 +414,7 @@ export const createUseCases = ( uowPerformer, gateways.poleEmploiGateway, gateways.timeGateway, + { resyncMode: false }, ), shareConventionByEmail: new ShareApplicationLinkByEmail( uowPerformer, diff --git a/back/src/adapters/primary/config/uowConfig.ts b/back/src/adapters/primary/config/uowConfig.ts index 8f7a113e5c..ab26790b3a 100644 --- a/back/src/adapters/primary/config/uowConfig.ts +++ b/back/src/adapters/primary/config/uowConfig.ts @@ -16,6 +16,7 @@ import { InMemoryAuthenticatedUserRepository } from "../../secondary/InMemoryAut import { InMemoryConventionPoleEmploiAdvisorRepository } from "../../secondary/InMemoryConventionPoleEmploiAdvisorRepository"; import { InMemoryConventionQueries } from "../../secondary/InMemoryConventionQueries"; import { InMemoryConventionRepository } from "../../secondary/InMemoryConventionRepository"; +import { InMemoryConventionsToSyncRepository } from "../../secondary/InMemoryConventionsToSyncRepository"; import { InMemoryFeatureFlagRepository } from "../../secondary/InMemoryFeatureFlagRepository"; import { InMemoryFormEstablishmentRepository } from "../../secondary/InMemoryFormEstablishmentRepository"; import { InMemoryImmersionAssessmentRepository } from "../../secondary/InMemoryImmersionAssessmentRepository"; @@ -31,6 +32,7 @@ import { PgAuthenticatedUserRepository } from "../../secondary/pg/PgAuthenticate import { PgConventionPoleEmploiAdvisorRepository } from "../../secondary/pg/PgConventionPoleEmploiAdvisorRepository"; import { PgConventionQueries } from "../../secondary/pg/PgConventionQueries"; import { PgConventionRepository } from "../../secondary/pg/PgConventionRepository"; +import { PgConventionsToSyncRepository } from "../../secondary/pg/PgConventionsToSyncRepository"; import { PgDiscussionAggregateRepository } from "../../secondary/pg/PgDiscussionAggregateRepository"; import { PgErrorRepository } from "../../secondary/pg/PgErrorRepository"; import { PgEstablishmentAggregateRepository } from "../../secondary/pg/PgEstablishmentAggregateRepository"; @@ -70,6 +72,7 @@ export const createInMemoryUow = () => { conventionRepository, conventionPoleEmploiAdvisorRepository: new InMemoryConventionPoleEmploiAdvisorRepository(), + conventionsToSyncRepository: new InMemoryConventionsToSyncRepository(), discussionAggregateRepository: new InMemoryDiscussionAggregateRepository(), establishmentAggregateRepository: new InMemoryEstablishmentAggregateRepository(), @@ -101,6 +104,7 @@ export const createPgUow = (client: PoolClient): UnitOfWork => { conventionQueries: new PgConventionQueries(client), conventionPoleEmploiAdvisorRepository: new PgConventionPoleEmploiAdvisorRepository(client), + conventionsToSyncRepository: new PgConventionsToSyncRepository(client), discussionAggregateRepository: new PgDiscussionAggregateRepository(client), establishmentAggregateRepository: new PgEstablishmentAggregateRepository( client, diff --git a/back/src/adapters/primary/scripts/triggerResyncOldConventionsToPe.ts b/back/src/adapters/primary/scripts/triggerResyncOldConventionsToPe.ts new file mode 100644 index 0000000000..80ff6821ce --- /dev/null +++ b/back/src/adapters/primary/scripts/triggerResyncOldConventionsToPe.ts @@ -0,0 +1,80 @@ +import axios from "axios"; +import { Pool } from "pg"; +import { GetAccessTokenResponse } from "../../../domain/convention/ports/PoleEmploiGateway"; +import { ResyncOldConventionsToPe } from "../../../domain/convention/useCases/ResyncOldConventionsToPe"; +import { noRetries } from "../../../domain/core/ports/RetryStrategy"; +import { createLogger } from "../../../utils/logger"; +import { InMemoryCachingGateway } from "../../secondary/core/InMemoryCachingGateway"; +import { RealTimeGateway } from "../../secondary/core/TimeGateway/RealTimeGateway"; +import { HttpPoleEmploiGateway } from "../../secondary/poleEmploi/HttpPoleEmploiGateway"; +import { createPoleEmploiTargets } from "../../secondary/poleEmploi/PoleEmploi.targets"; +import { AppConfig } from "../config/appConfig"; +import { configureCreateHttpClientForExternalApi } from "../config/createHttpClientForExternalApi"; +import { createUowPerformer } from "../config/uowConfig"; +import { handleEndOfScriptNotification } from "./handleEndOfScriptNotification"; + +const logger = createLogger(__filename); + +const config = AppConfig.createFromEnv(); + +const executeUsecase = async () => { + const timeGateway = new RealTimeGateway(); + + const httpPoleEmploiGateway = new HttpPoleEmploiGateway( + configureCreateHttpClientForExternalApi( + axios.create({ timeout: config.externalAxiosTimeout }), + )(createPoleEmploiTargets(config.peApiUrl)), + new InMemoryCachingGateway( + timeGateway, + "expires_in", + ), + config.peApiUrl, + config.poleEmploiAccessTokenConfig, + noRetries, + ); + + const { uowPerformer } = createUowPerformer( + config, + () => + new Pool({ + connectionString: config.pgImmersionDbUrl, + }), + ); + + const resyncOldConventionsToPeUsecase = new ResyncOldConventionsToPe( + uowPerformer, + httpPoleEmploiGateway, + timeGateway, + 50, + ); + + return resyncOldConventionsToPeUsecase.execute(); +}; + +/* eslint-disable @typescript-eslint/no-floating-promises */ +handleEndOfScriptNotification( + "resyncOldConventionToPE", + config, + executeUsecase, + (report) => { + const errors = Object.entries(report.errors).map( + ([key, error]) => `${key}: ${error.message} `, + ); + + const skips = Object.entries(report.skips).map( + ([key, reason]) => `${key}: ${reason} `, + ); + + return [ + `Total of convention to sync : ${ + report.success + errors.length + errors.length + }`, + `Number of successfully sync convention : ${report.success}`, + `Number of failures : ${errors.length}`, + `Number of skips : ${skips.length}`, + ...(errors.length > 0 ? [`Failures : ${errors.join("\n")}`] : []), + ...(skips.length > 0 ? [`Skips : ${skips.join("\n")}`] : []), + ].join("\n"); + }, + logger, +); diff --git a/back/src/adapters/secondary/InMemoryConventionsToSyncRepository.ts b/back/src/adapters/secondary/InMemoryConventionsToSyncRepository.ts new file mode 100644 index 0000000000..361226d0e3 --- /dev/null +++ b/back/src/adapters/secondary/InMemoryConventionsToSyncRepository.ts @@ -0,0 +1,37 @@ +import { ConventionId } from "shared"; +import { + ConventionsToSyncRepository, + ConventionToSync, +} from "../../domain/convention/ports/ConventionsToSyncRepository"; + +export class InMemoryConventionsToSyncRepository + implements ConventionsToSyncRepository +{ + public async getById( + id: ConventionId, + ): Promise { + return this.conventionsToSync.find((convention) => convention.id === id); + } + + public async getToProcessOrError(limit: number): Promise { + return this.conventionsToSync + .filter(({ status }) => status === "ERROR" || status === "TO_PROCESS") + .slice(0, limit); + } + + public async save(conventionToSync: ConventionToSync): Promise { + const index = this.conventionsToSync.findIndex( + (convention) => convention.id === conventionToSync.id, + ); + index === -1 + ? this.conventionsToSync.push(conventionToSync) + : (this.conventionsToSync[index] = conventionToSync); + } + + // for testing purpose + public setForTesting(conventions: ConventionToSync[]) { + this.conventionsToSync = conventions; + } + + public conventionsToSync: ConventionToSync[] = []; +} diff --git a/back/src/adapters/secondary/pg/PgConventionsToSyncRepository.integration.test.ts b/back/src/adapters/secondary/pg/PgConventionsToSyncRepository.integration.test.ts new file mode 100644 index 0000000000..dd89b42f63 --- /dev/null +++ b/back/src/adapters/secondary/pg/PgConventionsToSyncRepository.integration.test.ts @@ -0,0 +1,100 @@ +import { Pool, PoolClient } from "pg"; +import { expectToEqual } from "shared"; +import { getTestPgPool } from "../../../_testBuilders/getTestPgPool"; +import { ConventionToSync } from "../../../domain/convention/ports/ConventionsToSyncRepository"; +import { + conventionsToSyncTableName, + PgConventionsToSyncRepository, +} from "./PgConventionsToSyncRepository"; + +describe("PgConventionsRepository", () => { + const conventionsToSync: ConventionToSync[] = [ + { + id: "aaaaac99-9c0b-1bbb-bb6d-6bb9bd38aaa1", + status: "TO_PROCESS", + }, + { + id: "aaaaac99-9c0b-1bbb-bb6d-6bb9bd38aaa2", + status: "SUCCESS", + processDate: new Date(), + }, + { + id: "aaaaac99-9c0b-1bbb-bb6d-6bb9bd38aaa3", + status: "ERROR", + processDate: new Date(), + reason: "An error", + }, + { + id: "aaaaac99-9c0b-1bbb-bb6d-6bb9bd38aaa4", + status: "SKIP", + processDate: new Date(), + reason: "Skipped reason", + }, + ]; + + let pool: Pool; + let client: PoolClient; + let conventionsToSyncRepository: PgConventionsToSyncRepository; + + beforeAll(async () => { + pool = getTestPgPool(); + client = await pool.connect(); + }); + + afterAll(async () => { + client.release(); + await pool.end(); + }); + + beforeEach(async () => { + await client.query(`DELETE + FROM ${conventionsToSyncTableName}`); + conventionsToSyncRepository = new PgConventionsToSyncRepository(client); + }); + + it.each(conventionsToSync)( + `save and getById convention with status '$status'`, + async (conventionToSync) => { + expectToEqual( + await conventionsToSyncRepository.getById(conventionToSync.id), + undefined, + ); + + await conventionsToSyncRepository.save(conventionToSync); + + const syncedConvention = await conventionsToSyncRepository.getById( + conventionToSync.id, + ); + expectToEqual(syncedConvention, conventionToSync); + }, + ); + + describe("getNotProcessedAndErrored", () => { + beforeEach(() => + Promise.all( + conventionsToSync.map((conventionToSync) => + conventionsToSyncRepository.save(conventionToSync), + ), + ), + ); + + it("only TO_PROCESS and ERROR", async () => { + const conventionsToSyncNotProcessedAndErrored = + await conventionsToSyncRepository.getToProcessOrError(10000); + + expectToEqual(conventionsToSyncNotProcessedAndErrored, [ + conventionsToSync[0], + conventionsToSync[2], + ]); + }); + + it("with limit 1", async () => { + const conventionsToSyncNotProcessedAndErrored = + await conventionsToSyncRepository.getToProcessOrError(1); + + expectToEqual(conventionsToSyncNotProcessedAndErrored, [ + conventionsToSync[0], + ]); + }); + }); +}); diff --git a/back/src/adapters/secondary/pg/PgConventionsToSyncRepository.ts b/back/src/adapters/secondary/pg/PgConventionsToSyncRepository.ts new file mode 100644 index 0000000000..b60bc5261d --- /dev/null +++ b/back/src/adapters/secondary/pg/PgConventionsToSyncRepository.ts @@ -0,0 +1,86 @@ +import { PoolClient } from "pg"; +import { ConventionId } from "shared"; +import { + ConventionsToSyncRepository, + ConventionToSync, +} from "../../../domain/convention/ports/ConventionsToSyncRepository"; + +export const conventionsToSyncTableName = "conventions_to_sync_with_pe"; + +export class PgConventionsToSyncRepository + implements ConventionsToSyncRepository +{ + constructor(private client: PoolClient) {} + + async getToProcessOrError(limit: number): Promise { + const queryResult = await this.client.query( + ` + SELECT id, status, process_date, reason + FROM ${conventionsToSyncTableName} + WHERE status = 'TO_PROCESS' + OR status = 'ERROR' + LIMIT $1 + `, + [limit], + ); + return queryResult.rows.map((pgConventionToSync) => + pgResultToConventionToSync(pgConventionToSync), + ); + } + + async save(conventionToSync: ConventionToSync): Promise { + await this.client.query( + ` + INSERT INTO ${conventionsToSyncTableName} (id, + status, + process_date, + reason) + VALUES ($1, $2, $3, $4) + `, + [ + conventionToSync.id, + conventionToSync.status, + conventionToSync.status !== "TO_PROCESS" + ? conventionToSync.processDate + : null, + conventionToSync.status === "ERROR" || + conventionToSync.status === "SKIP" + ? conventionToSync.reason + : null, + ], + ); + } + + async getById(id: ConventionId): Promise { + const queryResult = await this.client.query( + ` + SELECT id, status, process_date, reason + FROM ${conventionsToSyncTableName} + WHERE id = $1 + `, + [id], + ); + const pgConventionToSync = queryResult.rows.at(0); + return pgConventionToSync + ? pgResultToConventionToSync(pgConventionToSync) + : undefined; + } +} + +type PgConventionToSync = { + id: string; + status: string; + process_date: Date | null; + reason: string | null; +}; + +function pgResultToConventionToSync( + pgConventionToSync: PgConventionToSync, +): ConventionToSync { + return { + id: pgConventionToSync.id, + status: pgConventionToSync.status, + processDate: pgConventionToSync.process_date ?? undefined, + reason: pgConventionToSync.reason ?? undefined, + } as ConventionToSync; +} diff --git a/back/src/adapters/secondary/pg/migrations/1687438863530_create-conventions-to-sync-repository.ts b/back/src/adapters/secondary/pg/migrations/1687438863530_create-conventions-to-sync-repository.ts new file mode 100644 index 0000000000..ea31aa3165 --- /dev/null +++ b/back/src/adapters/secondary/pg/migrations/1687438863530_create-conventions-to-sync-repository.ts @@ -0,0 +1,17 @@ +/* eslint-disable @typescript-eslint/naming-convention */ +import { MigrationBuilder } from "node-pg-migrate"; + +const tableName = "conventions_to_sync_with_pe"; + +export async function up(pgm: MigrationBuilder): Promise { + pgm.createTable(tableName, { + id: { type: "uuid", primaryKey: true }, + status: { type: "text", notNull: true }, + process_date: { type: "timestamptz", notNull: false }, + reason: { type: "text", notNull: false }, + }); +} + +export async function down(pgm: MigrationBuilder): Promise { + pgm.dropTable(tableName); +} diff --git a/back/src/domain/convention/ports/ConventionsToSyncRepository.ts b/back/src/domain/convention/ports/ConventionsToSyncRepository.ts new file mode 100644 index 0000000000..a41093422e --- /dev/null +++ b/back/src/domain/convention/ports/ConventionsToSyncRepository.ts @@ -0,0 +1,31 @@ +import { ConventionId } from "shared"; + +export type ConventionToSync = { + id: ConventionId; +} & ( + | { + status: "TO_PROCESS"; + } + | { + status: "SUCCESS"; + processDate: Date; + } + | { + status: "ERROR"; + processDate: Date; + reason: string; + } + | { + status: "SKIP"; + processDate: Date; + reason: string; + } +); + +export interface ConventionsToSyncRepository { + getById(id: ConventionId): Promise; + + getToProcessOrError(limit: number): Promise; + + save(filledConvention: ConventionToSync): Promise; +} diff --git a/back/src/domain/convention/useCases/ResyncOldConventionsToPe.ts b/back/src/domain/convention/useCases/ResyncOldConventionsToPe.ts new file mode 100644 index 0000000000..fa6a6169da --- /dev/null +++ b/back/src/domain/convention/useCases/ResyncOldConventionsToPe.ts @@ -0,0 +1,121 @@ +import { match } from "ts-pattern"; +import { z } from "zod"; +import { ConventionId } from "shared"; +import { NotFoundError } from "../../../adapters/primary/helpers/httpErrors"; +import { TimeGateway } from "../../core/ports/TimeGateway"; +import { UnitOfWork, UnitOfWorkPerformer } from "../../core/ports/UnitOfWork"; +import { TransactionalUseCase } from "../../core/UseCase"; +import { PoleEmploiGateway } from "../ports/PoleEmploiGateway"; +import { BroadcastToPoleEmploiOnConventionUpdates } from "./broadcast/BroadcastToPoleEmploiOnConventionUpdates"; + +type ResyncOldConventionToPeReport = { + success: number; + skips: Record; + errors: Record; +}; + +export class ResyncOldConventionsToPe extends TransactionalUseCase< + void, + ResyncOldConventionToPeReport +> { + constructor( + private uowPerform: UnitOfWorkPerformer, + private poleEmploiGateway: PoleEmploiGateway, + private timeGateway: TimeGateway, + private limit: number, + ) { + super(uowPerform); + this.broadcastToPeUsecase = new BroadcastToPoleEmploiOnConventionUpdates( + this.uowPerform, + this.poleEmploiGateway, + this.timeGateway, + { resyncMode: true }, + ); + } + + protected override inputSchema = z.void(); + + public async _execute( + _: void, + uow: UnitOfWork, + ): Promise { + const conventionsToSync = + await uow.conventionsToSyncRepository.getToProcessOrError(this.limit); + await Promise.all( + conventionsToSync.map((conventionToSync) => + this.handleConventionToSync(uow, conventionToSync.id), + ), + ); + + return this.report; + } + + private async handleConventionToSync( + uow: UnitOfWork, + conventionToSyncId: ConventionId, + ) { + try { + await this.resync(uow, conventionToSyncId); + const updatedConventionToSync = + await uow.conventionsToSyncRepository.getById(conventionToSyncId); + + match(updatedConventionToSync) + .with(undefined, () => { + this.report.errors[conventionToSyncId] = new Error( + "Convention not found or no status", + ); + }) + .with({ status: "SUCCESS" }, () => { + this.report.success += 1; + }) + .with({ status: "TO_PROCESS" }, (toProcessConventionToSync) => { + this.report.errors[toProcessConventionToSync.id] = new Error( + "Convention still have status TO_PROCESS", + ); + }) + .with({ status: "ERROR" }, (errorConventionToSync) => { + this.report.errors[errorConventionToSync.id] = new Error( + errorConventionToSync.reason, + ); + }) + .with({ status: "SKIP" }, (skipConventionToSync) => { + this.report.skips[skipConventionToSync.id] = + skipConventionToSync.reason; + }) + .exhaustive(); + } catch (error) { + const anError = + error instanceof Error + ? error + : new Error("Not an Error: " + JSON.stringify(error)); + await uow.conventionsToSyncRepository.save({ + id: conventionToSyncId, + status: "ERROR", + processDate: this.timeGateway.now(), + reason: anError.message, + }); + this.report.errors[conventionToSyncId] = anError; + } + } + + private async resync( + uow: UnitOfWork, + conventionToSyncId: ConventionId, + ): Promise { + const convention = await uow.conventionRepository.getById( + conventionToSyncId, + ); + if (!convention) + throw new NotFoundError( + `Convention with id ${conventionToSyncId} missing in conventionRepository.`, + ); + return this.broadcastToPeUsecase.execute(convention); + } + + private broadcastToPeUsecase: BroadcastToPoleEmploiOnConventionUpdates; + private report: ResyncOldConventionToPeReport = { + errors: {}, + skips: {}, + success: 0, + }; +} diff --git a/back/src/domain/convention/useCases/ResyncOldConventionsToPe.unit.test.ts b/back/src/domain/convention/useCases/ResyncOldConventionsToPe.unit.test.ts new file mode 100644 index 0000000000..f71b80bb41 --- /dev/null +++ b/back/src/domain/convention/useCases/ResyncOldConventionsToPe.unit.test.ts @@ -0,0 +1,465 @@ +import subDays from "date-fns/subDays"; +import { + AgencyDtoBuilder, + ConventionDto, + ConventionDtoBuilder, + expectToEqual, +} from "shared"; +import { + createInMemoryUow, + InMemoryUnitOfWork, +} from "../../../adapters/primary/config/uowConfig"; +import { NotFoundError } from "../../../adapters/primary/helpers/httpErrors"; +import { CustomTimeGateway } from "../../../adapters/secondary/core/TimeGateway/CustomTimeGateway"; +import { InMemoryUowPerformer } from "../../../adapters/secondary/InMemoryUowPerformer"; +import { InMemoryPoleEmploiGateway } from "../../../adapters/secondary/poleEmploi/InMemoryPoleEmploiGateway"; +import { + conventionStatusToPoleEmploiStatus, + PoleEmploiConvention, +} from "../ports/PoleEmploiGateway"; +import { ResyncOldConventionsToPe } from "./ResyncOldConventionsToPe"; + +describe("ResyncOldConventionsToPe use case", () => { + const agencyPE = new AgencyDtoBuilder().withKind("pole-emploi").build(); + const conventionToSync1 = new ConventionDtoBuilder() + .withId("6f59c7b7-c2c9-4a31-a3eb-377ea83ae08b") + .withAgencyId(agencyPE.id) + .build(); + const conventionToSync2 = new ConventionDtoBuilder() + .withId("6f59c7b7-c2c9-4a31-a3eb-377ea83ae08a") + .withAgencyId(agencyPE.id) + .build(); + const conventionToSync3 = new ConventionDtoBuilder() + .withId("6f59c7b7-c2c9-4a31-a3eb-377ea83ae08d") + .withAgencyId(agencyPE.id) + .build(); + const conventionToSync4 = new ConventionDtoBuilder() + .withId("6f59c7b7-c2c9-4a31-a3eb-377ea83ae08e") + .withAgencyId(agencyPE.id) + .build(); + + let uow: InMemoryUnitOfWork; + let useCase: ResyncOldConventionsToPe; + let timeGateway: CustomTimeGateway; + let peGateway: InMemoryPoleEmploiGateway; + + beforeEach(() => { + uow = createInMemoryUow(); + + timeGateway = new CustomTimeGateway(); + peGateway = new InMemoryPoleEmploiGateway(); + useCase = new ResyncOldConventionsToPe( + new InMemoryUowPerformer(uow), + peGateway, + timeGateway, + 100, + ); + }); + + describe("Right paths", () => { + it("broadcast two conventions to pe", async () => { + uow.agencyRepository.setAgencies([agencyPE]); + uow.conventionRepository.setConventions({ + [conventionToSync1.id]: conventionToSync1, + [conventionToSync2.id]: conventionToSync2, + }); + uow.conventionsToSyncRepository.setForTesting([ + { + id: conventionToSync1.id, + status: "TO_PROCESS", + }, + { + id: conventionToSync2.id, + status: "TO_PROCESS", + }, + ]); + expectToEqual(peGateway.notifications, []); + + const report = await useCase.execute(); + + expectToEqual(uow.conventionRepository.conventions, [ + conventionToSync1, + conventionToSync2, + ]); + expectToEqual(uow.conventionsToSyncRepository.conventionsToSync, [ + { + id: conventionToSync1.id, + status: "SUCCESS", + processDate: timeGateway.now(), + }, + { + id: conventionToSync2.id, + status: "SUCCESS", + processDate: timeGateway.now(), + }, + ]); + expectToEqual(peGateway.notifications, [ + conventionToConventionNotification(conventionToSync1), + conventionToConventionNotification(conventionToSync2), + ]); + expectToEqual(report, { + success: 2, + skips: {}, + errors: {}, + }); + }); + it("broadcast one convention to pe", async () => { + uow.agencyRepository.setAgencies([agencyPE]); + uow.conventionRepository.setConventions({ + [conventionToSync1.id]: conventionToSync1, + }); + uow.conventionsToSyncRepository.setForTesting([ + { + id: conventionToSync1.id, + status: "TO_PROCESS", + }, + ]); + expectToEqual(peGateway.notifications, []); + + const report = await useCase.execute(); + + expectToEqual(uow.conventionRepository.conventions, [conventionToSync1]); + expectToEqual(uow.conventionsToSyncRepository.conventionsToSync, [ + { + id: conventionToSync1.id, + status: "SUCCESS", + processDate: timeGateway.now(), + }, + ]); + expectToEqual(peGateway.notifications, [ + conventionToConventionNotification(conventionToSync1), + ]); + expectToEqual(report, { + success: [conventionToSync1.id].length, + skips: {}, + errors: {}, + }); + }); + it("no convention to sync", async () => { + uow.conventionsToSyncRepository.setForTesting([]); + expectToEqual(peGateway.notifications, []); + + const report = await useCase.execute(); + + expectToEqual(uow.conventionRepository.conventions, []); + expectToEqual(uow.conventionsToSyncRepository.conventionsToSync, []); + expectToEqual(peGateway.notifications, []); + expectToEqual(report, { + success: 0, + skips: {}, + errors: {}, + }); + }); + it("when agency is not kind pole-emploi", async () => { + const agencyCCI = new AgencyDtoBuilder().withKind("cci").build(); + const conventionToSync = new ConventionDtoBuilder() + .withAgencyId(agencyCCI.id) + .build(); + uow.agencyRepository.setAgencies([agencyCCI]); + uow.conventionRepository.setConventions({ + [conventionToSync.id]: conventionToSync, + }); + uow.conventionsToSyncRepository.setForTesting([ + { + id: conventionToSync.id, + status: "TO_PROCESS", + }, + ]); + expectToEqual(peGateway.notifications, []); + + const report = await useCase.execute(); + + expectToEqual(uow.conventionRepository.conventions, [conventionToSync]); + expectToEqual(uow.conventionsToSyncRepository.conventionsToSync, [ + { + id: conventionToSync.id, + status: "SKIP", + processDate: timeGateway.now(), + reason: "Agency is not of kind pole-emploi", + }, + ]); + expectToEqual(peGateway.notifications, []); + expectToEqual(report, { + success: 0, + skips: { + [conventionToSync.id]: "Agency is not of kind pole-emploi", + }, + errors: {}, + }); + }); + it("when feature flag enablePeConventionBroadcast is disabled", async () => { + await uow.featureFlagRepository.set({ + flagName: "enablePeConventionBroadcast", + value: false, + }); + uow.agencyRepository.setAgencies([agencyPE]); + uow.conventionRepository.setConventions({ + [conventionToSync1.id]: conventionToSync1, + }); + uow.conventionsToSyncRepository.setForTesting([ + { + id: conventionToSync1.id, + status: "TO_PROCESS", + }, + ]); + expectToEqual(peGateway.notifications, []); + + const report = await useCase.execute(); + + expectToEqual(uow.conventionRepository.conventions, [conventionToSync1]); + expectToEqual(uow.conventionsToSyncRepository.conventionsToSync, [ + { + id: conventionToSync1.id, + status: "SKIP", + processDate: timeGateway.now(), + reason: "Feature flag enablePeConventionBroadcast not enabled", + }, + ]); + expectToEqual(peGateway.notifications, []); + expectToEqual(report, { + success: 0, + skips: { + [conventionToSync1.id]: + "Feature flag enablePeConventionBroadcast not enabled", + }, + errors: {}, + }); + }); + it("only process convention with status TO_PROCESS and ERROR", async () => { + uow.agencyRepository.setAgencies([agencyPE]); + uow.conventionRepository.setConventions({ + [conventionToSync1.id]: conventionToSync1, + [conventionToSync2.id]: conventionToSync2, + [conventionToSync3.id]: conventionToSync3, + [conventionToSync4.id]: conventionToSync4, + }); + uow.conventionsToSyncRepository.setForTesting([ + { + id: conventionToSync1.id, + status: "TO_PROCESS", + }, + { + id: conventionToSync2.id, + status: "ERROR", + processDate: subDays(timeGateway.now(), 1), + reason: "Random error", + }, + { + id: conventionToSync3.id, + status: "SKIP", + processDate: subDays(timeGateway.now(), 1), + reason: "Feature flag enablePeConventionBroadcast not enabled", + }, + { + id: conventionToSync4.id, + status: "SUCCESS", + processDate: subDays(timeGateway.now(), 1), + }, + ]); + expectToEqual(peGateway.notifications, []); + + const report = await useCase.execute(); + + expectToEqual(uow.conventionRepository.conventions, [ + conventionToSync1, + conventionToSync2, + conventionToSync3, + conventionToSync4, + ]); + expectToEqual(uow.conventionsToSyncRepository.conventionsToSync, [ + { + id: conventionToSync1.id, + status: "SUCCESS", + processDate: timeGateway.now(), + }, + { + id: conventionToSync2.id, + status: "SUCCESS", + processDate: timeGateway.now(), + }, + { + id: conventionToSync3.id, + status: "SKIP", + processDate: subDays(timeGateway.now(), 1), + reason: "Feature flag enablePeConventionBroadcast not enabled", + }, + { + id: conventionToSync4.id, + status: "SUCCESS", + processDate: subDays(timeGateway.now(), 1), + }, + ]); + expectToEqual(peGateway.notifications, [ + conventionToConventionNotification(conventionToSync1), + conventionToConventionNotification(conventionToSync2), + ]); + expectToEqual(report, { + success: 2, + skips: {}, + errors: {}, + }); + }); + it("should consider limit", async () => { + uow.agencyRepository.setAgencies([agencyPE]); + uow.conventionRepository.setConventions({ + [conventionToSync1.id]: conventionToSync1, + [conventionToSync2.id]: conventionToSync2, + }); + uow.conventionsToSyncRepository.setForTesting([ + { + id: conventionToSync1.id, + status: "TO_PROCESS", + }, + { + id: conventionToSync2.id, + status: "TO_PROCESS", + }, + ]); + expectToEqual(peGateway.notifications, []); + + const report = await new ResyncOldConventionsToPe( + new InMemoryUowPerformer(uow), + peGateway, + timeGateway, + 1, + ).execute(); + + expectToEqual(uow.conventionRepository.conventions, [ + conventionToSync1, + conventionToSync2, + ]); + expectToEqual(uow.conventionsToSyncRepository.conventionsToSync, [ + { + id: conventionToSync1.id, + status: "SUCCESS", + processDate: timeGateway.now(), + }, + { + id: conventionToSync2.id, + status: "TO_PROCESS", + }, + ]); + expectToEqual(peGateway.notifications, [ + conventionToConventionNotification(conventionToSync1), + ]); + expectToEqual(report, { + success: 1, + skips: {}, + errors: {}, + }); + }); + }); + + describe("Wrong paths", () => { + it("when no convention in conventionRepository should not sync convention", async () => { + uow.agencyRepository.setAgencies([agencyPE]); + uow.conventionsToSyncRepository.setForTesting([ + { + id: conventionToSync1.id, + status: "TO_PROCESS", + }, + ]); + expectToEqual(peGateway.notifications, []); + + const report = await useCase.execute(); + + expectToEqual(uow.conventionRepository.conventions, []); + expectToEqual(uow.conventionsToSyncRepository.conventionsToSync, [ + { + id: conventionToSync1.id, + status: "ERROR", + processDate: timeGateway.now(), + reason: `Convention with id ${conventionToSync1.id} missing in conventionRepository.`, + }, + ]); + expectToEqual(peGateway.notifications, []); + expectToEqual(report, { + success: 0, + skips: {}, + errors: { + [conventionToSync1.id]: new NotFoundError( + `Convention with id ${conventionToSync1.id} missing in conventionRepository.`, + ), + }, + }); + }); + it("when no agency", async () => { + uow.conventionRepository.setConventions({ + [conventionToSync1.id]: conventionToSync1, + }); + uow.conventionsToSyncRepository.setForTesting([ + { + id: conventionToSync1.id, + status: "TO_PROCESS", + }, + ]); + expectToEqual(peGateway.notifications, []); + + const report = await useCase.execute(); + + expectToEqual(uow.conventionRepository.conventions, [conventionToSync1]); + expectToEqual(uow.conventionsToSyncRepository.conventionsToSync, [ + { + id: conventionToSync1.id, + status: "ERROR", + processDate: timeGateway.now(), + reason: `Agency with id ${agencyPE.id} missing in agencyRepository`, + }, + ]); + expectToEqual(peGateway.notifications, []); + expectToEqual(report, { + success: 0, + skips: {}, + errors: { + [conventionToSync1.id]: new NotFoundError( + `Agency with id ${agencyPE.id} missing in agencyRepository`, + ), + }, + }); + }); + }); +}); + +function conventionToConventionNotification( + convention: ConventionDto, +): PoleEmploiConvention { + return { + id: convention.externalId + ? convention.externalId.padStart(11, "0") + : "no-external-id", + originalId: convention.id, + peConnectId: convention.signatories.beneficiary.federatedIdentity?.token, + statut: conventionStatusToPoleEmploiStatus[convention.status], + email: convention.signatories.beneficiary.email, + telephone: convention.signatories.beneficiary.phone, + prenom: convention.signatories.beneficiary.firstName, + nom: convention.signatories.beneficiary.lastName, + dateNaissance: new Date( + convention.signatories.beneficiary.birthdate, + ).toISOString(), + dateDemande: new Date(convention.dateSubmission).toISOString(), + dateDebut: new Date(convention.dateStart).toISOString(), + dateFin: new Date(convention.dateEnd).toISOString(), + dureeImmersion: convention.schedule.totalHours, + raisonSociale: convention.businessName, + siret: convention.siret, + nomPrenomFonctionTuteur: `${convention.establishmentTutor.firstName} ${convention.establishmentTutor.lastName} ${convention.establishmentTutor.job}`, + telephoneTuteur: convention.establishmentTutor.phone, + emailTuteur: convention.establishmentTutor.email, + adresseImmersion: convention.immersionAddress, + protectionIndividuelle: convention.individualProtection, + preventionSanitaire: convention.sanitaryPrevention, + descriptionPreventionSanitaire: convention.sanitaryPreventionDescription, + objectifDeImmersion: 2, + codeRome: convention.immersionAppellation.romeCode, + codeAppellation: convention.immersionAppellation.appellationCode.padStart( + 6, + "0", + ), + activitesObservees: convention.immersionActivities, + competencesObservees: convention.immersionSkills, + signatureBeneficiaire: !!convention.signatories.beneficiary.signedAt, + signatureEntreprise: + !!convention.signatories.establishmentRepresentative.signedAt, + }; +} diff --git a/back/src/domain/convention/useCases/broadcast/BroadcastToPoleEmploiOnConventionUpdates.ts b/back/src/domain/convention/useCases/broadcast/BroadcastToPoleEmploiOnConventionUpdates.ts index d555c44ef6..4d7486bdca 100644 --- a/back/src/domain/convention/useCases/broadcast/BroadcastToPoleEmploiOnConventionUpdates.ts +++ b/back/src/domain/convention/useCases/broadcast/BroadcastToPoleEmploiOnConventionUpdates.ts @@ -1,4 +1,5 @@ import { ConventionDto, conventionSchema, ImmersionObjective } from "shared"; +import { NotFoundError } from "../../../../adapters/primary/helpers/httpErrors"; import { TimeGateway } from "../../../core/ports/TimeGateway"; import { UnitOfWork, @@ -25,16 +26,17 @@ export class BroadcastToPoleEmploiOnConventionUpdates extends TransactionalUseCa ConventionDto, void > { + protected inputSchema = conventionSchema; + constructor( uowPerformer: UnitOfWorkPerformer, private poleEmploiGateway: PoleEmploiGateway, private timeGateway: TimeGateway, + private options: { resyncMode: boolean }, ) { super(uowPerformer); } - protected inputSchema = conventionSchema; - protected async _execute( convention: ConventionDto, uow: UnitOfWork, @@ -42,10 +44,30 @@ export class BroadcastToPoleEmploiOnConventionUpdates extends TransactionalUseCa const { enablePeConventionBroadcast } = await uow.featureFlagRepository.getAll(); - if (!enablePeConventionBroadcast) return; + if (!enablePeConventionBroadcast) + return this.options.resyncMode + ? uow.conventionsToSyncRepository.save({ + id: convention.id, + status: "SKIP", + processDate: this.timeGateway.now(), + reason: "Feature flag enablePeConventionBroadcast not enabled", + }) + : undefined; const [agency] = await uow.agencyRepository.getByIds([convention.agencyId]); - if (!agency || agency.kind !== "pole-emploi") return; + if (!agency) + throw new NotFoundError( + `Agency with id ${convention.agencyId} missing in agencyRepository`, + ); + if (agency.kind !== "pole-emploi") + return this.options.resyncMode + ? uow.conventionsToSyncRepository.save({ + id: convention.id, + status: "SKIP", + processDate: this.timeGateway.now(), + reason: "Agency is not of kind pole-emploi", + }) + : undefined; const { beneficiary, establishmentRepresentative } = convention.signatories; @@ -93,6 +115,13 @@ export class BroadcastToPoleEmploiOnConventionUpdates extends TransactionalUseCa poleEmploiConvention, ); + if (this.options.resyncMode) + await uow.conventionsToSyncRepository.save({ + id: convention.id, + status: "SUCCESS", + processDate: this.timeGateway.now(), + }); + if (!isBroadcastResponseOk(response)) { await uow.errorRepository.save({ serviceName: "PoleEmploiGateway.notifyOnConventionUpdated", diff --git a/back/src/domain/convention/useCases/broadcast/BroadcastToPoleEmploiOnConventionUpdates.unit.test.ts b/back/src/domain/convention/useCases/broadcast/BroadcastToPoleEmploiOnConventionUpdates.unit.test.ts index c533519bdc..3b6c8f152c 100644 --- a/back/src/domain/convention/useCases/broadcast/BroadcastToPoleEmploiOnConventionUpdates.unit.test.ts +++ b/back/src/domain/convention/useCases/broadcast/BroadcastToPoleEmploiOnConventionUpdates.unit.test.ts @@ -28,6 +28,7 @@ const prepareUseCase = async ({ new InMemoryUowPerformer(uow), poleEmploiGateWay, timeGateway, + { resyncMode: false }, ); const agencyRepository = uow.agencyRepository; diff --git a/back/src/domain/core/ports/UnitOfWork.ts b/back/src/domain/core/ports/UnitOfWork.ts index f706d1c67b..97d8e78351 100644 --- a/back/src/domain/core/ports/UnitOfWork.ts +++ b/back/src/domain/core/ports/UnitOfWork.ts @@ -2,6 +2,7 @@ import { ApiConsumerRepository } from "../../auth/ports/ApiConsumerRepository"; import { AgencyRepository } from "../../convention/ports/AgencyRepository"; import { ConventionQueries } from "../../convention/ports/ConventionQueries"; import { ConventionRepository } from "../../convention/ports/ConventionRepository"; +import { ConventionsToSyncRepository } from "../../convention/ports/ConventionsToSyncRepository"; import { ImmersionAssessmentRepository } from "../../convention/ports/ImmersionAssessmentRepository"; import { InclusionConnectedUserRepository } from "../../dashboard/port/InclusionConnectedUserRepository"; import { NotificationRepository } from "../../generic/notifications/ports/NotificationRepository"; @@ -44,6 +45,7 @@ export type UnitOfWork = { searchMadeRepository: SearchMadeRepository; shortLinkQuery: ShortLinkQuery; shortLinkRepository: ShortLinkRepository; + conventionsToSyncRepository: ConventionsToSyncRepository; }; export interface UnitOfWorkPerformer { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 01d9be32d1..5923985209 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -123,6 +123,9 @@ importers: ts-node: specifier: ^10.9.1 version: 10.9.1(@types/node@18.13.0)(typescript@5.1.3) + ts-pattern: + specifier: 4.2.2 + version: 4.2.2 uuid: specifier: ^8.3.2 version: 8.3.2 @@ -14586,7 +14589,6 @@ packages: /ts-pattern@4.2.2: resolution: {integrity: sha512-qzJMo2pbkUJWusRH5o8xR+xogn6RmvViyUgwBFTtRENLse470clCGjHDf6haWGZ1AOmk8XkEohUoBW8Uut6Scg==} - dev: true /ts-toolbelt@6.15.5: resolution: {integrity: sha512-FZIXf1ksVyLcfr7M317jbB67XFJhOO1YqdTcuGaq9q5jLUoTikukZ+98TPjKiP2jC5CgmYdWWYs0s2nLSU0/1A==}