Skip to content

Commit

Permalink
Fix #1035 : RejectedExecutionException in WatchHTTPManager
Browse files Browse the repository at this point in the history
+ Since RejectedExecutionException is a 'normal' situation that could
  happen in a healthy system. Move exception in reconnect stacktraces
  to logger.debug levels.
+ Added a check to avoid submitting task to executor if it's shutdown.
+ Updated CHANGELOG.md
  • Loading branch information
rohanKanojia committed Apr 13, 2018
1 parent 79286a2 commit d3add2b
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 63 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Bugs
* Fix #1013 : Kubernetes connection is not getting closed.
* Fix #1004: Multiple document handling breaks if "---" found anywhere in the document
* Fix #1035 : RejectedExecutionException in WatchHTTPManager

Bugs
* Impersonation parameters not set in withRequestConfig - https://github.com/fabric8io/kubernetes-client/pull/1037
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,40 +302,43 @@ public void onClosed(WebSocket webSocket, int code, String reason) {
private void scheduleReconnect() {

logger.debug("Submitting reconnect task to the executor");
// make sure that whichever thread calls this method, the tasks are
// performed serially in the executor
executor.submit(new NamedRunnable("scheduleReconnect") {
@Override
public void execute() {
if (!reconnectPending.compareAndSet(false, true)) {
logger.debug("Reconnect already scheduled");
return;
}
webSocketRef.set(null);
try {
// actual reconnect only after the back-off time has passed, without
// blocking the thread
logger.debug("Scheduling reconnect task");
executor.schedule(new NamedRunnable("reconnectAttempt") {
@Override
public void execute() {
try {
runWatch();
reconnectPending.set(false);
} catch (Exception e) {
// An unexpected error occurred and we didn't even get an onFailure callback.
logger.error("Exception in reconnect", e);
webSocketRef.set(null);
closeEvent(new KubernetesClientException("Unhandled exception in reconnect attempt", e));
close();
// Don't submit new tasks after having called shutdown() on executor
if(!executor.isShutdown()) {
// make sure that whichever thread calls this method, the tasks are
// performed serially in the executor
executor.submit(new NamedRunnable("scheduleReconnect") {
@Override
public void execute() {
if (!reconnectPending.compareAndSet(false, true)) {
logger.debug("Reconnect already scheduled");
return;
}
webSocketRef.set(null);
try {
// actual reconnect only after the back-off time has passed, without
// blocking the thread
logger.debug("Scheduling reconnect task");
executor.schedule(new NamedRunnable("reconnectAttempt") {
@Override
public void execute() {
try {
runWatch();
reconnectPending.set(false);
} catch (Exception e) {
// An unexpected error occurred and we didn't even get an onFailure callback.
logger.error("Exception in reconnect", e);
webSocketRef.set(null);
closeEvent(new KubernetesClientException("Unhandled exception in reconnect attempt", e));
close();
}
}
}
}, nextReconnectInterval(), TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
reconnectPending.set(false);
}, nextReconnectInterval(), TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
reconnectPending.set(false);
}
}
}
});
});
}
}

public void waitUntilReady() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,39 +201,42 @@ private void scheduleReconnect() {
}

logger.debug("Submitting reconnect task to the executor");
// make sure that whichever thread calls this method, the tasks are
// performed serially in the executor.
executor.submit(new Runnable() {
@Override
public void run() {
if (!reconnectPending.compareAndSet(false, true)) {
logger.debug("Reconnect already scheduled");
return;
}
try {
// actual reconnect only after the back-off time has passed, without
// blocking the thread
logger.debug("Scheduling reconnect task");
executor.schedule(new Runnable() {
@Override
public void run() {
try {
WatchHTTPManager.this.runWatch();
reconnectPending.set(false);
} catch (Exception e) {
// An unexpected error occurred and we didn't even get an onFailure callback.
logger.error("Exception in reconnect", e);
close();
watcher.onClose(new KubernetesClientException("Unhandled exception in reconnect attempt", e));
// Don't submit new tasks after having called shutdown() on executor
if(!executor.isShutdown()) {
// make sure that whichever thread calls this method, the tasks are
// performed serially in the executor.
executor.submit(new Runnable() {
@Override
public void run() {
if (!reconnectPending.compareAndSet(false, true)) {
logger.debug("Reconnect already scheduled");
return;
}
try {
// actual reconnect only after the back-off time has passed, without
// blocking the thread
logger.debug("Scheduling reconnect task");
executor.schedule(new Runnable() {
@Override
public void run() {
try {
WatchHTTPManager.this.runWatch();
reconnectPending.set(false);
} catch (Exception e) {
// An unexpected error occurred and we didn't even get an onFailure callback.
logger.error("Exception in reconnect", e);
close();
watcher.onClose(new KubernetesClientException("Unhandled exception in reconnect attempt", e));
}
}
}
}, nextReconnectInterval(), TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
logger.error("Exception in reconnect", e);
reconnectPending.set(false);
}, nextReconnectInterval(), TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
logger.error("Exception in reconnect", e);
reconnectPending.set(false);
}
}
}
});
});
}
}

public void onMessage(String messageSource) throws IOException {
Expand Down

0 comments on commit d3add2b

Please sign in to comment.