diff --git a/.eslintrc.js b/.eslintrc.js index 8884bf4..54029fc 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -3,19 +3,22 @@ require("@rushstack/eslint-patch/modern-module-resolution"); module.exports = { env: { es2021: true, - node: true, + node: true }, rules: { "max-classes-per-file": "off", + "no-console": "off" }, - parser: '@typescript-eslint/parser', + parser: "@typescript-eslint/parser", overrides: [ { - files: ['*.ts', '*.tsx'], - extends: ["@pagopa/eslint-config/recommended"], + files: ["*.ts", "*.tsx"], + extends: [ + "@pagopa/eslint-config/recommended" + ], parserOptions: { - project: ['./tsconfig.json'], - }, - }, + project: ["./tsconfig.json"] + } + } ] -}; \ No newline at end of file +}; diff --git a/.gitignore b/.gitignore index 4e6b780..d83c160 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,6 @@ /dist *.log *.lock -.env \ No newline at end of file +.env +.yarn/cache +.yarn/install-state.gz \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json index 5ef3cba..dda9018 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -3,6 +3,6 @@ "**/.yarn": true, "**/.pnp.*": true }, - "eslint.nodePath": ".yarn/sdks", - "prettier.prettierPath": ".yarn/sdks/prettier/index.js" + "appService.defaultWebAppToDeploy": "RESOURCE_ID_OF_YOUR_WEB_APP", + "appService.deploySubpath": "." } diff --git a/.yarnrc.yml b/.yarnrc.yml index 3186f3f..60e5903 100644 --- a/.yarnrc.yml +++ b/.yarnrc.yml @@ -1 +1,2 @@ nodeLinker: node-modules +network-timeout: 1000000 \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..63238d0 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,13 @@ +FROM node:16-alpine AS builder +WORKDIR /app +COPY ./ ./ +RUN yarn install +RUN yarn build + +FROM node:16-alpine AS final +WORKDIR /app +COPY --from=builder ./app/dist ./dist +COPY ./package.json . +COPY ./.env . +RUN yarn install --production +CMD [ "yarn", "start" ] \ No newline at end of file diff --git a/package.json b/package.json index a2854c4..db607e0 100644 --- a/package.json +++ b/package.json @@ -6,7 +6,9 @@ "scripts": { "format": "prettier --write .", "lint": "eslint \"src/**\"", - "lint:fix": "eslint --fix \"src/**\"" + "lint:fix": "eslint --fix \"src/**\"", + "start": "node dist/index.js", + "build": "tsc" }, "devDependencies": { "@pagopa/eslint-config": "^3.0.0", @@ -20,9 +22,12 @@ "dependencies": { "@pagopa/fp-ts-kafkajs": "^0.3.0", "@pagopa/winston-ts": "^2.2.0", + "@types/express": "^4.17.17", "dotenv": "^16.3.1", + "express": "^4.18.2", "fp-ts": "^2.16.1", "io-ts-types": "^0.5.19", + "kafkajs": "^2.2.4", "mongodb": "^6.0.0", "monocle-ts": "^2.3.13", "newtype-ts": "^0.3.5" diff --git a/src/main.ts b/src/index.ts similarity index 65% rename from src/main.ts rename to src/index.ts index e357848..d09ff30 100644 --- a/src/main.ts +++ b/src/index.ts @@ -1,4 +1,6 @@ /* eslint-disable no-console */ +import * as http from "http"; + import { AzureEventhubSasFromString, KafkaProducerCompact, @@ -7,15 +9,19 @@ import { } from "@pagopa/fp-ts-kafkajs/dist/lib/KafkaProducerCompact"; import { defaultLog, useWinston, withConsole } from "@pagopa/winston-ts"; import dotenv from "dotenv"; +import express, { Request, Response } from "express"; import * as E from "fp-ts/Either"; import * as TE from "fp-ts/lib/TaskEither"; import { pipe } from "fp-ts/lib/function"; -import { ChangeStreamDocument, MongoClient } from "mongodb"; +import { ChangeStreamDocument, Db, MongoClient } from "mongodb"; import { CosmosDBConfig, EH_CONFIG, MONGO_CONFIG } from "./config/config"; import { transform } from "./mapper/students"; import { Student } from "./model/student"; + +import { Id } from "./model/resume-token"; import { disconnectMongo, + findLastToken, getMongoCollection, getMongoDb, mongoConnect, @@ -29,6 +35,7 @@ useWinston(withConsole()); const databaseName = "mongo-cdc-poc-mongodb"; const collectionName = "students"; +export const resumeToken = "resumeToken"; const getCosmosConnectionURI = (): TE.TaskEither => pipe( @@ -96,12 +103,49 @@ const exitFromProcess = (): TE.TaskEither => pipe(defaultLog.taskEither.error(`Application failed`), process.exit(1)); const sendMessageEventHub = - (messagingClient: KafkaProducerCompact) => + (messagingClient: KafkaProducerCompact, db: Db) => (change: ChangeStreamDocument): void => - void pipe(change, transform, (students) => - sendMessages(messagingClient)(students)() + void pipe( + change, + transform, + (students) => { + console.log("Sending message to Event Hub", students); + return sendMessages(messagingClient)(students)(); + }, + mongoInsertOne(db.collection(resumeToken), { + // eslint-disable-next-line no-underscore-dangle + resumeToken: JSON.stringify((change._id as Id)._data), + }) ); +const app = express(); + +app.get( + "/", + (req: Request, res: Response) => async () => + // const config: ConsumerConfig = { + // groupId: "", + // }; + // const kafkaConfig: KafkaConfig = { + // brokers: [], + // }; + // const consumer = new Kafka(kafkaConfig).consumer(config); + // await consumer.connect(); + // // eslint-disable-next-line functional/no-let + // let messages: string[] = []; + // await consumer.run({ + // eachMessage: async ({ topic, partition, message }: EachMessagePayload) => { + // console.log({ + // value: message.value?.toString(), + // topic, + // partition, + // }); + // messages = [...messages, message.value?.toString()]; + // }, + // }); + // await consumer.disconnect(); + res.status(200).send("Messages Read") +); const main = () => pipe( TE.Do, @@ -117,38 +161,38 @@ const main = () => "Trying to connect to the event hub instance..." ), TE.bind("messagingClient", () => getEventHubProducer()), - defaultLog.taskEither.info("Connectied to event hub"), + defaultLog.taskEither.info("Connected to event hub"), defaultLog.taskEither.info( `Trying to watch the collection ${collectionName}` ), - TE.chainFirst(({ collection, messagingClient }) => + TE.chainFirst(({ db, collection, messagingClient }) => pipe( - watchMongoCollection(collection), + findLastToken(db.collection("resumeToken")), + TE.chain((resumeToken) => + watchMongoCollection(collection, resumeToken?.resumeToken) + ), TE.chain((watcher) => setMongoListenerOnEventChange( watcher, - sendMessageEventHub(messagingClient) + sendMessageEventHub(messagingClient, db) ) ) ) ), defaultLog.taskEither.info(`Watching the collection ${collectionName}`), TE.chainFirst((_) => simulateAsyncPause), - defaultLog.taskEither.info(`Inserting one document as example...`), - TE.chainFirst((collection) => - mongoInsertOne(collection.collection, { - id: `${Math.floor(Math.random() * 1000)}`, - firstName: `name${Math.floor(Math.random() * 1000)}`, - lastName: `surname${Math.floor(Math.random() * 1000)}`, - dateOfBirth: new Date(), - }) - ), - defaultLog.taskEither.info( - `Document inserted - Press CTRL+C to exit...Waiting...` - ), TE.chainFirst((_) => simulateAsyncPause), TE.chain(({ client }) => waitForExit(client)), TE.orElse(exitFromProcess) )(); -main().catch(console.error).finally(console.log); +main() + .catch(console.error) + .finally(() => { + const server = http.createServer(app).listen(8080, () => { + console.log("Listening on port %d", 8080); + }); + server.on("close", () => { + app.emit("server:stop"); + }); + }); diff --git a/src/mapper/students.ts b/src/mapper/students.ts index bb6f148..60f2ee7 100644 --- a/src/mapper/students.ts +++ b/src/mapper/students.ts @@ -27,7 +27,12 @@ const insertTransformer: Transformer = ( }), O.fold( () => [], - (data) => A.of(data as unknown as Student) + (data) => { + const student = data as unknown as Student; + // eslint-disable-next-line functional/immutable-data + student.timestamp = data.clusterTime?.toString(); + return A.of(student); + } ) ); // const deleteTransformer: Transformer = (data) => { ... }; diff --git a/src/model/resume-token.ts b/src/model/resume-token.ts new file mode 100644 index 0000000..4e8354c --- /dev/null +++ b/src/model/resume-token.ts @@ -0,0 +1,7 @@ +export interface ResumeToken { + resumeToken: string; +} +export interface Id { + _data: string; + _kind: number; +} diff --git a/src/model/student.ts b/src/model/student.ts index 077085e..289c6b4 100644 --- a/src/model/student.ts +++ b/src/model/student.ts @@ -1,11 +1,11 @@ import * as t from "io-ts"; import { DateFromISOString } from "io-ts-types/DateFromISOString"; - export const Student = t.type({ id: t.number, firstName: t.string, lastName: t.string, dateOfBirth: DateFromISOString, + timestamp: t.string, }); export type Student = t.TypeOf; diff --git a/src/mongo/mongoOperation.ts b/src/mongo/mongoOperation.ts index 8bb70b8..b6b93da 100644 --- a/src/mongo/mongoOperation.ts +++ b/src/mongo/mongoOperation.ts @@ -1,5 +1,7 @@ +/* eslint-disable no-console */ import * as TE from "fp-ts/TaskEither"; import { + Binary, ChangeStream, ChangeStreamDocument, Collection, @@ -9,6 +11,7 @@ import { MongoClient, OptionalUnlessRequiredId, } from "mongodb"; +import { ResumeToken } from "../model/resume-token"; export const mongoConnect = (uri: string): TE.TaskEither => TE.tryCatch( @@ -56,25 +59,72 @@ export const disconnectMongo = ( ); export const watchMongoCollection = ( - collection: Collection + collection: Collection, + resumeToken: string ): TE.TaskEither>> => TE.tryCatch( - async () => - collection.watch( + async () => { + // eslint-disable-next-line functional/no-let + let params: + | { fullDocument: string } + | { fullDocument: string; resumeAfter: { _data: unknown } } = { + fullDocument: "updateLookup", + }; + + if (resumeToken !== undefined) { + params = { + ...params, + resumeAfter: { + _data: new Binary(Buffer.from(resumeToken, "base64")), + }, + }; + } + return collection.watch( [ { - $match: { operationType: { $in: ["insert", "update", "replace"] } }, + $match: { + operationType: { $in: ["insert", "update", "replace"] }, + }, + }, + { + $project: { + _id: 1, + fullDocument: 1, + ns: 1, + documentKey: 1, + }, }, - { $project: { _id: 1, fullDocument: 1, ns: 1, documentKey: 1 } }, ], - { fullDocument: "updateLookup" } - ), + params + ); + }, (reason) => new Error( `Impossible to watch the ${collection.namespace} collection: " ${reason}` ) ); +export const findLastToken = ( + collection: Collection +): TE.TaskEither => + TE.tryCatch( + async () => { + const myDocument = await collection + .find({}) + .sort({ _id: -1 }) + .limit(1) + .tryNext(); + if (myDocument) { + return myDocument; + } + return null; + }, + (reason) => + new Error( + `Unable to get the last inserted document from collection: " ${reason}` + ) + ); + export const setMongoListenerOnEventChange = ( changeStream: ChangeStream>, listener: (change: ChangeStreamDocument) => void