Skip to content

Commit

Permalink
Merge pull request #1 from pagopa/IOPLT-165-improving-poc-resiliency
Browse files Browse the repository at this point in the history
IOPLT-165 Improving poc resiliency and adding configuration for deploying app on app service/app container
  • Loading branch information
drmarro authored Oct 6, 2023
2 parents d839492 + ae92b68 commit cda7e85
Show file tree
Hide file tree
Showing 11 changed files with 172 additions and 42 deletions.
19 changes: 11 additions & 8 deletions .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@ require("@rushstack/eslint-patch/modern-module-resolution");
module.exports = {
env: {
es2021: true,
node: true,
node: true
},
rules: {
"max-classes-per-file": "off",
"no-console": "off"
},
parser: '@typescript-eslint/parser',
parser: "@typescript-eslint/parser",
overrides: [
{
files: ['*.ts', '*.tsx'],
extends: ["@pagopa/eslint-config/recommended"],
files: ["*.ts", "*.tsx"],
extends: [
"@pagopa/eslint-config/recommended"
],
parserOptions: {
project: ['./tsconfig.json'],
},
},
project: ["./tsconfig.json"]
}
}
]
};
};
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@
/dist
*.log
*.lock
.env
.env
.yarn/cache
.yarn/install-state.gz
4 changes: 2 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
"**/.yarn": true,
"**/.pnp.*": true
},
"eslint.nodePath": ".yarn/sdks",
"prettier.prettierPath": ".yarn/sdks/prettier/index.js"
"appService.defaultWebAppToDeploy": "RESOURCE_ID_OF_YOUR_WEB_APP",
"appService.deploySubpath": "."
}
1 change: 1 addition & 0 deletions .yarnrc.yml
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
nodeLinker: node-modules
network-timeout: 1000000
13 changes: 13 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
FROM node:16-alpine AS builder
WORKDIR /app
COPY ./ ./
RUN yarn install
RUN yarn build

FROM node:16-alpine AS final
WORKDIR /app
COPY --from=builder ./app/dist ./dist
COPY ./package.json .
COPY ./.env .
RUN yarn install --production
CMD [ "yarn", "start" ]
7 changes: 6 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
"scripts": {
"format": "prettier --write .",
"lint": "eslint \"src/**\"",
"lint:fix": "eslint --fix \"src/**\""
"lint:fix": "eslint --fix \"src/**\"",
"start": "node dist/index.js",
"build": "tsc"
},
"devDependencies": {
"@pagopa/eslint-config": "^3.0.0",
Expand All @@ -20,9 +22,12 @@
"dependencies": {
"@pagopa/fp-ts-kafkajs": "^0.3.0",
"@pagopa/winston-ts": "^2.2.0",
"@types/express": "^4.17.17",
"dotenv": "^16.3.1",
"express": "^4.18.2",
"fp-ts": "^2.16.1",
"io-ts-types": "^0.5.19",
"kafkajs": "^2.2.4",
"mongodb": "^6.0.0",
"monocle-ts": "^2.3.13",
"newtype-ts": "^0.3.5"
Expand Down
86 changes: 65 additions & 21 deletions src/main.ts → src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
/* eslint-disable no-console */
import * as http from "http";

import {
AzureEventhubSasFromString,
KafkaProducerCompact,
Expand All @@ -7,15 +9,19 @@ import {
} from "@pagopa/fp-ts-kafkajs/dist/lib/KafkaProducerCompact";
import { defaultLog, useWinston, withConsole } from "@pagopa/winston-ts";
import dotenv from "dotenv";
import express, { Request, Response } from "express";
import * as E from "fp-ts/Either";
import * as TE from "fp-ts/lib/TaskEither";
import { pipe } from "fp-ts/lib/function";
import { ChangeStreamDocument, MongoClient } from "mongodb";
import { ChangeStreamDocument, Db, MongoClient } from "mongodb";
import { CosmosDBConfig, EH_CONFIG, MONGO_CONFIG } from "./config/config";
import { transform } from "./mapper/students";
import { Student } from "./model/student";

import { Id } from "./model/resume-token";
import {
disconnectMongo,
findLastToken,
getMongoCollection,
getMongoDb,
mongoConnect,
Expand All @@ -29,6 +35,7 @@ useWinston(withConsole());

const databaseName = "mongo-cdc-poc-mongodb";
const collectionName = "students";
export const resumeToken = "resumeToken";

const getCosmosConnectionURI = (): TE.TaskEither<Error, string> =>
pipe(
Expand Down Expand Up @@ -96,12 +103,49 @@ const exitFromProcess = (): TE.TaskEither<Error, void> =>
pipe(defaultLog.taskEither.error(`Application failed`), process.exit(1));

const sendMessageEventHub =
(messagingClient: KafkaProducerCompact<Student>) =>
(messagingClient: KafkaProducerCompact<Student>, db: Db) =>
<T = Document>(change: ChangeStreamDocument<T>): void =>
void pipe(change, transform, (students) =>
sendMessages(messagingClient)(students)()
void pipe(
change,
transform,
(students) => {
console.log("Sending message to Event Hub", students);
return sendMessages(messagingClient)(students)();
},
mongoInsertOne(db.collection(resumeToken), {
// eslint-disable-next-line no-underscore-dangle
resumeToken: JSON.stringify((change._id as Id)._data),
})
);

const app = express();

app.get(
"/",
(req: Request, res: Response) => async () =>
// const config: ConsumerConfig = {
// groupId: "",
// };
// const kafkaConfig: KafkaConfig = {
// brokers: [],
// };
// const consumer = new Kafka(kafkaConfig).consumer(config);
// await consumer.connect();
// // eslint-disable-next-line functional/no-let
// let messages: string[] = [];
// await consumer.run({
// eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
// console.log({
// value: message.value?.toString(),
// topic,
// partition,
// });
// messages = [...messages, message.value?.toString()];
// },
// });
// await consumer.disconnect();
res.status(200).send("Messages Read")
);
const main = () =>
pipe(
TE.Do,
Expand All @@ -117,38 +161,38 @@ const main = () =>
"Trying to connect to the event hub instance..."
),
TE.bind("messagingClient", () => getEventHubProducer()),
defaultLog.taskEither.info("Connectied to event hub"),
defaultLog.taskEither.info("Connected to event hub"),
defaultLog.taskEither.info(
`Trying to watch the collection ${collectionName}`
),
TE.chainFirst(({ collection, messagingClient }) =>
TE.chainFirst(({ db, collection, messagingClient }) =>
pipe(
watchMongoCollection(collection),
findLastToken(db.collection("resumeToken")),
TE.chain((resumeToken) =>
watchMongoCollection(collection, resumeToken?.resumeToken)
),
TE.chain((watcher) =>
setMongoListenerOnEventChange(
watcher,
sendMessageEventHub(messagingClient)
sendMessageEventHub(messagingClient, db)
)
)
)
),
defaultLog.taskEither.info(`Watching the collection ${collectionName}`),
TE.chainFirst((_) => simulateAsyncPause),
defaultLog.taskEither.info(`Inserting one document as example...`),
TE.chainFirst((collection) =>
mongoInsertOne(collection.collection, {
id: `${Math.floor(Math.random() * 1000)}`,
firstName: `name${Math.floor(Math.random() * 1000)}`,
lastName: `surname${Math.floor(Math.random() * 1000)}`,
dateOfBirth: new Date(),
})
),
defaultLog.taskEither.info(
`Document inserted - Press CTRL+C to exit...Waiting...`
),
TE.chainFirst((_) => simulateAsyncPause),
TE.chain(({ client }) => waitForExit(client)),
TE.orElse(exitFromProcess)
)();

main().catch(console.error).finally(console.log);
main()
.catch(console.error)
.finally(() => {
const server = http.createServer(app).listen(8080, () => {
console.log("Listening on port %d", 8080);
});
server.on("close", () => {
app.emit("server:stop");
});
});
7 changes: 6 additions & 1 deletion src/mapper/students.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ const insertTransformer: Transformer = <T = Document>(
}),
O.fold(
() => [],
(data) => A.of(data as unknown as Student)
(data) => {
const student = data as unknown as Student;
// eslint-disable-next-line functional/immutable-data
student.timestamp = data.clusterTime?.toString();
return A.of(student);
}
)
);
// const deleteTransformer: Transformer = (data) => { ... };
Expand Down
7 changes: 7 additions & 0 deletions src/model/resume-token.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export interface ResumeToken {
resumeToken: string;
}
export interface Id {
_data: string;
_kind: number;
}
2 changes: 1 addition & 1 deletion src/model/student.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import * as t from "io-ts";
import { DateFromISOString } from "io-ts-types/DateFromISOString";

export const Student = t.type({
id: t.number,
firstName: t.string,
lastName: t.string,
dateOfBirth: DateFromISOString,
timestamp: t.string,
});

export type Student = t.TypeOf<typeof Student>;
64 changes: 57 additions & 7 deletions src/mongo/mongoOperation.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
/* eslint-disable no-console */
import * as TE from "fp-ts/TaskEither";
import {
Binary,
ChangeStream,
ChangeStreamDocument,
Collection,
Expand All @@ -9,6 +11,7 @@ import {
MongoClient,
OptionalUnlessRequiredId,
} from "mongodb";
import { ResumeToken } from "../model/resume-token";

export const mongoConnect = (uri: string): TE.TaskEither<Error, MongoClient> =>
TE.tryCatch(
Expand Down Expand Up @@ -56,25 +59,72 @@ export const disconnectMongo = (
);

export const watchMongoCollection = <T = Document>(
collection: Collection<T>
collection: Collection<T>,
resumeToken: string
): TE.TaskEither<Error, ChangeStream<T, ChangeStreamDocument<T>>> =>
TE.tryCatch(
async () =>
collection.watch(
async () => {
// eslint-disable-next-line functional/no-let
let params:
| { fullDocument: string }
| { fullDocument: string; resumeAfter: { _data: unknown } } = {
fullDocument: "updateLookup",
};

if (resumeToken !== undefined) {
params = {
...params,
resumeAfter: {
_data: new Binary(Buffer.from(resumeToken, "base64")),
},
};
}
return collection.watch(
[
{
$match: { operationType: { $in: ["insert", "update", "replace"] } },
$match: {
operationType: { $in: ["insert", "update", "replace"] },
},
},
{
$project: {
_id: 1,
fullDocument: 1,
ns: 1,
documentKey: 1,
},
},
{ $project: { _id: 1, fullDocument: 1, ns: 1, documentKey: 1 } },
],
{ fullDocument: "updateLookup" }
),
params
);
},
(reason) =>
new Error(
`Impossible to watch the ${collection.namespace} collection: " ${reason}`
)
);

export const findLastToken = <T>(
collection: Collection<T>
): TE.TaskEither<Error, ResumeToken> =>
TE.tryCatch(
async () => {
const myDocument = await collection
.find<ResumeToken>({})
.sort({ _id: -1 })
.limit(1)
.tryNext();
if (myDocument) {
return myDocument;
}
return null;
},
(reason) =>
new Error(
`Unable to get the last inserted document from collection: " ${reason}`
)
);

export const setMongoListenerOnEventChange = <T = Document>(
changeStream: ChangeStream<T, ChangeStreamDocument<T>>,
listener: (change: ChangeStreamDocument<T>) => void
Expand Down

0 comments on commit cda7e85

Please sign in to comment.