From aeded23afc4b51c9ce272b20e874220ca59ea6a8 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Tue, 17 Oct 2023 15:16:38 +0200 Subject: [PATCH 1/2] feat(NODE-5613): add awaited to hearbeat started events --- src/sdam/events.ts | 5 ++++- src/sdam/monitor.ts | 11 +++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/sdam/events.ts b/src/sdam/events.ts index c55eb09575..5a3198aee9 100644 --- a/src/sdam/events.ts +++ b/src/sdam/events.ts @@ -132,10 +132,13 @@ export class TopologyClosedEvent { export class ServerHeartbeatStartedEvent { /** The connection id for the command */ connectionId: string; + /** Is true when using the streaming protocol. */ + awaited: boolean; /** @internal */ - constructor(connectionId: string) { + constructor(connectionId: string, awaited: boolean) { this.connectionId = connectionId; + this.awaited = awaited; } } diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index bd5702b4af..2238b56271 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -209,7 +209,12 @@ function resetMonitorState(monitor: Monitor) { function checkServer(monitor: Monitor, callback: Callback) { let start = now(); - monitor.emit(Server.SERVER_HEARTBEAT_STARTED, new ServerHeartbeatStartedEvent(monitor.address)); + const topologyVersion = monitor[kServer].description.topologyVersion; + const isAwaitable = topologyVersion != null; + monitor.emit( + Server.SERVER_HEARTBEAT_STARTED, + new ServerHeartbeatStartedEvent(monitor.address, isAwaitable && topologyVersion != null) + ); function failureHandler(err: Error) { monitor[kConnection]?.destroy({ force: true }); @@ -237,8 +242,6 @@ function checkServer(monitor: Monitor, callback: Callback) { const { serverApi, helloOk } = connection; const connectTimeoutMS = monitor.options.connectTimeoutMS; const maxAwaitTimeMS = monitor.options.heartbeatFrequencyMS; - const topologyVersion = monitor[kServer].description.topologyVersion; - const isAwaitable = topologyVersion != null; const cmd = { [serverApi?.version || helloOk ? 'hello' : LEGACY_HELLO_COMMAND]: 1, @@ -288,7 +291,7 @@ function checkServer(monitor: Monitor, callback: Callback) { if (isAwaitable && hello.topologyVersion) { monitor.emit( Server.SERVER_HEARTBEAT_STARTED, - new ServerHeartbeatStartedEvent(monitor.address) + new ServerHeartbeatStartedEvent(monitor.address, true) ); start = now(); } else { From ad8174a063dd294d5f6dc1a7bc6c5bb07256fe2d Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Tue, 17 Oct 2023 16:30:07 +0200 Subject: [PATCH 2/2] feat(NODE-5613): add to success and failed --- src/sdam/events.ts | 10 ++++++++-- src/sdam/monitor.ts | 21 ++++++++++++++++----- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/src/sdam/events.ts b/src/sdam/events.ts index 5a3198aee9..4c8a1c1312 100644 --- a/src/sdam/events.ts +++ b/src/sdam/events.ts @@ -154,12 +154,15 @@ export class ServerHeartbeatSucceededEvent { duration: number; /** The command reply */ reply: Document; + /** Is true when using the streaming protocol. */ + awaited: boolean; /** @internal */ - constructor(connectionId: string, duration: number, reply: Document | null) { + constructor(connectionId: string, duration: number, reply: Document | null, awaited: boolean) { this.connectionId = connectionId; this.duration = duration; this.reply = reply ?? {}; + this.awaited = awaited; } } @@ -175,11 +178,14 @@ export class ServerHeartbeatFailedEvent { duration: number; /** The command failure */ failure: Error; + /** Is true when using the streaming protocol. */ + awaited: boolean; /** @internal */ - constructor(connectionId: string, duration: number, failure: Error) { + constructor(connectionId: string, duration: number, failure: Error, awaited: boolean) { this.connectionId = connectionId; this.duration = duration; this.failure = failure; + this.awaited = awaited; } } diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index 2238b56271..1e510d0dea 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -213,7 +213,7 @@ function checkServer(monitor: Monitor, callback: Callback) { const isAwaitable = topologyVersion != null; monitor.emit( Server.SERVER_HEARTBEAT_STARTED, - new ServerHeartbeatStartedEvent(monitor.address, isAwaitable && topologyVersion != null) + new ServerHeartbeatStartedEvent(monitor.address, isAwaitable) ); function failureHandler(err: Error) { @@ -222,7 +222,12 @@ function checkServer(monitor: Monitor, callback: Callback) { monitor.emit( Server.SERVER_HEARTBEAT_FAILED, - new ServerHeartbeatFailedEvent(monitor.address, calculateDurationInMs(start), err) + new ServerHeartbeatFailedEvent( + monitor.address, + calculateDurationInMs(start), + err, + isAwaitable + ) ); const error = !(err instanceof MongoError) @@ -281,14 +286,15 @@ function checkServer(monitor: Monitor, callback: Callback) { const duration = isAwaitable && rttPinger ? rttPinger.roundTripTime : calculateDurationInMs(start); + const awaited = isAwaitable && hello.topologyVersion != null; monitor.emit( Server.SERVER_HEARTBEAT_SUCCEEDED, - new ServerHeartbeatSucceededEvent(monitor.address, duration, hello) + new ServerHeartbeatSucceededEvent(monitor.address, duration, hello, awaited) ); // if we are using the streaming protocol then we immediately issue another `started` // event, otherwise the "check" is complete and return to the main monitor loop - if (isAwaitable && hello.topologyVersion) { + if (awaited) { monitor.emit( Server.SERVER_HEARTBEAT_STARTED, new ServerHeartbeatStartedEvent(monitor.address, true) @@ -327,7 +333,12 @@ function checkServer(monitor: Monitor, callback: Callback) { monitor[kConnection] = conn; monitor.emit( Server.SERVER_HEARTBEAT_SUCCEEDED, - new ServerHeartbeatSucceededEvent(monitor.address, calculateDurationInMs(start), conn.hello) + new ServerHeartbeatSucceededEvent( + monitor.address, + calculateDurationInMs(start), + conn.hello, + false + ) ); callback(undefined, conn.hello);