Skip to content

Commit

Permalink
Support TLS Grpc communication between clusters. (alibaba#11549)
Browse files Browse the repository at this point in the history
* Fix exception code error.(alibaba#10925)

* [ISSUE alibaba#11456]Add RpcClusterClientTlsConfig.java.

* [ISSUE alibaba#11456]Add cluster rpc tls config.

* [ISSUE alibaba#11456]Add RpcClusterClientTlsConfig UT.

* [ISSUE alibaba#11456]Add cluster server tls.

* [ISSUE alibaba#11456]Remove supportCommunicationTypes.

* [ISSUE alibaba#11456]Fix unit testing and indentation handling

* [ISSUE alibaba#11456]Indentation handling

* [ISSUE alibaba#11456]Fix unit test and rpc constants.

* [ISSUE alibaba#11456]Fix unit test.

* [ISSUE alibaba#11456]Optimize code.

* [ISSUE alibaba#11456]Fix check style.

* [ISSUE alibaba#11456]Add unit test.

* [ISSUE alibaba#11456]Fix check style.

* [ISSUE alibaba#11456]Update unit test.

* [ISSUE alibaba#11456]Fix unit test.

* [ISSUE alibaba#11456]Add License.

* [ISSUE alibaba#11456]Fix unit test.

* [ISSUE alibaba#11456]Fix unit test.

* [ISSUE alibaba#11456]Rename class.

* [ISSUE alibaba#11456]Optimize code.

* [ISSUE alibaba#11456]Handling indentation issues.

* [ISSUE alibaba#11456]Handling indentation issues.

* [ISSUE alibaba#11456]Handling indentation issues.

* [ISSUE alibaba#11456]Optimize code.

* [ISSUE alibaba#11456]Fix unit test.

* [ISSUE alibaba#11456]Fix unit testing and compatibility handling.

* [ISSUE alibaba#11456]Support TLS GRPC communication between clusters.

* [ISSUE alibaba#11456] Fix bugs.

* [ISSUE alibaba#11456]Fix bugs.

* [ISSUE alibaba#11456]Adjusting parameter names (compatibility considerations).

* [ISSUE alibaba#11456]Resolve conflict.

* [ISSUE alibaba#11456]Remove ProtocolNegotiatorBuilderManager and abstract ProtocolNegotiatorBuilderSingleton.

* [ISSUE alibaba#11456]Remove CommunicationType.java.

* [ISSUE alibaba#11456]Optimize code.

* [ISSUE alibaba#11456]Revert author.

* Splitting RpcTlsConfigFactory.

* Split RpcConstants.

* Divided RpcTlsConfigFactory, adjusted cluster parameters to "nacos.remote.peer.rpc.tls".

* check style.

* check style.

* unit test.
  • Loading branch information
stone-98 authored May 15, 2024
1 parent 6fe4363 commit 5169f06
Show file tree
Hide file tree
Showing 59 changed files with 2,174 additions and 969 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,29 @@

import org.junit.Test;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class RpcScheduledExecutorTest {

private static final String NAME = "test.rpc.thread";

Map<String, String> threadNameMap = new HashMap<>();
Map<String, String> threadNameMap = new ConcurrentHashMap<>();

@Test
public void testRpcScheduledExecutor() throws InterruptedException {
RpcScheduledExecutor executor = new RpcScheduledExecutor(2, NAME);
CountDownLatch latch = new CountDownLatch(2);
executor.submit(new TestRunner(1, latch));
executor.submit(new TestRunner(2, latch));
latch.await(1, TimeUnit.SECONDS);
boolean await = latch.await(1, TimeUnit.SECONDS);
assertTrue(await);
assertEquals(2, threadNameMap.size());
assertEquals(NAME + ".0", threadNameMap.get("1"));
assertEquals(NAME + ".1", threadNameMap.get("2"));
}

private class TestRunner implements Runnable {
Expand All @@ -56,13 +56,8 @@ public TestRunner(int id, CountDownLatch latch) {

@Override
public void run() {
try {
threadNameMap.put(String.valueOf(id), Thread.currentThread().getName());
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException ignored) {
} finally {
latch.countDown();
}
threadNameMap.put(String.valueOf(id), Thread.currentThread().getName());
latch.countDown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.RpcClientFactory;
import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig;
import com.alibaba.nacos.common.remote.client.RpcClientTlsConfigFactory;
import com.alibaba.nacos.common.remote.client.ServerListFactory;
import com.alibaba.nacos.common.utils.ConnLabelsUtils;
import com.alibaba.nacos.common.utils.ConvertUtils;
Expand Down Expand Up @@ -128,6 +129,8 @@ public class ClientWorker implements Closeable {
*/
private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<>(new HashMap<>());

private final DefaultLabelsCollectorManager defaultLabelsCollectorManager = new DefaultLabelsCollectorManager();

private Map<String, String> appLables = new HashMap<>();

private final ConfigFilterChainManager configFilterChainManager;
Expand Down Expand Up @@ -579,8 +582,6 @@ public boolean isHealthServer() {
return agent.isHealthServer();
}

private static DefaultLabelsCollectorManager defaultLabelsCollectorManager = new DefaultLabelsCollectorManager();

public class ConfigRpcTransportClient extends ConfigTransportClient {

Map<String, ExecutorService> multiTaskExecutor = new HashMap<>();
Expand Down Expand Up @@ -1088,18 +1089,19 @@ private boolean checkListenCache(Map<String, List<CacheData>> listenCachesMap) t

private RpcClient ensureRpcClient(String taskId) throws NacosException {
synchronized (ClientWorker.this) {

Map<String, String> labels = getLabels();
Map<String, String> newLabels = new HashMap<>(labels);
newLabels.put("taskId", taskId);
RpcClientTlsConfig clientTlsConfig = RpcClientTlsConfigFactory.getInstance()
.createSdkConfig(properties);
RpcClient rpcClient = RpcClientFactory.createClient(uuid + "_config-" + taskId, getConnectionType(),
newLabels, this.properties, RpcClientTlsConfig.properties(this.properties));
newLabels, clientTlsConfig);
if (rpcClient.isWaitInitiated()) {
initRpcClientHandler(rpcClient);
rpcClient.setTenant(getTenant());
rpcClient.start();
}

return rpcClient;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.RpcClientFactory;
import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig;
import com.alibaba.nacos.common.remote.client.RpcClientTlsConfigFactory;
import com.alibaba.nacos.common.remote.client.ServerListFactory;
import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacos.common.utils.JacksonUtils;
Expand Down Expand Up @@ -104,7 +104,7 @@ public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, Se
labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING);
labels.put(Constants.APPNAME, AppNameUtils.getAppName());
this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels,
RpcClientTlsConfig.properties(properties.asProperties()));
RpcClientTlsConfigFactory.getInstance().createSdkConfig(properties.asProperties()));
this.redoService = new NamingGrpcRedoService(this, properties);
NAMING_LOGGER.info("Create naming rpc client for uuid->{}", uuid);
start(serverListFactory, serviceInfoHolder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void before() {
any(RpcClientTlsConfig.class))).thenReturn(rpcClient);
rpcClientFactoryMockedStatic.when(
() -> RpcClientFactory.createClient(anyString(), any(ConnectionType.class), any(Map.class),
any(Properties.class), any(RpcClientTlsConfig.class))).thenReturn(rpcClient);
any(RpcClientTlsConfig.class))).thenReturn(rpcClient);
localConfigInfoProcessorMockedStatic = Mockito.mockStatic(LocalConfigInfoProcessor.class);
Properties properties = new Properties();
properties.put(PropertyKeyConst.NAMESPACE, TEST_NAMESPACE);
Expand Down Expand Up @@ -149,8 +149,8 @@ public void testAddListenerWithoutTenant() throws NacosException {
public void receiveConfigInfo(String configInfo) {
}
};
clientWorker.addListeners(dataId, group, Arrays.asList(listener));

clientWorker.addListeners(dataId, group, Collections.singletonList(listener));
List<Listener> listeners = clientWorker.getCache(dataId, group).getListeners();
Assert.assertEquals(1, listeners.size());
Assert.assertEquals(listener, listeners.get(0));
Expand Down Expand Up @@ -180,8 +180,8 @@ public void receiveConfigInfo(String configInfo) {

String dataId = "a";
String group = "b";
clientWorker.addTenantListeners(dataId, group, Arrays.asList(listener));

clientWorker.addTenantListeners(dataId, group, Collections.singletonList(listener));
List<Listener> listeners = clientWorker.getCache(dataId, group).getListeners();
Assert.assertEquals(1, listeners.size());
Assert.assertEquals(listener, listeners.get(0));
Expand All @@ -191,7 +191,7 @@ public void receiveConfigInfo(String configInfo) {
Assert.assertEquals(0, listeners.size());

String content = "d";
clientWorker.addTenantListenersWithContent(dataId, group, content, null, Arrays.asList(listener));
clientWorker.addTenantListenersWithContent(dataId, group, content, null, Collections.singletonList(listener));
listeners = clientWorker.getCache(dataId, group).getListeners();
Assert.assertEquals(1, listeners.size());
Assert.assertEquals(listener, listeners.get(0));
Expand Down Expand Up @@ -418,10 +418,10 @@ public void testHandleClientMetricsReqeust() throws Exception {
String metricValues = jsonNode.get("metricValues")
.get(ClientConfigMetricRequest.MetricsKey.build(ClientConfigMetricRequest.MetricsKey.CACHE_DATA,
GroupKey.getKeyTenant(dataId, group, tenant)).toString()).textValue();
int colonIndex = metricValues.toString().lastIndexOf(":");

int colonIndex = metricValues.lastIndexOf(":");
Assert.assertEquals(content, metricValues.substring(0, colonIndex));
Assert.assertEquals(md5, metricValues.substring(colonIndex + 1, metricValues.toString().length()));
Assert.assertEquals(md5, metricValues.substring(colonIndex + 1, metricValues.length()));

}

Expand All @@ -441,7 +441,7 @@ public void testGeConfigConfigNotFound() throws NacosException {
Mockito.when(rpcClient.request(any(ConfigQueryRequest.class), anyLong())).thenReturn(configQueryResponse);

ConfigResponse configResponse = clientWorker.getServerConfig(dataId, group, tenant, 100, true);
Assert.assertEquals(null, configResponse.getContent());
Assert.assertNull(configResponse.getContent());
localConfigInfoProcessorMockedStatic.verify(
() -> LocalConfigInfoProcessor.saveSnapshot(eq(clientWorker.getAgentName()), eq(dataId), eq(group),
eq(tenant), eq(null)), times(1));
Expand Down Expand Up @@ -476,7 +476,7 @@ public void testShutdown() throws NacosException, NoSuchFieldException, IllegalA
Properties prop = new Properties();
ConfigFilterChainManager filter = new ConfigFilterChainManager(new Properties());
ServerListManager agent = Mockito.mock(ServerListManager.class);

final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(prop);
ClientWorker clientWorker = new ClientWorker(filter, agent, nacosClientProperties);
clientWorker.shutdown();
Expand All @@ -485,8 +485,8 @@ public void testShutdown() throws NacosException, NoSuchFieldException, IllegalA
ConfigTransportClient o = (ConfigTransportClient) agent1.get(clientWorker);
Assert.assertTrue(o.executor.isShutdown());
agent1.setAccessible(false);
Assert.assertEquals(null, clientWorker.getAgentName());

Assert.assertNull(clientWorker.getAgentName());
}

@Test
Expand Down Expand Up @@ -552,13 +552,13 @@ public void receiveConfigInfo(String configInfo) {
configContext.setGroup(group);
configContext.setTenant(tenant);
ConfigChangeBatchListenResponse response = new ConfigChangeBatchListenResponse();
response.setChangedConfigs(Arrays.asList(configContext));
response.setChangedConfigs(Collections.singletonList(configContext));

RpcClient rpcClientInner = Mockito.mock(RpcClient.class);
Mockito.when(rpcClientInner.isWaitInitiated()).thenReturn(true, false);
rpcClientFactoryMockedStatic.when(
() -> RpcClientFactory.createClient(anyString(), any(ConnectionType.class), any(Map.class),
any(Properties.class), any(RpcClientTlsConfig.class))).thenReturn(rpcClientInner);
any(RpcClientTlsConfig.class))).thenReturn(rpcClientInner);
// mock listen and remove listen request
Mockito.when(rpcClientInner.request(any(ConfigBatchListenRequest.class), anyLong()))
.thenReturn(response, response);
Expand Down Expand Up @@ -620,20 +620,20 @@ public void testIsHealthServer() throws NacosException, NoSuchFieldException, Il
Properties prop = new Properties();
ConfigFilterChainManager filter = new ConfigFilterChainManager(new Properties());
ServerListManager agent = Mockito.mock(ServerListManager.class);

final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(prop);
ClientWorker clientWorker = new ClientWorker(filter, agent, nacosClientProperties);
ClientWorker.ConfigRpcTransportClient client = Mockito.mock(ClientWorker.ConfigRpcTransportClient.class);
Mockito.when(client.isHealthServer()).thenReturn(Boolean.TRUE);

Field declaredField = ClientWorker.class.getDeclaredField("agent");
declaredField.setAccessible(true);
declaredField.set(clientWorker, client);
Assert.assertEquals(true, clientWorker.isHealthServer());

Assert.assertTrue(clientWorker.isHealthServer());

Mockito.when(client.isHealthServer()).thenReturn(Boolean.FALSE);
Assert.assertEquals(false, clientWorker.isHealthServer());
assertFalse(clientWorker.isHealthServer());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,30 +121,41 @@ public static RpcClient createClient(String clientName, ConnectionType connectio
}

/**
* create a rpc client.
* Creates an RPC client for cluster communication with default thread pool settings.
*
* @param clientName client name.
* @param connectionType client type.
* @return rpc client.
* @param clientName The name of the client.
* @param connectionType The type of client connection.
* @param labels Additional labels for RPC-related attributes.
* @return An RPC client for cluster communication.
*/
public static RpcClient createClusterClient(String clientName, ConnectionType connectionType,
Map<String, String> labels) {
return createClusterClient(clientName, connectionType, null, null, labels);
}

/**
* Creates an RPC client for cluster communication with TLS configuration.
*
* @param clientName The name of the client.
* @param connectionType The type of client connection.
* @param labels Additional labels for RPC-related attributes.
* @param tlsConfig TLS configuration for secure communication.
* @return An RPC client for cluster communication with TLS configuration.
*/
public static RpcClient createClusterClient(String clientName, ConnectionType connectionType,
Map<String, String> labels, RpcClientTlsConfig tlsConfig) {
return createClusterClient(clientName, connectionType, null, null, labels, tlsConfig);
}

/**
* create a rpc client.
* Creates an RPC client for cluster communication with custom thread pool settings.
*
* @param clientName client name.
* @param connectionType client type.
* @param threadPoolCoreSize grpc thread pool core size
* @param threadPoolMaxSize grpc thread pool max size
* @return rpc client.
* @param clientName The name of the client.
* @param connectionType The type of client connection.
* @param threadPoolCoreSize The core size of the gRPC thread pool.
* @param threadPoolMaxSize The maximum size of the gRPC thread pool.
* @param labels Additional labels for RPC-related attributes.
* @return An RPC client for cluster communication with custom thread pool settings.
*/
public static RpcClient createClusterClient(String clientName, ConnectionType connectionType,
Integer threadPoolCoreSize, Integer threadPoolMaxSize, Map<String, String> labels) {
Expand All @@ -162,7 +173,6 @@ public static RpcClient createClusterClient(String clientName, ConnectionType co
* @param tlsConfig tlsConfig.
* @return
*/

public static RpcClient createClusterClient(String clientName, ConnectionType connectionType,
Integer threadPoolCoreSize, Integer threadPoolMaxSize, Map<String, String> labels,
RpcClientTlsConfig tlsConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,68 +18,10 @@

import com.alibaba.nacos.common.remote.TlsConfig;

import java.util.Properties;

/**
* gRPC config for sdk.
*
* @author githubcheng2978
*/
public class RpcClientTlsConfig extends TlsConfig {

/**
* get tls config from properties.
* @param properties Properties.
* @return tls of config.
*/
public static RpcClientTlsConfig properties(Properties properties) {
RpcClientTlsConfig tlsConfig = new RpcClientTlsConfig();
if (properties.containsKey(RpcConstants.RPC_CLIENT_TLS_ENABLE)) {
tlsConfig.setEnableTls(Boolean.parseBoolean(
properties.getProperty(RpcConstants.RPC_CLIENT_TLS_ENABLE)));
}

if (properties.containsKey(RpcConstants.RPC_CLIENT_TLS_PROVIDER)) {
tlsConfig.setSslProvider(properties.getProperty(RpcConstants.RPC_CLIENT_TLS_PROVIDER));
}

if (properties.containsKey(RpcConstants.RPC_CLIENT_MUTUAL_AUTH)) {
tlsConfig.setMutualAuthEnable(Boolean.parseBoolean(
properties.getProperty(RpcConstants.RPC_CLIENT_MUTUAL_AUTH)));
}

if (properties.containsKey(RpcConstants.RPC_CLIENT_TLS_PROTOCOLS)) {
tlsConfig.setProtocols(properties.getProperty(RpcConstants.RPC_CLIENT_TLS_PROTOCOLS));
}

if (properties.containsKey(RpcConstants.RPC_CLIENT_TLS_CIPHERS)) {
tlsConfig.setCiphers(properties.getProperty(RpcConstants.RPC_CLIENT_TLS_CIPHERS));
}

if (properties.containsKey(RpcConstants.RPC_CLIENT_TLS_TRUST_COLLECTION_CHAIN_PATH)) {
tlsConfig.setTrustCollectionCertFile(properties.getProperty(RpcConstants.RPC_CLIENT_TLS_TRUST_COLLECTION_CHAIN_PATH));
}

if (properties.containsKey(RpcConstants.RPC_CLIENT_TLS_CERT_CHAIN_PATH)) {
tlsConfig.setCertChainFile(properties.getProperty(RpcConstants.RPC_CLIENT_TLS_CERT_CHAIN_PATH));
}

if (properties.containsKey(RpcConstants.RPC_CLIENT_TLS_CERT_KEY)) {
tlsConfig.setCertPrivateKey(properties.getProperty(RpcConstants.RPC_CLIENT_TLS_CERT_KEY));
}

if (properties.containsKey(RpcConstants.RPC_CLIENT_TLS_TRUST_ALL)) {
tlsConfig.setTrustAll(Boolean.parseBoolean(properties.getProperty(RpcConstants.RPC_CLIENT_TLS_TRUST_ALL)));
}

if (properties.containsKey(RpcConstants.RPC_CLIENT_TLS_TRUST_PWD)) {
tlsConfig.setCertPrivateKeyPassword(properties.getProperty(RpcConstants.RPC_CLIENT_TLS_TRUST_PWD));
}

if (properties.containsKey(RpcConstants.RPC_CLIENT_TLS_PROVIDER)) {
tlsConfig.setSslProvider(properties.getProperty(RpcConstants.RPC_CLIENT_TLS_PROVIDER));
}
return tlsConfig;
}

}
Loading

0 comments on commit 5169f06

Please sign in to comment.