Skip to content

Commit

Permalink
Preallocate Request Buffers in Buffer Pool (#89)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexw91 authored and Justin Boswell committed Sep 13, 2019
1 parent cf08d44 commit fde0b0f
Show file tree
Hide file tree
Showing 13 changed files with 437 additions and 103 deletions.
43 changes: 27 additions & 16 deletions src/main/java/software/amazon/awssdk/crt/http/HttpConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

package software.amazon.awssdk.crt.http;

import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.crt.CrtResource;
import software.amazon.awssdk.crt.CrtRuntimeException;
import software.amazon.awssdk.crt.io.CrtByteBuffer;


/**
Expand All @@ -39,33 +41,42 @@ protected HttpConnection(HttpConnectionPoolManager manager, long connection) {
* Schedules an HttpRequest on the Native EventLoop for this HttpConnection.
*
* @param request The Request to make to the Server.
* @param reqOptions The Http Request Options
* @param streamHandler The Stream Handler to be called from the Native EventLoop
* @throws CrtRuntimeException
* @return The HttpStream that represents this Request/Response Pair. It can be closed at any time during the
* request/response, but must be closed by the user thread making this request when it's done.
*/
public HttpStream makeRequest(HttpRequest request, HttpRequestOptions reqOptions, CrtHttpStreamHandler streamHandler) throws CrtRuntimeException {
public CompletableFuture<HttpStream> makeRequest(HttpRequest request, CrtHttpStreamHandler streamHandler) throws CrtRuntimeException {
if (isNull()) {
throw new IllegalStateException("HttpConnection has been closed, can't make requests on it.");
}

if (reqOptions.getBodyBufferSize() > manager.getWindowSize()) {
throw new IllegalArgumentException("Response Body Buffer can't be > than Window Size");
}
CompletableFuture<CrtByteBuffer> bufferFuture = manager.acquireBuffer();

HttpStream stream = httpConnectionMakeRequest(native_ptr(),
reqOptions.getBodyBufferSize(),
request.getMethod(),
request.getEncodedPath(),
request.getHeaders(),
streamHandler);
CompletableFuture<HttpStream> streamFuture = new CompletableFuture<>();

if (stream == null || stream.isNull()) {
throw new IllegalStateException("HttpStream is null");
}
bufferFuture.whenComplete((crtBuffer, throwable) -> {
if (throwable != null) {
streamFuture.completeExceptionally(throwable);
return;
}
try {
HttpStream stream = httpConnectionMakeRequest(native_ptr(),
crtBuffer,
request.getMethod(),
request.getEncodedPath(),
request.getHeaders(),
streamHandler);
if (stream == null || stream.isNull()) {
streamFuture.completeExceptionally(new RuntimeException("HttpStream creation failed"));
}
streamFuture.complete(stream);
} catch (Exception e) {
streamFuture.completeExceptionally(e);
}
});

return stream;
return streamFuture;
}

/**
Expand All @@ -84,7 +95,7 @@ public void close() {
* Native methods
******************************************************************************/
private static native HttpStream httpConnectionMakeRequest(long connection,
int respBodyBufSize,
CrtByteBuffer crtBuffer,
String method,
String uri,
HttpHeader[] headers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,21 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import software.amazon.awssdk.crt.CRT;
import software.amazon.awssdk.crt.CrtResource;
import software.amazon.awssdk.crt.CrtRuntimeException;
import software.amazon.awssdk.crt.io.ClientBootstrap;
import software.amazon.awssdk.crt.io.CrtBufferPool;
import software.amazon.awssdk.crt.io.CrtByteBuffer;
import software.amazon.awssdk.crt.io.SocketOptions;
import software.amazon.awssdk.crt.io.TlsContext;

/**
* Manages a Pool of Http Connections
*/
public class HttpConnectionPoolManager extends CrtResource {
public static final int DEFAULT_MAX_BUFFER_SIZE = 16 * 1024;
public static final int DEFAULT_MAX_WINDOW_SIZE = Integer.MAX_VALUE;
public static final int DEFAULT_MAX_CONNECTIONS = 2;
private static final String HTTP = "http";
Expand All @@ -49,18 +51,19 @@ public class HttpConnectionPoolManager extends CrtResource {
private final int maxConnections;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final CompletableFuture<Void> shutdownComplete = new CompletableFuture<>();
private final CrtBufferPool bufferPool;

/**
* The queue of Connection Acquisition requests.
*/
private final Queue<CompletableFuture<HttpConnection>> connectionAcquisitionRequests = new ConcurrentLinkedQueue<>();

public HttpConnectionPoolManager(ClientBootstrap clientBootstrap, SocketOptions socketOptions, TlsContext tlsContext, URI uri) {
this(clientBootstrap, socketOptions, tlsContext, uri, DEFAULT_MAX_WINDOW_SIZE, DEFAULT_MAX_CONNECTIONS);
this(clientBootstrap, socketOptions, tlsContext, uri, DEFAULT_MAX_BUFFER_SIZE, DEFAULT_MAX_WINDOW_SIZE, DEFAULT_MAX_CONNECTIONS);
}

public HttpConnectionPoolManager(ClientBootstrap clientBootstrap, SocketOptions socketOptions, TlsContext tlsContext,
URI uri, int windowSize, int maxConnections) {
URI uri, int bufferSize, int windowSize, int maxConnections) {

if (uri == null) { throw new IllegalArgumentException("URI must not be null"); }
if (uri.getScheme() == null) { throw new IllegalArgumentException("URI does not have a Scheme"); }
Expand All @@ -69,6 +72,7 @@ public HttpConnectionPoolManager(ClientBootstrap clientBootstrap, SocketOptions
if (clientBootstrap == null || clientBootstrap.isNull()) { throw new IllegalArgumentException("ClientBootstrap must not be null"); }
if (socketOptions == null || socketOptions.isNull()) { throw new IllegalArgumentException("SocketOptions must not be null"); }
if (HTTPS.equals(uri.getScheme()) && tlsContext == null) { throw new IllegalArgumentException("TlsContext must not be null if https is used"); }
if (bufferSize <= 0) { throw new IllegalArgumentException("Buffer Size must be greater than zero."); }
if (windowSize <= 0) { throw new IllegalArgumentException("Window Size must be greater than zero."); }
if (maxConnections <= 0) { throw new IllegalArgumentException("Max Connections must be greater than zero."); }

Expand All @@ -88,6 +92,8 @@ public HttpConnectionPoolManager(ClientBootstrap clientBootstrap, SocketOptions
this.port = port;
this.useTls = HTTPS.equals(uri.getScheme());
this.maxConnections = maxConnections;
// TODO: We will need to create more buffers once we allow >1 HttpStream per HttpConnection (such as for Http/2)
this.bufferPool = own(new CrtBufferPool(maxConnections, bufferSize));

acquire(httpConnectionManagerNew(this,
clientBootstrap.native_ptr(),
Expand Down Expand Up @@ -116,6 +122,10 @@ private void onConnectionAcquired(long connection, int errorCode) {
connectionRequest.complete(conn);
}

protected CompletableFuture<CrtByteBuffer> acquireBuffer() {
return this.bufferPool.acquireBuffer();
}

/**
* Request a HttpConnection from the Connection Pool.
* @return A Future for a HttpConnection that will be completed when a connection is acquired.
Expand Down Expand Up @@ -167,6 +177,7 @@ private void onShutdownComplete() {
/**
* Closes this Connection Pool and any pending Connection Acquisitions
*/
@Override
public void close() {
isClosed.set(true);
closePendingAcquisitions(new RuntimeException("Connection Manager Closing. Closing Pending Connection Acquisitions."));
Expand All @@ -189,6 +200,8 @@ public void close() {
} catch (Exception e) {
throw new RuntimeException(e);
}

super.close();
}

/*******************************************************************************
Expand Down

This file was deleted.

22 changes: 17 additions & 5 deletions src/main/java/software/amazon/awssdk/crt/http/HttpStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package software.amazon.awssdk.crt.http;

import software.amazon.awssdk.crt.CrtResource;
import software.amazon.awssdk.crt.io.CrtByteBuffer;

/**
* An HttpStream represents a single Http Request/Response pair within a HttpConnection, and wraps the native resources
Expand All @@ -24,15 +25,21 @@
* Can be used to update the Window size, or to abort the stream early in the middle of sending/receiving Http Bodies.
*/
public class HttpStream extends CrtResource {
private CrtByteBuffer streamBuffer;

/* Native code can call this constructor */
protected HttpStream(long ptr) {
/* Native code will call this constructor during HttpConnection.makeRequest() */
protected HttpStream(CrtByteBuffer streamBuffer, long ptr) {
this.streamBuffer = streamBuffer;
acquire(ptr);
}

@Override
public void close() {
httpStreamRelease(release());
if (!isNull()) {
streamBuffer.releaseBackToPool();
streamBuffer = null;
httpStreamRelease(release());
}
}

/**
Expand All @@ -48,15 +55,20 @@ public void incrementWindow(int windowSize) {
if (windowSize < 0) {
throw new IllegalArgumentException("windowSize must be >= 0. Actual value: " + windowSize);
}
httpStreamIncrementWindow(native_ptr(), windowSize);
if (!isNull()) {
httpStreamIncrementWindow(native_ptr(), windowSize);
}
}

/**
* Retrieves the Http Response Status Code
* @return The Http Response Status Code
*/
public int getResponseStatusCode() {
return httpStreamGetResponseStatusCode(native_ptr());
if (!isNull()) {
return httpStreamGetResponseStatusCode(native_ptr());
}
throw new IllegalStateException("Can't get Status Code on Closed Stream");
}

private static native void httpStreamRelease(long http_stream);
Expand Down
98 changes: 98 additions & 0 deletions src/main/java/software/amazon/awssdk/crt/io/CrtBufferPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package software.amazon.awssdk.crt.io;

import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import software.amazon.awssdk.crt.CrtResource;


/**
* A Pool of CrtByteBuffers
*/
public class CrtBufferPool extends CrtResource {
private final int numBuffers;
private final int bufferSize;

private final Queue<CrtByteBuffer> idleBuffers = new ConcurrentLinkedQueue<>();
private final Queue<CompletableFuture<CrtByteBuffer>> bufferAcquisitionRequests = new ConcurrentLinkedQueue<>();
private final AtomicBoolean isClosed = new AtomicBoolean(false);

public CrtBufferPool(int numBuffers, int bufferSize) {
this.numBuffers = numBuffers;
this.bufferSize = bufferSize;

for (int i = 0; i < numBuffers; i++) {
idleBuffers.add(own(CrtByteBuffer.alloc(bufferSize).withPool(this)));
}
}

private synchronized void completeFutureIfPossible() {
// Peek at the front of each Queue
CompletableFuture<CrtByteBuffer> bufferRequest = bufferAcquisitionRequests.peek();
CrtByteBuffer idleBuffer = idleBuffers.peek();

// If either is null, do nothing.
if (bufferRequest == null || idleBuffer == null) {
return;
}

// If both are present, remove each from the front of the queue
bufferAcquisitionRequests.remove();
idleBuffers.remove();

// And complete the request.
bufferRequest.complete(idleBuffer);
}

/**
* Acquires a CrtByteBuffer from this Buffer Pool. When the object using this buffer is done with it,
* the buffer should be released back into this Pool.
* @return A Future for a CrtByteBuffer
*/
public CompletableFuture<CrtByteBuffer> acquireBuffer() {
if (isClosed.get()) {
throw new IllegalStateException("CrtBufferPool has been closed, can't acquire new Buffers");
}

CompletableFuture<CrtByteBuffer> bufferRequest = new CompletableFuture<>();

// Add bufferRequest to end of the queue
bufferAcquisitionRequests.add(bufferRequest);


completeFutureIfPossible();
return bufferRequest;
}

public void releaseBuffer(CrtByteBuffer idleBuffer) {
idleBuffer.wipe();
idleBuffers.add(idleBuffer);

// Add idleBuffer to end of the queue
completeFutureIfPossible();
}

private void closePendingAcquisitions(Throwable throwable) {
while (bufferAcquisitionRequests.size() > 0) {
// Remove and complete future from connectionAcquisitionRequests Queue
CompletableFuture<CrtByteBuffer> future = bufferAcquisitionRequests.poll();
if (future != null) {
future.completeExceptionally(throwable);
}
}
}

@Override
public void close() {
isClosed.set(true);

closePendingAcquisitions(new RuntimeException("CrtBufferPool is Closing. Closing Pending Buffer Acquisitions."));

if (idleBuffers.size() != numBuffers) {
throw new IllegalStateException("Can't close CrtBufferPool yet since some buffers are still in use");
}

super.close();
}
}
Loading

0 comments on commit fde0b0f

Please sign in to comment.