From e98abb4f83130c55ca89fcce1bdb14fa9562487c Mon Sep 17 00:00:00 2001 From: Lars Wander Date: Thu, 7 Sep 2017 17:48:25 -0400 Subject: [PATCH] feat(provider/kubernetes): v2 generic on demand caching --- .../clouddriver/cache/OnDemandAgent.groovy | 59 ---- .../clouddriver/cache/OnDemandAgent.java | 70 ++++ .../caching/KubernetesCachingAgent.java | 2 +- .../kubernetes/v2/caching/Keys.java | 20 ++ .../agent/KubernetesPodCachingAgent.java | 2 +- .../KubernetesReplicaSetCachingAgent.java | 42 ++- .../agent/KubernetesV2CachingAgent.java | 13 +- .../KubernetesV2OnDemandCachingAgent.java | 309 ++++++++++++++++++ .../v2/security/KubernetesV2Credentials.java | 18 + ...ubernetesReplicaSetCachingAgentSpec.groovy | 41 ++- 10 files changed, 509 insertions(+), 67 deletions(-) delete mode 100644 clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/cache/OnDemandAgent.groovy create mode 100644 clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/cache/OnDemandAgent.java create mode 100644 clouddriver-kubernetes/src/main/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/agent/KubernetesV2OnDemandCachingAgent.java diff --git a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/cache/OnDemandAgent.groovy b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/cache/OnDemandAgent.groovy deleted file mode 100644 index 1f4f4381df2..00000000000 --- a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/cache/OnDemandAgent.groovy +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright 2015 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.netflix.spinnaker.clouddriver.cache - -import com.netflix.spinnaker.cats.agent.CacheResult -import com.netflix.spinnaker.cats.provider.ProviderCache - -interface OnDemandAgent { - - String getProviderName() - - String getOnDemandAgentType() - - // TODO(ttomsu): This seems like it should go in a different interface. - OnDemandMetricsSupport getMetricsSupport() - - enum OnDemandType { - ServerGroup, - SecurityGroup, - LoadBalancer, - Job, - TargetGroup - - static OnDemandType fromString(String s) { - OnDemandType t = values().find { it.toString().equalsIgnoreCase(s) } - if (!t) { - throw new IllegalArgumentException("Cannot create OnDemandType from String '${s}'") - } - return t - } - } - - boolean handles(OnDemandType type, String cloudProvider) - - static class OnDemandResult { - String sourceAgentType - Collection authoritativeTypes = [] - CacheResult cacheResult - Map> evictions = [:] - } - - OnDemandResult handle(ProviderCache providerCache, Map data) - - Collection pendingOnDemandRequests(ProviderCache providerCache) -} diff --git a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/cache/OnDemandAgent.java b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/cache/OnDemandAgent.java new file mode 100644 index 00000000000..bd04bb1792c --- /dev/null +++ b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/cache/OnDemandAgent.java @@ -0,0 +1,70 @@ +/* + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.clouddriver.cache; + +import com.netflix.spinnaker.cats.agent.CacheResult; +import com.netflix.spinnaker.cats.provider.ProviderCache; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +public interface OnDemandAgent { + String getProviderName(); + + String getOnDemandAgentType(); + + // TODO(ttomsu): This seems like it should go in a different interface. + OnDemandMetricsSupport getMetricsSupport(); + + enum OnDemandType { + ServerGroup, + SecurityGroup, + LoadBalancer, + Job, + TargetGroup; + + static OnDemandType fromString(String s) { + return Arrays.stream(values()) + .filter(v -> v.toString().equalsIgnoreCase(s)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Cannot create OnDemandType from '" + s + "'")); + } + } + + boolean handles(OnDemandType type, String cloudProvider); + + static class OnDemandResult { + String sourceAgentType; + Collection authoritativeTypes = new ArrayList<>(); + CacheResult cacheResult; + Map> evictions = new HashMap<>(); + + public OnDemandResult() {} + + public OnDemandResult(String sourceAgentType, CacheResult cacheResult, Map> evictions) { + this.sourceAgentType = sourceAgentType; + this.cacheResult = cacheResult; + this.evictions = evictions; + } + } + + OnDemandResult handle(ProviderCache providerCache, Map data); + Collection pendingOnDemandRequests(ProviderCache providerCache); +} diff --git a/clouddriver-kubernetes/src/main/groovy/com/netflix/spinnaker/clouddriver/kubernetes/caching/KubernetesCachingAgent.java b/clouddriver-kubernetes/src/main/groovy/com/netflix/spinnaker/clouddriver/kubernetes/caching/KubernetesCachingAgent.java index 4ab85561401..b52d157acb4 100644 --- a/clouddriver-kubernetes/src/main/groovy/com/netflix/spinnaker/clouddriver/kubernetes/caching/KubernetesCachingAgent.java +++ b/clouddriver-kubernetes/src/main/groovy/com/netflix/spinnaker/clouddriver/kubernetes/caching/KubernetesCachingAgent.java @@ -59,7 +59,7 @@ protected KubernetesCachingAgent(KubernetesNamedAccountCredentials namedAccou @Override public String getAgentType() { - return String.format("%s/%s[%d/%d]", accountName, this.getClass().getSimpleName(), agentIndex, agentCount); + return String.format("%s/%s[%d/%d]", accountName, this.getClass().getSimpleName(), agentIndex + 1, agentCount); } protected void reloadNamespaces() { diff --git a/clouddriver-kubernetes/src/main/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/Keys.java b/clouddriver-kubernetes/src/main/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/Keys.java index 02c0ed4841c..3732e7c16bb 100644 --- a/clouddriver-kubernetes/src/main/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/Keys.java +++ b/clouddriver-kubernetes/src/main/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/Keys.java @@ -19,6 +19,7 @@ import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.KubernetesApiVersion; import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.KubernetesKind; +import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.KubernetesManifest; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.extern.slf4j.Slf4j; @@ -91,6 +92,10 @@ public static String infrastructure(KubernetesKind kind, KubernetesApiVersion ve return createKey(Kind.INFRASTRUCTURE, kind, version, account, namespace, name); } + public static String infrastructure(KubernetesManifest manifest, String account) { + return infrastructure(manifest.getKind(), manifest.getApiVersion(), account, manifest.getNamespace(), manifest.getName()); + } + public static Optional parseKey(String key) { String[] parts = key.split(":", -1); @@ -150,6 +155,11 @@ public ApplicationCacheKey(String[] parts) { name = parts[3]; } + @Override + public String toString() { + return createKey(kind, logicalKind, name); + } + @Override public String getGroup() { return logicalKind.toString(); @@ -173,6 +183,11 @@ public ClusterCacheKey(String[] parts) { name = parts[4]; } + @Override + public String toString() { + return createKey(kind, logicalKind, account, name); + } + @Override public String getGroup() { return logicalKind.toString(); @@ -201,6 +216,11 @@ public InfrastructureCacheKey(String[] parts) { name = parts[6]; } + @Override + public String toString() { + return createKey(kind, kubernetesKind, kubernetesApiVersion, account, namespace, name); + } + @Override public String getGroup() { return kubernetesKind.toString(); diff --git a/clouddriver-kubernetes/src/main/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/agent/KubernetesPodCachingAgent.java b/clouddriver-kubernetes/src/main/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/agent/KubernetesPodCachingAgent.java index 785ed05e92c..5e2b840cd8b 100644 --- a/clouddriver-kubernetes/src/main/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/agent/KubernetesPodCachingAgent.java +++ b/clouddriver-kubernetes/src/main/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/agent/KubernetesPodCachingAgent.java @@ -60,7 +60,7 @@ public class KubernetesPodCachingAgent extends KubernetesV2CachingAgent { ); @Override - protected List loadPrimaryResource() { + protected List loadPrimaryResourceList() { return namespaces.stream() .map(credentials::listAllPods) .flatMap(Collection::stream) diff --git a/clouddriver-kubernetes/src/main/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/agent/KubernetesReplicaSetCachingAgent.java b/clouddriver-kubernetes/src/main/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/agent/KubernetesReplicaSetCachingAgent.java index 8b1f0ea6117..d415531b773 100644 --- a/clouddriver-kubernetes/src/main/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/agent/KubernetesReplicaSetCachingAgent.java +++ b/clouddriver-kubernetes/src/main/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/agent/KubernetesReplicaSetCachingAgent.java @@ -18,10 +18,12 @@ package com.netflix.spinnaker.clouddriver.kubernetes.v2.caching.agent; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; import com.netflix.spectator.api.Registry; import com.netflix.spinnaker.cats.agent.AgentDataType; import com.netflix.spinnaker.clouddriver.kubernetes.security.KubernetesNamedAccountCredentials; import com.netflix.spinnaker.clouddriver.kubernetes.v2.caching.Keys; +import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.KubernetesApiVersion; import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.KubernetesKind; import com.netflix.spinnaker.clouddriver.kubernetes.v2.security.KubernetesV2Credentials; import io.kubernetes.client.models.V1beta1ReplicaSet; @@ -33,13 +35,15 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import static com.netflix.spinnaker.cats.agent.AgentDataType.Authority.AUTHORITATIVE; import static com.netflix.spinnaker.cats.agent.AgentDataType.Authority.INFORMATIVE; @Slf4j -public class KubernetesReplicaSetCachingAgent extends KubernetesV2CachingAgent { +public class KubernetesReplicaSetCachingAgent extends KubernetesV2OnDemandCachingAgent { KubernetesReplicaSetCachingAgent(KubernetesNamedAccountCredentials namedAccountCredentials, ObjectMapper objectMapper, Registry registry, @@ -59,10 +63,44 @@ public class KubernetesReplicaSetCachingAgent extends KubernetesV2CachingAgent loadPrimaryResource() { + protected List loadPrimaryResourceList() { return namespaces.stream() .map(credentials::listAllReplicaSets) .flatMap(Collection::stream) .collect(Collectors.toList()); } + + @Override + protected V1beta1ReplicaSet loadPrimaryResource(String namespace, String name) { + return credentials.readReplicaSet(namespace, name); + } + + @Override + protected OnDemandType onDemandType() { + return OnDemandType.ServerGroup; + } + + @Override + protected KubernetesKind primaryKind() { + return KubernetesKind.REPLICA_SET; + } + + @Override + protected KubernetesApiVersion primaryApiVersion() { + return KubernetesApiVersion.EXTENSIONS_V1BETA1; + } + + @Override + protected Map mapKeyToOnDemandResult(Keys.InfrastructureCacheKey key) { + return new ImmutableMap.Builder() + .put("serverGroup", key.getName()) + .put("account", key.getAccount()) + .put("region", key.getNamespace()) + .build(); + } + + @Override + protected Optional getResourceNameFromOnDemandRequest(Map request) { + return request.containsKey("serverGroup") ? Optional.of((String) request.get("serverGroup")) : Optional.empty(); + } } diff --git a/clouddriver-kubernetes/src/main/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/agent/KubernetesV2CachingAgent.java b/clouddriver-kubernetes/src/main/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/agent/KubernetesV2CachingAgent.java index 57c5e1d1148..91a868f6eeb 100644 --- a/clouddriver-kubernetes/src/main/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/agent/KubernetesV2CachingAgent.java +++ b/clouddriver-kubernetes/src/main/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/agent/KubernetesV2CachingAgent.java @@ -28,6 +28,7 @@ import com.netflix.spinnaker.clouddriver.kubernetes.v2.security.KubernetesV2Credentials; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -45,8 +46,15 @@ protected KubernetesV2CachingAgent(KubernetesNamedAccountCredentials resourceData = loadPrimaryResource().stream() + protected CacheResult buildCacheResult(List resources) { + List resourceData = resources.stream() .map(rs -> KubernetesCacheDataConverter.fromResource(accountName, objectMapper, rs)) .filter(Objects::nonNull) .collect(Collectors.toList()); @@ -62,7 +70,8 @@ public CacheResult loadData(ProviderCache providerCache) { KubernetesCacheDataConverter.logStratifiedCacheData(getAgentType(), entries); return new DefaultCacheResult(entries); + } - protected abstract List loadPrimaryResource(); + protected abstract List loadPrimaryResourceList(); } diff --git a/clouddriver-kubernetes/src/main/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/agent/KubernetesV2OnDemandCachingAgent.java b/clouddriver-kubernetes/src/main/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/agent/KubernetesV2OnDemandCachingAgent.java new file mode 100644 index 00000000000..5bd40bcc055 --- /dev/null +++ b/clouddriver-kubernetes/src/main/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/agent/KubernetesV2OnDemandCachingAgent.java @@ -0,0 +1,309 @@ +/* + * Copyright 2017 Google, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.netflix.spinnaker.clouddriver.kubernetes.v2.caching.agent; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import com.netflix.spectator.api.Registry; +import com.netflix.spinnaker.cats.agent.CacheResult; +import com.netflix.spinnaker.cats.agent.DefaultCacheResult; +import com.netflix.spinnaker.cats.cache.CacheData; +import com.netflix.spinnaker.cats.cache.DefaultCacheData; +import com.netflix.spinnaker.cats.provider.ProviderCache; +import com.netflix.spinnaker.clouddriver.cache.OnDemandAgent; +import com.netflix.spinnaker.clouddriver.cache.OnDemandMetricsSupport; +import com.netflix.spinnaker.clouddriver.kubernetes.KubernetesCloudProvider; +import com.netflix.spinnaker.clouddriver.kubernetes.security.KubernetesNamedAccountCredentials; +import com.netflix.spinnaker.clouddriver.kubernetes.v2.caching.Keys; +import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.KubernetesApiVersion; +import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.KubernetesKind; +import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.KubernetesManifest; +import com.netflix.spinnaker.clouddriver.kubernetes.v2.security.KubernetesV2Credentials; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@Slf4j +public abstract class KubernetesV2OnDemandCachingAgent extends KubernetesV2CachingAgent implements OnDemandAgent { + @Getter + protected final OnDemandMetricsSupport metricsSupport; + + protected final static String ON_DEMAND_TYPE = "onDemand"; + private final static String CACHE_TIME_KEY = "cacheTime"; + private final static String PROCESSED_COUNT_KEY = "processedCount"; + private final static String PROCESSED_TIME_KEY = "processedTime"; + private final static String CACHE_RESULTS_KEY = "cacheResults"; + + protected abstract List loadPrimaryResourceList(); + protected abstract T loadPrimaryResource(String namespace, String name); + protected abstract OnDemandType onDemandType(); + protected abstract KubernetesKind primaryKind(); + protected abstract KubernetesApiVersion primaryApiVersion(); + + /* Kind-of ugly... required for the on demand madness in orca that expects various fields like region, serverGroup, etc... */ + protected abstract Map mapKeyToOnDemandResult(Keys.InfrastructureCacheKey key); + protected abstract Optional getResourceNameFromOnDemandRequest(Map request); + + protected KubernetesV2OnDemandCachingAgent(KubernetesNamedAccountCredentials namedAccountCredentials, + ObjectMapper objectMapper, + Registry registry, + int agentIndex, + int agentCount) { + super(namedAccountCredentials, objectMapper, registry, agentIndex, agentCount); + + metricsSupport = new OnDemandMetricsSupport(registry, this, KubernetesCloudProvider.getID() + ":" + onDemandType()); + } + + @Override + public CacheResult loadData(ProviderCache providerCache) { + reloadNamespaces(); + + Long start = System.currentTimeMillis(); + List primaryResource = loadPrimaryResourceList(); + + List primaryKeys = primaryResource.stream() + .map(rs -> objectMapper.convertValue(rs, KubernetesManifest.class)) + .map(mf -> Keys.infrastructure(mf, accountName)) + .collect(Collectors.toList()); + + List keepInOnDemand = new ArrayList<>(); + List evictFromOnDemand = new ArrayList<>(); + + providerCache.getAll(ON_DEMAND_TYPE, primaryKeys).forEach(cd -> { + // can't be a ternary op due to restrictions on non-statement expressions in lambdas + if (shouldKeepInOnDemand(start, cd)) { + keepInOnDemand.add(cd); + } else { + evictFromOnDemand.add(cd); + } + processOnDemandEntry(cd); + }); + + // sort by increasing cache time to ensure newest entries are first + keepInOnDemand.sort(Comparator.comparing(a -> ((Long) a.getAttributes().get(CACHE_TIME_KEY)))); + + // first build the cache result, then decide which entries to overwrite with on demand data + CacheResult result = buildCacheResult(primaryResource); + Map> cacheResults = result.getCacheResults(); + + for (CacheData onDemandData : keepInOnDemand) { + String onDemandKey = onDemandData.getId(); + log.info("On demand entry '{}' is overwriting load data entry", onDemandKey); + + String onDemandResultsJson = (String) onDemandData.getAttributes().get(CACHE_RESULTS_KEY); + Map> onDemandResults; + try { + onDemandResults = objectMapper.readValue(onDemandResultsJson, new TypeReference>>() { }); + } catch (IOException e) { + log.error("Failure parsing stored on demand data for '{}'", onDemandKey, e); + continue; + } + + mergeCacheResults(cacheResults, onDemandResults); + } + + cacheResults.put(ON_DEMAND_TYPE, keepInOnDemand); + Map> evictionResults = new ImmutableMap.Builder>() + .put(ON_DEMAND_TYPE, evictFromOnDemand.stream().map(CacheData::getId).collect(Collectors.toList())) + .build(); + + return new DefaultCacheResult(cacheResults, evictionResults); + } + + protected void mergeCacheResults(Map> current, Map> added) { + for (String group : added.keySet()) { + Collection currentByGroup = current.get(group); + Collection addedByGroup = added.get(group); + + currentByGroup = currentByGroup == null ? new ArrayList<>() : currentByGroup; + addedByGroup = addedByGroup == null ? new ArrayList<>() : addedByGroup; + + for (CacheData addedCacheData : addedByGroup) { + CacheData mergedEntry = currentByGroup.stream() + .filter(cd -> cd.getId().equals(addedCacheData.getId())) + .findFirst() + .flatMap(cd -> Optional.of(mergeCacheData(cd, addedCacheData))) + .orElse(addedCacheData); + + currentByGroup.removeIf(cd -> cd.getId().equals(addedCacheData.getId())); + currentByGroup.add(mergedEntry); + } + + current.put(group, currentByGroup); + } + } + + protected CacheData mergeCacheData(CacheData current, CacheData added) { + String id = current.getId(); + Map attributes = current.getAttributes(); + Map> relationships = current.getRelationships(); + attributes.putAll(added.getAttributes()); + added.getRelationships() + .entrySet() + .forEach(entry -> relationships.merge(entry.getKey(), entry.getValue(), + (a, b) -> { + a.addAll(b); + return a; + })); + + return new DefaultCacheData(id, attributes, relationships); + } + + private void processOnDemandEntry(CacheData onDemandEntry) { + Map attributes = onDemandEntry.getAttributes(); + Integer processedCount = (Integer) attributes.get(PROCESSED_COUNT_KEY); + Long processedTime = System.currentTimeMillis(); + + processedCount = processedCount == null ? 0 : processedCount; + processedCount += 1; + + attributes.put(PROCESSED_TIME_KEY, processedTime); + attributes.put(PROCESSED_COUNT_KEY, processedCount); + } + + private boolean shouldKeepInOnDemand(Long lastFullRefresh, CacheData onDemandEntry) { + Map attributes = onDemandEntry.getAttributes(); + Long cacheTime = (Long) attributes.get(CACHE_TIME_KEY); + Integer processedCount = (Integer) attributes.get(PROCESSED_COUNT_KEY); + + cacheTime = cacheTime == null ? 0L : cacheTime; + processedCount = processedCount == null ? 0 : processedCount; + + return cacheTime >= lastFullRefresh || processedCount == 0; + } + + private OnDemandAgent.OnDemandResult evictEntry(ProviderCache providerCache, String key) { + Map> evictions = new HashMap<>(); + CacheResult cacheResult = new DefaultCacheResult(new HashMap<>()); + + log.info("Evicting on demand '{}'", key); + providerCache.evictDeletedItems(ON_DEMAND_TYPE, Collections.singletonList(key)); + evictions.put(primaryKind().toString(), Collections.singletonList(key)); + + return new OnDemandAgent.OnDemandResult(getOnDemandAgentType(), cacheResult, evictions); + } + + private OnDemandAgent.OnDemandResult addEntry(ProviderCache providerCache, String key, T resource) throws JsonProcessingException { + Map> evictions = new HashMap<>(); + CacheResult cacheResult; + + log.info("Storing on demand '{}'", key); + cacheResult = buildCacheResult(resource); + String jsonResult = objectMapper.writeValueAsString(cacheResult.getCacheResults()); + + Map attributes = new ImmutableMap.Builder() + .put(CACHE_TIME_KEY, System.currentTimeMillis()) + .put(CACHE_RESULTS_KEY, jsonResult) + .put(PROCESSED_COUNT_KEY, 0) + .put(PROCESSED_TIME_KEY, null) + .build(); + + Map> relationships = new HashMap<>(); + CacheData onDemandData = new DefaultCacheData(key, attributes, relationships); + providerCache.putCacheData(ON_DEMAND_TYPE, onDemandData); + + return new OnDemandAgent.OnDemandResult(getOnDemandAgentType(), cacheResult, evictions); + } + + @Override + public OnDemandAgent.OnDemandResult handle(ProviderCache providerCache, Map data) { + String account = (String) data.get("accountName"); + String name = getResourceNameFromOnDemandRequest(data).orElse(""); + String namespace = (String) data.get("namespace"); + if (StringUtils.isEmpty(namespace)) { + namespace = (String) data.get("region"); // sigh, namespace == region in k8s <-> spinnaker + } + + reloadNamespaces(); + if (StringUtils.isEmpty(account) + || StringUtils.isEmpty(name) + || StringUtils.isEmpty(namespace) + || !namespaces.contains(namespace)) { + return null; + } + + OnDemandAgent.OnDemandResult result; + T resource = loadPrimaryResource(namespace, name); + String resourceKey = Keys.infrastructure(primaryKind(), primaryApiVersion(), account, namespace, name); + try { + result = resource == null ? evictEntry(providerCache, resourceKey) : addEntry(providerCache, resourceKey, resource); + } catch (Exception e) { + log.error("Failed to process update of '{}'", resourceKey, e); + return null; + } + + log.info("On demand cache refresh of (data: {}) succeeded", data); + return result; + } + + @Override + public String getOnDemandAgentType() { + return getAgentType() + "-OnDemand"; + } + + @Override + public boolean handles(OnDemandType type, String cloudProvider) { + return type == onDemandType() && cloudProvider.equals(KubernetesCloudProvider.getID()); + } + + @Override + public Collection pendingOnDemandRequests(ProviderCache providerCache) { + Collection keys = providerCache.getIdentifiers(ON_DEMAND_TYPE); + List infraKeys = keys.stream() + .map(Keys::parseKey) + .flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) + .filter(k -> k instanceof Keys.InfrastructureCacheKey) + .map(i -> (Keys.InfrastructureCacheKey) i) + .collect(Collectors.toList()); + + List matchingKeys = infraKeys.stream() + .filter(i -> i.getAccount().equals(getAccountName()) + && namespaces.contains(i.getNamespace()) + && i.getKubernetesKind().equals(primaryKind())) + .map(Keys.InfrastructureCacheKey::toString) + .collect(Collectors.toList()); + + return providerCache.getAll(ON_DEMAND_TYPE, matchingKeys).stream() + .map(cd -> { + Keys.InfrastructureCacheKey parsedKey = (Keys.InfrastructureCacheKey) Keys.parseKey(cd.getId()).get(); + Map details = mapKeyToOnDemandResult(parsedKey); + Map attributes = cd.getAttributes(); + return new ImmutableMap.Builder() + .put("details", details) + .put(CACHE_TIME_KEY, attributes.get(CACHE_TIME_KEY)) + .put(PROCESSED_COUNT_KEY, attributes.get(PROCESSED_COUNT_KEY)) + .put(PROCESSED_TIME_KEY, attributes.get(PROCESSED_TIME_KEY)) + .build(); + }) + .collect(Collectors.toList()); + } +} diff --git a/clouddriver-kubernetes/src/main/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/security/KubernetesV2Credentials.java b/clouddriver-kubernetes/src/main/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/security/KubernetesV2Credentials.java index 36c75bf35c1..3dcfb3ff661 100644 --- a/clouddriver-kubernetes/src/main/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/security/KubernetesV2Credentials.java +++ b/clouddriver-kubernetes/src/main/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/security/KubernetesV2Credentials.java @@ -227,6 +227,24 @@ public List listReplicaSets(String namespace, KubernetesSelec }); } + public V1beta1ReplicaSet readReplicaSet(String namespace, String name) { + final String methodName = "replicaSets.read"; + final KubernetesApiVersion apiVersion = KubernetesApiVersion.EXTENSIONS_V1BETA1; + final KubernetesKind kind = KubernetesKind.REPLICA_SET; + return runAndRecordMetrics(methodName, namespace, () -> { + try { + V1beta1ReplicaSet result = extensionsV1beta1Api.readNamespacedReplicaSet(name, namespace, PRETTY, EXACT, EXPORT); + return annotateMissingFields(result, V1beta1ReplicaSet.class, apiVersion, kind); + } catch (ApiException e) { + if (notFound(e)) { + return null; + } + + throw new KubernetesApiException(methodName, e); + } + }); + } + public void createService(V1Service service) { final String methodName = "services.create"; final String namespace = service.getMetadata().getNamespace(); diff --git a/clouddriver-kubernetes/src/test/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/agent/KubernetesReplicaSetCachingAgentSpec.groovy b/clouddriver-kubernetes/src/test/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/agent/KubernetesReplicaSetCachingAgentSpec.groovy index d1ab6483d24..91960a41331 100644 --- a/clouddriver-kubernetes/src/test/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/agent/KubernetesReplicaSetCachingAgentSpec.groovy +++ b/clouddriver-kubernetes/src/test/groovy/com/netflix/spinnaker/clouddriver/kubernetes/v2/caching/agent/KubernetesReplicaSetCachingAgentSpec.groovy @@ -18,6 +18,9 @@ package com.netflix.spinnaker.clouddriver.kubernetes.v2.caching.agent import com.fasterxml.jackson.databind.ObjectMapper +import com.netflix.spectator.api.Registry +import com.netflix.spinnaker.cats.cache.DefaultCacheData +import com.netflix.spinnaker.cats.provider.ProviderCache import com.netflix.spinnaker.clouddriver.kubernetes.security.KubernetesNamedAccountCredentials import com.netflix.spinnaker.clouddriver.kubernetes.v2.caching.Keys import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.KubernetesApiVersion @@ -26,6 +29,7 @@ import com.netflix.spinnaker.clouddriver.kubernetes.v2.security.KubernetesV2Cred import io.kubernetes.client.models.V1ObjectMeta import io.kubernetes.client.models.V1beta1ReplicaSet import spock.lang.Specification +import spock.lang.Unroll class KubernetesReplicaSetCachingAgentSpec extends Specification { def ACCOUNT = "my-account" @@ -58,10 +62,14 @@ class KubernetesReplicaSetCachingAgentSpec extends Specification { namedAccountCredentials.getCredentials() >> credentials namedAccountCredentials.getName() >> ACCOUNT - def cachingAgent = new KubernetesReplicaSetCachingAgent(namedAccountCredentials, new ObjectMapper(), null, 0, 1) + def registryMock = Mock(Registry) + registryMock.timer(_) >> null + def cachingAgent = new KubernetesReplicaSetCachingAgent(namedAccountCredentials, new ObjectMapper(), registryMock, 0, 1) + def providerCacheMock = Mock(ProviderCache) + providerCacheMock.getAll(_, _) >> [] when: - def result = cachingAgent.loadData(null) + def result = cachingAgent.loadData(providerCacheMock) then: result.cacheResults[KubernetesKind.REPLICA_SET.name].size() == 1 @@ -71,4 +79,33 @@ class KubernetesReplicaSetCachingAgentSpec extends Specification { cacheData.attributes.get("name") == NAME cacheData.attributes.get("namespace") == NAMESPACE } + + @Unroll + void "merges two cache data"() { + when: + def credentials = Mock(KubernetesV2Credentials) + credentials.getDeclaredNamespaces() >> [NAMESPACE] + + def namedAccountCredentials = Mock(KubernetesNamedAccountCredentials) + namedAccountCredentials.getCredentials() >> credentials + namedAccountCredentials.getName() >> ACCOUNT + + def registryMock = Mock(Registry) + registryMock.timer(_) >> null + def cachingAgent = new KubernetesReplicaSetCachingAgent(namedAccountCredentials, new ObjectMapper(), registryMock, 0, 1) + def a = new DefaultCacheData("id", attrA, relA) + def b = new DefaultCacheData("id", attrB, relB) + cachingAgent.mergeCacheData(a, b) + + then: + b.getAttributes().collect { k, v -> a.getAttributes().get(k) == v }.every() + b.getRelationships().collect { k, v -> v.collect { r -> b.getRelationships().get(k).contains(r) }.every() }.every() + + where: + attrA | attrB | relA | relB + ["a": "b"] | ["c": "d"] | ["a": ["1", "2"]] | ["b": ["3", "4"]] + [:] | ["c": "d"] | [:] | ["b": ["3", "4"]] + [:] | [:] | [:] | [:] + ["a": "b"] | [:] | ["a": ["1", "2"]] | [:] + } }