Skip to content

Commit

Permalink
refactor: client
Browse files Browse the repository at this point in the history
  • Loading branch information
ngyewch committed Jul 24, 2024
1 parent 732cc4d commit 067f0c8
Showing 1 changed file with 48 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@ protected AbstractService(String baseUri, MediaType contentType) {
}

protected void doRequest(String path, Message input, Message.Builder outputBuilder) {
final Single<Message.Builder> requester;
if (contentType.equals(MediaTypes.PROTOBUF_MEDIA_TYPE)) {
requester = doProtobufRequest(path, input, outputBuilder);
} else if (contentType.equals(MediaTypes.JSON_MEDIA_TYPE)) {
requester = doJsonRequest(path, input, outputBuilder);
} else {
throw new IllegalArgumentException("unsupported content type");
}
try {
if (contentType.equals(MediaTypes.PROTOBUF_MEDIA_TYPE)) {
doProtobufRequest(path, input, outputBuilder).get();
} else if (contentType.equals(MediaTypes.JSON_MEDIA_TYPE)) {
doJsonRequest(path, input, outputBuilder).get();
} else {
throw new IllegalArgumentException("unsupported content type");
}
requester.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
Expand All @@ -52,6 +54,45 @@ protected void doRequest(String path, Message input, Message.Builder outputBuild
}
}

private Single<Message.Builder> doProtobufRequest(
String path, Message input, Message.Builder outputBuilder) {
return webClient
.post()
.path(path)
.contentType(MediaTypes.PROTOBUF_MEDIA_TYPE)
.submit(input.toByteArray())
.flatMap(this::handleNonSuccessfulResponse)
.first()
.flatMap(
webClientResponse -> expectMediaType(webClientResponse, MediaTypes.PROTOBUF_MEDIA_TYPE))
.first()
.flatMap(webClientResponse -> webClientResponse.content().as(byte[].class))
.first()
.map(bytes -> mergeProtobuf(outputBuilder, bytes));
}

private Single<Message.Builder> doJsonRequest(
String path, Message input, Message.Builder outputBuilder) {
try {
final String requestJson = JsonFormat.printer().print(input);
return webClient
.post()
.path(path)
.contentType(MediaTypes.JSON_MEDIA_TYPE)
.submit(requestJson)
.flatMap(this::handleNonSuccessfulResponse)
.first()
.flatMap(
webClientResponse -> expectMediaType(webClientResponse, MediaTypes.JSON_MEDIA_TYPE))
.first()
.flatMap(webClientResponse -> webClientResponse.content().as(String.class))
.first()
.map(responseJson -> mergeProtobufJson(outputBuilder, responseJson));
} catch (InvalidProtocolBufferException e) {
throw new TwirpException(TwirpErrorCode.INTERNAL, e);
}
}

private Single<WebClientResponse> handleNonSuccessfulResponse(
WebClientResponse webClientResponse) {
if (webClientResponse.status().code() < Http.Status.MOVED_PERMANENTLY_301.code()) {
Expand Down Expand Up @@ -109,43 +150,4 @@ private Message.Builder mergeProtobuf(Message.Builder messageBuilder, byte[] dat
throw new TwirpException(TwirpErrorCode.MALFORMED, e, true);
}
}

private Single<Message.Builder> doProtobufRequest(
String path, Message input, Message.Builder outputBuilder) {
return webClient
.post()
.path(path)
.contentType(MediaTypes.PROTOBUF_MEDIA_TYPE)
.submit(input.toByteArray())
.flatMap(this::handleNonSuccessfulResponse)
.first()
.flatMap(
webClientResponse -> expectMediaType(webClientResponse, MediaTypes.PROTOBUF_MEDIA_TYPE))
.first()
.flatMap(webClientResponse -> webClientResponse.content().as(byte[].class))
.first()
.map(bytes -> mergeProtobuf(outputBuilder, bytes));
}

private Single<Message.Builder> doJsonRequest(
String path, Message input, Message.Builder outputBuilder) {
try {
final String requestJson = JsonFormat.printer().print(input);
return webClient
.post()
.path(path)
.contentType(MediaTypes.JSON_MEDIA_TYPE)
.submit(requestJson)
.flatMap(this::handleNonSuccessfulResponse)
.first()
.flatMap(
webClientResponse -> expectMediaType(webClientResponse, MediaTypes.JSON_MEDIA_TYPE))
.first()
.flatMap(webClientResponse -> webClientResponse.content().as(String.class))
.first()
.map(responseJson -> mergeProtobufJson(outputBuilder, responseJson));
} catch (InvalidProtocolBufferException e) {
throw new TwirpException(TwirpErrorCode.INTERNAL, e);
}
}
}

0 comments on commit 067f0c8

Please sign in to comment.