Skip to content

Commit

Permalink
feat(server): switch ping-pong latency aggregation to per-socket (#23856
Browse files Browse the repository at this point in the history
)

## Description

Switches the R11s Socket latency telemetry from global aggregation
(average over time) to per-socket aggregation (average over threshold
ping-pong events). This allows for better telemetry granularity
(percentiles, tenant/document narrowing, etc.) at the cost of more
generated telemetry. The amount of telemetry can be tuned with the
`pingPongLatencyTrackingAggregationThreshold` config, where 1 ping-pong
event should [happen every ~25
seconds](https://socket.io/docs/v4/server-options/#pinginterval).

## Breaking Changes

An internal param that was never used was removed, and a config that was
never set was removed.
  • Loading branch information
znewton authored Feb 14, 2025
1 parent 370f849 commit 6ab8453
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 53 deletions.
7 changes: 7 additions & 0 deletions server/routerlicious/.changeset/old-melons-tease.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@fluidframework/server-routerlicious": major
---

Socket Latency Telemetry strategy changed to per-socket-connection

Socket latency tracking strategy was changed to a per-socket strategy for better telemetry granularity. With this, a new config was added (`nexus.socketIo.pingPongLatencyTrackingAggregationThreshold`) and an old config was removed (`nexus.socketIo.pingPongLatencyTrackingIntervalMs`).
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ data:
"socketIo" : {
"perMessageDeflate": {{ .Values.nexus.socketIo.perMessageDeflate}},
"gracefulShutdownEnabled": {{ .Values.nexus.socketIo.gracefulShutdownEnabled}},
"pingPongLatencyTrackingEnabled": {{ .Values.nexus.socketIo.pingPongLatencyTrackingEnabled}}
"pingPongLatencyTrackingEnabled": {{ .Values.nexus.socketIo.pingPongLatencyTrackingEnabled}},
"pingPongLatencyTrackingAggregationThreshold": {{ .Values.nexus.socketIo.pingPongLatencyTrackingAggregationThreshold}}
},
"enableCollaborationSessionTracking": {{ .Values.nexus.enableCollaborationSessionTracking }},
"enableCollaborationSessionPruning": {{ .Values.nexus.enableCollaborationSessionPruning }},
Expand Down
1 change: 1 addition & 0 deletions server/routerlicious/kubernetes/routerlicious/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ nexus:
perMessageDeflate: true
gracefulShutdownEnabled: false
pingPongLatencyTrackingEnabled: false
pingPongLatencyTrackingAggregationThreshold: 3
enableCollaborationSessionTracking: false
enableCollaborationSessionPruning: false
redisCollaborationSessionManagerOptions:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@
"socketIo": {
"perMessageDeflate": false,
"gracefulShutdownEnabled": false,
"pingPongLatencyTrackingEnabled": false
"pingPongLatencyTrackingEnabled": false,
"pingPongLatencyTrackingAggregationThreshold": 3
},
"jwtTokenCache": {
"enable": true
Expand Down
96 changes: 45 additions & 51 deletions server/routerlicious/packages/services-shared/src/socketIoServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@ import {
Lumberjack,
LumberEventName,
} from "@fluidframework/server-services-telemetry";
import {
IRedisClientConnectionManager,
InMemoryApiCounters,
type IApiCounters,
} from "@fluidframework/server-services-utils";
import { IRedisClientConnectionManager } from "@fluidframework/server-services-utils";
import { Namespace, Server, Socket, RemoteSocket, type DisconnectReason } from "socket.io";
import { createAdapter as createRedisAdapter } from "@socket.io/redis-adapter";
import type { Adapter } from "socket.io-adapter";
Expand Down Expand Up @@ -122,10 +118,11 @@ export interface ISocketIoServerConfig {
*/
pingPongLatencyTrackingEnabled: boolean;
/**
* The time in milliseconds to wait between each aggregated ping-pong latency telemetry event.
* Default is 1 minute.
* The number of ping pong events to aggregate for each ping-pong latency telemetry event.
* This is tracked on a per socket connection level.
* Default is 3.
*/
pingPongLatencyTrackingIntervalMs: number;
pingPongLatencyTrackingAggregationThreshold: number;
/**
* Whether to enable Socket.io [perMessageDeflate](https://socket.io/docs/v4/server-options/#permessagedeflate) option.
* Default is `true`.
Expand All @@ -135,15 +132,13 @@ export interface ISocketIoServerConfig {

class SocketIoServer implements core.IWebSocketServer {
private readonly events = new EventEmitter();
private readonly pingPongLatencyInterval: ReturnType<typeof setInterval> | undefined;
private readonly pingPongLatencyTrackingIntervalMs: number | undefined;
private readonly pingPongLatencyTrackingAggregationThreshold: number = 3;

constructor(
private readonly io: Server,
private readonly redisClientConnectionManagerForPub: IRedisClientConnectionManager,
private readonly redisClientConnectionManagerForSub: IRedisClientConnectionManager,
private readonly socketIoConfig?: Partial<ISocketIoServerConfig>,
private readonly apiCounters: IApiCounters = new InMemoryApiCounters(),
) {
this.io.on("connection", (socket: Socket) => {
/**
Expand All @@ -155,8 +150,8 @@ class SocketIoServer implements core.IWebSocketServer {
* for real logic and access purposes without validating against the JWT access token.
*/
const telemetryProperties = {
[BaseTelemetryProperties.tenantId]: socket.handshake.query.tenantId,
[BaseTelemetryProperties.documentId]: socket.handshake.query.documentId,
[BaseTelemetryProperties.tenantId]: `${socket.handshake.query.tenantId}`,
[BaseTelemetryProperties.documentId]: `${socket.handshake.query.documentId}`,
};
const socketConnectionMetric = Lumberjack.newLumberMetric(
LumberEventName.SocketConnection,
Expand All @@ -165,7 +160,7 @@ class SocketIoServer implements core.IWebSocketServer {
const webSocket = new SocketIoSocket(socket);
this.events.emit("connection", webSocket);

this.initPingPongLatencyTracking(socket);
this.initPingPongLatencyTracking(socket, telemetryProperties);

webSocket.on("disconnect", (reason: DisconnectReason) => {
// The following should be considered as normal disconnects and not logged as errors.
Expand Down Expand Up @@ -231,13 +226,9 @@ class SocketIoServer implements core.IWebSocketServer {
}
});

if (this.socketIoConfig?.pingPongLatencyTrackingEnabled) {
this.pingPongLatencyTrackingIntervalMs =
this.socketIoConfig?.pingPongLatencyTrackingIntervalMs ?? 60000;
this.pingPongLatencyInterval = setInterval(
this.flushPingPongLatencyTracking.bind(this),
this.pingPongLatencyTrackingIntervalMs,
);
if (this.socketIoConfig?.pingPongLatencyTrackingAggregationThreshold !== undefined) {
this.pingPongLatencyTrackingAggregationThreshold =
this.socketIoConfig?.pingPongLatencyTrackingAggregationThreshold;
}
}

Expand Down Expand Up @@ -331,17 +322,33 @@ class SocketIoServer implements core.IWebSocketServer {
this.redisClientConnectionManagerForPub.getRedisClient().quit(),
this.redisClientConnectionManagerForSub.getRedisClient().quit(),
]);
if (this.socketIoConfig?.pingPongLatencyTrackingEnabled) {
clearInterval(this.pingPongLatencyInterval);
this.flushPingPongLatencyTracking();
}
}

private initPingPongLatencyTracking(socket: Socket) {
private initPingPongLatencyTracking(
socket: Socket,
{ tenantId, documentId }: { tenantId: string; documentId: string },
) {
if (!this.socketIoConfig?.pingPongLatencyTrackingEnabled) {
return;
}

const pingPongDurationsMs: number[] = [];
const outputPingPongLatencyEvent = () => {
const aggregateAverageLatencyMs = Math.ceil(
pingPongDurationsMs.reduce((a, b) => a + b) / pingPongDurationsMs.length,
);
pingPongDurationsMs.length = 0;
const latencyMetric = Lumberjack.newLumberMetric(
LumberEventName.SocketConnectionLatency,
{
[BaseTelemetryProperties.tenantId]: tenantId,
[BaseTelemetryProperties.documentId]: documentId,
durationInMs: aggregateAverageLatencyMs,
},
);
// Always successful
latencyMetric.success("Socket.io Ping-Pong Latency");
};
let lastPingStartTime: number | undefined;
const packetCreateHandler = (packet: any) => {
if (packet.type === "ping") {
Expand All @@ -350,40 +357,27 @@ class SocketIoServer implements core.IWebSocketServer {
};
socket.conn.on("packetCreate", packetCreateHandler);
const packetReceivedHandler = (packet: any) => {
if (packet.type === "pong") {
if (lastPingStartTime !== undefined) {
this.apiCounters.incrementCounter("pingPongCount", 1);
this.apiCounters.incrementCounter(
"pingPongLatency",
performance.now() - lastPingStartTime,
);
lastPingStartTime = undefined;
if (packet.type === "pong" && lastPingStartTime !== undefined) {
const latency = performance.now() - lastPingStartTime;
lastPingStartTime = undefined;
pingPongDurationsMs.push(latency);
// Output telemetry when threshold is reached
if (
pingPongDurationsMs.length >= this.pingPongLatencyTrackingAggregationThreshold
) {
outputPingPongLatencyEvent();
}
}
};
socket.conn.on("packet", packetReceivedHandler);
socket.conn.on("close", () => {
if (pingPongDurationsMs.length > 0) {
outputPingPongLatencyEvent();
}
socket.conn.off("packetCreate", packetCreateHandler);
socket.conn.off("packet", packetReceivedHandler);
});
}

private flushPingPongLatencyTracking() {
if (!this.apiCounters.countersAreActive) {
return;
}
const pingPongCount = this.apiCounters.getCounter("pingPongCount") ?? 0;
const pingPongLatency = this.apiCounters.getCounter("pingPongLatency") ?? 0;
if (pingPongCount > 0) {
Lumberjack.info("Average Socket.io Ping-Pong Latency", {
durationInMs: Math.ceil(pingPongLatency / pingPongCount),
aggregateCount: pingPongCount,
aggregateLatencyMs: pingPongLatency,
aggregateLatencyIntervalMs: this.pingPongLatencyTrackingIntervalMs,
});
}
this.apiCounters.resetAllCounters();
}
}

type SocketIoAdapter = typeof Adapter | ((nsp: Namespace) => Adapter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ export enum LumberEventName {
HttpRequest = "HttpRequest",
SocketConnection = "SocketConnection",
SocketConnectionCount = "SocketConnectionCount",
SocketConnectionLatency = "SocketConnectionLatency",
TotalConnectionCount = "TotalConnectionCount",
ConnectionCountPerNode = "ConnectionCountPerNode",
RestoreFromCheckpoint = "RestoreFromCheckpoint",
Expand Down

0 comments on commit 6ab8453

Please sign in to comment.