Skip to content

Commit

Permalink
Updating project
Browse files Browse the repository at this point in the history
  • Loading branch information
drmarro committed Sep 7, 2023
1 parent 5337760 commit 839ccab
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 13 deletions.
17 changes: 11 additions & 6 deletions src/main.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
/* eslint-disable no-console */
import {
AzureEventhubSasFromString,
KafkaProducerCompact,
Expand Down Expand Up @@ -28,6 +27,9 @@ import {

dotenv.config();

const databaseName = "mongo-cdc-poc-mongodb";
const collectionName = "students";

const getCosmosConnectionURI = (): TE.TaskEither<Error, string> =>
pipe(
CosmosDBConfig.decode(MONGO_CONFIG),
Expand Down Expand Up @@ -65,7 +67,6 @@ export const log = withLogger(IO.io)(C.log);
const waitForExit = (client: MongoClient): TE.TaskEither<Error, void> =>
TE.tryCatch(
async () => {
console.log("waiting");
process.stdin.resume();
process.on("SIGINT", () => {
disconnectMongo(client);
Expand All @@ -84,15 +85,17 @@ const exitFromProcess = (): TE.TaskEither<Error, void> =>
const sendMessageEventHub =
(messagingClient: KafkaProducerCompact<Student>) =>
<T = Document>(change: ChangeStreamDocument<T>): void =>
pipe(change, transform, void sendMessages(messagingClient));
void pipe(change, transform, (students) =>
sendMessages(messagingClient)(students)()
);

const main = () =>
pipe(
TE.Do,
getCosmosConnectionURI,
TE.bind("client", (connectionUri) => mongoConnect(connectionUri)),
TE.bind("db", ({ client }) => getMongoDb(client, "customer")),
TE.bind("collection", ({ db }) => getMongoCollection(db, "students")),
TE.bind("db", ({ client }) => getMongoDb(client, databaseName)),
TE.bind("collection", ({ db }) => getMongoCollection(db, collectionName)),
TE.bind("messagingClient", () => getEventHubProducer()),
TE.chainFirst(({ collection, messagingClient }) =>
pipe(
Expand All @@ -119,4 +122,6 @@ const main = () =>
TE.orElse(exitFromProcess)
)();

void main();
main()
.catch((err) => log(() => "Error: " + String(err)))
.finally(() => log(() => "Exiting..."));
11 changes: 6 additions & 5 deletions src/mapper/students.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable no-console */
import * as A from "fp-ts/lib/Array";
import * as O from "fp-ts/lib/Option";

Expand All @@ -7,26 +8,26 @@ import { Student } from "../model/student";

type Transformer = <T = Document>(data: ChangeStreamDocument<T>) => Student[];

const isHandled = <T = Document>(data: ChangeStreamDocument<T>) =>
const isInsert = <T = Document>(data: ChangeStreamDocument<T>) =>
"documentKey" in (data as ChangeStreamInsertDocument<T>);
const insertTransformer: Transformer = <T = Document>(
data: ChangeStreamDocument<T>
) =>
pipe(
O.fromNullable(data),
O.filter(isHandled),
O.filter(isInsert),
O.chain(() =>
O.fromNullable((data as ChangeStreamInsertDocument<T>).fullDocument)
),
O.map((newObject) => {
O.map((data) => {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const { _id, ...newObj } =
newObject as unknown as ChangeStreamInsertDocument<T>;
data as unknown as ChangeStreamInsertDocument<T>;
return newObj;
}),
O.fold(
() => [],
(newObject) => A.of(newObject as unknown as Student)
(data) => A.of(data as unknown as Student)
)
);
// const deleteTransformer: Transformer = (data) => { ... };
Expand Down
21 changes: 19 additions & 2 deletions src/mongo/mongoOperation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,15 @@ export const getMongoCollection = <T = Document>(
collectionName: string
): TE.TaskEither<Error, Collection<T>> =>
TE.tryCatch(
async () => db.collection<T>(collectionName),
async () => {
const collections = await db
.listCollections({ name: collectionName })
.toArray();
if (collections.length === 0) {
return db.createCollection<T>(collectionName);
}
return db.collection(collectionName);
},
(reason) =>
new Error(
`Impossible to connect to Get the ${collectionName} collection: " ${reason}`
Expand All @@ -51,7 +59,16 @@ export const watchMongoCollection = <T = Document>(
collection: Collection<T>
): TE.TaskEither<Error, ChangeStream<T, ChangeStreamDocument<T>>> =>
TE.tryCatch(
async () => collection.watch(),
async () =>
collection.watch(
[
{
$match: { operationType: { $in: ["insert", "update", "replace"] } },
},
{ $project: { _id: 1, fullDocument: 1, ns: 1, documentKey: 1 } },
],
{ fullDocument: "updateLookup" }
),
(reason) =>
new Error(
`Impossible to watch the ${collection.namespace} collection: " ${reason}`
Expand Down

0 comments on commit 839ccab

Please sign in to comment.