diff --git a/package.json b/package.json index 43458bd..a2854c4 100644 --- a/package.json +++ b/package.json @@ -19,10 +19,10 @@ }, "dependencies": { "@pagopa/fp-ts-kafkajs": "^0.3.0", + "@pagopa/winston-ts": "^2.2.0", "dotenv": "^16.3.1", "fp-ts": "^2.16.1", "io-ts-types": "^0.5.19", - "logging-ts": "^0.3.4", "mongodb": "^6.0.0", "monocle-ts": "^2.3.13", "newtype-ts": "^0.3.5" diff --git a/src/main.ts b/src/main.ts index 2df6bea..e357848 100644 --- a/src/main.ts +++ b/src/main.ts @@ -1,16 +1,15 @@ +/* eslint-disable no-console */ import { AzureEventhubSasFromString, KafkaProducerCompact, fromSas, sendMessages, } from "@pagopa/fp-ts-kafkajs/dist/lib/KafkaProducerCompact"; +import { defaultLog, useWinston, withConsole } from "@pagopa/winston-ts"; import dotenv from "dotenv"; import * as E from "fp-ts/Either"; -import * as C from "fp-ts/lib/Console"; -import * as IO from "fp-ts/lib/IO"; import * as TE from "fp-ts/lib/TaskEither"; import { pipe } from "fp-ts/lib/function"; -import { withLogger } from "logging-ts/lib/IO"; import { ChangeStreamDocument, MongoClient } from "mongodb"; import { CosmosDBConfig, EH_CONFIG, MONGO_CONFIG } from "./config/config"; import { transform } from "./mapper/students"; @@ -26,6 +25,7 @@ import { } from "./mongo/mongoOperation"; dotenv.config(); +useWinston(withConsole()); const databaseName = "mongo-cdc-poc-mongodb"; const collectionName = "students"; @@ -35,9 +35,13 @@ const getCosmosConnectionURI = (): TE.TaskEither => CosmosDBConfig.decode(MONGO_CONFIG), E.map((config) => config.CONNECTION_URI), TE.fromEither, - TE.mapLeft( - (errors) => - new Error(`Error during decoding Cosmos ConnectionURI - ${errors}`) + TE.mapLeft((errors) => + pipe( + defaultLog.taskEither.error( + `Error during decoding Cosmos ConnectionURI - ${errors}` + ), + () => new Error(`Error during decoding Cosmos ConnectionURI`) + ) ) ); @@ -49,8 +53,13 @@ const getEventHubProducer = (): TE.TaskEither< AzureEventhubSasFromString.decode(EH_CONFIG.CONNECTION_STRING), E.map((sas) => fromSas(sas)), TE.fromEither, - TE.mapLeft( - (errors) => new Error(`Error during decoding Event Hub SAS - ${errors}`) + TE.mapLeft((errors) => + pipe( + defaultLog.taskEither.error( + `Error during decoding Cosmos ConnectionURI - ${errors}` + ), + () => new Error(`Error during decoding Event Hub SAS`) + ) ) ); @@ -59,28 +68,32 @@ const simulateAsyncPause: TE.TaskEither = TE.tryCatch( new Promise((resolve) => { setTimeout(() => resolve(), 1000); }), - () => new Error("An error occurred") + (error) => + pipe( + defaultLog.taskEither.error(`Error during pause simulation - ${error}`), + () => new Error("An error occurred") + ) ); -export const log = withLogger(IO.io)(C.log); - const waitForExit = (client: MongoClient): TE.TaskEither => TE.tryCatch( async () => { process.stdin.resume(); process.on("SIGINT", () => { - disconnectMongo(client); - process.exit(0); + pipe(disconnectMongo(client), process.exit(0)); }); }, - (reason) => new Error(`Failed to set exit handler: ${String(reason)}`) + (reason) => + pipe( + defaultLog.taskEither.error( + `Failed to set exit handler: ${String(reason)}` + ), + () => new Error(`Failed to set exit handler`) + ) ); const exitFromProcess = (): TE.TaskEither => - pipe( - log(() => "Application failed"), - process.exit(1) - ); + pipe(defaultLog.taskEither.error(`Application failed`), process.exit(1)); const sendMessageEventHub = (messagingClient: KafkaProducerCompact) => @@ -93,10 +106,21 @@ const main = () => pipe( TE.Do, getCosmosConnectionURI, + defaultLog.taskEither.info("Connecting to mongo..."), TE.bind("client", (connectionUri) => mongoConnect(connectionUri)), TE.bind("db", ({ client }) => getMongoDb(client, databaseName)), TE.bind("collection", ({ db }) => getMongoCollection(db, collectionName)), + defaultLog.taskEither.info( + `Connected to DB ${databaseName} - Working on collection ${collectionName}` + ), + defaultLog.taskEither.info( + "Trying to connect to the event hub instance..." + ), TE.bind("messagingClient", () => getEventHubProducer()), + defaultLog.taskEither.info("Connectied to event hub"), + defaultLog.taskEither.info( + `Trying to watch the collection ${collectionName}` + ), TE.chainFirst(({ collection, messagingClient }) => pipe( watchMongoCollection(collection), @@ -108,7 +132,9 @@ const main = () => ) ) ), + defaultLog.taskEither.info(`Watching the collection ${collectionName}`), TE.chainFirst((_) => simulateAsyncPause), + defaultLog.taskEither.info(`Inserting one document as example...`), TE.chainFirst((collection) => mongoInsertOne(collection.collection, { id: `${Math.floor(Math.random() * 1000)}`, @@ -117,11 +143,12 @@ const main = () => dateOfBirth: new Date(), }) ), + defaultLog.taskEither.info( + `Document inserted - Press CTRL+C to exit...Waiting...` + ), TE.chainFirst((_) => simulateAsyncPause), TE.chain(({ client }) => waitForExit(client)), TE.orElse(exitFromProcess) )(); -main() - .catch((err) => log(() => "Error: " + String(err))) - .finally(() => log(() => "Exiting...")); +main().catch(console.error).finally(console.log);