From c59623f1fa1362ea5f2a638245bcb097c1f61dee Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Fri, 7 Aug 2020 21:49:07 -0700 Subject: [PATCH 1/3] Fix generic watches. --- .../util/generic/GenericKubernetesApi.java | 3 ++- .../util/generic/GenericKubernetesApiTest.java | 17 +++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/util/src/main/java/io/kubernetes/client/util/generic/GenericKubernetesApi.java b/util/src/main/java/io/kubernetes/client/util/generic/GenericKubernetesApi.java index 078c14afd6..6c8cf04146 100644 --- a/util/src/main/java/io/kubernetes/client/util/generic/GenericKubernetesApi.java +++ b/util/src/main/java/io/kubernetes/client/util/generic/GenericKubernetesApi.java @@ -661,9 +661,10 @@ public Watchable watch(String namespace, final ListOptions listOptions) throw new IllegalArgumentException("invalid namespace"); } Call call = - customObjectsApi.listClusterCustomObjectCall( + customObjectsApi.listNamespacedCustomObjectCall( this.apiGroup, this.apiVersion, + namespace, this.resourcePlural, null, listOptions.getContinue(), diff --git a/util/src/test/java/io/kubernetes/client/util/generic/GenericKubernetesApiTest.java b/util/src/test/java/io/kubernetes/client/util/generic/GenericKubernetesApiTest.java index 7355247ac7..a75bc7fe73 100644 --- a/util/src/test/java/io/kubernetes/client/util/generic/GenericKubernetesApiTest.java +++ b/util/src/test/java/io/kubernetes/client/util/generic/GenericKubernetesApiTest.java @@ -19,8 +19,11 @@ import com.google.gson.Gson; import io.kubernetes.client.custom.V1Patch; import io.kubernetes.client.openapi.ApiClient; +import io.kubernetes.client.openapi.ApiException; import io.kubernetes.client.openapi.models.*; import io.kubernetes.client.util.ClientBuilder; +import io.kubernetes.client.util.Watchable; +import io.kubernetes.client.util.generic.options.ListOptions; import java.io.IOException; import java.net.SocketTimeoutException; import java.util.concurrent.TimeUnit; @@ -163,6 +166,20 @@ public void patchNamespacedJobReturningObject() { verify(1, patchRequestedFor(urlPathEqualTo("/apis/batch/v1/namespaces/default/jobs/foo1"))); } + @Test + public void watchNamespacedJobReturningObject() throws ApiException { + V1JobList jobList = new V1JobList().kind("JobList").metadata(new V1ListMeta()); + + stubFor( + get(urlPathEqualTo("/apis/batch/v1/namespaces/default/jobs")) + .willReturn(aResponse().withStatus(200).withBody(new Gson().toJson(jobList)))); + Watchable jobListWatch = jobClient.watch("default", new ListOptions()); + verify( + 1, + getRequestedFor(urlPathEqualTo("/apis/batch/v1/namespaces/default/jobs")) + .withQueryParam("watch", equalTo("true"))); + } + @Test public void testReadTimeoutShouldThrowException() { ApiClient apiClient = new ClientBuilder().setBasePath("http://localhost:" + 8181).build(); From fdccec20d3603f49a48909655e61300184182da6 Mon Sep 17 00:00:00 2001 From: yue9944882 <291271447@qq.com> Date: Thu, 6 Aug 2020 18:09:01 +0800 Subject: [PATCH 2/3] graceful shutdown sharedprocessor --- .../client/informer/cache/SharedProcessor.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/util/src/main/java/io/kubernetes/client/informer/cache/SharedProcessor.java b/util/src/main/java/io/kubernetes/client/informer/cache/SharedProcessor.java index d87b2a6c09..e28b73d933 100644 --- a/util/src/main/java/io/kubernetes/client/informer/cache/SharedProcessor.java +++ b/util/src/main/java/io/kubernetes/client/informer/cache/SharedProcessor.java @@ -23,12 +23,16 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.collections4.CollectionUtils; import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /* * SharedProcessor class manages all the registered ProcessorListener and distributes notifications. */ public class SharedProcessor { + private static final Logger log = LoggerFactory.getLogger(SharedProcessor.class); + private ReadWriteLock lock = new ReentrantReadWriteLock(); private List> listeners; @@ -155,16 +159,16 @@ public void stop() { } finally { lock.writeLock().unlock(); } - // Disable new tasks from being submitted - executorService.shutdown(); + // Interrupts running listeners by signalling InterruptedException + executorService.shutdownNow(); try { - // Wait a while for existing tasks to terminate + // Hold until all the listeners exits if (!executorService.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) { - // Cancel currently executing tasks - executorService.shutdownNow(); + log.warn( + "SharedProcessors wasn't gracefully terminated, there can be listener thread leakage"); } } catch (InterruptedException e) { - executorService.shutdownNow(); + log.error("Graceful shutdown process of SharedProcessors was interrupted"); } } } From b6ac710b5489e3a43000eba72e9540dc7f5f1b18 Mon Sep 17 00:00:00 2001 From: yue9944882 <291271447@qq.com> Date: Thu, 6 Aug 2020 20:06:12 +0800 Subject: [PATCH 3/3] removing failing tests --- .../informer/cache/SharedProcessorTest.java | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/util/src/test/java/io/kubernetes/client/informer/cache/SharedProcessorTest.java b/util/src/test/java/io/kubernetes/client/informer/cache/SharedProcessorTest.java index fcd65e745f..56d185d66b 100644 --- a/util/src/test/java/io/kubernetes/client/informer/cache/SharedProcessorTest.java +++ b/util/src/test/java/io/kubernetes/client/informer/cache/SharedProcessorTest.java @@ -67,22 +67,6 @@ public void testListenerAddition() throws InterruptedException { public void testShutdownGracefully() throws InterruptedException { SharedProcessor sharedProcessor = new SharedProcessor<>(Executors.newCachedThreadPool(), Duration.ofSeconds(5)); - TestWorker quickWorker = new TestWorker<>(null, 0); - quickWorker.setTask( - () -> { - try { - // sleep 2s so that it could terminate within timeout(5s) - Thread.sleep(2000); - } catch (InterruptedException e) { - } - }); - long before = System.currentTimeMillis(); - sharedProcessor.addAndStartListener(quickWorker); - sharedProcessor.stop(); - // the stopping worker properly blocks the processor's stop call - assertTrue(System.currentTimeMillis() - before >= 2000); - - sharedProcessor = new SharedProcessor<>(Executors.newCachedThreadPool(), Duration.ofSeconds(5)); TestWorker slowWorker = new TestWorker<>(null, 0); final boolean[] interrupted = {false}; slowWorker.setTask(