Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): switch ping-pong latency aggregation to per-socket #23856

Merged
merged 2 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions server/routerlicious/.changeset/old-melons-tease.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@fluidframework/server-routerlicious": major
---

Socket Latency Telemetry strategy changed to per-socket-connection

Socket latency tracking strategy was changed to a per-socket strategy for better telemetry granularity. With this, a new config was added (`nexus.socketIo.pingPongLatencyTrackingAggregationThreshold`) and an old config was removed (`nexus.socketIo.pingPongLatencyTrackingIntervalMs`).
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ data:
"socketIo" : {
"perMessageDeflate": {{ .Values.nexus.socketIo.perMessageDeflate}},
"gracefulShutdownEnabled": {{ .Values.nexus.socketIo.gracefulShutdownEnabled}},
"pingPongLatencyTrackingEnabled": {{ .Values.nexus.socketIo.pingPongLatencyTrackingEnabled}}
"pingPongLatencyTrackingEnabled": {{ .Values.nexus.socketIo.pingPongLatencyTrackingEnabled}},
"pingPongLatencyTrackingAggregationThreshold": {{ .Values.nexus.socketIo.pingPongLatencyTrackingAggregationThreshold}}
},
"enableCollaborationSessionTracking": {{ .Values.nexus.enableCollaborationSessionTracking }},
"enableCollaborationSessionPruning": {{ .Values.nexus.enableCollaborationSessionPruning }},
Expand Down
1 change: 1 addition & 0 deletions server/routerlicious/kubernetes/routerlicious/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ nexus:
perMessageDeflate: true
gracefulShutdownEnabled: false
pingPongLatencyTrackingEnabled: false
pingPongLatencyTrackingAggregationThreshold: 3
enableCollaborationSessionTracking: false
enableCollaborationSessionPruning: false
redisCollaborationSessionManagerOptions:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@
"socketIo": {
"perMessageDeflate": false,
"gracefulShutdownEnabled": false,
"pingPongLatencyTrackingEnabled": false
"pingPongLatencyTrackingEnabled": false,
"pingPongLatencyTrackingAggregationThreshold": 3
},
"jwtTokenCache": {
"enable": true
Expand Down
96 changes: 45 additions & 51 deletions server/routerlicious/packages/services-shared/src/socketIoServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@ import {
Lumberjack,
LumberEventName,
} from "@fluidframework/server-services-telemetry";
import {
IRedisClientConnectionManager,
InMemoryApiCounters,
type IApiCounters,
} from "@fluidframework/server-services-utils";
import { IRedisClientConnectionManager } from "@fluidframework/server-services-utils";
import { Namespace, Server, Socket, RemoteSocket, type DisconnectReason } from "socket.io";
import { createAdapter as createRedisAdapter } from "@socket.io/redis-adapter";
import type { Adapter } from "socket.io-adapter";
Expand Down Expand Up @@ -122,10 +118,11 @@ export interface ISocketIoServerConfig {
*/
pingPongLatencyTrackingEnabled: boolean;
/**
* The time in milliseconds to wait between each aggregated ping-pong latency telemetry event.
* Default is 1 minute.
* The number of ping pong events to aggregate for each ping-pong latency telemetry event.
* This is tracked on a per socket connection level.
* Default is 3.
*/
pingPongLatencyTrackingIntervalMs: number;
pingPongLatencyTrackingAggregationThreshold: number;
/**
* Whether to enable Socket.io [perMessageDeflate](https://socket.io/docs/v4/server-options/#permessagedeflate) option.
* Default is `true`.
Expand All @@ -135,15 +132,13 @@ export interface ISocketIoServerConfig {

class SocketIoServer implements core.IWebSocketServer {
private readonly events = new EventEmitter();
private readonly pingPongLatencyInterval: ReturnType<typeof setInterval> | undefined;
private readonly pingPongLatencyTrackingIntervalMs: number | undefined;
private readonly pingPongLatencyTrackingAggregationThreshold: number = 3;

constructor(
private readonly io: Server,
private readonly redisClientConnectionManagerForPub: IRedisClientConnectionManager,
private readonly redisClientConnectionManagerForSub: IRedisClientConnectionManager,
private readonly socketIoConfig?: Partial<ISocketIoServerConfig>,
private readonly apiCounters: IApiCounters = new InMemoryApiCounters(),
) {
this.io.on("connection", (socket: Socket) => {
/**
Expand All @@ -155,8 +150,8 @@ class SocketIoServer implements core.IWebSocketServer {
* for real logic and access purposes without validating against the JWT access token.
*/
const telemetryProperties = {
[BaseTelemetryProperties.tenantId]: socket.handshake.query.tenantId,
[BaseTelemetryProperties.documentId]: socket.handshake.query.documentId,
[BaseTelemetryProperties.tenantId]: `${socket.handshake.query.tenantId}`,
[BaseTelemetryProperties.documentId]: `${socket.handshake.query.documentId}`,
};
const socketConnectionMetric = Lumberjack.newLumberMetric(
LumberEventName.SocketConnection,
Expand All @@ -165,7 +160,7 @@ class SocketIoServer implements core.IWebSocketServer {
const webSocket = new SocketIoSocket(socket);
this.events.emit("connection", webSocket);

this.initPingPongLatencyTracking(socket);
this.initPingPongLatencyTracking(socket, telemetryProperties);

webSocket.on("disconnect", (reason: DisconnectReason) => {
// The following should be considered as normal disconnects and not logged as errors.
Expand Down Expand Up @@ -231,13 +226,9 @@ class SocketIoServer implements core.IWebSocketServer {
}
});

if (this.socketIoConfig?.pingPongLatencyTrackingEnabled) {
this.pingPongLatencyTrackingIntervalMs =
this.socketIoConfig?.pingPongLatencyTrackingIntervalMs ?? 60000;
this.pingPongLatencyInterval = setInterval(
this.flushPingPongLatencyTracking.bind(this),
this.pingPongLatencyTrackingIntervalMs,
);
if (this.socketIoConfig?.pingPongLatencyTrackingAggregationThreshold !== undefined) {
this.pingPongLatencyTrackingAggregationThreshold =
this.socketIoConfig?.pingPongLatencyTrackingAggregationThreshold;
}
}

Expand Down Expand Up @@ -331,17 +322,33 @@ class SocketIoServer implements core.IWebSocketServer {
this.redisClientConnectionManagerForPub.getRedisClient().quit(),
this.redisClientConnectionManagerForSub.getRedisClient().quit(),
]);
if (this.socketIoConfig?.pingPongLatencyTrackingEnabled) {
clearInterval(this.pingPongLatencyInterval);
this.flushPingPongLatencyTracking();
}
}

private initPingPongLatencyTracking(socket: Socket) {
private initPingPongLatencyTracking(
socket: Socket,
{ tenantId, documentId }: { tenantId: string; documentId: string },
) {
if (!this.socketIoConfig?.pingPongLatencyTrackingEnabled) {
return;
}

const pingPongDurationsMs: number[] = [];
const outputPingPongLatencyEvent = () => {
const aggregateAverageLatencyMs = Math.ceil(
pingPongDurationsMs.reduce((a, b) => a + b) / pingPongDurationsMs.length,
);
pingPongDurationsMs.length = 0;
const latencyMetric = Lumberjack.newLumberMetric(
LumberEventName.SocketConnectionLatency,
{
[BaseTelemetryProperties.tenantId]: tenantId,
[BaseTelemetryProperties.documentId]: documentId,
durationInMs: aggregateAverageLatencyMs,
},
);
// Always successful
latencyMetric.success("Socket.io Ping-Pong Latency");
};
let lastPingStartTime: number | undefined;
const packetCreateHandler = (packet: any) => {
if (packet.type === "ping") {
Expand All @@ -350,40 +357,27 @@ class SocketIoServer implements core.IWebSocketServer {
};
socket.conn.on("packetCreate", packetCreateHandler);
const packetReceivedHandler = (packet: any) => {
if (packet.type === "pong") {
if (lastPingStartTime !== undefined) {
this.apiCounters.incrementCounter("pingPongCount", 1);
this.apiCounters.incrementCounter(
"pingPongLatency",
performance.now() - lastPingStartTime,
);
lastPingStartTime = undefined;
if (packet.type === "pong" && lastPingStartTime !== undefined) {
const latency = performance.now() - lastPingStartTime;
lastPingStartTime = undefined;
pingPongDurationsMs.push(latency);
// Output telemetry when threshold is reached
if (
pingPongDurationsMs.length >= this.pingPongLatencyTrackingAggregationThreshold
) {
outputPingPongLatencyEvent();
}
}
};
socket.conn.on("packet", packetReceivedHandler);
socket.conn.on("close", () => {
if (pingPongDurationsMs.length > 0) {
outputPingPongLatencyEvent();
}
socket.conn.off("packetCreate", packetCreateHandler);
socket.conn.off("packet", packetReceivedHandler);
});
}

private flushPingPongLatencyTracking() {
if (!this.apiCounters.countersAreActive) {
return;
}
const pingPongCount = this.apiCounters.getCounter("pingPongCount") ?? 0;
const pingPongLatency = this.apiCounters.getCounter("pingPongLatency") ?? 0;
if (pingPongCount > 0) {
Lumberjack.info("Average Socket.io Ping-Pong Latency", {
durationInMs: Math.ceil(pingPongLatency / pingPongCount),
aggregateCount: pingPongCount,
aggregateLatencyMs: pingPongLatency,
aggregateLatencyIntervalMs: this.pingPongLatencyTrackingIntervalMs,
});
}
this.apiCounters.resetAllCounters();
}
}

type SocketIoAdapter = typeof Adapter | ((nsp: Namespace) => Adapter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ export enum LumberEventName {
HttpRequest = "HttpRequest",
SocketConnection = "SocketConnection",
SocketConnectionCount = "SocketConnectionCount",
SocketConnectionLatency = "SocketConnectionLatency",
TotalConnectionCount = "TotalConnectionCount",
ConnectionCountPerNode = "ConnectionCountPerNode",
RestoreFromCheckpoint = "RestoreFromCheckpoint",
Expand Down