Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

syncro PE #342

Merged
merged 5 commits into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions back/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"trigger-sending-emails-with-assessment-creation-link": "ts-node src/adapters/primary/scripts/triggerSendingEmailsWithAssessmentCreationLink.ts",
"trigger-refresh-materialized-views": "ts-node src/adapters/primary/scripts/triggerRefreshMaterializedViews.ts",
"trigger-convention-reminder": "ts-node src/adapters/primary/scripts/triggerConventionReminder.ts",
"trigger-resync-old-conventions-to-pe": "ts-node src/adapters/primary/scripts/triggerResyncOldConventionsToPe.ts",
"seed": "NODE_ENV=local ts-node src/adapters/primary/scripts/seed.ts",
"generate-api-key": "ts-node src/adapters/primary/scripts/generateApiKey.ts",
"test:unit": "jest --watchAll --testRegex=.unit.test.ts",
Expand Down Expand Up @@ -61,6 +62,7 @@
"ramda": "^0.28.0",
"shared": "workspace:*",
"ts-node": "^10.9.1",
"ts-pattern": "4.2.2",
"uuid": "^8.3.2",
"zod": "3.21.4"
},
Expand Down
4 changes: 4 additions & 0 deletions back/scalingo/cron.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
"command": "03 2 * * * pnpm back trigger-update-establishments-from-sirene",
"size": "M"
},
{
"command": "02 3 * * * pnpm back run trigger-resync-old-conventions-to-pe",
"size": "M"
},
{
"command": "15 05 * * * pnpm back run trigger-convention-reminder",
"size": "M"
Expand Down
1 change: 1 addition & 0 deletions back/src/adapters/primary/config/createUseCases.ts
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ export const createUseCases = (
uowPerformer,
gateways.poleEmploiGateway,
gateways.timeGateway,
{ resyncMode: false },
),
shareConventionByEmail: new ShareApplicationLinkByEmail(
uowPerformer,
Expand Down
4 changes: 4 additions & 0 deletions back/src/adapters/primary/config/uowConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { InMemoryAuthenticatedUserRepository } from "../../secondary/InMemoryAut
import { InMemoryConventionPoleEmploiAdvisorRepository } from "../../secondary/InMemoryConventionPoleEmploiAdvisorRepository";
import { InMemoryConventionQueries } from "../../secondary/InMemoryConventionQueries";
import { InMemoryConventionRepository } from "../../secondary/InMemoryConventionRepository";
import { InMemoryConventionsToSyncRepository } from "../../secondary/InMemoryConventionsToSyncRepository";
import { InMemoryFeatureFlagRepository } from "../../secondary/InMemoryFeatureFlagRepository";
import { InMemoryFormEstablishmentRepository } from "../../secondary/InMemoryFormEstablishmentRepository";
import { InMemoryImmersionAssessmentRepository } from "../../secondary/InMemoryImmersionAssessmentRepository";
Expand All @@ -31,6 +32,7 @@ import { PgAuthenticatedUserRepository } from "../../secondary/pg/PgAuthenticate
import { PgConventionPoleEmploiAdvisorRepository } from "../../secondary/pg/PgConventionPoleEmploiAdvisorRepository";
import { PgConventionQueries } from "../../secondary/pg/PgConventionQueries";
import { PgConventionRepository } from "../../secondary/pg/PgConventionRepository";
import { PgConventionsToSyncRepository } from "../../secondary/pg/PgConventionsToSyncRepository";
import { PgDiscussionAggregateRepository } from "../../secondary/pg/PgDiscussionAggregateRepository";
import { PgErrorRepository } from "../../secondary/pg/PgErrorRepository";
import { PgEstablishmentAggregateRepository } from "../../secondary/pg/PgEstablishmentAggregateRepository";
Expand Down Expand Up @@ -70,6 +72,7 @@ export const createInMemoryUow = () => {
conventionRepository,
conventionPoleEmploiAdvisorRepository:
new InMemoryConventionPoleEmploiAdvisorRepository(),
conventionsToSyncRepository: new InMemoryConventionsToSyncRepository(),
discussionAggregateRepository: new InMemoryDiscussionAggregateRepository(),
establishmentAggregateRepository:
new InMemoryEstablishmentAggregateRepository(),
Expand Down Expand Up @@ -101,6 +104,7 @@ export const createPgUow = (client: PoolClient): UnitOfWork => {
conventionQueries: new PgConventionQueries(client),
conventionPoleEmploiAdvisorRepository:
new PgConventionPoleEmploiAdvisorRepository(client),
conventionsToSyncRepository: new PgConventionsToSyncRepository(client),
discussionAggregateRepository: new PgDiscussionAggregateRepository(client),
establishmentAggregateRepository: new PgEstablishmentAggregateRepository(
client,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import axios from "axios";
import { Pool } from "pg";
import { GetAccessTokenResponse } from "../../../domain/convention/ports/PoleEmploiGateway";
import { ResyncOldConventionsToPe } from "../../../domain/convention/useCases/ResyncOldConventionsToPe";
import { noRetries } from "../../../domain/core/ports/RetryStrategy";
import { createLogger } from "../../../utils/logger";
import { InMemoryCachingGateway } from "../../secondary/core/InMemoryCachingGateway";
import { RealTimeGateway } from "../../secondary/core/TimeGateway/RealTimeGateway";
import { HttpPoleEmploiGateway } from "../../secondary/poleEmploi/HttpPoleEmploiGateway";
import { createPoleEmploiTargets } from "../../secondary/poleEmploi/PoleEmploi.targets";
import { AppConfig } from "../config/appConfig";
import { configureCreateHttpClientForExternalApi } from "../config/createHttpClientForExternalApi";
import { createUowPerformer } from "../config/uowConfig";
import { handleEndOfScriptNotification } from "./handleEndOfScriptNotification";

const logger = createLogger(__filename);

const config = AppConfig.createFromEnv();

const executeUsecase = async () => {
const timeGateway = new RealTimeGateway();

const httpPoleEmploiGateway = new HttpPoleEmploiGateway(
configureCreateHttpClientForExternalApi(
axios.create({ timeout: config.externalAxiosTimeout }),
)(createPoleEmploiTargets(config.peApiUrl)),
new InMemoryCachingGateway<GetAccessTokenResponse>(
timeGateway,
"expires_in",
),
config.peApiUrl,
config.poleEmploiAccessTokenConfig,
noRetries,
);

const { uowPerformer } = createUowPerformer(
config,
() =>
new Pool({
connectionString: config.pgImmersionDbUrl,
}),
);

const resyncOldConventionsToPeUsecase = new ResyncOldConventionsToPe(
uowPerformer,
httpPoleEmploiGateway,
timeGateway,
50,
);

return resyncOldConventionsToPeUsecase.execute();
};

/* eslint-disable @typescript-eslint/no-floating-promises */
handleEndOfScriptNotification(
"resyncOldConventionToPE",
config,
executeUsecase,
(report) => {
const errors = Object.entries(report.errors).map(
([key, error]) => `${key}: ${error.message} `,
);

const skips = Object.entries(report.skips).map(
([key, reason]) => `${key}: ${reason} `,
);

return [
`Total of convention to sync : ${
report.success + errors.length + errors.length
}`,
`Number of successfully sync convention : ${report.success}`,
`Number of failures : ${errors.length}`,
`Number of skips : ${skips.length}`,
...(errors.length > 0 ? [`Failures : ${errors.join("\n")}`] : []),
...(skips.length > 0 ? [`Skips : ${skips.join("\n")}`] : []),
].join("\n");
},
logger,
);
37 changes: 37 additions & 0 deletions back/src/adapters/secondary/InMemoryConventionsToSyncRepository.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { ConventionId } from "shared";
import {
ConventionsToSyncRepository,
ConventionToSync,
} from "../../domain/convention/ports/ConventionsToSyncRepository";

export class InMemoryConventionsToSyncRepository
implements ConventionsToSyncRepository
{
public async getById(
id: ConventionId,
): Promise<ConventionToSync | undefined> {
return this.conventionsToSync.find((convention) => convention.id === id);
}

public async getToProcessOrError(limit: number): Promise<ConventionToSync[]> {
return this.conventionsToSync
.filter(({ status }) => status === "ERROR" || status === "TO_PROCESS")
.slice(0, limit);
}

public async save(conventionToSync: ConventionToSync): Promise<void> {
const index = this.conventionsToSync.findIndex(
(convention) => convention.id === conventionToSync.id,
);
index === -1
? this.conventionsToSync.push(conventionToSync)
: (this.conventionsToSync[index] = conventionToSync);
}

// for testing purpose
public setForTesting(conventions: ConventionToSync[]) {
this.conventionsToSync = conventions;
}

public conventionsToSync: ConventionToSync[] = [];
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import { Pool, PoolClient } from "pg";
import { expectToEqual } from "shared";
import { getTestPgPool } from "../../../_testBuilders/getTestPgPool";
import { ConventionToSync } from "../../../domain/convention/ports/ConventionsToSyncRepository";
import {
conventionsToSyncTableName,
PgConventionsToSyncRepository,
} from "./PgConventionsToSyncRepository";

describe("PgConventionsRepository", () => {
const conventionsToSync: ConventionToSync[] = [
{
id: "aaaaac99-9c0b-1bbb-bb6d-6bb9bd38aaa1",
status: "TO_PROCESS",
},
{
id: "aaaaac99-9c0b-1bbb-bb6d-6bb9bd38aaa2",
status: "SUCCESS",
processDate: new Date(),
},
{
id: "aaaaac99-9c0b-1bbb-bb6d-6bb9bd38aaa3",
status: "ERROR",
processDate: new Date(),
reason: "An error",
},
{
id: "aaaaac99-9c0b-1bbb-bb6d-6bb9bd38aaa4",
status: "SKIP",
processDate: new Date(),
reason: "Skipped reason",
},
];

let pool: Pool;
let client: PoolClient;
let conventionsToSyncRepository: PgConventionsToSyncRepository;

beforeAll(async () => {
pool = getTestPgPool();
client = await pool.connect();
});

afterAll(async () => {
client.release();
await pool.end();
});

beforeEach(async () => {
await client.query(`DELETE
FROM ${conventionsToSyncTableName}`);
conventionsToSyncRepository = new PgConventionsToSyncRepository(client);
});

it.each(conventionsToSync)(
`save and getById convention with status '$status'`,
async (conventionToSync) => {
expectToEqual(
await conventionsToSyncRepository.getById(conventionToSync.id),
undefined,
);

await conventionsToSyncRepository.save(conventionToSync);

const syncedConvention = await conventionsToSyncRepository.getById(
conventionToSync.id,
);
expectToEqual(syncedConvention, conventionToSync);
},
);

describe("getNotProcessedAndErrored", () => {
beforeEach(() =>
Promise.all(
conventionsToSync.map((conventionToSync) =>
conventionsToSyncRepository.save(conventionToSync),
),
),
);

it("only TO_PROCESS and ERROR", async () => {
const conventionsToSyncNotProcessedAndErrored =
await conventionsToSyncRepository.getToProcessOrError(10000);

expectToEqual(conventionsToSyncNotProcessedAndErrored, [
conventionsToSync[0],
conventionsToSync[2],
]);
});

it("with limit 1", async () => {
const conventionsToSyncNotProcessedAndErrored =
await conventionsToSyncRepository.getToProcessOrError(1);

expectToEqual(conventionsToSyncNotProcessedAndErrored, [
conventionsToSync[0],
]);
});
});
});
86 changes: 86 additions & 0 deletions back/src/adapters/secondary/pg/PgConventionsToSyncRepository.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import { PoolClient } from "pg";
import { ConventionId } from "shared";
import {
ConventionsToSyncRepository,
ConventionToSync,
} from "../../../domain/convention/ports/ConventionsToSyncRepository";

export const conventionsToSyncTableName = "conventions_to_sync_with_pe";

export class PgConventionsToSyncRepository
implements ConventionsToSyncRepository
{
constructor(private client: PoolClient) {}

async getToProcessOrError(limit: number): Promise<ConventionToSync[]> {
const queryResult = await this.client.query<PgConventionToSync>(
`
SELECT id, status, process_date, reason
FROM ${conventionsToSyncTableName}
WHERE status = 'TO_PROCESS'
OR status = 'ERROR'
LIMIT $1
`,
[limit],
);
return queryResult.rows.map((pgConventionToSync) =>
pgResultToConventionToSync(pgConventionToSync),
);
}

async save(conventionToSync: ConventionToSync): Promise<void> {
await this.client.query(
`
INSERT INTO ${conventionsToSyncTableName} (id,
status,
process_date,
reason)
VALUES ($1, $2, $3, $4)
`,
[
conventionToSync.id,
conventionToSync.status,
conventionToSync.status !== "TO_PROCESS"
? conventionToSync.processDate
: null,
conventionToSync.status === "ERROR" ||
conventionToSync.status === "SKIP"
? conventionToSync.reason
: null,
],
);
}

async getById(id: ConventionId): Promise<ConventionToSync | undefined> {
const queryResult = await this.client.query<PgConventionToSync>(
`
SELECT id, status, process_date, reason
FROM ${conventionsToSyncTableName}
WHERE id = $1
`,
[id],
);
const pgConventionToSync = queryResult.rows.at(0);
return pgConventionToSync
? pgResultToConventionToSync(pgConventionToSync)
: undefined;
}
}

type PgConventionToSync = {
id: string;
status: string;
process_date: Date | null;
reason: string | null;
};

function pgResultToConventionToSync(
pgConventionToSync: PgConventionToSync,
): ConventionToSync {
return {
id: pgConventionToSync.id,
status: pgConventionToSync.status,
processDate: pgConventionToSync.process_date ?? undefined,
reason: pgConventionToSync.reason ?? undefined,
} as ConventionToSync;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/* eslint-disable @typescript-eslint/naming-convention */
import { MigrationBuilder } from "node-pg-migrate";

const tableName = "conventions_to_sync_with_pe";

export async function up(pgm: MigrationBuilder): Promise<void> {
pgm.createTable(tableName, {
id: { type: "uuid", primaryKey: true },
status: { type: "text", notNull: true },
process_date: { type: "timestamptz", notNull: false },
reason: { type: "text", notNull: false },
});
}

export async function down(pgm: MigrationBuilder): Promise<void> {
pgm.dropTable(tableName);
}
Loading