Skip to content

Commit

Permalink
Feat xdsroute parse (#10956)
Browse files Browse the repository at this point in the history
Co-authored-by: Albumen Kevin <jhq0812@gmail.com>
  • Loading branch information
haoyann and AlbumenJ authored Dec 9, 2022
1 parent 5c082ea commit f95e659
Show file tree
Hide file tree
Showing 24 changed files with 2,374 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.cluster.router.xds;

import org.apache.dubbo.registry.xds.util.protocol.message.Endpoint;

import java.util.Set;


public interface EdsEndpointListener {

void onEndPointChange(String cluster, Set<Endpoint> endpoints);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.cluster.router.xds;

import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.registry.xds.util.protocol.message.Endpoint;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

public class EdsEndpointManager {

private static final ConcurrentHashMap<String, Set<EdsEndpointListener>> ENDPOINT_LISTENERS = new ConcurrentHashMap<>();

private static final ConcurrentHashMap<String, Set<Endpoint>> ENDPOINT_DATA_CACHE = new ConcurrentHashMap<>();

private static final ConcurrentHashMap<String, Consumer<Set<Endpoint>>> EDS_LISTENERS = new ConcurrentHashMap<>();


public EdsEndpointManager() {
}

public synchronized void subscribeEds(String cluster, EdsEndpointListener listener) {

Set<EdsEndpointListener> listeners = ENDPOINT_LISTENERS.computeIfAbsent(cluster, key ->
new ConcurrentHashSet<>()
);
if (CollectionUtils.isEmpty(listeners)) {
doSubscribeEds(cluster);
}
listeners.add(listener);

if (ENDPOINT_DATA_CACHE.containsKey(cluster)) {
listener.onEndPointChange(cluster, ENDPOINT_DATA_CACHE.get(cluster));
}
}

private void doSubscribeEds(String cluster) {
EDS_LISTENERS.computeIfAbsent(cluster, key -> endpoints -> {
notifyEndpointChange(cluster, endpoints);
});
Consumer<Set<Endpoint>> consumer = EDS_LISTENERS.get(cluster);

//todo control plane subscribe eds
}

public synchronized void unSubscribeEds(String cluster, EdsEndpointListener listener) {
Set<EdsEndpointListener> listeners = ENDPOINT_LISTENERS.get(cluster);
if (CollectionUtils.isEmpty(listeners)) {
return;
}
listeners.remove(listener);
if (listeners.isEmpty()) {
ENDPOINT_LISTENERS.remove(cluster);
doUnsubscribeEds(cluster);
}
}

private void doUnsubscribeEds(String cluster) {
Consumer<Set<Endpoint>> consumer = EDS_LISTENERS.remove(cluster);

if (consumer != null) {

//todo control plane unsubscribe eds
}
ENDPOINT_DATA_CACHE.remove(cluster);
}


public void notifyEndpointChange(String cluster, Set<Endpoint> endpoints) {

ENDPOINT_DATA_CACHE.put(cluster, endpoints);

Set<EdsEndpointListener> listeners = ENDPOINT_LISTENERS.get(cluster);
if (CollectionUtils.isEmpty(listeners)) {
return;
}
for (EdsEndpointListener listener : listeners) {
listener.onEndPointChange(cluster, endpoints);
}
}

// for test
static ConcurrentHashMap<String, Set<EdsEndpointListener>> getEndpointListeners() {
return ENDPOINT_LISTENERS;
}

// for test
static ConcurrentHashMap<String, Set<Endpoint>> getEndpointDataCache() {
return ENDPOINT_DATA_CACHE;
}

// for test
static ConcurrentHashMap<String, Consumer<Set<Endpoint>>> getEdsListeners() {
return EDS_LISTENERS;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.cluster.router.xds;

import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.rpc.cluster.router.xds.rule.XdsRouteRule;

import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

public class RdsRouteRuleManager {


private static final ConcurrentHashMap<String, Set<XdsRouteRuleListener>> RULE_LISTENERS = new ConcurrentHashMap<>();

private static final ConcurrentHashMap<String, List<XdsRouteRule>> ROUTE_DATA_CACHE = new ConcurrentHashMap<>();

private static final ConcurrentHashMap<String, RdsVirtualHostListener> RDS_LISTENERS = new ConcurrentHashMap<>();

public RdsRouteRuleManager() {
}

public synchronized void subscribeRds(String domain, XdsRouteRuleListener listener) {

Set<XdsRouteRuleListener> listeners = RULE_LISTENERS.computeIfAbsent(domain, key ->
new ConcurrentHashSet<>()
);
if (CollectionUtils.isEmpty(listeners)) {
doSubscribeRds(domain);
}
listeners.add(listener);

if (ROUTE_DATA_CACHE.containsKey(domain)) {
listener.onRuleChange(domain, ROUTE_DATA_CACHE.get(domain));
}
}

private void doSubscribeRds(String domain) {
RDS_LISTENERS.computeIfAbsent(domain, key -> new RdsVirtualHostListener(domain, this));
RdsVirtualHostListener rdsVirtualHostListener = RDS_LISTENERS.get(domain);
// todo request control plane subscribe rds
}

public synchronized void unSubscribeRds(String domain, XdsRouteRuleListener listener) {
Set<XdsRouteRuleListener> listeners = RULE_LISTENERS.get(domain);
if (CollectionUtils.isEmpty(listeners)) {
return;
}
listeners.remove(listener);
if (listeners.isEmpty()) {
RULE_LISTENERS.remove(domain);
doUnsubscribeRds(domain);
}
}

private void doUnsubscribeRds(String domain) {
RdsVirtualHostListener rdsVirtualHostListener = RDS_LISTENERS.remove(domain);

if (rdsVirtualHostListener != null) {

// todo request control plane unsubscribe rds
}
ROUTE_DATA_CACHE.remove(domain);
}


public void notifyRuleChange(String domain, List<XdsRouteRule> xdsRouteRules) {

ROUTE_DATA_CACHE.put(domain, xdsRouteRules);

Set<XdsRouteRuleListener> listeners = RULE_LISTENERS.get(domain);
if (CollectionUtils.isEmpty(listeners)) {
return;
}
boolean empty = CollectionUtils.isEmpty(xdsRouteRules);
for (XdsRouteRuleListener listener : listeners) {
if (empty) {
listener.clearRule(domain);
} else {
listener.onRuleChange(domain, xdsRouteRules);
}
}
}

// for test
static ConcurrentHashMap<String, Set<XdsRouteRuleListener>> getRuleListeners() {
return RULE_LISTENERS;
}

// for test
static ConcurrentHashMap<String, List<XdsRouteRule>> getRouteDataCache() {
return ROUTE_DATA_CACHE;
}

// for test
static ConcurrentHashMap<String, RdsVirtualHostListener> getRdsListeners() {
return RDS_LISTENERS;
}

}
Loading

0 comments on commit f95e659

Please sign in to comment.