Skip to content

Commit

Permalink
Merge pull request #683 from numerique-gouv/import-coop-account
Browse files Browse the repository at this point in the history
feat: add script to import account from coop de la médiation numérique
  • Loading branch information
rdubigny authored Sep 20, 2024
2 parents fe13c77 + a6c1f7b commit 436aafa
Show file tree
Hide file tree
Showing 2 changed files with 276 additions and 0 deletions.
275 changes: 275 additions & 0 deletions scripts/import-accounts-coop.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
// src https://stackoverflow.com/questions/40994095/pipe-streams-to-edit-csv-file-in-node-js
import { AxiosError } from "axios";
import fs from "fs";
import { isEmpty, some, toInteger } from "lodash-es";
import {
getInseeAccessToken,
getOrganizationInfo,
} from "../src/connectors/api-sirene";
import {
linkUserToOrganization,
upsert,
} from "../src/repositories/organization/setters";
import { create, findByEmail, update } from "../src/repositories/user";
import { logger } from "../src/services/log";
import {
getNumberOfLineInFile,
humanReadableDuration,
isOrganizationInfo,
startDurationMesure,
throttleApiCall,
} from "../src/services/script-helpers";
import {
isEmailValid,
isNameValid,
isPhoneNumberValid,
isSiretValid,
} from "../src/services/security";

import { parse, stringify, transform } from "csv";
import { findByUserId } from "../src/repositories/organization/getters";

const INPUT_FILE = process.env.INPUT_FILE ?? "./input.csv";
const OUTPUT_FILE = process.env.OUTPUT_FILE ?? "./output.csv";

// ex: for public insee subscription the script can be run like so:
// npm run update-organization-info 2000
const rateInMsFromArgs = toInteger(process.argv[2]);
// we wait 125ms max which allow us to make 8 requests to insee api per seconds
// this makes 480 request per minute which is just under our 500 requests per minute quota
// for 30 000 organizations, this script will run for about 1 hour
const maxInseeCallRateInMs = rateInMsFromArgs !== 0 ? rateInMsFromArgs : 125;

(async () => {
const access_token = await getInseeAccessToken();

const readStream = fs.createReadStream(INPUT_FILE); // readStream is a read-only stream wit raw text content of the CSV file
const writeStream = fs.createWriteStream(OUTPUT_FILE); // writeStream is a write-only stream to write on the disk

const inputCsvStream = parse({
columns: true,
trim: true,
cast: false,
delimiter: ";",
}); // csv Stream is a read and write stream : it reads raw text in CSV and output untransformed records
const outputCsvStream = stringify({
quoted_empty: false,
quoted_string: false,
header: true,
});

type InputCsvData = {
prenom: string;
nom: string;
téléphone: string;
email: string;
coordinateur: string;
"Code Postal": string;
"Nom commune": string;
"SIRET structure": string;
"email professionnel secondaire": string;
};
type OutputCsvData = InputCsvData & { error: string };
const input_file_lines = await getNumberOfLineInFile(INPUT_FILE);
let i = 1;
let rejected_invalid_email_address_count = 0;
let rejected_invalid_siret_count = 0;
let rejected_invalid_names_count = 0;
let rejected_invalid_phone_number_count = 0;
let rejected_invalid_org_count = 0;
let unexpected_error_count = 0;
let success_count = 0;

// 100ms is the benchmarked response time from INSEE API
const estimatedExecutionTimeInMilliseconds =
Math.max(maxInseeCallRateInMs, 100) * input_file_lines;

logger.info("");
logger.info(
"\x1b[33m",
`Estimated execution time is ${humanReadableDuration(
estimatedExecutionTimeInMilliseconds,
)}`,
"\x1b[0m",
);
logger.info("");

const transformStream = transform(
async function (
data: InputCsvData,
done: (err: null | Error, data?: OutputCsvData) => void,
) {
const start = startDurationMesure();
try {
const {
prenom: given_name,
nom: family_name,
téléphone: phone_number,
coordinateur,
"email professionnel secondaire": professional_email,
"SIRET structure": siret,
} = data;
const email = professional_email;
logger.info(`${i}: processing ${email}...`);
// 0. params validation
if (!isEmailValid(email)) {
i++;
rejected_invalid_email_address_count++;
logger.error(`invalid email address ${email}`);
return done(null, {
...data,
error: "rejected_invalid_email_address",
});
}
if (!isNameValid(given_name) || !isNameValid(family_name)) {
i++;
rejected_invalid_names_count++;
logger.error(`invalid names ${given_name} ${family_name}`);
return done(null, { ...data, error: "rejected_invalid_names" });
}
if (!isEmpty(phone_number) && !isPhoneNumberValid(phone_number)) {
i++;
rejected_invalid_phone_number_count++;
logger.error(`invalid phone number ${phone_number}`);
return done(null, {
...data,
error: "rejected_invalid_phone_number",
});
}
const job =
coordinateur === "oui"
? "Coordinateur conseillers numérique"
: "Conseiller numérique";
if (!isSiretValid(siret)) {
i++;
rejected_invalid_siret_count++;
logger.error(`invalid siret ${siret}`);
return done(null, { ...data, error: "rejected_invalid_siret" });
}

// 1. add user if it does not exist
let user = await findByEmail(email);
if (isEmpty(user)) {
user = await create({ email });
await update(user.id, {
given_name,
family_name,
job,
phone_number,
});
}

// 2. get organizationInfo
try {
const organizationInfo = await getOrganizationInfo(
siret,
access_token,
);
if (!isOrganizationInfo(organizationInfo)) {
throw Error("empty organization info");
}
if (!organizationInfo.estActive) {
throw Error("organization not active");
}

// 3. update organizationInfo
const organization = await upsert({
siret: organizationInfo.siret,
organizationInfo,
});

// 4. create the user-organization link
const usersOrganizations = await findByUserId(user.id);
if (!some(usersOrganizations, ["id", organization.id])) {
await linkUserToOrganization({
organization_id: organization.id,
user_id: user.id,
verification_type: "imported_from_coop_mediation_numerique",
});
}
} catch (error) {
logger.error(`unexpected error for siret ${siret}`);
logger.error(
"\x1b[31m",
error instanceof AxiosError && !isEmpty(error.response)
? error.response.data
: error,
"\x1b[0m",
);
logger.error("");

await throttleApiCall(start, maxInseeCallRateInMs);
rejected_invalid_org_count++;
return done(null, { ...data, error: "rejected_invalid_org" });
}
await throttleApiCall(start, maxInseeCallRateInMs);

i++;
success_count++;
return done(null);
} catch (error) {
logger.error("unexpected error");
logger.error(
"\x1b[31m",
error instanceof AxiosError && !isEmpty(error.response)
? error.response.data
: error,
"\x1b[0m",
);
logger.error("");

await throttleApiCall(start, maxInseeCallRateInMs);
i++;
unexpected_error_count++;
return done(null, { ...data, error: "unexpected_error" });
}
},
{ parallel: 1 }, // avoid messing with line orders
).on("end", () => {
logger.info(`Import done! Import logs are recorded in ${OUTPUT_FILE}.`);
logger.info("");
logger.info(
"success_count: \x1b[32m",
success_count,
"\x1b[0m",
);
logger.info(
"rejected_invalid_email_address_count: \x1b[33m",
rejected_invalid_email_address_count,
"\x1b[0m",
);
logger.info(
"rejected_invalid_siret_count: \x1b[33m",
rejected_invalid_siret_count,
"\x1b[0m",
);
logger.info(
"rejected_invalid_names_count: \x1b[33m",
rejected_invalid_names_count,
"\x1b[0m",
);
logger.info(
"rejected_invalid_org_count: \x1b[33m",
rejected_invalid_org_count,
"\x1b[0m",
);
logger.info(
"unexpected_error_count: \x1b[31m",
unexpected_error_count,
"\x1b[0m",
);
logger.info(
"total: \x1b[1m",
i - 1,
"\x1b[21m",
);
});

logger.info(`Importing data from ${INPUT_FILE}`);

readStream
.pipe(inputCsvStream)
.pipe(transformStream)
.pipe(outputCsvStream)
.pipe(writeStream);
})();
1 change: 1 addition & 0 deletions src/types/user-organization-link.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ interface BaseUserOrganizationLink {
| "code_sent_to_official_contact_email"
| "domain"
| "imported_from_inclusion_connect"
| "imported_from_coop_mediation_numerique"
| "in_liste_dirigeants_rna"
| "no_validation_means_available"
| "no_verification_means_for_entreprise_unipersonnelle"
Expand Down

0 comments on commit 436aafa

Please sign in to comment.