diff --git a/CHANGELOG.md b/CHANGELOG.md index 3dacae43f02..2395e30fd7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ Bugs * Fix #1013 : Kubernetes connection is not getting closed. * Fix #1004: Multiple document handling breaks if "---" found anywhere in the document + * Fix #1035 : RejectedExecutionException in WatchHTTPManager Bugs * Impersonation parameters not set in withRequestConfig - https://github.com/fabric8io/kubernetes-client/pull/1037 diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java index 701ca7c9b9d..9b2cd092204 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java @@ -302,40 +302,43 @@ public void onClosed(WebSocket webSocket, int code, String reason) { private void scheduleReconnect() { logger.debug("Submitting reconnect task to the executor"); - // make sure that whichever thread calls this method, the tasks are - // performed serially in the executor - executor.submit(new NamedRunnable("scheduleReconnect") { - @Override - public void execute() { - if (!reconnectPending.compareAndSet(false, true)) { - logger.debug("Reconnect already scheduled"); - return; - } - webSocketRef.set(null); - try { - // actual reconnect only after the back-off time has passed, without - // blocking the thread - logger.debug("Scheduling reconnect task"); - executor.schedule(new NamedRunnable("reconnectAttempt") { - @Override - public void execute() { - try { - runWatch(); - reconnectPending.set(false); - } catch (Exception e) { - // An unexpected error occurred and we didn't even get an onFailure callback. - logger.error("Exception in reconnect", e); - webSocketRef.set(null); - closeEvent(new KubernetesClientException("Unhandled exception in reconnect attempt", e)); - close(); + // Don't submit new tasks after having called shutdown() on executor + if(!executor.isShutdown()) { + // make sure that whichever thread calls this method, the tasks are + // performed serially in the executor + executor.submit(new NamedRunnable("scheduleReconnect") { + @Override + public void execute() { + if (!reconnectPending.compareAndSet(false, true)) { + logger.debug("Reconnect already scheduled"); + return; + } + webSocketRef.set(null); + try { + // actual reconnect only after the back-off time has passed, without + // blocking the thread + logger.debug("Scheduling reconnect task"); + executor.schedule(new NamedRunnable("reconnectAttempt") { + @Override + public void execute() { + try { + runWatch(); + reconnectPending.set(false); + } catch (Exception e) { + // An unexpected error occurred and we didn't even get an onFailure callback. + logger.error("Exception in reconnect", e); + webSocketRef.set(null); + closeEvent(new KubernetesClientException("Unhandled exception in reconnect attempt", e)); + close(); + } } - } - }, nextReconnectInterval(), TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException e) { - reconnectPending.set(false); + }, nextReconnectInterval(), TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException e) { + reconnectPending.set(false); + } } - } - }); + }); + } } public void waitUntilReady() { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java index 24230f66c9b..98f349b57fa 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java @@ -201,39 +201,42 @@ private void scheduleReconnect() { } logger.debug("Submitting reconnect task to the executor"); - // make sure that whichever thread calls this method, the tasks are - // performed serially in the executor. - executor.submit(new Runnable() { - @Override - public void run() { - if (!reconnectPending.compareAndSet(false, true)) { - logger.debug("Reconnect already scheduled"); - return; - } - try { - // actual reconnect only after the back-off time has passed, without - // blocking the thread - logger.debug("Scheduling reconnect task"); - executor.schedule(new Runnable() { - @Override - public void run() { - try { - WatchHTTPManager.this.runWatch(); - reconnectPending.set(false); - } catch (Exception e) { - // An unexpected error occurred and we didn't even get an onFailure callback. - logger.error("Exception in reconnect", e); - close(); - watcher.onClose(new KubernetesClientException("Unhandled exception in reconnect attempt", e)); + // Don't submit new tasks after having called shutdown() on executor + if(!executor.isShutdown()) { + // make sure that whichever thread calls this method, the tasks are + // performed serially in the executor. + executor.submit(new Runnable() { + @Override + public void run() { + if (!reconnectPending.compareAndSet(false, true)) { + logger.debug("Reconnect already scheduled"); + return; + } + try { + // actual reconnect only after the back-off time has passed, without + // blocking the thread + logger.debug("Scheduling reconnect task"); + executor.schedule(new Runnable() { + @Override + public void run() { + try { + WatchHTTPManager.this.runWatch(); + reconnectPending.set(false); + } catch (Exception e) { + // An unexpected error occurred and we didn't even get an onFailure callback. + logger.error("Exception in reconnect", e); + close(); + watcher.onClose(new KubernetesClientException("Unhandled exception in reconnect attempt", e)); + } } - } - }, nextReconnectInterval(), TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException e) { - logger.error("Exception in reconnect", e); - reconnectPending.set(false); + }, nextReconnectInterval(), TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException e) { + logger.error("Exception in reconnect", e); + reconnectPending.set(false); + } } - } - }); + }); + } } public void onMessage(String messageSource) throws IOException {