Skip to content

Commit

Permalink
Adding logs
Browse files Browse the repository at this point in the history
  • Loading branch information
drmarro committed Sep 8, 2023
1 parent 839ccab commit d839492
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 22 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
},
"dependencies": {
"@pagopa/fp-ts-kafkajs": "^0.3.0",
"@pagopa/winston-ts": "^2.2.0",
"dotenv": "^16.3.1",
"fp-ts": "^2.16.1",
"io-ts-types": "^0.5.19",
"logging-ts": "^0.3.4",
"mongodb": "^6.0.0",
"monocle-ts": "^2.3.13",
"newtype-ts": "^0.3.5"
Expand Down
69 changes: 48 additions & 21 deletions src/main.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
/* eslint-disable no-console */
import {
AzureEventhubSasFromString,
KafkaProducerCompact,
fromSas,
sendMessages,
} from "@pagopa/fp-ts-kafkajs/dist/lib/KafkaProducerCompact";
import { defaultLog, useWinston, withConsole } from "@pagopa/winston-ts";
import dotenv from "dotenv";
import * as E from "fp-ts/Either";
import * as C from "fp-ts/lib/Console";
import * as IO from "fp-ts/lib/IO";
import * as TE from "fp-ts/lib/TaskEither";
import { pipe } from "fp-ts/lib/function";
import { withLogger } from "logging-ts/lib/IO";
import { ChangeStreamDocument, MongoClient } from "mongodb";
import { CosmosDBConfig, EH_CONFIG, MONGO_CONFIG } from "./config/config";
import { transform } from "./mapper/students";
Expand All @@ -26,6 +25,7 @@ import {
} from "./mongo/mongoOperation";

dotenv.config();
useWinston(withConsole());

const databaseName = "mongo-cdc-poc-mongodb";
const collectionName = "students";
Expand All @@ -35,9 +35,13 @@ const getCosmosConnectionURI = (): TE.TaskEither<Error, string> =>
CosmosDBConfig.decode(MONGO_CONFIG),
E.map((config) => config.CONNECTION_URI),
TE.fromEither,
TE.mapLeft(
(errors) =>
new Error(`Error during decoding Cosmos ConnectionURI - ${errors}`)
TE.mapLeft((errors) =>
pipe(
defaultLog.taskEither.error(
`Error during decoding Cosmos ConnectionURI - ${errors}`
),
() => new Error(`Error during decoding Cosmos ConnectionURI`)
)
)
);

Expand All @@ -49,8 +53,13 @@ const getEventHubProducer = (): TE.TaskEither<
AzureEventhubSasFromString.decode(EH_CONFIG.CONNECTION_STRING),
E.map((sas) => fromSas<Student>(sas)),
TE.fromEither,
TE.mapLeft(
(errors) => new Error(`Error during decoding Event Hub SAS - ${errors}`)
TE.mapLeft((errors) =>
pipe(
defaultLog.taskEither.error(
`Error during decoding Cosmos ConnectionURI - ${errors}`
),
() => new Error(`Error during decoding Event Hub SAS`)
)
)
);

Expand All @@ -59,28 +68,32 @@ const simulateAsyncPause: TE.TaskEither<Error, void> = TE.tryCatch(
new Promise<void>((resolve) => {
setTimeout(() => resolve(), 1000);
}),
() => new Error("An error occurred")
(error) =>
pipe(
defaultLog.taskEither.error(`Error during pause simulation - ${error}`),
() => new Error("An error occurred")
)
);

export const log = withLogger(IO.io)(C.log);

const waitForExit = (client: MongoClient): TE.TaskEither<Error, void> =>
TE.tryCatch(
async () => {
process.stdin.resume();
process.on("SIGINT", () => {
disconnectMongo(client);
process.exit(0);
pipe(disconnectMongo(client), process.exit(0));
});
},
(reason) => new Error(`Failed to set exit handler: ${String(reason)}`)
(reason) =>
pipe(
defaultLog.taskEither.error(
`Failed to set exit handler: ${String(reason)}`
),
() => new Error(`Failed to set exit handler`)
)
);

const exitFromProcess = (): TE.TaskEither<Error, void> =>
pipe(
log(() => "Application failed"),
process.exit(1)
);
pipe(defaultLog.taskEither.error(`Application failed`), process.exit(1));

const sendMessageEventHub =
(messagingClient: KafkaProducerCompact<Student>) =>
Expand All @@ -93,10 +106,21 @@ const main = () =>
pipe(
TE.Do,
getCosmosConnectionURI,
defaultLog.taskEither.info("Connecting to mongo..."),
TE.bind("client", (connectionUri) => mongoConnect(connectionUri)),
TE.bind("db", ({ client }) => getMongoDb(client, databaseName)),
TE.bind("collection", ({ db }) => getMongoCollection(db, collectionName)),
defaultLog.taskEither.info(
`Connected to DB ${databaseName} - Working on collection ${collectionName}`
),
defaultLog.taskEither.info(
"Trying to connect to the event hub instance..."
),
TE.bind("messagingClient", () => getEventHubProducer()),
defaultLog.taskEither.info("Connectied to event hub"),
defaultLog.taskEither.info(
`Trying to watch the collection ${collectionName}`
),
TE.chainFirst(({ collection, messagingClient }) =>
pipe(
watchMongoCollection(collection),
Expand All @@ -108,7 +132,9 @@ const main = () =>
)
)
),
defaultLog.taskEither.info(`Watching the collection ${collectionName}`),
TE.chainFirst((_) => simulateAsyncPause),
defaultLog.taskEither.info(`Inserting one document as example...`),
TE.chainFirst((collection) =>
mongoInsertOne(collection.collection, {
id: `${Math.floor(Math.random() * 1000)}`,
Expand All @@ -117,11 +143,12 @@ const main = () =>
dateOfBirth: new Date(),
})
),
defaultLog.taskEither.info(
`Document inserted - Press CTRL+C to exit...Waiting...`
),
TE.chainFirst((_) => simulateAsyncPause),
TE.chain(({ client }) => waitForExit(client)),
TE.orElse(exitFromProcess)
)();

main()
.catch((err) => log(() => "Error: " + String(err)))
.finally(() => log(() => "Exiting..."));
main().catch(console.error).finally(console.log);

0 comments on commit d839492

Please sign in to comment.