Skip to content

Commit

Permalink
fix: contacts come with several patches of N contacts, deleting those…
Browse files Browse the repository at this point in the history
… that are not in this patch ends up deleting those received in the previous patch
  • Loading branch information
Ctrl-Mota committed Jul 31, 2024
1 parent 308396e commit 0dcde8e
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 28 deletions.
1 change: 0 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ const app = express();
app.use(cors());
app.use(express.json());
app.use("/", routes);

app.all("*", (_: Request, res: Response) => res.status(404).json({ error: "URL not found" }));

const host = process.env.HOST || "0.0.0.0";
Expand Down
41 changes: 26 additions & 15 deletions src/store/handlers/contact.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { transformPrisma } from "@/store/utils";
import { prisma } from "@/db";
import { logger } from "@/shared";
import { PrismaClientKnownRequestError } from "@prisma/client/runtime/library";
import { PrismaClient } from "@prisma/client";

export default function contactHandler(sessionId: string, event: BaileysEventEmitter) {
let listening = false;
Expand Down Expand Up @@ -31,7 +32,8 @@ export default function contactHandler(sessionId: string, event: BaileysEventEmi

await Promise.any([
...upsertPromises,
prisma.contact.deleteMany({ where: { id: { in: deletedOldContactIds }, sessionId } }),
//danger: contacts come with several patches of N contacts, deleting those that are not in this patch ends up deleting those received in the previous patch
//prisma.contact.deleteMany({ where: { id: { in: deletedOldContactIds }, sessionId } }),
]);
logger.info(
{ deletedContacts: deletedOldContactIds.length, newContacts: contacts.length },
Expand All @@ -44,22 +46,31 @@ export default function contactHandler(sessionId: string, event: BaileysEventEmi

const upsert: BaileysEventHandler<"contacts.upsert"> = async (contacts) => {
try {
await Promise.any(
contacts
.map((c) => transformPrisma(c))
.map((data) =>
prisma.contact.upsert({
select: { pkId: true },
create: { ...data, sessionId },
update: data,
where: { sessionId_id: { id: data.id, sessionId } },
}),
),
);
} catch (e) {
logger.error(e, "An error occured during contacts upsert");
console.info(`Received ${contacts.length} contacts for upsert.`); // Informative message
console.info(contacts[0]); // Informative message

if (contacts.length === 0) {
return;
}

const transformedContacts = await Promise.all(
contacts.map((contact) => transformPrisma(contact))
);
console.log(transformedContacts)
await prisma.contact.createMany({
data: transformedContacts.map((data) => ({
...data,
sessionId,
})),
skipDuplicates: true, // Prevent duplicate inserts
});

} catch (error) {
logger.error("An unexpected error occurred during contacts upsert", error);
}
};



const update: BaileysEventHandler<"contacts.update"> = async (updates) => {
for (const update of updates) {
Expand Down
195 changes: 183 additions & 12 deletions src/whatsapp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,25 @@ import makeWASocket, {
import type { ConnectionState, SocketConfig, WASocket, proto } from "@whiskeysockets/baileys";
import { Store, useSession } from "./store";
import { prisma } from "./db";
import type { WebSocket } from "ws";
import type { WebSocket as WebSocketType } from "ws";
import WebSocket from "ws";
import { logger } from "./shared";
import type { Boom } from "@hapi/boom";
import type { Response } from "express";
import { toDataURL } from "qrcode";
import { delay } from "./utils";
import dotenv from "dotenv";

const channel = new WebSocket.Server({port: 3001})

dotenv.config();

type WaStatus = 'unknown' | 'wait_for_qrcode_auth' | 'authenticated' | 'pulling_wa_data' | 'connected' | 'disconected';

type Session = WASocket & {
destroy: () => Promise<void>;
store: Store;
waStatus?: WaStatus
};

const sessions = new Map<string, Session>();
Expand All @@ -29,16 +35,23 @@ const RECONNECT_INTERVAL = Number(process.env.RECONNECT_INTERVAL || 0);
const MAX_RECONNECT_RETRIES = Number(process.env.MAX_RECONNECT_RETRIES || 5);
const SSE_MAX_QR_GENERATION = Number(process.env.SSE_MAX_QR_GENERATION || 5);
const SESSION_CONFIG_ID = "session-config";

export async function init() {
const sessions = await prisma.session.findMany({
select: { sessionId: true, data: true },
where: { id: { startsWith: SESSION_CONFIG_ID } },
});

for (const { sessionId, data } of sessions) {
const { readIncomingMessages, ...socketConfig } = JSON.parse(data);
createSession({ sessionId, readIncomingMessages, socketConfig });
const { readIncomingMessages, ...socketConfig } = JSON.parse(data);
createSession({ sessionId, readIncomingMessages, socketConfig });

}
}

export function updateWaStatus(sessionId: string, waStatus: WaStatus){
if(sessions.has(sessionId)){
const _session = sessions.get(sessionId)!
console.warn(waStatus)
sessions.set(sessionId, {..._session, waStatus});
}
}

Expand Down Expand Up @@ -70,9 +83,9 @@ export async function createSession(options: createSessionOptions) {
try {
await Promise.all([
logout && socket.logout(),
prisma.chat.deleteMany({ where: { sessionId } }),
// prisma.chat.deleteMany({ where: { sessionId } }),
prisma.contact.deleteMany({ where: { sessionId } }),
prisma.message.deleteMany({ where: { sessionId } }),
prisma.message.deleteMany({ where: { sessionId } }),
prisma.groupMetadata.deleteMany({ where: { sessionId } }),
prisma.session.deleteMany({ where: { sessionId } }),
]);
Expand All @@ -89,6 +102,8 @@ export async function createSession(options: createSessionOptions) {
const restartRequired = code === DisconnectReason.restartRequired;
const doNotReconnect = !shouldReconnect(sessionId);

updateWaStatus(sessionId, "disconected");

if (code === DisconnectReason.loggedOut || doNotReconnect) {
if (res) {
!SSE && !res.headersSent && res.status(500).json({ error: "Unable to create session" });
Expand All @@ -109,6 +124,7 @@ export async function createSession(options: createSessionOptions) {
if (res && !res.headersSent) {
try {
const qr = await toDataURL(connectionState.qr);
updateWaStatus(sessionId, "wait_for_qrcode_auth");
res.status(200).json({ qr });
return;
} catch (e) {
Expand All @@ -124,6 +140,7 @@ export async function createSession(options: createSessionOptions) {
let qr: string | undefined = undefined;
if (connectionState.qr?.length) {
try {
updateWaStatus(sessionId, "wait_for_qrcode_auth")
qr = await toDataURL(connectionState.qr);
} catch (e) {
logger.error(e, "An error occured during QR generation");
Expand Down Expand Up @@ -165,18 +182,170 @@ export async function createSession(options: createSessionOptions) {
});

const store = new Store(sessionId, socket.ev);
sessions.set(sessionId, { ...socket, destroy, store });

sessions.set(sessionId, { ...socket, destroy, store, waStatus: 'unknown' });

socket.ev.on("creds.update", saveCreds);

socket.ev.on("connection.update", (update) => {
connectionState = update;
const { connection } = update;

if (connection === "open") {
updateWaStatus(sessionId, update.isNewLogin ? "authenticated" : "connected")
retries.delete(sessionId);
SSEQRGenerations.delete(sessionId);
}
if (connection === "close") handleConnectionClose();
handleConnectionUpdate();
});

if (readIncomingMessages) {
socket.ev.on("messages.upsert", async (m) => {
const message = m.messages[0];
if (message.key.fromMe || m.type !== "notify") return;

await delay(1000);
await socket.readMessages([message.key]);
});
}



// Debug events
// socket.ev.on("messaging-history.set", (data) => dump("messaging-history.set", data));
// socket.ev.on("chats.upsert", (data) => dump("chats.upsert", data));
// socket.ev.on("contacts.update", (data) => dump("contacts.update", data));
// socket.ev.on("groups.upsert", (data) => dump("groups.upsert", data));

await prisma.session.upsert({
create: {
id: configID,
sessionId,
data: JSON.stringify({ readIncomingMessages, ...socketConfig }),
},
update: {},
where: { sessionId_id: { id: configID, sessionId } },
});
}
export async function reloadDataFromSession(options: createSessionOptions) {
const { sessionId, res, SSE = false, readIncomingMessages = false, socketConfig } = options;
const configID = `${SESSION_CONFIG_ID}-${sessionId}`;
let connectionState: Partial<ConnectionState> = { connection: "close" };

const destroy = async (logout = true) => {
try {
await Promise.all([
logout && socket.logout(),
// prisma.chat.deleteMany({ where: { sessionId } }),
prisma.contact.deleteMany({ where: { sessionId } }),
// prisma.message.deleteMany({ where: { sessionId } }),
prisma.groupMetadata.deleteMany({ where: { sessionId } }),
prisma.session.deleteMany({ where: { sessionId } }),
]);
logger.info({ session: sessionId }, "Session destroyed");
} catch (e) {
logger.error(e, "An error occured during session destroy");
} finally {
sessions.delete(sessionId);
}
};

const handleConnectionClose = () => {
const code = (connectionState.lastDisconnect?.error as Boom)?.output?.statusCode;
const restartRequired = code === DisconnectReason.restartRequired;
const doNotReconnect = !shouldReconnect(sessionId);

if (code === DisconnectReason.loggedOut || doNotReconnect) {
if (res) {
!SSE && !res.headersSent && res.status(500).json({ error: "Unable to create session" });
res.end();
}
destroy(doNotReconnect);
return;
}

if (!restartRequired) {
logger.info({ attempts: retries.get(sessionId) ?? 1, sessionId }, "Reconnecting...");
}
setTimeout(() => createSession(options), restartRequired ? 0 : RECONNECT_INTERVAL);
};

const handleNormalConnectionUpdate = async () => {
if (connectionState.qr?.length) {
if (res && !res.headersSent) {
try {
const qr = await toDataURL(connectionState.qr);
res.status(200).json({ qr });
return;
} catch (e) {
logger.error(e, "An error occured during QR generation");
res.status(500).json({ error: "Unable to generate QR" });
}
}
destroy();
}
};

const handleSSEConnectionUpdate = async () => {
let qr: string | undefined = undefined;
if (connectionState.qr?.length) {
try {
qr = await toDataURL(connectionState.qr);
} catch (e) {
logger.error(e, "An error occured during QR generation");
}
}

const currentGenerations = SSEQRGenerations.get(sessionId) ?? 0;
if (!res || res.writableEnded || (qr && currentGenerations >= SSE_MAX_QR_GENERATION)) {
res && !res.writableEnded && res.end();
destroy();
return;
}

const data = { ...connectionState, qr };
if (qr) SSEQRGenerations.set(sessionId, currentGenerations + 1);
res.write(`data: ${JSON.stringify(data)}\n\n`);
};

const handleConnectionUpdate = SSE ? handleSSEConnectionUpdate : handleNormalConnectionUpdate;
const { state, saveCreds } = await useSession(sessionId);
const socket = makeWASocket({
printQRInTerminal: true,
browser: [process.env.NAME_BOT_BROWSER || "Whatsapp Bot", "Chrome", "3.0"],
generateHighQualityLinkPreview: true,
...socketConfig,
auth: {
creds: state.creds,
keys: makeCacheableSignalKeyStore(state.keys, logger),
},
version: [2, 2413, 1],
logger,
shouldIgnoreJid: (jid) => isJidBroadcast(jid),
getMessage: async (key) => {
const data = await prisma.message.findFirst({
where: { remoteJid: key.remoteJid!, id: key.id!, sessionId },
});
return (data?.message || undefined) as proto.IMessage | undefined;
},
});

const store = new Store(sessionId, socket.ev);
sessions.set(sessionId, { ...socket, destroy, store });

socket.ev.on("creds.update", saveCreds);
socket.ev.on("connection.update", (update) => {
connectionState = update;
const { connection } = update;
if (connection === "open") {
updateWaStatus(sessionId, update.isNewLogin ? 'authenticated' : 'connected');

retries.delete(sessionId);
SSEQRGenerations.delete(sessionId);
}
if (connection === "close") handleConnectionClose();
if (connection === "connecting") updateWaStatus(sessionId, 'pulling_wa_data');
handleConnectionUpdate();
});

Expand All @@ -190,6 +359,8 @@ export async function createSession(options: createSessionOptions) {
});
}



// Debug events
// socket.ev.on("messaging-history.set", (data) => dump("messaging-history.set", data));
// socket.ev.on("chats.upsert", (data) => dump("chats.upsert", data));
Expand All @@ -208,10 +379,10 @@ export async function createSession(options: createSessionOptions) {
}

export function getSessionStatus(session: Session) {
const state = ["CONNECTING", "CONNECTED", "DISCONNECTING", "DISCONNECTED"];
let status = state[(session.ws as WebSocket).readyState];
status = session.user ? "AUTHENTICATED" : status;
return status;
// const state = ["CONNECTING", "CONNECTED", "DISCONNECTING", "DISCONNECTED"];
// let status = state[(session.ws as WebSocketType).readyState];
// // status = session.user ? "AUTHENTICATED" : status;
return session.waStatus;
}

export function listSessions() {
Expand Down

0 comments on commit 0dcde8e

Please sign in to comment.