Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New unified stream manager #509

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 165 additions & 0 deletions src/main/java/software/amazon/awssdk/crt/http/Http1StreamManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
package software.amazon.awssdk.crt.http;

import software.amazon.awssdk.crt.CrtRuntimeException;

import java.util.concurrent.CompletableFuture;

/**
* Manages a Pool of HTTP/1.1 Streams. Creates and manages HTTP/1.1 connections
* under the hood. Will grab a connection from HttpClientConnectionManager to
* make request on it, and will return it back until the request finishes.
*/
public class Http1StreamManager implements AutoCloseable {

private HttpClientConnectionManager connectionManager = null;

/**
* Factory function for Http1StreamManager instances
*
* @param options the connection manager options configure to connection manager under the hood
* @return a new instance of an Http1StreamManager
*/
public static Http1StreamManager create(HttpClientConnectionManagerOptions options) {
return new Http1StreamManager(options);
}

private Http1StreamManager(HttpClientConnectionManagerOptions options) {
this.connectionManager = HttpClientConnectionManager.create(options);
}

public CompletableFuture<Void> getShutdownCompleteFuture() {
return this.connectionManager.getShutdownCompleteFuture();
}

/**
* Request an HTTP/1.1 HttpStream from StreamManager.
*
* @param request HttpRequest. The Request to make to the Server.
* @param streamHandler HttpStreamResponseHandler. The Stream Handler to be called from the Native EventLoop
* @return A future for a HttpStream that will be completed when the stream is
* acquired.
* @throws CrtRuntimeException Exception happens from acquiring stream.
*/
public CompletableFuture<HttpStream> acquireStream(HttpRequest request,
HttpStreamResponseHandler streamHandler) {
CompletableFuture<HttpStream> completionFuture = new CompletableFuture<>();
HttpClientConnectionManager connManager = this.connectionManager;
this.connectionManager.acquireConnection().whenComplete((conn, throwable) -> {
if (throwable != null) {
completionFuture.completeExceptionally(throwable);
} else {
try {
HttpStream stream = conn.makeRequest(request, new HttpStreamResponseHandler() {
@Override
public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType,
HttpHeader[] nextHeaders) {
streamHandler.onResponseHeaders(stream, responseStatusCode, blockType, nextHeaders);
}

@Override
public void onResponseHeadersDone(HttpStream stream, int blockType) {
streamHandler.onResponseHeadersDone(stream, blockType);
}

@Override
public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
return streamHandler.onResponseBody(stream, bodyBytesIn);
}

@Override
public void onResponseComplete(HttpStream stream, int errorCode) {
streamHandler.onResponseComplete(stream, errorCode);
/* Release the connection back */
connManager.releaseConnection(conn);
}
});
completionFuture.complete(stream);
/* Active the stream for user */
try {
stream.activate();
} catch (CrtRuntimeException e) {
/* If activate failed, complete callback will not be invoked */
streamHandler.onResponseComplete(stream, e.errorCode);
/* Release the connection back */
connManager.releaseConnection(conn);
}
} catch (Exception ex) {
connManager.releaseConnection(conn);
completionFuture.completeExceptionally(ex);
}
}
});
return completionFuture;
}


/**
* Request an HTTP/1.1 HttpStream from StreamManager.
*
* @param request HttpRequestBase. The Request to make to the Server.
* @param streamHandler HttpStreamBaseResponseHandler. The Stream Handler to be called from the Native EventLoop
* @return A future for a HttpStreamBase that will be completed when the stream is
* acquired.
* @throws CrtRuntimeException Exception happens from acquiring stream.
*/
public CompletableFuture<HttpStreamBase> acquireStream(HttpRequestBase request,
HttpStreamBaseResponseHandler streamHandler) {
CompletableFuture<HttpStreamBase> completionFuture = new CompletableFuture<>();
HttpClientConnectionManager connManager = this.connectionManager;
this.connectionManager.acquireConnection().whenComplete((conn, throwable) -> {
if (throwable != null) {
completionFuture.completeExceptionally(throwable);
} else {
try {
HttpStreamBase stream = conn.makeRequest(request, new HttpStreamBaseResponseHandler() {
@Override
public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType,
HttpHeader[] nextHeaders) {
streamHandler.onResponseHeaders(stream, responseStatusCode, blockType, nextHeaders);
}

@Override
public void onResponseHeadersDone(HttpStreamBase stream, int blockType) {
streamHandler.onResponseHeadersDone(stream, blockType);
}

@Override
public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
return streamHandler.onResponseBody(stream, bodyBytesIn);
}

@Override
public void onResponseComplete(HttpStreamBase stream, int errorCode) {
streamHandler.onResponseComplete(stream, errorCode);
/* Release the connection back */
connManager.releaseConnection(conn);
}
});
completionFuture.complete(stream);
/* Active the stream for user */
try {
stream.activate();
} catch (CrtRuntimeException e) {
/* If activate failed, complete callback will not be invoked */
streamHandler.onResponseComplete(stream, e.errorCode);
/* Release the connection back */
connManager.releaseConnection(conn);
}
} catch (Exception ex) {
connManager.releaseConnection(conn);
completionFuture.completeExceptionally(ex);
}
}
});
return completionFuture;
}

@Override
public void close() {
this.connectionManager.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,8 @@ public CompletableFuture<Http2Stream> acquireStream(HttpRequest request,
return this.acquireStream((HttpRequestBase) request, streamHandler);
}

private CompletableFuture<Http2Stream> acquireStream(HttpRequestBase request,
HttpStreamBaseResponseHandler streamHandler) {

public CompletableFuture<Http2Stream> acquireStream(HttpRequestBase request,
HttpStreamBaseResponseHandler streamHandler) {
CompletableFuture<Http2Stream> completionFuture = new CompletableFuture<>();
AsyncCallback acquireStreamCompleted = AsyncCallback.wrapFuture(completionFuture, null);
if (isNull()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.List;
import java.util.ArrayList;
import java.net.URI;

/**
* Contains all the configuration options for a Http2StreamManager
Expand All @@ -12,6 +13,7 @@ public class Http2StreamManagerOptions {
public static final int DEFAULT_MAX = Integer.MAX_VALUE;
public static final int DEFAULT_MAX_CONNECTIONS = 2;
public static final int DEFAULT_CONNECTION_PING_TIMEOUT_MS = 3000;
private static final String HTTPS = "https";

private HttpClientConnectionManagerOptions connectionManagerOptions;

Expand Down Expand Up @@ -249,8 +251,9 @@ public void validateOptions() {
throw new IllegalArgumentException("Connection manager options are required.");
}
connectionManagerOptions.validateOptions();
if ((connectionManagerOptions.getTlsConnectionOptions() != null
|| connectionManagerOptions.getTlsContext() != null) && priorKnowledge) {
URI uri = connectionManagerOptions.getUri();
boolean useTls = HTTPS.equals(uri.getScheme());
if (useTls && priorKnowledge) {
throw new IllegalArgumentException("HTTP/2 prior knowledge cannot be set when TLS is used.");
}
if ((connectionManagerOptions.getTlsConnectionOptions() == null
Expand Down
144 changes: 144 additions & 0 deletions src/main/java/software/amazon/awssdk/crt/http/HttpStreamManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

package software.amazon.awssdk.crt.http;

import software.amazon.awssdk.crt.CrtRuntimeException;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;

/**
* Manages a pool for either HTTP/1.1 or HTTP/2 connection.
*
* Contains two stream manager for two protocols under the hood.
*/
public class HttpStreamManager implements AutoCloseable {

private Http1StreamManager h1StreamManager = null;
private Http2StreamManager h2StreamManager = null;
private CompletableFuture<Void> shutdownComplete = null;
private AtomicLong shutdownNum = new AtomicLong(0);
private Throwable shutdownCompleteException = null;

/**
* Factory function for HttpStreamManager instances
*
* @param options configuration options
* @return a new instance of an HttpStreamManager
*/
public static HttpStreamManager create(HttpStreamManagerOptions options) {
return new HttpStreamManager(options);
}

private HttpStreamManager(HttpStreamManagerOptions options) {
this.shutdownComplete = new CompletableFuture<Void>();
if (options.getExpectedProtocol() == HttpVersion.UNKNOWN) {
this.h1StreamManager = Http1StreamManager.create(options.getHTTP1ConnectionManagerOptions());
this.h2StreamManager = Http2StreamManager.create(options.getHTTP2StreamManagerOptions());
} else {
if (options.getExpectedProtocol() == HttpVersion.HTTP_2) {
this.h2StreamManager = Http2StreamManager.create(options.getHTTP2StreamManagerOptions());
} else {
this.h1StreamManager = Http1StreamManager.create(options.getHTTP1ConnectionManagerOptions());
}
/* Only one manager created. */
this.shutdownNum.addAndGet(1);
}
if (this.h1StreamManager != null) {
this.h1StreamManager.getShutdownCompleteFuture().whenComplete((v, throwable) -> {
if (throwable != null) {
this.shutdownCompleteException = throwable;
}
long shutdownNum = this.shutdownNum.addAndGet(1);
if (shutdownNum == 2) {
/* both connectionManager and the h2StreamManager has been shutdown. */
if (this.shutdownCompleteException != null) {
this.shutdownComplete.completeExceptionally(this.shutdownCompleteException);
} else {
this.shutdownComplete.complete(null);
}
}
});
}
if (this.h2StreamManager != null) {
this.h2StreamManager.getShutdownCompleteFuture().whenComplete((v, throwable) -> {
if (throwable != null) {
this.shutdownCompleteException = throwable;
}
long shutdownNum = this.shutdownNum.addAndGet(1);
if (shutdownNum == 2) {
/* both connectionManager and the h2StreamManager has been shutdown. */
if (this.shutdownCompleteException != null) {
this.shutdownComplete.completeExceptionally(this.shutdownCompleteException);
} else {
this.shutdownComplete.complete(null);
}
}
});
}
}

private void h1AcquireStream(HttpRequestBase request,
HttpStreamBaseResponseHandler streamHandler, CompletableFuture<HttpStreamBase> completionFuture) {

this.h1StreamManager.acquireStream(request, streamHandler).whenComplete((stream, throwable) -> {
if (throwable != null) {
completionFuture.completeExceptionally(throwable);
} else {
completionFuture.complete(stream);
}
});
}

/**
* Request a HttpStream from StreamManager. If the streamManager is made with
* HTTP/2 connection under the hood, it will be Http2Stream.
*
* @param request HttpRequestBase. The Request to make to the Server.
* @param streamHandler HttpStreamBaseResponseHandler. The Stream Handler to be called from the Native EventLoop
* @return A future for a Http2Stream that will be completed when the stream is
* acquired.
*/
public CompletableFuture<HttpStreamBase> acquireStream(HttpRequestBase request,
HttpStreamBaseResponseHandler streamHandler) {
CompletableFuture<HttpStreamBase> completionFuture = new CompletableFuture<>();
if (this.h2StreamManager != null) {
this.h2StreamManager.acquireStream(request, streamHandler).whenComplete((stream, throwable) -> {
if (throwable != null) {
if (throwable instanceof CrtRuntimeException) {
CrtRuntimeException exception = (CrtRuntimeException) throwable;
if (exception.errorName.equals("AWS_ERROR_HTTP_STREAM_MANAGER_UNEXPECTED_HTTP_VERSION") && this.h1StreamManager != null) {
this.h1AcquireStream(request, streamHandler, completionFuture);
} else {
completionFuture.completeExceptionally(throwable);
}
} else {
completionFuture.completeExceptionally(throwable);
}
} else {
completionFuture.complete((Http2Stream) stream);
}
});
return completionFuture;
}
this.h1AcquireStream(request, streamHandler, completionFuture);
return completionFuture;
}

public CompletableFuture<Void> getShutdownCompleteFuture() {
return shutdownComplete;
}

@Override
public void close() {
if (this.h1StreamManager != null) {
this.h1StreamManager.close();
}
if (this.h2StreamManager != null) {
this.h2StreamManager.close();
}
}
}
Loading