Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: http/http2 support transparent info #15

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* Tencent is pleased to support the open source community by making tRPC available.
*
* Copyright (C) 2023 THL A29 Limited, a Tencent company.
* Copyright (C) 2023 THL A29 Limited, a Tencent company.
* All rights reserved.
*
* If you have downloaded a copy of the tRPC source code from Tencent,
Expand All @@ -15,6 +15,7 @@
import static com.tencent.trpc.core.common.Constants.DEFAULT_CLIENT_REQUEST_TIMEOUT_MS;
import static com.tencent.trpc.proto.http.common.HttpConstants.CONNECTION_REQUEST_TIMEOUT;

import autovalue.shaded.com.google.common.common.base.Objects;
import com.tencent.trpc.core.common.config.BackendConfig;
import com.tencent.trpc.core.common.config.ConsumerConfig;
import com.tencent.trpc.core.common.config.ProtocolConfig;
Expand All @@ -25,6 +26,9 @@
import com.tencent.trpc.core.rpc.Response;
import com.tencent.trpc.core.utils.RpcUtils;
import com.tencent.trpc.proto.http.common.HttpConstants;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
Expand Down Expand Up @@ -89,6 +93,15 @@ private Response handleResponse(Request request, SimpleHttpResponse simpleHttpRe
throw TRpcException
.newBizException(statusCode, simpleHttpResponse.getReasonPhrase());
}
// handle http header
// Parse the data passed through from the server to the client.
// In the tRPC protocol, the value of the attachment is stored and used as a byte array to maintain consistency.
Map<String, Object> respAttachments = new HashMap<>();
for (Header header : simpleHttpResponse.getHeaders()) {
String name = header.getName();
String value = header.getValue();
respAttachments.put(name, value.getBytes(StandardCharsets.UTF_8));
}

Header contentLengthHdr = simpleHttpResponse.getFirstHeader(HttpHeaders.CONTENT_LENGTH);
if (contentLengthHdr != null) {
Expand All @@ -97,7 +110,9 @@ private Response handleResponse(Request request, SimpleHttpResponse simpleHttpRe
// but other HTTP implementations may not return it. Therefore,
// no strict validation is performed here.
if (contentLength == 0) {
return RpcUtils.newResponse(request, null, null);
Response response = RpcUtils.newResponse(request, null, null);
response.setAttachments(respAttachments);
return response;
}
}

Expand All @@ -106,7 +121,9 @@ private Response handleResponse(Request request, SimpleHttpResponse simpleHttpRe
request.getInvocation().getRpcMethodInfo().getActualReturnType(),
simpleHttpResponse.getBodyText());

return RpcUtils.newResponse(request, value, null);
Response response = RpcUtils.newResponse(request, value, null);
response.setAttachments(respAttachments);
return response;
}

/**
Expand Down Expand Up @@ -187,6 +204,17 @@ private SimpleHttpRequest buildRequest(Request request, int requestTimeout) thro
if (jsonString != null) {
simpleHttpRequest.setBody(jsonString, ContentType.APPLICATION_JSON);
}
// Set custom business headers, consistent with the TRPC protocol, only process String and byte[]
request.getAttachments().forEach((k, v) -> {
if (Objects.equal(k, HttpHeaders.TRANSFER_ENCODING) || Objects.equal(k, HttpHeaders.CONTENT_LENGTH)) {
return;
}
if (v instanceof String) {
simpleHttpRequest.setHeader(k, String.valueOf(v));
} else if (v instanceof byte[]) {
simpleHttpRequest.setHeader(k, new String((byte[]) v));
}
});
return simpleHttpRequest;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* Tencent is pleased to support the open source community by making tRPC available.
*
* Copyright (C) 2023 THL A29 Limited, a Tencent company.
* Copyright (C) 2023 THL A29 Limited, a Tencent company.
* All rights reserved.
*
* If you have downloaded a copy of the tRPC source code from Tencent,
Expand All @@ -27,9 +27,12 @@
import com.tencent.trpc.proto.http.common.HttpConstants;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.commons.io.IOUtils;
import org.apache.http.Header;
import org.apache.http.HeaderElement;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpRequest;
import org.apache.http.HttpStatus;
Expand Down Expand Up @@ -84,24 +87,39 @@ private Response handleResponse(Request request, CloseableHttpResponse httpRespo
throw TRpcException.newBizException(statusCode,
httpResponse.getStatusLine().getReasonPhrase());
}
// handle http header
// Parse the data passed through from the server to the client.
// In the tRPC protocol, the value of the attachment is stored and used as a byte array to maintain consistency.
Map<String, Object> respAttachments = new HashMap<>();
for (Header header : httpResponse.getAllHeaders()) {
String name = header.getName();
for (HeaderElement element : header.getElements()) {
String value = element.getName();
respAttachments.put(name, value.getBytes(StandardCharsets.UTF_8));
}
}

Header contentLengthHdr = httpResponse.getFirstHeader(HttpHeaders.CONTENT_LENGTH);

if (contentLengthHdr != null) {
int contentLength = Integer.parseInt(contentLengthHdr.getValue().trim());
// NOTE: By default, the HTTP implementation must return the content length.
// However, other HTTP implementations may not return the content length,
// so strong validation is not performed here.
if (contentLength == 0) {
return RpcUtils.newResponse(request, null, null);
Response response = RpcUtils.newResponse(request, null, null);
response.setAttachments(respAttachments);
return response;
}
}

// Decoded response result.
InputStream in = httpResponse.getEntity().getContent();
String decodeIn = IOUtils.toString(in, StandardCharsets.UTF_8);
Object value = decodeFromJson(
request.getInvocation().getRpcMethodInfo().getActualReturnType(), decodeIn);
return RpcUtils.newResponse(request, value, null);
Response response = RpcUtils.newResponse(request, value, null);
response.setAttachments(respAttachments);
return response;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* Tencent is pleased to support the open source community by making tRPC available.
*
* Copyright (C) 2023 THL A29 Limited, a Tencent company.
* Copyright (C) 2023 THL A29 Limited, a Tencent company.
* All rights reserved.
*
* If you have downloaded a copy of the tRPC source code from Tencent,
Expand All @@ -17,27 +17,18 @@
import com.tencent.trpc.core.serialization.spi.Serialization;
import com.tencent.trpc.core.serialization.support.JSONSerialization;
import com.tencent.trpc.core.serialization.support.PBSerialization;
import com.tencent.trpc.core.utils.JsonUtils;
import com.tencent.trpc.core.utils.ProtoJsonConverter;
import com.tencent.trpc.proto.http.util.StreamUtils;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.http.HttpHeaders;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.http.HttpHeaders;
import org.springframework.cglib.beans.BeanMap;

/**
* HTTP encoding and decoding utility class, also used in the Spring MVC module.
Expand Down Expand Up @@ -216,7 +207,8 @@ public void writeHttpResponse(HttpServletResponse response, Response result) thr
data = jsonSerialization.serialize(value);
}
}

// Encapsulate the data passed through from the server to the client in the HTTP response header.
fillResponseHeaderWithAttachments(response, result);
if (data != null) {
response.setHeader(HttpHeaders.CONTENT_TYPE, HttpConstants.CONTENT_TYPE_JSON);
response.setContentLength(data.length);
Expand All @@ -225,4 +217,19 @@ public void writeHttpResponse(HttpServletResponse response, Response result) thr
}
}

private void fillResponseHeaderWithAttachments(HttpServletResponse response, Response result) throws IOException {
Map<String, Object> attachments = result.getAttachments();
if (attachments == null || attachments.isEmpty()) {
return;
}
// Set custom business headers, consistent with the TRPC protocol, only process String and byte[]
attachments.forEach((name, value) -> {
if (value instanceof String) {
response.setHeader(name, String.valueOf(value));
} else if (value instanceof byte[]) {
response.setHeader(name, new String((byte[]) value));
}
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Tencent is pleased to support the open source community by making tRPC available.
*
* Copyright (C) 2023 THL A29 Limited, a Tencent company.
* All rights reserved.
*
* If you have downloaded a copy of the tRPC source code from Tencent,
* please note that tRPC source code is licensed under the Apache 2.0 License,
* A copy of the Apache 2.0 License can be found in the LICENSE file.
*/

package com.tencent.trpc.proto.http.common;

import static com.tencent.trpc.proto.http.common.HttpConstants.HTTP_SCHEME;
import static com.tencent.trpc.proto.http.constant.Constant.TEST_BYTES_REQ_KEY;
import static com.tencent.trpc.proto.http.constant.Constant.TEST_BYTES_REQ_VALUE;
import static com.tencent.trpc.proto.http.constant.Constant.TEST_BYTES_RSP_KEY;
import static com.tencent.trpc.proto.http.constant.Constant.TEST_BYTES_RSP_VALUE;
import static com.tencent.trpc.proto.http.constant.Constant.TEST_MESSAGE;
import static com.tencent.trpc.proto.http.constant.Constant.TEST_RSP_MESSAGE;
import static com.tencent.trpc.proto.http.constant.Constant.TEST_STRING_REQ_KEY;
import static com.tencent.trpc.proto.http.constant.Constant.TEST_STRING_REQ_VALUE;
import static com.tencent.trpc.proto.http.constant.Constant.TEST_STRING_RSP_KEY;
import static com.tencent.trpc.proto.http.constant.Constant.TEST_STRING_RSP_VALUE;
import static com.tencent.trpc.transport.http.common.Constants.HTTP2_SCHEME;

import com.tencent.trpc.core.common.ConfigManager;
import com.tencent.trpc.core.common.config.BackendConfig;
import com.tencent.trpc.core.common.config.ConsumerConfig;
import com.tencent.trpc.core.common.config.ProviderConfig;
import com.tencent.trpc.core.common.config.ServerConfig;
import com.tencent.trpc.core.common.config.ServiceConfig;
import com.tencent.trpc.core.rpc.RpcClientContext;
import com.tencent.trpc.core.utils.NetUtils;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import org.apache.http.HttpHeaders;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import tests.service.GreeterService;
import tests.service.HelloRequestProtocol.HelloRequest;
import tests.service.HelloRequestProtocol.HelloResponse;
import tests.service.impl1.GreeterServiceImpl2;

public class HttpTransparentInfoTest {

private static ServerConfig serverConfig;

private static int TRPC_JAVA_TEST_HTTP2_PORT;

private static int TRPC_JAVA_TEST_HTTP_PORT;

@BeforeClass
public static void startHttpServer() {
ConfigManager.stopTest();
ConfigManager.startTest();

ProviderConfig<GreeterService> providerConfig = new ProviderConfig<>();
providerConfig.setServiceInterface(GreeterService.class);
providerConfig.setRef(new GreeterServiceImpl2());
HashMap<String, ServiceConfig> providers = new HashMap<>();

TRPC_JAVA_TEST_HTTP2_PORT = NetUtils.getAvailablePort();
ServiceConfig serviceConfig1 = getServiceConfig(providerConfig, "trpc.java.test.http2",
NetUtils.LOCAL_HOST, TRPC_JAVA_TEST_HTTP2_PORT, HTTP2_SCHEME, "jetty");
providers.put(serviceConfig1.getName(), serviceConfig1);

TRPC_JAVA_TEST_HTTP_PORT = NetUtils.getAvailablePort();
ServiceConfig serviceConfig2 = getServiceConfig(providerConfig, "trpc.java.test.http",
NetUtils.LOCAL_HOST, TRPC_JAVA_TEST_HTTP_PORT, HTTP_SCHEME, "jetty");
providers.put(serviceConfig2.getName(), serviceConfig2);

ServerConfig sc = new ServerConfig();
sc.setServiceMap(providers);
sc.setApp("http-test-app");
sc.setLocalIp("127.0.0.1");
sc.init();

serverConfig = sc;
}

private static ServiceConfig getServiceConfig(ProviderConfig gspc, String name,
String ip, int port, String protocol, String transport) {
ServiceConfig serviceConfig = new ServiceConfig();
serviceConfig.setName(name);
serviceConfig.getProviderConfigs().add(gspc);
serviceConfig.setIp(ip);
serviceConfig.setPort(port);
serviceConfig.setProtocol(protocol);
serviceConfig.setTransporter(transport);
return serviceConfig;
}

@AfterClass
public static void stopHttpServer() {
ConfigManager.stopTest();
if (serverConfig != null) {
serverConfig.stop();
serverConfig = null;
}
}

private static HelloRequest createPbRequest(String msg) {
HelloRequest.Builder builder = HelloRequest.newBuilder();
builder.setMessage(msg);
return builder.build();
}

@Test
public void testHttp2RpcClient() {
BackendConfig backendConfig = new BackendConfig();
ConsumerConfig<GreeterService> consumerConfig = new ConsumerConfig<>();
backendConfig.setName("serviceId");
backendConfig.setRequestTimeout(10000);
consumerConfig.setServiceInterface(GreeterService.class);
consumerConfig.setBackendConfig(backendConfig);
backendConfig.setNamingUrl("ip://127.0.0.1:" + TRPC_JAVA_TEST_HTTP2_PORT);
backendConfig.setKeepAlive(false);
backendConfig.setConnsPerAddr(4);
backendConfig.setProtocol(HTTP2_SCHEME);
try {
GreeterService proxy = consumerConfig.getProxy();

RpcClientContext context = new RpcClientContext();
// client tran info to server
context.getReqAttachMap().put(TEST_STRING_REQ_KEY, TEST_STRING_REQ_VALUE);
context.getReqAttachMap().put(TEST_BYTES_REQ_KEY, TEST_BYTES_REQ_VALUE);
context.getReqAttachMap().put(HttpHeaders.CONTENT_LENGTH, TEST_MESSAGE.length());

HelloResponse helloResponse = proxy.sayHello(context, createPbRequest(TEST_MESSAGE));
// get server tran info
byte[] bytesRspValue = (byte[]) context.getRspAttachMap().get(TEST_BYTES_RSP_KEY);
Assert.assertArrayEquals(bytesRspValue, TEST_BYTES_RSP_VALUE);

byte[] stringRspValue = (byte[]) context.getRspAttachMap().get(TEST_STRING_RSP_KEY);
Assert.assertEquals(new String(stringRspValue, StandardCharsets.UTF_8), TEST_STRING_RSP_VALUE);

Assert.assertNotNull(helloResponse);
String rspMessage = helloResponse.getMessage();
Assert.assertEquals(rspMessage, TEST_RSP_MESSAGE);
} finally {
backendConfig.stop();
}
}

@Test
public void testHttpRpcClient() {
BackendConfig backendConfig = new BackendConfig();
ConsumerConfig<GreeterService> consumerConfig = new ConsumerConfig<>();
backendConfig.setName("serviceId");
backendConfig.setRequestTimeout(10000);
consumerConfig.setServiceInterface(GreeterService.class);
consumerConfig.setBackendConfig(backendConfig);
backendConfig.setNamingUrl("ip://127.0.0.1:" + TRPC_JAVA_TEST_HTTP_PORT);
backendConfig.setKeepAlive(false);
backendConfig.setConnsPerAddr(4);
backendConfig.setProtocol(HTTP_SCHEME);
try {
GreeterService proxy = consumerConfig.getProxy();

RpcClientContext context = new RpcClientContext();
context.getReqAttachMap().put(TEST_STRING_REQ_KEY, TEST_STRING_REQ_VALUE);
context.getReqAttachMap().put(TEST_BYTES_REQ_KEY, TEST_BYTES_REQ_VALUE);

HelloResponse helloResponse = proxy.sayHello(context, createPbRequest(TEST_MESSAGE));
// get server tran info
byte[] bytesRspValue = (byte[]) context.getRspAttachMap().get(TEST_BYTES_RSP_KEY);
Assert.assertArrayEquals(bytesRspValue, TEST_BYTES_RSP_VALUE);

byte[] stringRspValue = (byte[]) context.getRspAttachMap().get(TEST_STRING_RSP_KEY);
Assert.assertEquals(new String(stringRspValue, StandardCharsets.UTF_8), TEST_STRING_RSP_VALUE);

Assert.assertNotNull(helloResponse);
String rspMessage = helloResponse.getMessage();
Assert.assertEquals(rspMessage, TEST_RSP_MESSAGE);
} finally {
backendConfig.stop();
}
}
}
Loading