Skip to content

Commit

Permalink
Merge pull request #1148 from brendandburns/cherry-pick
Browse files Browse the repository at this point in the history
Cherry pick some fixes to set the stage for 9.0.1
  • Loading branch information
k8s-ci-robot authored Aug 12, 2020
2 parents dd06710 + b6ac710 commit fe754c3
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.collections4.CollectionUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*
* SharedProcessor class manages all the registered ProcessorListener and distributes notifications.
*/
public class SharedProcessor<ApiType extends KubernetesObject> {

private static final Logger log = LoggerFactory.getLogger(SharedProcessor.class);

private ReadWriteLock lock = new ReentrantReadWriteLock();

private List<ProcessorListener<ApiType>> listeners;
Expand Down Expand Up @@ -155,16 +159,16 @@ public void stop() {
} finally {
lock.writeLock().unlock();
}
// Disable new tasks from being submitted
executorService.shutdown();
// Interrupts running listeners by signalling InterruptedException
executorService.shutdownNow();
try {
// Wait a while for existing tasks to terminate
// Hold until all the listeners exits
if (!executorService.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
// Cancel currently executing tasks
executorService.shutdownNow();
log.warn(
"SharedProcessors wasn't gracefully terminated, there can be listener thread leakage");
}
} catch (InterruptedException e) {
executorService.shutdownNow();
log.error("Graceful shutdown process of SharedProcessors was interrupted");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -661,9 +661,10 @@ public Watchable<ApiType> watch(String namespace, final ListOptions listOptions)
throw new IllegalArgumentException("invalid namespace");
}
Call call =
customObjectsApi.listClusterCustomObjectCall(
customObjectsApi.listNamespacedCustomObjectCall(
this.apiGroup,
this.apiVersion,
namespace,
this.resourcePlural,
null,
listOptions.getContinue(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,6 @@ public void testListenerAddition() throws InterruptedException {
public void testShutdownGracefully() throws InterruptedException {
SharedProcessor<V1Pod> sharedProcessor =
new SharedProcessor<>(Executors.newCachedThreadPool(), Duration.ofSeconds(5));
TestWorker<V1Pod> quickWorker = new TestWorker<>(null, 0);
quickWorker.setTask(
() -> {
try {
// sleep 2s so that it could terminate within timeout(5s)
Thread.sleep(2000);
} catch (InterruptedException e) {
}
});
long before = System.currentTimeMillis();
sharedProcessor.addAndStartListener(quickWorker);
sharedProcessor.stop();
// the stopping worker properly blocks the processor's stop call
assertTrue(System.currentTimeMillis() - before >= 2000);

sharedProcessor = new SharedProcessor<>(Executors.newCachedThreadPool(), Duration.ofSeconds(5));
TestWorker<V1Pod> slowWorker = new TestWorker<>(null, 0);
final boolean[] interrupted = {false};
slowWorker.setTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
import com.google.gson.Gson;
import io.kubernetes.client.custom.V1Patch;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.models.*;
import io.kubernetes.client.util.ClientBuilder;
import io.kubernetes.client.util.Watchable;
import io.kubernetes.client.util.generic.options.ListOptions;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -163,6 +166,20 @@ public void patchNamespacedJobReturningObject() {
verify(1, patchRequestedFor(urlPathEqualTo("/apis/batch/v1/namespaces/default/jobs/foo1")));
}

@Test
public void watchNamespacedJobReturningObject() throws ApiException {
V1JobList jobList = new V1JobList().kind("JobList").metadata(new V1ListMeta());

stubFor(
get(urlPathEqualTo("/apis/batch/v1/namespaces/default/jobs"))
.willReturn(aResponse().withStatus(200).withBody(new Gson().toJson(jobList))));
Watchable<V1Job> jobListWatch = jobClient.watch("default", new ListOptions());
verify(
1,
getRequestedFor(urlPathEqualTo("/apis/batch/v1/namespaces/default/jobs"))
.withQueryParam("watch", equalTo("true")));
}

@Test
public void testReadTimeoutShouldThrowException() {
ApiClient apiClient = new ClientBuilder().setBasePath("http://localhost:" + 8181).build();
Expand Down

0 comments on commit fe754c3

Please sign in to comment.