Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delta XDS - improved #166

Merged
merged 50 commits into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
9338246
delta xds
sschepens Oct 22, 2020
417efe7
fixes
sschepens Oct 22, 2020
a63a066
method naming
sschepens Oct 23, 2020
dcc5468
Building version of non-breaking changes
Apr 2, 2021
3f59306
Get integration tests passing
Apr 2, 2021
583c213
Add delta tests back
Apr 2, 2021
530814e
Rewrite V3DiscoveryServerAdsDeltaResourcesIT to have a 2 second poll …
Apr 6, 2021
4437256
Add comment to trigger PR build
Apr 6, 2021
cb43523
Get V3DiscoveryServerXdsDeltaResourcesIT passing
Apr 7, 2021
5838a4f
Update protobuf to envoy 1.16.0 (#155)
wookieJ Dec 7, 2020
9f8ca58
release: prepare release v0.1.25
envoy-bot Dec 7, 2020
a30683a
release: prepare for next development iteration
envoy-bot Dec 7, 2020
17c710a
release: prepare release v0.1.26
envoy-bot Dec 8, 2020
471fa83
release: prepare for next development iteration
envoy-bot Dec 8, 2020
5715e16
Bump nexus release plugin timeout to 20 minutes (#156)
slonka Dec 11, 2020
4d9b2c7
release: prepare release v0.1.27
envoy-bot Dec 11, 2020
443becf
release: prepare for next development iteration
envoy-bot Dec 11, 2020
6197daa
ci: fixes javadoc and jacoco plugin issues (#158)
listaction Dec 25, 2020
5af4b8f
Update api to v1 17 (#159)
Mar 1, 2021
3f0a633
Fix references to main branch after rename (#160)
slonka Mar 1, 2021
ab63a8c
release: prepare release v0.1.28
envoy-bot Mar 1, 2021
00d4d88
release: prepare for next development iteration
envoy-bot Mar 1, 2021
888dbee
Change version to 0.1.29-delta-xds-slonka-SNAPSHOT
slonka Apr 7, 2021
28d2481
Revert ads configs to main
slonka Apr 7, 2021
d31054b
Hash bytes array not string
slonka Apr 8, 2021
7941164
Remove unused import
slonka Apr 8, 2021
bac2a72
Revert snapshot name
slonka May 19, 2021
ea87f9f
Merge branch 'main' of github.com:envoyproxy/java-control-plane into …
slonka May 25, 2021
e6e22eb
Remove respondDeltaTracked since it's not used anywhere
slonka May 25, 2021
93293e3
Delta xds non breaking hash bytes refactor (#181)
Ferdudas97 Oct 21, 2021
4321240
Change version to 0.1.29-delta-xds-slonka-SNAPSHOT
Ferdudas97 Nov 19, 2021
15a4247
DEPLOY_BRANCH added for snapshot deploy
Ferdudas97 Nov 22, 2021
63eb59f
Merge remote-tracking branch 'origin/main' into delta-xds-non-breakin…
Ferdudas97 Nov 24, 2021
ae47beb
Merge remote-tracking branch 'origin/main' into delta-xds-non-breakin…
Ferdudas97 Jan 31, 2022
11680aa
changes after merge master with new envoy api
Ferdudas97 Jan 31, 2022
18957d4
Merge remote-tracking branch 'origin/main' into delta-xds-non-breakin…
Ferdudas97 Feb 4, 2022
2405bd1
make CacheStatusInfoAggregator public
Ferdudas97 Feb 4, 2022
5c734c2
Merge remote-tracking branch 'origin/main' into delta-xds-non-breakin…
Ferdudas97 Feb 4, 2022
2ca8d94
make GroupCacheStatusInfo and MutableStatusInfo public
Ferdudas97 Feb 10, 2022
3bb76ee
refactor SimpleCache
Ferdudas97 Feb 10, 2022
1696cd5
Merge remote-tracking branch 'origin/main' into delta-xds-non-breakin…
Ferdudas97 Feb 10, 2022
3087413
create resources map only once
Ferdudas97 Feb 18, 2022
68d2764
build snapshot with improved performance
Ferdudas97 Feb 22, 2022
608c21c
refactor creating snapshot resources due to performance improvements
Ferdudas97 Feb 22, 2022
6136bf2
create hash version from string
Ferdudas97 Feb 28, 2022
ed414d6
Merge remote-tracking branch 'origin/main' into delta-xds-non-breakin…
Ferdudas97 Mar 1, 2022
5239443
remove custom snapshot version and deploy branch env
Ferdudas97 Mar 1, 2022
0292929
Merge remote-tracking branch 'origin/main' into delta-xds-non-breakin…
Ferdudas97 Jul 25, 2022
fc131d5
fix test after merge
Ferdudas97 Jul 25, 2022
fdc7425
fix V3DeltaDiscoveryServerCallbacks description
Ferdudas97 Aug 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package io.envoyproxy.controlplane.cache;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Consumer;

public abstract class AbstractWatch<V, T> {

private static final AtomicIntegerFieldUpdater<AbstractWatch> isCancelledUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractWatch.class, "isCancelled");
private final V request;
private final Consumer<T> responseConsumer;
private volatile int isCancelled = 0;
private Runnable stop;

/**
* Construct a watch.
*
* @param request the original request for the watch
* @param responseConsumer handler for outgoing response messages
*/
public AbstractWatch(V request, Consumer<T> responseConsumer) {
this.request = request;
this.responseConsumer = responseConsumer;
}

/**
* Cancel the watch. A watch must be cancelled in order to complete its resource stream and free resources. Cancel
* may be called multiple times, with each subsequent call being a no-op.
*/
public void cancel() {
if (isCancelledUpdater.compareAndSet(this, 0, 1)) {
if (stop != null) {
stop.run();
}
}
}

/**
* Returns boolean indicating whether or not the watch has been cancelled.
*/
public boolean isCancelled() {
return isCancelledUpdater.get(this) == 1;
}

/**
* Returns the original request for the watch.
*/
public V request() {
return request;
}

/**
* Sends the given response to the watch's response handler.
*
* @param response the response to be handled
* @throws WatchCancelledException if the watch has already been cancelled
*/
public void respond(T response) throws WatchCancelledException {
if (isCancelled()) {
throw new WatchCancelledException();
}

responseConsumer.accept(response);
}

/**
* Sets the callback method to be executed when the watch is cancelled. Even if cancel is executed multiple times, it
* ensures that this stop callback is only executed once.
*/
public void setStop(Runnable stop) {
this.stop = stop;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ public interface Cache<T> extends ConfigWatcher {
*
* @param group the node group whose status is being fetched
*/
StatusInfo statusInfo(T group);
StatusInfo<T> statusInfo(T group);
}
Original file line number Diff line number Diff line change
@@ -1,94 +1,14 @@
package io.envoyproxy.controlplane.cache;

import com.google.common.collect.ImmutableSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiFunction;
import javax.annotation.concurrent.ThreadSafe;

/**
* {@code CacheStatusInfo} provides a default implementation of {@link StatusInfo} for use in {@link Cache}
* implementations.
*/
@ThreadSafe
public class CacheStatusInfo<T> implements StatusInfo<T> {

private final T nodeGroup;

private final ConcurrentMap<Long, Watch> watches = new ConcurrentHashMap<>();
private volatile long lastWatchRequestTime;

public class CacheStatusInfo<T> extends MutableStatusInfo<T, Watch> {
public CacheStatusInfo(T nodeGroup) {
this.nodeGroup = nodeGroup;
}

/**
* {@inheritDoc}
*/
@Override
public long lastWatchRequestTime() {
return lastWatchRequestTime;
}

/**
* {@inheritDoc}
*/
@Override
public T nodeGroup() {
return nodeGroup;
}

/**
* {@inheritDoc}
*/
@Override
public int numWatches() {
return watches.size();
}

/**
* Removes the given watch from the tracked collection of watches.
*
* @param watchId the ID for the watch that should be removed
*/
public void removeWatch(long watchId) {
watches.remove(watchId);
}

/**
* Sets the timestamp of the last discovery watch request.
*
* @param lastWatchRequestTime the latest watch request timestamp
*/
public void setLastWatchRequestTime(long lastWatchRequestTime) {
this.lastWatchRequestTime = lastWatchRequestTime;
}

/**
* Adds the given watch to the tracked collection of watches.
*
* @param watchId the ID for the watch that should be added
* @param watch the watch that should be added
*/
public void setWatch(long watchId, Watch watch) {
watches.put(watchId, watch);
}

/**
* Returns the set of IDs for all watched currently being tracked.
*/
public Set<Long> watchIds() {
return ImmutableSet.copyOf(watches.keySet());
}

/**
* Iterate over all tracked watches and execute the given function. If it returns {@code true}, then the watch is
* removed from the tracked collection. If it returns {@code false}, then the watch is not removed.
*
* @param filter the function to execute on each watch
*/
public void watchesRemoveIf(BiFunction<Long, Watch, Boolean> filter) {
watches.entrySet().removeIf(entry -> filter.apply(entry.getKey(), entry.getValue()));
super(nodeGroup);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package io.envoyproxy.controlplane.cache;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class CacheStatusInfoAggregator<T> {
private final ConcurrentMap<T, ConcurrentMap<Resources.ResourceType, CacheStatusInfo<T>>> statuses =
new ConcurrentHashMap<>();
private final ConcurrentMap<T, ConcurrentMap<Resources.ResourceType, DeltaCacheStatusInfo<T>>> deltaStatuses =
new ConcurrentHashMap<>();

public Collection<T> groups() {
return Stream.concat(statuses.keySet().stream(), deltaStatuses.keySet().stream()).collect(Collectors.toSet());
}

public void remove(T group) {
statuses.remove(group);
deltaStatuses.remove(group);
}

/**
* Returns map of delta status infos for group identifier.
*
* @param group group identifier.
*/
public Map<Resources.ResourceType, DeltaCacheStatusInfo<T>> getDeltaStatus(T group) {
return deltaStatuses.getOrDefault(group, new ConcurrentHashMap<>());
}

/**
* Returns map of status infos for group identifier.
*
* @param group group identifier.
*/
public Map<Resources.ResourceType, CacheStatusInfo<T>> getStatus(T group) {
return statuses.getOrDefault(group, new ConcurrentHashMap<>());
}

/**
* Check if statuses for specific group have any watcher.
*
* @param group group identifier.
* @return true if statuses for specific group have any watcher.
*/
public boolean hasStatuses(T group) {
Map<Resources.ResourceType, CacheStatusInfo<T>> status = getStatus(group);
Map<Resources.ResourceType, DeltaCacheStatusInfo<T>> deltaStatus = getDeltaStatus(group);
return status.values().stream().mapToLong(CacheStatusInfo::numWatches).sum()
+ deltaStatus.values().stream().mapToLong(DeltaCacheStatusInfo::numWatches).sum() > 0;
}

/**
* Returns delta status info for group identifier and creates new one if it doesn't exist.
*
* @param group group identifier.
* @param resourceType resource type.
*/
public DeltaCacheStatusInfo<T> getOrAddDeltaStatusInfo(T group, Resources.ResourceType resourceType) {
return deltaStatuses.computeIfAbsent(group, g -> new ConcurrentHashMap<>())
.computeIfAbsent(resourceType, s -> new DeltaCacheStatusInfo<>(group));
}

/**
* Returns status info for group identifier and creates new one if it doesn't exist.
*
* @param group group identifier.
* @param resourceType resource type.
*/
public CacheStatusInfo<T> getOrAddStatusInfo(T group, Resources.ResourceType resourceType) {
return statuses.computeIfAbsent(group, g -> new ConcurrentHashMap<>())
.computeIfAbsent(resourceType, s -> new CacheStatusInfo<>(group));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.envoyproxy.controlplane.cache;

import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import javax.annotation.concurrent.ThreadSafe;
Expand Down Expand Up @@ -28,4 +29,25 @@ Watch createWatch(
Set<String> knownResourceNames,
Consumer<Response> responseConsumer,
boolean hasClusterChanged);

/**
* Returns a new configuration resource {@link Watch} for the given discovery request.
*
* @param request the discovery request (node, names, etc.) to use to generate the watch
* @param requesterVersion the last version applied by the requester
* @param resourceVersions resources that are already known to the requester
* @param pendingResources resources that the caller is waiting for
* @param isWildcard indicates if the stream is in wildcard mode
* @param responseConsumer the response handler, used to process outgoing response messages
* @param hasClusterChanged indicates if EDS should be sent immediately, even if version has not been changed.
* Supported in ADS mode.
*/
DeltaWatch createDeltaWatch(
DeltaXdsRequest request,
String requesterVersion,
Map<String, String> resourceVersions,
Set<String> pendingResources,
boolean isWildcard,
Consumer<DeltaResponse> responseConsumer,
boolean hasClusterChanged);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.envoyproxy.controlplane.cache;

public class DeltaCacheStatusInfo<T> extends MutableStatusInfo<T, DeltaWatch> {

public DeltaCacheStatusInfo(T nodeGroup) {
super(nodeGroup);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.envoyproxy.controlplane.cache;

import com.google.auto.value.AutoValue;
import com.google.protobuf.Message;
import java.util.List;
import java.util.Map;

/**
* {@code Response} is a data class that contains the response for an assumed configuration type.
*/
@AutoValue
public abstract class DeltaResponse {

public static DeltaResponse create(DeltaXdsRequest request,
Map<String, VersionedResource<?>> resources,
List<String> removedResources,
String version) {
return new AutoValue_DeltaResponse(request, resources, removedResources, version);
}

/**
* Returns the original request associated with the response.
*/
public abstract DeltaXdsRequest request();

/**
* Returns the resources to include in the response.
*/
public abstract Map<String, VersionedResource<? extends Message>> resources();

/**
* Returns the removed resources to include in the response.
*/
public abstract List<String> removedResources();

/**
* Returns the version of the resources as tracked by the cache for the given type. Envoy responds with this version
* as an acknowledgement.
*/
public abstract String version();
}
Loading