From 5a938959804d35528f2e3c27fc7c013bdc01ffbb Mon Sep 17 00:00:00 2001 From: Marco Collovati Date: Fri, 18 Oct 2024 12:38:26 +0200 Subject: [PATCH 1/2] fix: resume client to server communication after web socket reconnection When a websocket PUSH connection is closed and re-established because of a network failure, the RequestResponseTracker.hasActiveRequest is not reset, prenvint the Flow client to send additional messages to the server. This change will reset the flag on reconnection. It also will track unsent PUSH message over websocket, to retry the delivery once the connection is re-established, preventing client resynchronization. In addition, it sets a default value of 12 for the Atmospehere maxWebsocketErrorRetries setting, to ensure that the Flow client will attempt to reconnect with web socket transport several times, instead of immediately downgrade to long-polling after first failed connection. Fixes #20213 --- .../AtmospherePushConnection.java | 1 + .../DefaultConnectionStateHandler.java | 12 ++++++++ .../client/communication/MessageSender.java | 29 ++++++++++++++++++- 3 files changed, 41 insertions(+), 1 deletion(-) diff --git a/flow-client/src/main/java/com/vaadin/client/communication/AtmospherePushConnection.java b/flow-client/src/main/java/com/vaadin/client/communication/AtmospherePushConnection.java index 8ed029ec347..46a512efd35 100644 --- a/flow-client/src/main/java/com/vaadin/client/communication/AtmospherePushConnection.java +++ b/flow-client/src/main/java/com/vaadin/client/communication/AtmospherePushConnection.java @@ -686,6 +686,7 @@ protected final native AtmosphereConfiguration createConfig() fallbackTransport: 'long-polling', contentType: 'application/json; charset=UTF-8', reconnectInterval: 5000, + maxWebsocketErrorRetries: 12, timeout: -1, maxReconnectOnClose: 10000000, trackMessageLength: true, diff --git a/flow-client/src/main/java/com/vaadin/client/communication/DefaultConnectionStateHandler.java b/flow-client/src/main/java/com/vaadin/client/communication/DefaultConnectionStateHandler.java index ec98867090f..5be7f20f33d 100644 --- a/flow-client/src/main/java/com/vaadin/client/communication/DefaultConnectionStateHandler.java +++ b/flow-client/src/main/java/com/vaadin/client/communication/DefaultConnectionStateHandler.java @@ -458,6 +458,18 @@ public void pushOk(PushConnection pushConnection) { debug("pushOk()"); if (isReconnecting()) { resolveTemporaryError(Type.PUSH); + if (registry.getRequestResponseTracker().hasActiveRequest()) { + debug("pushOk() Reset active request state when reconnecting PUSH because of a network error."); + endRequest(); + // for bidirectional transport, the pending message is not sent + // as reconnection payload, so immediately push the pending + // changes on reconnect + if (pushConnection.isBidirectional()) { + Console.debug( + "Flush pending messages after PUSH reconnection."); + registry.getMessageSender().sendInvocationsToServer(); + } + } } } diff --git a/flow-client/src/main/java/com/vaadin/client/communication/MessageSender.java b/flow-client/src/main/java/com/vaadin/client/communication/MessageSender.java index 2cf6ec6c2b7..5f1c311d6ba 100644 --- a/flow-client/src/main/java/com/vaadin/client/communication/MessageSender.java +++ b/flow-client/src/main/java/com/vaadin/client/communication/MessageSender.java @@ -64,6 +64,8 @@ public enum ResynchronizationState { private ResynchronizationState resynchronizationState = ResynchronizationState.NOT_ACTIVE; + private JsonObject pushPendingMessage; + /** * Creates a new instance connected to the given registry. * @@ -104,6 +106,17 @@ public void sendInvocationsToServer() { * */ private void doSendInvocationsToServer() { + // If there's a stored message, resend it and postpone processing the + // rest of the queued messages to prevent resynchronization issues. + if (pushPendingMessage != null) { + Console.log("Sending pending push message " + + pushPendingMessage.toJson()); + JsonObject payload = pushPendingMessage; + pushPendingMessage = null; + registry.getRequestResponseTracker().startRequest(); + send(payload); + return; + } ServerRpcQueue serverRpcQueue = registry.getServerRpcQueue(); if (serverRpcQueue.isEmpty() @@ -181,6 +194,13 @@ private JsonObject preparePayload(final JsonArray reqInvocations, */ public void send(final JsonObject payload) { if (push != null && push.isBidirectional()) { + // When using bidirectional transport, the payload is not resent + // to the server during reconnection attempts. + // Keep a copy of the message, so that it could be resent to the + // server after a reconnection. + // Reference will be cleaned up once the server confirms it has + // seen this message + pushPendingMessage = payload; push.push(payload); } else { registry.getXhrConnection().send(payload); @@ -260,7 +280,14 @@ public void resynchronize() { */ public void setClientToServerMessageId(int nextExpectedId, boolean force) { if (nextExpectedId == clientToServerMessageId) { - // No op as everything matches they way it should + // Everything matches they way it should + // Remove potential pending PUSH message if it has already been seen + // by the server. + if (pushPendingMessage != null + && (int) pushPendingMessage.getNumber( + ApplicationConstants.CLIENT_TO_SERVER_ID) < nextExpectedId) { + pushPendingMessage = null; + } return; } if (force) { From a804f8575d07f22340073a54bb8cbd36ec4019cd Mon Sep 17 00:00:00 2001 From: Marco Collovati Date: Tue, 22 Oct 2024 09:05:06 +0200 Subject: [PATCH 2/2] upgrade to atmosphere javascript 4.0.1 with reconnection fixes --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 928abf6b7d7..38b6f3981e0 100644 --- a/pom.xml +++ b/pom.xml @@ -120,7 +120,7 @@ 1.18.1 3.0.5.slf4jvaadin1 - 4.0.0 + 4.0.1