Skip to content

Commit

Permalink
Fix fabric8io#2746: SharedInformerFactory should use OperationContext…
Browse files Browse the repository at this point in the history
… as key

SharedInformerFactory has `informers` and `startedInformers` Map fields
which use `Type` as key. This limits us to use only one
SharedIndexInformer corresponding to a particular type. We should make a
key based on OperationContext's elements like apigroup, apiversion,
namespace, name, labels, fields etc.
  • Loading branch information
rohanKanojia authored and manusa committed Mar 11, 2021
1 parent db7d4eb commit 44b9722
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 8 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
* Fix #2857: Fix the log of an unexpected error from an Informer's EventHandler
* Fix #2853: Cannot change the type of the Service from ClusterIP to ExternalName with PATCH
* Fix #2783: OpenIDConnectionUtils#persistKubeConfigWithUpdatedToken persists access token instead of refresh token
* Fix #2871: Change longFileMode to LONGFILE_POSIX for creating tar in PodUpload, improve exception handling in PodUpload.
* Fix #2871: Change longFileMode to LONGFILE\_POSIX for creating tar in PodUpload, improve exception handling in PodUpload.
* Fix #2746: SharedInformerFactory should use key formed from OperationContext

#### Improvements
* Fix #2781: RawCustomResourceOperationsImpl#delete now returns a boolean value for deletion status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@
import io.fabric8.kubernetes.client.dsl.base.BaseOperation;
import io.fabric8.kubernetes.client.dsl.base.OperationContext;
import io.fabric8.kubernetes.client.informers.impl.DefaultSharedIndexInformer;
import io.fabric8.kubernetes.client.utils.Pluralize;
import io.fabric8.kubernetes.client.utils.Utils;
import io.fabric8.kubernetes.internal.KubernetesDeserializer;
import java.lang.reflect.Type;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -45,9 +46,9 @@
* which is ported from offical go client https://github.com/kubernetes/client-go/blob/master/informers/factory.go
*/
public class SharedInformerFactory extends BaseOperation {
private final Map<Type, SharedIndexInformer> informers = new HashMap<>();
private final Map<String, SharedIndexInformer> informers = new HashMap<>();

private final Map<Type, Future> startedInformers = new HashMap<>();
private final Map<String, Future> startedInformers = new HashMap<>();

private final ExecutorService informerExecutor;

Expand Down Expand Up @@ -198,7 +199,7 @@ private synchronized <T extends HasMetadata, L extends KubernetesResourceList<T>
}
}
SharedIndexInformer<T> informer = new DefaultSharedIndexInformer<>(apiTypeClass, listerWatcher, resyncPeriodInMillis, context, eventListeners);
this.informers.put(apiTypeClass, informer);
this.informers.put(getInformerKey(context), informer);
return informer;
}

Expand All @@ -223,14 +224,21 @@ public Watch watch(ListOptions params, String namespace, OperationContext contex

/**
* Gets existing shared index informer, return null if the requesting informer
* is never constructed.
* is never constructed. If there are multiple SharedIndexInformer objects corresponding
* to a Kubernetes resource, then it returns the first one
*
* @param apiTypeClass API type class
* @param <T> type of API type
* @return SharedIndexInformer object
*/
public synchronized <T> SharedIndexInformer<T> getExistingSharedIndexInformer(Class<T> apiTypeClass) {
return this.informers.get(apiTypeClass);
SharedIndexInformer<T> foundSharedIndexInformer = null;
for (Map.Entry<String, SharedIndexInformer> entry : this.informers.entrySet()) {
if (entry.getKey().contains(Pluralize.toPlural(apiTypeClass.getSimpleName().toLowerCase()))) {
foundSharedIndexInformer = (SharedIndexInformer<T>) entry.getValue();
}
}
return foundSharedIndexInformer;
}

/**
Expand Down Expand Up @@ -279,6 +287,33 @@ public void addSharedInformerEventListener(SharedInformerEventListener event) {
this.eventListeners.add(event);
}

Map<String, SharedIndexInformer> getInformers() {
return this.informers;
}

static String getInformerKey(OperationContext operationContext) {
StringBuilder keyBuilder = new StringBuilder();
if (operationContext.getApiGroupName() == null) {
keyBuilder.append(operationContext.getApiGroupVersion());
} else {
keyBuilder.append(operationContext.getApiGroupName()).append("/").append(operationContext.getApiGroupVersion());
}
keyBuilder.append(getKeyStrForField(operationContext.getPlural()));
keyBuilder.append(getKeyStrForField(operationContext.getNamespace()));
keyBuilder.append(getKeyStrForField(operationContext.getName()));

return keyBuilder.toString();
}

private static String getKeyStrForField(String str) {
StringBuilder keyBuilder = new StringBuilder();
if (Utils.isNotNullOrEmpty(str)) {
keyBuilder.append("/");
keyBuilder.append(str);
}
return keyBuilder.toString();
}

private <T extends HasMetadata, L extends KubernetesResourceList<T>> BaseOperation<T, L, ?> getConfiguredBaseOperation(String namespace, OperationContext context, Class<T> apiTypeClass, Class<L> apiListTypeClass) {
BaseOperation<T, L, ?> baseOperationWithContext;
// Avoid adding Namespace if it's picked from Global Configuration
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client.informers;

import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.client.dsl.base.OperationContext;
import io.fabric8.kubernetes.model.annotation.Group;
import io.fabric8.kubernetes.model.annotation.Version;
import okhttp3.OkHttpClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.util.Collections;
import java.util.concurrent.ExecutorService;

import static io.fabric8.kubernetes.client.informers.SharedInformerFactory.getInformerKey;
import static org.assertj.core.api.Assertions.assertThat;

class SharedInformerFactoryTest {
private OkHttpClient mockClient;
private Config config;
private ExecutorService executorService;
private static class TestCustomResourceSpec { }
private static class TestCustomResourceStatus { }

@Group("io.fabric8")
@Version("v1")
private static class TestCustomResource extends CustomResource<TestCustomResourceSpec, TestCustomResourceStatus> { }

@BeforeEach
void init() {
this.mockClient = Mockito.mock(OkHttpClient.class, Mockito.RETURNS_DEEP_STUBS);
this.config = new ConfigBuilder().withMasterUrl("https://localhost:8443/").build();
this.executorService = Mockito.mock(ExecutorService.class, Mockito.RETURNS_DEEP_STUBS);
}

@Test
void testGetInformerKey() {
assertThat(getInformerKey(new OperationContext()
.withApiGroupVersion("v1")
.withPlural("pods"))).isEqualTo("v1/pods");
assertThat(getInformerKey(new OperationContext()
.withApiGroupVersion("v1")
.withNamespace("ns1")
.withPlural("pods"))).isEqualTo("v1/pods/ns1");
assertThat(getInformerKey(new OperationContext()
.withApiGroupVersion("v1")
.withApiGroupName("io.fabric8")
.withPlural("testcustomresources"))).isEqualTo("io.fabric8/v1/testcustomresources");
assertThat(getInformerKey(new OperationContext()
.withApiGroupVersion("v1beta1")
.withApiGroupName("io.fabric8")
.withPlural("testcustomresources"))).isEqualTo("io.fabric8/v1beta1/testcustomresources");
assertThat(getInformerKey(new OperationContext()
.withApiGroupVersion("v1beta1")
.withApiGroupName("extensions")
.withPlural("deployments"))).isEqualTo("extensions/v1beta1/deployments");
}

@Test
void testInformersCreatedWithSameNameButDifferentCRDContext() {
// Given
SharedInformerFactory sharedInformerFactory = new SharedInformerFactory(executorService, mockClient, config);

// When
sharedInformerFactory.sharedIndexInformerForCustomResource(TestCustomResource.class, new OperationContext()
.withApiGroupVersion("v1")
.withPlural("testcustomresources"), 10 * 1000L);
sharedInformerFactory.sharedIndexInformerForCustomResource(TestCustomResource.class, new OperationContext()
.withApiGroupVersion("v1beta1")
.withPlural("testcustomresources"), 10 * 1000L);

// Then
assertThat(sharedInformerFactory.getInformers())
.hasSize(2);
}

@Test
void testGetExistingSharedIndexInformer() {
// Given
SharedInformerFactory sharedInformerFactory = new SharedInformerFactory(executorService, mockClient, config);

// When
sharedInformerFactory.sharedIndexInformerFor(Deployment.class, 10 * 1000L);
sharedInformerFactory.sharedIndexInformerFor(Pod.class, 10 * 1000L);

// Then
assertThat(sharedInformerFactory.getExistingSharedIndexInformer(Deployment.class)).isNotNull();
assertThat(sharedInformerFactory.getExistingSharedIndexInformer(Pod.class)).isNotNull();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,58 @@ void testCustomResourceInformerWithNoListTypeInClassPath() throws InterruptedExc
assertEquals(0, foundExistingCronTab.getCount());
}

@Test
@DisplayName("Test Informers of Same Resource but watching different namespaces")
void testDifferentDeploymentInformersWatchingInDifferentNamespaces() throws InterruptedException {
// Given
setupMockServerExpectations(Deployment.class, "ns1", this::getList, r -> new WatchEvent(new DeploymentBuilder()
.withNewMetadata().withName("d1")
.withResourceVersion(r).endMetadata().build(), "ADDED"));
setupMockServerExpectations(Deployment.class, "ns2", this::getList, r -> new WatchEvent(new DeploymentBuilder()
.withNewMetadata().withName("d2")
.withResourceVersion(r).endMetadata().build(), "ADDED"));
CountDownLatch ns1FoundLatch = new CountDownLatch(1);
CountDownLatch ns2FoundLatch = new CountDownLatch(1);

// When
SharedIndexInformer<Deployment> deploymentSharedIndexInformerInNamespace1 = factory.inNamespace("ns1").sharedIndexInformerFor(Deployment.class, 60 * WATCH_EVENT_EMIT_TIME);
SharedIndexInformer<Deployment> deploymentSharedIndexInformerInNamespace2 = factory.inNamespace("ns2").sharedIndexInformerFor(Deployment.class, 60 * WATCH_EVENT_EMIT_TIME);
deploymentSharedIndexInformerInNamespace1.addEventHandler(new TestResourceHandler<>(ns1FoundLatch, "d1"));
deploymentSharedIndexInformerInNamespace2.addEventHandler(new TestResourceHandler<>(ns2FoundLatch, "d2"));
factory.startAllRegisteredInformers();
ns1FoundLatch.await(LATCH_AWAIT_PERIOD_IN_SECONDS, TimeUnit.SECONDS);
ns2FoundLatch.await(LATCH_AWAIT_PERIOD_IN_SECONDS, TimeUnit.SECONDS);

// Then
assertEquals(0, ns1FoundLatch.getCount());
assertEquals(0, ns2FoundLatch.getCount());
}

@Test
@DisplayName("Test CustomResource Informers with different versions")
void testCustomResourceInformerWithDifferentVersions() throws InterruptedException {
// Given
setupMockServerExpectationsWithVersion(CronTab.class, "v1", "default", this::getList, r -> new WatchEvent(getCronTab("v1-crontab", r), "ADDED"));
setupMockServerExpectationsWithVersion(CronTab.class, "v1beta1", "default", this::getList, r -> new WatchEvent(getCronTab("v1beta1-crontab", r), "ADDED"));
CountDownLatch v1CronTabFound = new CountDownLatch(1);
CountDownLatch v1beta1CronTabFound = new CountDownLatch(1);

// When
SharedIndexInformer<CronTab> v1CronTabSharedIndexInformer = factory.inNamespace("default")
.sharedIndexInformerForCustomResource(CronTab.class, 60 * WATCH_EVENT_EMIT_TIME);
SharedIndexInformer<CronTab> v1beta1CronTabSharedIndexInformer = factory.inNamespace("default")
.sharedIndexInformerForCustomResource(CronTab.class, new OperationContext().withApiGroupVersion("v1beta1"), 60 * WATCH_EVENT_EMIT_TIME);
v1CronTabSharedIndexInformer.addEventHandler(new TestResourceHandler<>(v1CronTabFound, "v1-crontab"));
v1beta1CronTabSharedIndexInformer.addEventHandler(new TestResourceHandler<>(v1beta1CronTabFound, "v1beta1-crontab"));
factory.startAllRegisteredInformers();
v1CronTabFound.await(LATCH_AWAIT_PERIOD_IN_SECONDS, TimeUnit.SECONDS);
v1beta1CronTabFound.await(LATCH_AWAIT_PERIOD_IN_SECONDS, TimeUnit.SECONDS);

// Then
assertEquals(0, v1CronTabFound.getCount());
assertEquals(0, v1beta1CronTabFound.getCount());
}

private KubernetesResource getAnimal(String name, String order, String resourceVersion) {
AnimalSpec animalSpec = new AnimalSpec();
animalSpec.setOrder(order);
Expand Down Expand Up @@ -741,8 +793,12 @@ private KubernetesResource getCronTab(String name, String resourceVersion) {
}

private <T extends HasMetadata> void setupMockServerExpectations(Class<T> resourceClass, String namespace, BiFunction<String, Class<T>, KubernetesResourceList<T>> listSupplier, Function<String, WatchEvent> watchEventSupplier) {
setupMockServerExpectationsWithVersion(resourceClass, HasMetadata.getVersion(resourceClass), namespace, listSupplier, watchEventSupplier);
}

private <T extends HasMetadata> void setupMockServerExpectationsWithVersion(Class<T> resourceClass, String version, String namespace, BiFunction<String, Class<T>, KubernetesResourceList<T>> listSupplier, Function<String, WatchEvent> watchEventSupplier) {
String startResourceVersion = "1000", endResourceVersion = "1001";
String url = "/apis/" + HasMetadata.getGroup(resourceClass) +"/" + HasMetadata.getVersion(resourceClass);
String url = "/apis/" + HasMetadata.getGroup(resourceClass) +"/" + version;
if (namespace != null) {
url += ("/namespaces/" + namespace);
}
Expand Down

0 comments on commit 44b9722

Please sign in to comment.