Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: resume client to server communication after web socket reconnection #20283

Merged
merged 3 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,7 @@ protected final native AtmosphereConfiguration createConfig()
fallbackTransport: 'long-polling',
contentType: 'application/json; charset=UTF-8',
reconnectInterval: 5000,
maxWebsocketErrorRetries: 12,
timeout: -1,
maxReconnectOnClose: 10000000,
trackMessageLength: true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,18 @@ public void pushOk(PushConnection pushConnection) {
debug("pushOk()");
if (isReconnecting()) {
resolveTemporaryError(Type.PUSH);
if (registry.getRequestResponseTracker().hasActiveRequest()) {
debug("pushOk() Reset active request state when reconnecting PUSH because of a network error.");
endRequest();
// for bidirectional transport, the pending message is not sent
// as reconnection payload, so immediately push the pending
// changes on reconnect
if (pushConnection.isBidirectional()) {
Console.debug(
"Flush pending messages after PUSH reconnection.");
registry.getMessageSender().sendInvocationsToServer();
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public enum ResynchronizationState {

private ResynchronizationState resynchronizationState = ResynchronizationState.NOT_ACTIVE;

private JsonObject pushPendingMessage;

/**
* Creates a new instance connected to the given registry.
*
Expand Down Expand Up @@ -104,6 +106,17 @@ public void sendInvocationsToServer() {
*
*/
private void doSendInvocationsToServer() {
// If there's a stored message, resend it and postpone processing the
// rest of the queued messages to prevent resynchronization issues.
if (pushPendingMessage != null) {
Console.log("Sending pending push message "
+ pushPendingMessage.toJson());
JsonObject payload = pushPendingMessage;
pushPendingMessage = null;
registry.getRequestResponseTracker().startRequest();
send(payload);
return;
}

ServerRpcQueue serverRpcQueue = registry.getServerRpcQueue();
if (serverRpcQueue.isEmpty()
Expand Down Expand Up @@ -181,6 +194,13 @@ private JsonObject preparePayload(final JsonArray reqInvocations,
*/
public void send(final JsonObject payload) {
if (push != null && push.isBidirectional()) {
// When using bidirectional transport, the payload is not resent
// to the server during reconnection attempts.
// Keep a copy of the message, so that it could be resent to the
// server after a reconnection.
// Reference will be cleaned up once the server confirms it has
// seen this message
pushPendingMessage = payload;
push.push(payload);
} else {
registry.getXhrConnection().send(payload);
Expand Down Expand Up @@ -260,7 +280,14 @@ public void resynchronize() {
*/
public void setClientToServerMessageId(int nextExpectedId, boolean force) {
if (nextExpectedId == clientToServerMessageId) {
// No op as everything matches they way it should
// Everything matches they way it should
// Remove potential pending PUSH message if it has already been seen
// by the server.
if (pushPendingMessage != null
&& (int) pushPendingMessage.getNumber(
ApplicationConstants.CLIENT_TO_SERVER_ID) < nextExpectedId) {
pushPendingMessage = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the message reaches the server and then the channel is disconnected? It looks like a duplicate of the last message will be sent - what happens after that?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the message is the same as the last processed one (both client message id and hash must match) then it is ignored on the server side; the response should then have a next expected message id greater than one of the cached message, and pushPendingMessage should be nullified.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, if client id is too old, then the "Unexpected message id from the client ..." error will be raised.

}
return;
}
if (force) {
Expand Down
Loading