Skip to content

Commit

Permalink
feat(provider/kubernetes): Cache pods
Browse files Browse the repository at this point in the history
  • Loading branch information
Lars Wander committed Sep 6, 2017
1 parent b4109d4 commit 5587718
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2017 Google, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.netflix.spinnaker.clouddriver.kubernetes.v2.caching.agent;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.spectator.api.Registry;
import com.netflix.spinnaker.cats.agent.AgentDataType;
import com.netflix.spinnaker.clouddriver.kubernetes.security.KubernetesNamedAccountCredentials;
import com.netflix.spinnaker.clouddriver.kubernetes.v2.caching.Keys;
import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.KubernetesKind;
import com.netflix.spinnaker.clouddriver.kubernetes.v2.security.KubernetesV2Credentials;
import io.kubernetes.client.models.V1Pod;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;

import static com.netflix.spinnaker.cats.agent.AgentDataType.Authority.AUTHORITATIVE;
import static com.netflix.spinnaker.cats.agent.AgentDataType.Authority.INFORMATIVE;

@Slf4j
public class KubernetesPodCachingAgent extends KubernetesV2CachingAgent<V1Pod> {
KubernetesPodCachingAgent(KubernetesNamedAccountCredentials<KubernetesV2Credentials> namedAccountCredentials,
ObjectMapper objectMapper,
Registry registry,
int agentIndex,
int agentCount) {
super(namedAccountCredentials, objectMapper, registry, agentIndex, agentCount);
}

@Getter
final private Collection<AgentDataType> providedDataTypes = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(
INFORMATIVE.forType(Keys.LogicalKind.APPLICATION.toString()),
INFORMATIVE.forType(Keys.LogicalKind.CLUSTER.toString()),
INFORMATIVE.forType(KubernetesKind.DEPLOYMENT.toString()),
INFORMATIVE.forType(KubernetesKind.REPLICA_SET.toString()),
AUTHORITATIVE.forType(KubernetesKind.POD.toString())
))
);

@Override
protected List<V1Pod> loadPrimaryResource() {
return namespaces.stream()
.map(credentials::listAllPods)
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.spectator.api.Registry;
import com.netflix.spinnaker.cats.agent.AgentDataType;
import com.netflix.spinnaker.cats.agent.CacheResult;
import com.netflix.spinnaker.cats.agent.DefaultCacheResult;
import com.netflix.spinnaker.cats.cache.CacheData;
import com.netflix.spinnaker.cats.provider.ProviderCache;
import com.netflix.spinnaker.clouddriver.kubernetes.caching.KubernetesCachingAgent;
import com.netflix.spinnaker.clouddriver.kubernetes.security.KubernetesNamedAccountCredentials;
import com.netflix.spinnaker.clouddriver.kubernetes.v2.caching.Keys;
import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.KubernetesKind;
Expand All @@ -38,16 +33,14 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import static com.netflix.spinnaker.cats.agent.AgentDataType.Authority.AUTHORITATIVE;
import static com.netflix.spinnaker.cats.agent.AgentDataType.Authority.INFORMATIVE;

@Slf4j
public class KubernetesReplicaSetCachingAgent extends KubernetesCachingAgent<KubernetesV2Credentials> {
KubernetesReplicaSetCachingAgent(KubernetesNamedAccountCredentials namedAccountCredentials,
public class KubernetesReplicaSetCachingAgent extends KubernetesV2CachingAgent<V1beta1ReplicaSet> {
KubernetesReplicaSetCachingAgent(KubernetesNamedAccountCredentials<KubernetesV2Credentials> namedAccountCredentials,
ObjectMapper objectMapper,
Registry registry,
int agentIndex,
Expand All @@ -66,28 +59,7 @@ public class KubernetesReplicaSetCachingAgent extends KubernetesCachingAgent<Kub
);

@Override
public CacheResult loadData(ProviderCache providerCache) {
reloadNamespaces();

List<CacheData> replicaSetData = loadReplicaSets().stream()
.map(rs -> KubernetesCacheDataConverter.fromResource(accountName, objectMapper, rs))
.filter(Objects::nonNull)
.collect(Collectors.toList());

List<CacheData> invertedRelationships = replicaSetData.stream()
.map(KubernetesCacheDataConverter::invertRelationships)
.flatMap(Collection::stream)
.collect(Collectors.toList());

replicaSetData.addAll(invertedRelationships);

Map<String, Collection<CacheData>> entries = KubernetesCacheDataConverter.stratifyCacheDataByGroup(replicaSetData);
KubernetesCacheDataConverter.logStratifiedCacheData(getAgentType(), entries);

return new DefaultCacheResult(entries);
}

private List<V1beta1ReplicaSet> loadReplicaSets() {
protected List<V1beta1ReplicaSet> loadPrimaryResource() {
return namespaces.stream()
.map(credentials::listAllReplicaSets)
.flatMap(Collection::stream)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2017 Google, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.netflix.spinnaker.clouddriver.kubernetes.v2.caching.agent;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.spectator.api.Registry;
import com.netflix.spinnaker.cats.agent.CacheResult;
import com.netflix.spinnaker.cats.agent.DefaultCacheResult;
import com.netflix.spinnaker.cats.cache.CacheData;
import com.netflix.spinnaker.cats.provider.ProviderCache;
import com.netflix.spinnaker.clouddriver.kubernetes.caching.KubernetesCachingAgent;
import com.netflix.spinnaker.clouddriver.kubernetes.security.KubernetesNamedAccountCredentials;
import com.netflix.spinnaker.clouddriver.kubernetes.v2.security.KubernetesV2Credentials;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

public abstract class KubernetesV2CachingAgent<T> extends KubernetesCachingAgent<KubernetesV2Credentials> {
protected KubernetesV2CachingAgent(KubernetesNamedAccountCredentials<KubernetesV2Credentials> namedAccountCredentials,
ObjectMapper objectMapper,
Registry registry,
int agentIndex,
int agentCount) {
super(namedAccountCredentials, objectMapper, registry, agentIndex, agentCount);
}

@Override
public CacheResult loadData(ProviderCache providerCache) {
reloadNamespaces();

List<CacheData> resourceData = loadPrimaryResource().stream()
.map(rs -> KubernetesCacheDataConverter.fromResource(accountName, objectMapper, rs))
.filter(Objects::nonNull)
.collect(Collectors.toList());

List<CacheData> invertedRelationships = resourceData.stream()
.map(KubernetesCacheDataConverter::invertRelationships)
.flatMap(Collection::stream)
.collect(Collectors.toList());

resourceData.addAll(invertedRelationships);

Map<String, Collection<CacheData>> entries = KubernetesCacheDataConverter.stratifyCacheDataByGroup(resourceData);
KubernetesCacheDataConverter.logStratifiedCacheData(getAgentType(), entries);

return new DefaultCacheResult(entries);
}

protected abstract List<T> loadPrimaryResource();
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public List<KubernetesCachingAgent> buildAllCachingAgents(KubernetesNamedAccount
return IntStream.range(0, credentials.getCacheThreads())
.boxed()
.map(i -> new ArrayList<KubernetesCachingAgent>(Arrays.asList(
new KubernetesPodCachingAgent(credentials, objectMapper, registry, i, credentials.getCacheThreads()),
new KubernetesReplicaSetCachingAgent(credentials, objectMapper, registry, i, credentials.getCacheThreads())
)))
.flatMap(Collection::stream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
public enum KubernetesKind {
DEPLOYMENT("deployment"),
INGRESS("ingress"),
POD("pod"),
REPLICA_SET("replicaSet"),
NETWORK_POLICY("networkPolicy"),
SERVICE("service");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.apis.ExtensionsV1beta1Api;
import io.kubernetes.client.models.AppsV1beta1Deployment;
import io.kubernetes.client.models.V1Pod;
import io.kubernetes.client.models.V1PodList;
import io.kubernetes.client.models.V1Service;
import io.kubernetes.client.models.V1beta1Ingress;
import io.kubernetes.client.models.V1beta1ReplicaSet;
Expand Down Expand Up @@ -167,6 +169,29 @@ public void createIngress(V1beta1Ingress ingress) {
});
}

public List<V1Pod> listAllPods(String namespace) {
return listPods(namespace, new KubernetesSelectorList(), new KubernetesSelectorList());
}

public List<V1Pod> listPods(String namespace, KubernetesSelectorList fieldSelectors, KubernetesSelectorList labelSelectors) {
final String methodName = "pods.list";
final String fieldSelectorString = fieldSelectors.toString();
final String labelSelectorString = labelSelectors.toString();
final KubernetesApiVersion apiVersion = KubernetesApiVersion.V1;
final KubernetesKind kind = KubernetesKind.POD;
return runAndRecordMetrics(methodName, namespace, () -> {
try {
V1PodList list = coreV1Api.listNamespacedPod(namespace, PRETTY, fieldSelectorString, labelSelectorString, DEFAULT_VERSION, TIMEOUT_SECONDS, WATCH);
return annotateMissingFields(list == null ? new ArrayList<>() : list.getItems(),
V1Pod.class,
apiVersion,
kind);
} catch (ApiException e) {
throw new KubernetesApiException(methodName, e);
}
});
}

public void createReplicaSet(V1beta1ReplicaSet replicaSet) {
final String methodName = "replicaSets.create";
final String namespace = replicaSet.getMetadata().getNamespace();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2017 Google, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.netflix.spinnaker.clouddriver.kubernetes.v2.caching.agent

import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spinnaker.clouddriver.kubernetes.security.KubernetesNamedAccountCredentials
import com.netflix.spinnaker.clouddriver.kubernetes.v2.caching.Keys
import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.KubernetesApiVersion
import com.netflix.spinnaker.clouddriver.kubernetes.v2.description.KubernetesKind
import com.netflix.spinnaker.clouddriver.kubernetes.v2.security.KubernetesV2Credentials
import io.kubernetes.client.models.V1ObjectMeta
import io.kubernetes.client.models.V1Pod
import spock.lang.Specification

class KubernetesPodCachingAgentSpec extends Specification {
def ACCOUNT = "my-account"
def CLUSTER = "my-cluster"
def APPLICATION = "my-application"
def NAME = "the-name"
def NAMESPACE = "your-namespace"

void "invokes caching agent on output pod"() {
setup:
def pod = new V1Pod()
def annotations = [
'relationships.spinnaker.io/cluster': '"' + CLUSTER + '"',
'relationships.spinnaker.io/application': '"' + APPLICATION + '"'
]

def metadata = new V1ObjectMeta()
metadata.setAnnotations(annotations)
metadata.setName(NAME)
metadata.setNamespace(NAMESPACE)
pod.setMetadata(metadata)
pod.setKind(KubernetesKind.REPLICA_SET.name)
pod.setApiVersion(KubernetesApiVersion.EXTENSIONS_V1BETA1.name)

def credentials = Mock(KubernetesV2Credentials)
credentials.getDeclaredNamespaces() >> [NAMESPACE]
credentials.listAllPods(NAMESPACE) >> [pod]

def namedAccountCredentials = Mock(KubernetesNamedAccountCredentials)
namedAccountCredentials.getCredentials() >> credentials
namedAccountCredentials.getName() >> ACCOUNT

def cachingAgent = new KubernetesPodCachingAgent(namedAccountCredentials, new ObjectMapper(), null, 0, 1)

when:
def result = cachingAgent.loadData(null)

then:
result.cacheResults[KubernetesKind.REPLICA_SET.name].size() == 1
def cacheData = result.cacheResults[KubernetesKind.REPLICA_SET.name].iterator().next()
cacheData.relationships.get(Keys.LogicalKind.CLUSTER.toString()) == [Keys.cluster(ACCOUNT, CLUSTER)]
cacheData.relationships.get(Keys.LogicalKind.APPLICATION.toString()) == [Keys.application(APPLICATION)]
cacheData.attributes.get("name") == NAME
cacheData.attributes.get("namespace") == NAMESPACE
}
}

0 comments on commit 5587718

Please sign in to comment.