Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change to using websocket hibernation API #32

Merged
merged 5 commits into from
Feb 28, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 125 additions & 103 deletions src/chat.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -207,17 +207,37 @@ async function handleApiRequest(path, request, env) {
// connect to the room using WebSockets, and the room broadcasts messages from each participant
// to all others.
export class ChatRoom {
constructor(controller, env) {
// `controller.storage` provides access to our durable storage. It provides a simple KV
constructor(state, env) {
this.state = state

// `state.storage` provides access to our durable storage. It provides a simple KV
// get()/put() interface.
this.storage = controller.storage;
this.storage = state.storage;

// `env` is our environment bindings (discussed earlier).
this.env = env;

// We will put the WebSocket objects for each client, along with some metadata, into
// `sessions`.
this.sessions = [];
// We will track metadata for each client WebSocket object in `sessions`.
this.sessions = new Map();
this.state.getWebSockets().forEach((webSocket) => {
// The constructor may have been called when waking up from hibernation,
// so get previously serialized metadata for any existing WebSockets.
let meta = webSocket.deserializeAttachment();

// Set up our rate limiter client.
// The client itself can't have been in the attachment, because structured clone doesn't work on functions.
// DO ids aren't cloneable, restore the ID from its hex string
let limiterId = this.env.limiters.idFromString(meta.limiterId);
let limiter = new RateLimiterClient(
() => this.env.limiters.get(limiterId),
err => webSocket.close(1011, err.stack));

// We don't send any messages to the client until it has sent us the initial user info
// message. Until then, we will queue messages in `session.blockedMessages`.
// This could have been arbitrarily large, so we won't put it in the attachment.
let blockedMessages = [];
this.sessions.set(webSocket, { ...meta, limiter, blockedMessages });
});

// We keep track of the last-seen message's timestamp just so that we can assign monotonically
// increasing timestamps even if multiple messages arrive simultaneously (see below). There's
Expand Down Expand Up @@ -269,26 +289,26 @@ export class ChatRoom {
async handleSession(webSocket, ip) {
// Accept our end of the WebSocket. This tells the runtime that we'll be terminating the
// WebSocket in JavaScript, not sending it elsewhere.
webSocket.accept();
this.state.acceptWebSocket(webSocket);

// Set up our rate limiter client.
let limiterId = this.env.limiters.idFromName(ip);
let limiter = new RateLimiterClient(
() => this.env.limiters.get(limiterId),
err => webSocket.close(1011, err.stack));

// Create our session and add it to the sessions list.
// We don't send any messages to the client until it has sent us the initial user info
// message. Until then, we will queue messages in `session.blockedMessages`.
let session = {webSocket, blockedMessages: []};
this.sessions.push(session);
// Create our session and add it to the sessions map.
let session = { limiterId, limiter, blockedMessages: [] };
// attach limiterId to the webSocket so it survives hibernation
webSocket.serializeAttachment({ ...webSocket.deserializeAttachment(), limiterId: limiterId.toString() });
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a dumb question but why are we deserializing webSockets attachment here? Considering it was only just accepted as hibernatable I would think deserializeAttachment() would be null, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If someone takes this example and adds more properties to the attachment, and this line was just overwriting whatever was already in the attachment with only the limiterId, then correctness would depend on whether they happened to add the new properties above or below this line.

Seems uglier but safer this way to always add another property to whatever was already in the attachment.

this.sessions.set(webSocket, session);

// Queue "join" messages for all online users, to populate the client's roster.
this.sessions.forEach(otherSession => {
for (let otherSession of this.sessions.values()) {
if (otherSession.name) {
session.blockedMessages.push(JSON.stringify({joined: otherSession.name}));
}
});
}

// Load the last 100 messages from the chat history stored on disk, and send them to the
// client.
Expand All @@ -298,103 +318,107 @@ export class ChatRoom {
backlog.forEach(value => {
session.blockedMessages.push(value);
});
}

// Set event handlers to receive messages.
let receivedUserInfo = false;
webSocket.addEventListener("message", async msg => {
try {
if (session.quit) {
// Whoops, when trying to send to this WebSocket in the past, it threw an exception and
// we marked it broken. But somehow we got another message? I guess try sending a
// close(), which might throw, in which case we'll try to send an error, which will also
// throw, and whatever, at least we won't accept the message. (This probably can't
// actually happen. This is defensive coding.)
webSocket.close(1011, "WebSocket broken.");
return;
}
async webSocketMessage(webSocket, msg) {
try {
let session = this.sessions.get(webSocket);
if (session.quit) {
// Whoops, when trying to send to this WebSocket in the past, it threw an exception and
// we marked it broken. But somehow we got another message? I guess try sending a
// close(), which might throw, in which case we'll try to send an error, which will also
// throw, and whatever, at least we won't accept the message. (This probably can't
// actually happen. This is defensive coding.)
webSocket.close(1011, "WebSocket broken.");
return;
}

// Check if the user is over their rate limit and reject the message if so.
if (!limiter.checkLimit()) {
webSocket.send(JSON.stringify({
error: "Your IP is being rate-limited, please try again later."
}));
// Check if the user is over their rate limit and reject the message if so.
if (!session.limiter.checkLimit()) {
webSocket.send(JSON.stringify({
error: "Your IP is being rate-limited, please try again later."
}));
return;
}

// I guess we'll use JSON.
let data = JSON.parse(msg);

if (!session.name) {
// The first message the client sends is the user info message with their name. Save it
// into their session object.
session.name = "" + (data.name || "anonymous");
// attach name to the webSocket so it survives hibernation
webSocket.serializeAttachment({ ...webSocket.deserializeAttachment(), name: session.name });

// Don't let people use ridiculously long names. (This is also enforced on the client,
// so if they get here they are not using the intended client.)
if (session.name.length > 32) {
webSocket.send(JSON.stringify({error: "Name too long."}));
webSocket.close(1009, "Name too long.");
return;
}

// I guess we'll use JSON.
let data = JSON.parse(msg.data);

if (!receivedUserInfo) {
// The first message the client sends is the user info message with their name. Save it
// into their session object.
session.name = "" + (data.name || "anonymous");

// Don't let people use ridiculously long names. (This is also enforced on the client,
// so if they get here they are not using the intended client.)
if (session.name.length > 32) {
webSocket.send(JSON.stringify({error: "Name too long."}));
webSocket.close(1009, "Name too long.");
return;
}

// Deliver all the messages we queued up since the user connected.
session.blockedMessages.forEach(queued => {
webSocket.send(queued);
});
delete session.blockedMessages;
// Deliver all the messages we queued up since the user connected.
session.blockedMessages.forEach(queued => {
webSocket.send(queued);
});
delete session.blockedMessages;

// Broadcast to all other connections that this user has joined.
this.broadcast({joined: session.name});
// Broadcast to all other connections that this user has joined.
this.broadcast({joined: session.name});

webSocket.send(JSON.stringify({ready: true}));
webSocket.send(JSON.stringify({ready: true}));
return;
}

// Note that we've now received the user info message.
receivedUserInfo = true;
// Construct sanitized message for storage and broadcast.
data = { name: session.name, message: "" + data.message };

return;
}
// Block people from sending overly long messages. This is also enforced on the client,
// so to trigger this the user must be bypassing the client code.
if (data.message.length > 256) {
webSocket.send(JSON.stringify({error: "Message too long."}));
return;
}

// Construct sanitized message for storage and broadcast.
data = { name: session.name, message: "" + data.message };
// Add timestamp. Here's where this.lastTimestamp comes in -- if we receive a bunch of
// messages at the same time (or if the clock somehow goes backwards????), we'll assign
// them sequential timestamps, so at least the ordering is maintained.
data.timestamp = Math.max(Date.now(), this.lastTimestamp + 1);
this.lastTimestamp = data.timestamp;

// Block people from sending overly long messages. This is also enforced on the client,
// so to trigger this the user must be bypassing the client code.
if (data.message.length > 256) {
webSocket.send(JSON.stringify({error: "Message too long."}));
return;
}
// Broadcast the message to all other WebSockets.
let dataStr = JSON.stringify(data);
this.broadcast(dataStr);

// Add timestamp. Here's where this.lastTimestamp comes in -- if we receive a bunch of
// messages at the same time (or if the clock somehow goes backwards????), we'll assign
// them sequential timestamps, so at least the ordering is maintained.
data.timestamp = Math.max(Date.now(), this.lastTimestamp + 1);
this.lastTimestamp = data.timestamp;
// Save message.
let key = new Date(data.timestamp).toISOString();
await this.storage.put(key, dataStr);
} catch (err) {
// Report any exceptions directly back to the client. As with our handleErrors() this
// probably isn't what you'd want to do in production, but it's convenient when testing.
webSocket.send(JSON.stringify({error: err.stack}));
}
}

// Broadcast the message to all other WebSockets.
let dataStr = JSON.stringify(data);
this.broadcast(dataStr);
// On "close" and "error" events, remove the WebSocket from the sessions list and broadcast
// a quit message.
async closeOrErrorHandler(webSocket) {
let session = this.sessions.get(webSocket) || {};
session.quit = true;
this.sessions.delete(webSocket);
if (session.name) {
this.broadcast({quit: session.name});
}
}

// Save message.
let key = new Date(data.timestamp).toISOString();
await this.storage.put(key, dataStr);
} catch (err) {
// Report any exceptions directly back to the client. As with our handleErrors() this
// probably isn't what you'd want to do in production, but it's convenient when testing.
webSocket.send(JSON.stringify({error: err.stack}));
}
});
async webSocketClose(webSocket, code, reason, wasClean) {
this.closeOrErrorHandler(webSocket)
}

// On "close" and "error" events, remove the WebSocket from the sessions list and broadcast
// a quit message.
let closeOrErrorHandler = evt => {
session.quit = true;
this.sessions = this.sessions.filter(member => member !== session);
if (session.name) {
this.broadcast({quit: session.name});
}
};
webSocket.addEventListener("close", closeOrErrorHandler);
webSocket.addEventListener("error", closeOrErrorHandler);
async webSocketError(webSocket, error) {
this.closeOrErrorHandler(webSocket)
}

// broadcast() broadcasts a message to all clients.
Expand All @@ -406,23 +430,21 @@ export class ChatRoom {

// Iterate over all the sessions sending them messages.
let quitters = [];
this.sessions = this.sessions.filter(session => {
this.sessions.forEach((session, webSocket) => {
if (session.name) {
try {
session.webSocket.send(message);
return true;
webSocket.send(message);
} catch (err) {
// Whoops, this connection is dead. Remove it from the list and arrange to notify
// Whoops, this connection is dead. Remove it from the map and arrange to notify
// everyone below.
session.quit = true;
quitters.push(session);
return false;
this.sessions.delete(webSocket);
}
} else {
// This session hasn't sent the initial user info message yet, so we're not sending them
// messages yet (no secret lurking!). Queue the message to be sent later.
session.blockedMessages.push(message);
return true;
}
});

Expand All @@ -445,7 +467,7 @@ export class ChatRoom {
// global, i.e. they apply across all chat rooms, so if a user spams one chat room, they will find
// themselves rate limited in all other chat rooms simultaneously.
export class RateLimiter {
constructor(controller, env) {
constructor(state, env) {
// Timestamp at which this IP will next be allowed to send a message. Start in the distant
// past, i.e. the IP can send a message now.
this.nextAllowedTime = 0;
Expand Down