Skip to content

Commit

Permalink
Make the Java GrpcStreamingTransport timeout configurable.
Browse files Browse the repository at this point in the history
Change-Id: I30c335f6ff9a51fcd525bf7f616eee8f54c9218b
  • Loading branch information
jblebrun committed Aug 9, 2024
1 parent 0d0cc7a commit 23b12cb
Showing 1 changed file with 44 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
public class GrpcStreamingTransport implements EvidenceProvider, Transport {
private static final Logger logger = Logger.getLogger(GrpcStreamingTransport.class.getName());

private static final Duration TIMEOUT = Duration.ofSeconds(10);
public static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(10);

private final Duration timeout;

/**
* QueueingStreamObserver with a queue containing responses received from the
Expand All @@ -48,13 +50,31 @@ public class GrpcStreamingTransport implements EvidenceProvider, Transport {
/**
* Creates an instance of {@code GrpcStreamingTransport}.
*
* The response timeout will be the value of {@code DEFAULT_TIMEOUT}.
*
* @param stream a method reference to a gRPC client streaming method with the
* appropriate request
* and response types.
*/
public GrpcStreamingTransport(
Function<StreamObserver<ResponseWrapper>, StreamObserver<RequestWrapper>> stream) {
this(stream, DEFAULT_TIMEOUT);
}

/**
* Creates an instance of {@code GrpcStreamingTransport}.
*
* @param stream a method reference to a gRPC client streaming method with the
* appropriate request
* and response types.
* @param timeout The length of time to wait for a {@code InvokeResponse}
* after sending an @{code InvokeRequest}.
*/
public GrpcStreamingTransport(
Function<StreamObserver<ResponseWrapper>, StreamObserver<RequestWrapper>> stream,
Duration timeout) {
this.requestObserver = stream.apply(responseObserver);
this.timeout = timeout;
}

/**
Expand All @@ -69,24 +89,11 @@ public Result<EndorsedEvidence, String> getEndorsedEvidence() {
.setGetEndorsedEvidenceRequest(GetEndorsedEvidenceRequest.newBuilder())
.build();
logger.log(Level.INFO, "sending get endorsed evidence request: " + requestWrapper);
this.requestObserver.onNext(requestWrapper);

ResponseWrapper responseWrapper;
try {
// TODO(#3644): Add retry for client messages.
responseWrapper = this.responseObserver.poll(TIMEOUT);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return Result.error("Thread interrupted while waiting for a response");
}
if (responseWrapper == null) {
return Result.error("No response message received");
}

logger.log(Level.INFO, "received get endorsed evidence response: " + responseWrapper);
GetEndorsedEvidenceResponse response = responseWrapper.getGetEndorsedEvidenceResponse();

return Result.success(response.getEndorsedEvidence());
return receiveResponse(requestWrapper).map(responseWrapper -> {
logger.log(Level.INFO, "received get endorsed evidence response: " + responseWrapper);
return responseWrapper.getGetEndorsedEvidenceResponse().getEndorsedEvidence();
});
}

/**
Expand All @@ -102,28 +109,31 @@ public Result<EncryptedResponse, String> invoke(EncryptedRequest encryptedReques
.setInvokeRequest(InvokeRequest.newBuilder().setEncryptedRequest(encryptedRequest))
.build();
logger.log(Level.INFO, "sending invoke request: " + requestWrapper);
this.requestObserver.onNext(requestWrapper);

ResponseWrapper responseWrapper;
return receiveResponse(requestWrapper).map(responseWrapper -> {
logger.log(Level.INFO, "received invoke response: " + responseWrapper);
return responseWrapper.getInvokeResponse().getEncryptedResponse();
});
}

@Override
public void close() throws Exception {
this.requestObserver.onCompleted();
}

private Result<ResponseWrapper, String> receiveResponse(RequestWrapper requestWrapper) {
this.requestObserver.onNext(requestWrapper);
try {
// TODO(#3644): Add retry for client messages.
responseWrapper = this.responseObserver.poll(TIMEOUT);
ResponseWrapper responseWrapper = this.responseObserver.poll(this.timeout);
if (responseWrapper == null) {
return Result.error("No response message received within the specified timeout.");
} else {
return Result.success(responseWrapper);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return Result.error("Thread interrupted while waiting for a response");
}
if (responseWrapper == null) {
return Result.error("No response message received");
}

logger.log(Level.INFO, "received invoke response: " + responseWrapper);
EncryptedResponse encryptedResponse =
responseWrapper.getInvokeResponse().getEncryptedResponse();
return Result.success(encryptedResponse);
}

@Override
public void close() throws Exception {
this.requestObserver.onCompleted();
}
}

0 comments on commit 23b12cb

Please sign in to comment.