Skip to content

Commit

Permalink
wip(server): refactor socket.io to native websocket implmentation
Browse files Browse the repository at this point in the history
  • Loading branch information
brunocroh committed May 15, 2024
1 parent 6837958 commit b8cddbb
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 122 deletions.
1 change: 1 addition & 0 deletions apps/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"dependencies": {
"@types/node-cron": "^3.0.11",
"@types/uuid": "^9.0.8",
"@types/ws": "^8.5.10",
"fastify": "^4.24.3",
"node-cron": "^3.0.3",
"socket.io": "^4.7.2",
Expand Down
201 changes: 111 additions & 90 deletions apps/server/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,68 +1,99 @@
import { Server, Socket } from "socket.io";
import WebSocket, { WebSocketServer } from "ws";
import Http from "http";
import { v4 as uuid } from "uuid";
import cron from "node-cron";

const server = Http.createServer();
const wss = new WebSocketServer({ server });

type Room = {
host?: string;
users: string[];
};

type SocketEvents = {
queueJoin: (props: { id: string }) => void;
queueUpdated: (props: { size: number }) => void;
newUserConnect: (props: { size: number }) => void;
queueExit: (props: { id: string }) => void;
roomFound: (props: { room: Room; roomId: string }) => void;
roomEnter: (props: { roomId: string; id: string }) => void;
sendOffer: (props: {
to: string | undefined;
signal: any;
from: string;
}) => void;
receiveOffer: (props: { to: string; from: string; signal: any }) => void;
sendAnswer: (props: { to: string; signal: any }) => void;
receiveAnswer: (props: { signal: any }) => void;
me: (id: string) => void;
callAccepted: (signal: any) => void;
hostCall: (payload: { to: string }) => void;
type onQueueUpdate = {
userId: string;
};

type QueueMe = {
type: "me";
};

type SocketEventCurrier<T extends SocketEvents[keyof SocketEvents]> = (
socket: Socket,
) => T;
type QueueUpdateEvent = {
type: "queueJoin" | "queueExit";
id: string;
};

type SocketEvents = QueueUpdateEvent | QueueMe;

function broadcastMessage(json: any) {
const data = JSON.stringify(json);
for (let user of users.values()) {
if (user.readyState === WebSocket.OPEN) {
user.send(data);
}
}
}

const queue = new Set<string>();
const users = new Map();
const users = new Map<string, any>();
const rooms = new Map<string, Room>();

const io = new Server<SocketEvents, SocketEvents>({
cors: {
origin: "*",
},
});
wss.on("connection", (ws) => {
console.log("user Connected");
const userId = uuid();
users.set(userId, ws);

ws.on("message", (data) => {
const event = JSON.parse(data.toString()) as SocketEvents;

switch (event.type) {
case "me":
const mee = JSON.stringify({ id: userId });
console.log({ size: users.size });
ws.send(mee);
broadcastMessage({ type: "usersOnline", size: users.size });
break;
case "queueJoin":
onQueueJoin({ userId: event.id });
break;
case "queueExit":
onQueueExit({ userId: event.id });
break;
default:
break;
}
});

io.on("connection", (socket) => {
socket.emit("me", socket.id);
users.set(socket.id, socket);
io.emit("newUserConnect", { size: io.engine.clientsCount });

socket.on("disconnect", () => handleDisconnect(socket)());
socket.on("queueJoin", onQueueJoin(socket));
socket.on("queueExit", onQueueExit(socket));
socket.on("sendOffer", onSendOffer(socket));
socket.on("sendAnswer", onSendAnswer(socket));
socket.on("roomEnter", onRoomEnter(socket));
ws.on("close", () => handleDisconnect(userId));
});

const handleDisconnect = (socket: Socket) => () => {
queue.delete(socket.id);
users.delete(socket.id);
io.emit("newUserConnect", { size: users.size });
// ws.on("connection", (socket) => {
// socket.emit("me", socket.id);
// users.set(socket.id, socket);
// ws.emit("newUserConnect", { size: ws.engine.clientsCount });
//
// socket.on("disconnect", () => handleDisconnect(socket)());
// socket.on("queueJoin", onQueueJoin(socket));
// socket.on("queueExit", onQueueExit(socket));
// socket.on("sendOffer", onSendOffer(socket));
// socket.on("sendAnswer", onSendAnswer(socket));
// socket.on("roomEnter", onRoomEnter(socket));
// });

const handleDisconnect = (userId: string) => {
console.log("user disconnected");
queue.delete(userId);
users.delete(userId);
broadcastMessage({
type: "teste",
size: users.size,
});
};

const onRoomEnter: SocketEventCurrier<SocketEvents["roomEnter"]> =
(socket) =>
({ roomId, id }) => {
const onRoomEnter =
(socket: any) =>
({ roomId, id }: any) => {
const room = rooms.get(roomId);

if (!room) return;
Expand All @@ -74,56 +105,46 @@ const onRoomEnter: SocketEventCurrier<SocketEvents["roomEnter"]> =
}
};

const onSendOffer: SocketEventCurrier<SocketEvents["sendOffer"]> =
() =>
({ to, signal, from }) => {
if (!to) return;

console.log(`user ${from} call to ${to}`);
const onSendOffer = ({ to, signal, from }: any) => {
if (!to) return;

io.to(to).emit("receiveOffer", { signal, from, to });
};

const onSendAnswer: SocketEventCurrier<SocketEvents["sendAnswer"]> =
(socket) =>
({ to, signal }) => {
console.log(`user ${socket.id} accept call of ${to}`);
io.to(to).emit("receiveAnswer", { signal });
};
console.log(`user ${from} call to ${to} on ${signal}`);

const onQueueJoin: SocketEventCurrier<SocketEvents["queueJoin"]> =
() =>
({ id }) => {
queue.add(id);
io.emit("queueUpdated", { size: queue.size });
};
//ws.to(to).emit("receiveOffer", { signal, from, to });
};

const onQueueExit: SocketEventCurrier<SocketEvents["queueExit"]> =
() =>
({ id }) => {
queue.delete(id);
io.emit("queueUpdated", { size: queue.size });
const onSendAnswer =
(socket: any) =>
({ to, signal }: any) => {
console.log(`user ${socket.id} accept call of ${to} ${signal}`);
// ws.to(to).emit("receiveAnswer", { signal });
};

io.listen(4000);

cron.schedule("*/5 * * * * *", () => {
const _queue = Array.from(queue);

console.log({ clients: io.engine.clientsCount });

for (; _queue.length >= 2; ) {
const roomId = uuid();
const _users = Array.from(_queue.splice(0, 2));
const room: Room = { users: _users, host: _users[0] };
const onQueueJoin = ({ userId }: onQueueUpdate) => {
queue.add(userId);
};

rooms.set(roomId, { host: undefined, users: [] });
const onQueueExit = ({ userId }: onQueueUpdate) => {
queue.delete(userId);
};

_users.forEach((user) => {
queue.delete(user);
users.get(user).emit("roomFound", { room, roomId });
});
}
server.listen(4000, () => {
console.log("Server up");
});

console.log("Server up");
// cron.schedule("*/5 * * * * *", () => {
// const _queue = Array.from(queue);
//
// for (; _queue.length >= 2; ) {
// const roomId = uuid();
// const _users = Array.from(_queue.splice(0, 2));
// const room: Room = { users: _users, host: _users[0] };
//
// rooms.set(roomId, { host: undefined, users: [] });
//
// _users.forEach((user) => {
// queue.delete(user);
// users.get(user).emit("roomFound", { room, roomId });
// });
// }
// });
79 changes: 49 additions & 30 deletions apps/web/app/queue/page.tsx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"use client";

import { useRef, ReactElement, useEffect, useState, useCallback } from "react";
import { socket as Socket } from "@/lib/socket";
import useWebSocket from "react-use-websocket";
import {
Select,
SelectContent,
Expand All @@ -13,9 +13,6 @@ import { Button } from "@/components/ui/button";
import { useRouter } from "next/navigation";
import { useUserMedia } from "@/hooks/useUserMedia";
import { Header } from "@/components/header";
import type { Socket as SocketType } from "socket.io-client";

let socket: SocketType;

export default function Page(): JSX.Element {
const router = useRouter();
Expand All @@ -35,37 +32,59 @@ export default function Page(): JSX.Element {
const [usersOnline, setUsersOnline] = useState(null);
const [inQueue, setInQueue] = useState(false);

useEffect(() => {
let me = null;
if (!socket) {
socket = Socket();
}

socket.on("me", (_me) => {
me = _me;
setMe(me);
});

socket.on("newUserConnect", ({ size }) => {
setUsersOnline(size);
});
const { sendJsonMessage } = useWebSocket(process.env.NEXT_PUBLIC_SOCKET_URL, {
onOpen: () => {
console.log("connected");
sendJsonMessage({
type: "me",
});
},
onMessage: (event) => {
const data = JSON.parse(event.data);

socket.on("roomFound", ({ room, roomId }) => {
console.log({ room, roomId });
router.push(`/room/${roomId}?host=${room.host}`);
});
switch (data.type) {
case "usersOnline":
setUsersOnline(data.size);
break;
default:
break;
}
},
});

return () => {
socket.off("queueUpdated");
socket.off("newUserConnect");
socket.off("roomFound");
socket.close();
};
}, []);
// useEffect(() => {
// let me = null;
//
//
// if (!socket) {
// socket = Socket();
// }
//
// socket.on("me", (_me) => {
// me = _me;
// setMe(me);
// });
//
// socket.on("newUserConnect", ({ size }) => {
// setUsersOnline(size);
// });
//
// socket.on("roomFound", ({ room, roomId }) => {
// console.log({ room, roomId });
// router.push(`/room/${roomId}?host=${room.host}`);
// });
//
// return () => {
// socket.off("queueUpdated");
// socket.off("newUserConnect");
// socket.off("roomFound");
// socket.close();
// };
// }, []);

const onConnect = useCallback(() => {
setInQueue(!inQueue);
socket.emit(inQueue ? "queueExit" : "queueJoin", { id: me });
// socket.emit(inQueue ? "queueExit" : "queueJoin", { id: me });
}, [inQueue, me]);

if (!ready) return <div>loading</div>;
Expand Down
4 changes: 2 additions & 2 deletions apps/web/lib/socket/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { io, Socket } from "socket.io-client";
import { Socket } from "socket.io-client";

const URL = process.env.NEXT_PUBLIC_SOCKET_URL;

export const socket = (): Socket => {
return io(URL!);
return new WebSocket(URL!);
};
1 change: 1 addition & 0 deletions apps/web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"next": "^14.0.3",
"react": "^18.2.0",
"react-dom": "^18.2.0",
"react-use-websocket": "^4.8.1",
"simple-peer": "^9.11.1",
"tailwind-merge": "^2.1.0",
"tailwindcss-animate": "^1.0.7",
Expand Down
Loading

0 comments on commit b8cddbb

Please sign in to comment.