Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly release underlying buffer before passing it to WebSocket handler #2715

Merged
merged 4 commits into from
Feb 2, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2021 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,7 +19,6 @@
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow;
import java.util.logging.Logger;

import javax.websocket.CloseReason;

Expand All @@ -34,7 +33,6 @@
* Class TyrusReaderSubscriber.
*/
public class TyrusReaderSubscriber implements Flow.Subscriber<DataChunk> {
private static final Logger LOGGER = Logger.getLogger(TyrusSupport.class.getName());

private static final int MAX_RETRIES = 5;
private static final CloseReason CONNECTION_CLOSED = new CloseReason(NORMAL_CLOSURE, "Connection closed");
Expand Down Expand Up @@ -63,34 +61,55 @@ public void onSubscribe(Flow.Subscription subscription) {

@Override
public void onNext(DataChunk item) {
if (executorService == null) {
if (subscription != null) {
if (executorService == null) {
submitDataChunk(item);
} else {
executorService.submit(() -> submitDataChunk(item));
}
} else {
item.release();
}
}

/**
* Submits all data in a chunk and requests one more if successful.
*
* @param item a data chunk
*/
private void submitDataChunk(DataChunk item) {
try {
for (ByteBuffer byteBuffer : item.data()) {
submitBuffer(byteBuffer);
}
} else {
executorService.submit(() -> {
for (ByteBuffer byteBuffer : item.data()) {
submitBuffer(byteBuffer);
}
});
} finally {
item.release();
}
if (subscription != null) {
subscription.request(1L);
}
}

/**
* Submits data buffer to Tyrus. Retries a few times to make sure the entire buffer
* is consumed or logs an error.
* Submits single buffer to Tyrus. Retries a few times to make sure the entire buffer
* is consumed.
*
* @param data Data buffer.
*/
private void submitBuffer(ByteBuffer data) {
// Pass all data to Tyrus spi
int retries = MAX_RETRIES;
while (data.remaining() > 0 && retries-- > 0) {
connection.getReadHandler().handle(data);
}

// If we can't push all data to Tyrus, cancel and report problem
if (retries == 0) {
LOGGER.warning("Tyrus did not consume all data buffer after " + MAX_RETRIES + " retries");
subscription.cancel();
subscription = null;
connection.close(new CloseReason(UNEXPECTED_CONDITION, "Tyrus did not "
+ "consume all data after " + MAX_RETRIES + " retries"));
}
subscription.request(1L);
}

@Override
Expand Down