Skip to content

Commit

Permalink
resync old convention business logic
Browse files Browse the repository at this point in the history
  • Loading branch information
bbohec authored and celineung committed Jun 22, 2023
1 parent eee7c7f commit 1b5a964
Show file tree
Hide file tree
Showing 10 changed files with 736 additions and 2 deletions.
1 change: 1 addition & 0 deletions back/src/adapters/primary/config/createUseCases.ts
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ export const createUseCases = (
uowPerformer,
gateways.poleEmploiGateway,
gateways.timeGateway,
false,
),
shareConventionByEmail: new ShareApplicationLinkByEmail(
uowPerformer,
Expand Down
4 changes: 4 additions & 0 deletions back/src/adapters/primary/config/uowConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { InMemoryAuthenticatedUserRepository } from "../../secondary/InMemoryAut
import { InMemoryConventionPoleEmploiAdvisorRepository } from "../../secondary/InMemoryConventionPoleEmploiAdvisorRepository";
import { InMemoryConventionQueries } from "../../secondary/InMemoryConventionQueries";
import { InMemoryConventionRepository } from "../../secondary/InMemoryConventionRepository";
import { InMemoryConventionToSyncRepository } from "../../secondary/InMemoryConventionToSyncRepository";
import { InMemoryFeatureFlagRepository } from "../../secondary/InMemoryFeatureFlagRepository";
import { InMemoryFormEstablishmentRepository } from "../../secondary/InMemoryFormEstablishmentRepository";
import { InMemoryImmersionAssessmentRepository } from "../../secondary/InMemoryImmersionAssessmentRepository";
Expand All @@ -31,6 +32,7 @@ import { PgAuthenticatedUserRepository } from "../../secondary/pg/PgAuthenticate
import { PgConventionPoleEmploiAdvisorRepository } from "../../secondary/pg/PgConventionPoleEmploiAdvisorRepository";
import { PgConventionQueries } from "../../secondary/pg/PgConventionQueries";
import { PgConventionRepository } from "../../secondary/pg/PgConventionRepository";
import { PgConventionToSyncRepository } from "../../secondary/pg/PgConventionToSyncRepository";
import { PgDiscussionAggregateRepository } from "../../secondary/pg/PgDiscussionAggregateRepository";
import { PgErrorRepository } from "../../secondary/pg/PgErrorRepository";
import { PgEstablishmentAggregateRepository } from "../../secondary/pg/PgEstablishmentAggregateRepository";
Expand Down Expand Up @@ -70,6 +72,7 @@ export const createInMemoryUow = () => {
conventionRepository,
conventionPoleEmploiAdvisorRepository:
new InMemoryConventionPoleEmploiAdvisorRepository(),
conventionToSyncRepository: new InMemoryConventionToSyncRepository(),
discussionAggregateRepository: new InMemoryDiscussionAggregateRepository(),
establishmentAggregateRepository:
new InMemoryEstablishmentAggregateRepository(),
Expand Down Expand Up @@ -101,6 +104,7 @@ export const createPgUow = (client: PoolClient): UnitOfWork => {
conventionQueries: new PgConventionQueries(client),
conventionPoleEmploiAdvisorRepository:
new PgConventionPoleEmploiAdvisorRepository(client),
conventionToSyncRepository: new PgConventionToSyncRepository(),
discussionAggregateRepository: new PgDiscussionAggregateRepository(client),
establishmentAggregateRepository: new PgEstablishmentAggregateRepository(
client,
Expand Down
39 changes: 39 additions & 0 deletions back/src/adapters/secondary/InMemoryConventionToSyncRepository.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { ConventionId } from "shared";
import {
ConventionToSync,
ConventionToSyncRepository,
} from "../../domain/convention/ports/ConventionToSyncRepository";

export class InMemoryConventionToSyncRepository
implements ConventionToSyncRepository
{
public async getById(
id: ConventionId,
): Promise<ConventionToSync | undefined> {
return this.conventionsToSync.find((convention) => convention.id === id);
}

public async getNotProcessedAndErrored(
limit: number,
): Promise<ConventionToSync[]> {
return this.conventionsToSync
.filter(({ status }) => status === "ERROR" || status === "TO_PROCESS")
.slice(0, limit);
}

public async save(conventionToSync: ConventionToSync): Promise<void> {
const index = this.conventionsToSync.findIndex(
(convention) => convention.id === conventionToSync.id,
);
index === -1
? this.conventionsToSync.push(conventionToSync)
: (this.conventionsToSync[index] = conventionToSync);
}

// for testing purpose
public setForTesting(conventions: ConventionToSync[]) {
this.conventionsToSync = conventions;
}

public conventionsToSync: ConventionToSync[] = [];
}
21 changes: 21 additions & 0 deletions back/src/adapters/secondary/pg/PgConventionToSyncRepository.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { ConventionId } from "shared";
import {
ConventionToSync,
ConventionToSyncRepository,
} from "../../../domain/convention/ports/ConventionToSyncRepository";

export class PgConventionToSyncRepository
implements ConventionToSyncRepository
{
getNotProcessedAndErrored(): Promise<ConventionToSync[]> {
return Promise.resolve([]);
}

save(_filledConvention: ConventionToSync): Promise<void> {
return Promise.resolve(undefined);
}

getById(_id: ConventionId): Promise<ConventionToSync | undefined> {
return Promise.resolve(undefined);
}
}
32 changes: 32 additions & 0 deletions back/src/domain/convention/ports/ConventionToSyncRepository.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { ConventionId } from "shared";

export type ConventionToSync =
| {
id: ConventionId;
status: "TO_PROCESS";
}
| {
id: ConventionId;
status: "SUCCESS";
processDate: Date;
}
| {
id: ConventionId;
status: "ERROR";
processDate: Date;
reason: string;
}
| {
id: ConventionId;
status: "SKIP";
processDate: Date;
reason: string;
};

export interface ConventionToSyncRepository {
getById(id: ConventionId): Promise<ConventionToSync | undefined>;

getNotProcessedAndErrored(limit: number): Promise<ConventionToSync[]>;

save(filledConvention: ConventionToSync): Promise<void>;
}
140 changes: 140 additions & 0 deletions back/src/domain/convention/useCases/ResyncOldConventionsToPe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import { z } from "zod";
import { ConventionId } from "shared";
import { NotFoundError } from "../../../adapters/primary/helpers/httpErrors";
import { TimeGateway } from "../../core/ports/TimeGateway";
import { UnitOfWork, UnitOfWorkPerformer } from "../../core/ports/UnitOfWork";
import { TransactionalUseCase } from "../../core/UseCase";
import { ConventionToSync } from "../ports/ConventionToSyncRepository";
import { PoleEmploiGateway } from "../ports/PoleEmploiGateway";
import { BroadcastToPoleEmploiOnConventionUpdates } from "./broadcast/BroadcastToPoleEmploiOnConventionUpdates";

type ResyncOldConventionToPeReport = {
success: number;
skips: Record<ConventionId, string>;
errors: Record<ConventionId, Error>;
};

export class ResyncOldConventionToPe extends TransactionalUseCase<
void,
ResyncOldConventionToPeReport
> {
constructor(
private uowPerform: UnitOfWorkPerformer,
private poleEmploiGateway: PoleEmploiGateway,
private timeGateway: TimeGateway,
private limit: number,
) {
super(uowPerform);
}

protected override inputSchema = z.void();

public async _execute(
_: void,
uow: UnitOfWork,
): Promise<ResyncOldConventionToPeReport> {
const conventionsToSync =
await uow.conventionToSyncRepository.getNotProcessedAndErrored(
this.limit,
);
await Promise.all(
conventionsToSync.map((conventionToSync) =>
this.handleConventionToSync(uow, conventionToSync),
),
);

return this.report;
}

private async handleConventionToSync(
uow: UnitOfWork,
conventionToSync: ConventionToSync,
) {
try {
await this.resync(uow, conventionToSync);
const updatedConventionToSync =
await uow.conventionToSyncRepository.getById(conventionToSync.id);
if (updatedConventionToSync === undefined) {
this.report.errors[conventionToSync.id] = new Error(
"Convention not found or no status",
);
return;
}
if (updatedConventionToSync.status === "TO_PROCESS") {
this.report.errors[conventionToSync.id] = new Error(
"Convention still have status TO_PROCESS",
);
}
if (updatedConventionToSync.status === "SUCCESS")
this.report.success += 1;
if (updatedConventionToSync.status === "SKIP") {
this.report.skips[conventionToSync.id] = updatedConventionToSync.reason;
}
if (updatedConventionToSync.status === "ERROR") {
this.report.errors[conventionToSync.id] = new Error(
updatedConventionToSync.reason,
);
}
// const strategy: {
// [K in ConventionToSync["status"]]: (
// conventionCase: Extract<ConventionToSync, { status: K }>,
// ) => void;
// } = {
// SUCCESS: () => {
// this.report.success += 1;
// },
// TO_PROCESS: () => {
// this.report.errors[conventionToSync.id] = new Error(
// "Convention still have status TO_PROCESS",
// );
// },
// SKIP: (conventionCase) => {
// this.report.skips[conventionToSync.id] = conventionCase.reason;
// },
// ERROR: (conventionCase) => {
// this.report.errors[conventionToSync.id] = new Error(
// conventionCase.reason,
// );
// },
// };
// strategy[updatedConventionToSync.status](updatedConventionToSync);
} catch (error) {
const anError =
error instanceof Error
? error
: new Error("Not an Error: " + JSON.stringify(error));
await uow.conventionToSyncRepository.save({
id: conventionToSync.id,
status: "ERROR",
processDate: this.timeGateway.now(),
reason: anError.message,
});
this.report.errors[conventionToSync.id] = anError;
}
}

private async resync(
uow: UnitOfWork,
conventionToSync: ConventionToSync,
): Promise<void> {
const convention = await uow.conventionRepository.getById(
conventionToSync.id,
);
if (!convention)
throw new NotFoundError(
`Convention with id ${conventionToSync.id} missing in conventionRepository.`,
);
await new BroadcastToPoleEmploiOnConventionUpdates(
this.uowPerform,
this.poleEmploiGateway,
this.timeGateway,
true,
).execute(convention);
}

private report: ResyncOldConventionToPeReport = {
errors: {},
skips: {},
success: 0,
};
}
Loading

0 comments on commit 1b5a964

Please sign in to comment.