Skip to content

Commit

Permalink
Merge pull request #1051 from rohanKanojia/issue1035
Browse files Browse the repository at this point in the history
  • Loading branch information
fusesource-ci authored Apr 18, 2018
2 parents c04e9a4 + d3add2b commit 5b1a57b
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 63 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Bugs
* Fix #1013 : Kubernetes connection is not getting closed.
* Fix #1004: Multiple document handling breaks if "---" found anywhere in the document
* Fix #1035 : RejectedExecutionException in WatchHTTPManager

Bugs
* Impersonation parameters not set in withRequestConfig - https://github.com/fabric8io/kubernetes-client/pull/1037
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,40 +302,43 @@ public void onClosed(WebSocket webSocket, int code, String reason) {
private void scheduleReconnect() {

logger.debug("Submitting reconnect task to the executor");
// make sure that whichever thread calls this method, the tasks are
// performed serially in the executor
executor.submit(new NamedRunnable("scheduleReconnect") {
@Override
public void execute() {
if (!reconnectPending.compareAndSet(false, true)) {
logger.debug("Reconnect already scheduled");
return;
}
webSocketRef.set(null);
try {
// actual reconnect only after the back-off time has passed, without
// blocking the thread
logger.debug("Scheduling reconnect task");
executor.schedule(new NamedRunnable("reconnectAttempt") {
@Override
public void execute() {
try {
runWatch();
reconnectPending.set(false);
} catch (Exception e) {
// An unexpected error occurred and we didn't even get an onFailure callback.
logger.error("Exception in reconnect", e);
webSocketRef.set(null);
closeEvent(new KubernetesClientException("Unhandled exception in reconnect attempt", e));
close();
// Don't submit new tasks after having called shutdown() on executor
if(!executor.isShutdown()) {
// make sure that whichever thread calls this method, the tasks are
// performed serially in the executor
executor.submit(new NamedRunnable("scheduleReconnect") {
@Override
public void execute() {
if (!reconnectPending.compareAndSet(false, true)) {
logger.debug("Reconnect already scheduled");
return;
}
webSocketRef.set(null);
try {
// actual reconnect only after the back-off time has passed, without
// blocking the thread
logger.debug("Scheduling reconnect task");
executor.schedule(new NamedRunnable("reconnectAttempt") {
@Override
public void execute() {
try {
runWatch();
reconnectPending.set(false);
} catch (Exception e) {
// An unexpected error occurred and we didn't even get an onFailure callback.
logger.error("Exception in reconnect", e);
webSocketRef.set(null);
closeEvent(new KubernetesClientException("Unhandled exception in reconnect attempt", e));
close();
}
}
}
}, nextReconnectInterval(), TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
reconnectPending.set(false);
}, nextReconnectInterval(), TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
reconnectPending.set(false);
}
}
}
});
});
}
}

public void waitUntilReady() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,39 +201,42 @@ private void scheduleReconnect() {
}

logger.debug("Submitting reconnect task to the executor");
// make sure that whichever thread calls this method, the tasks are
// performed serially in the executor.
executor.submit(new Runnable() {
@Override
public void run() {
if (!reconnectPending.compareAndSet(false, true)) {
logger.debug("Reconnect already scheduled");
return;
}
try {
// actual reconnect only after the back-off time has passed, without
// blocking the thread
logger.debug("Scheduling reconnect task");
executor.schedule(new Runnable() {
@Override
public void run() {
try {
WatchHTTPManager.this.runWatch();
reconnectPending.set(false);
} catch (Exception e) {
// An unexpected error occurred and we didn't even get an onFailure callback.
logger.error("Exception in reconnect", e);
close();
watcher.onClose(new KubernetesClientException("Unhandled exception in reconnect attempt", e));
// Don't submit new tasks after having called shutdown() on executor
if(!executor.isShutdown()) {
// make sure that whichever thread calls this method, the tasks are
// performed serially in the executor.
executor.submit(new Runnable() {
@Override
public void run() {
if (!reconnectPending.compareAndSet(false, true)) {
logger.debug("Reconnect already scheduled");
return;
}
try {
// actual reconnect only after the back-off time has passed, without
// blocking the thread
logger.debug("Scheduling reconnect task");
executor.schedule(new Runnable() {
@Override
public void run() {
try {
WatchHTTPManager.this.runWatch();
reconnectPending.set(false);
} catch (Exception e) {
// An unexpected error occurred and we didn't even get an onFailure callback.
logger.error("Exception in reconnect", e);
close();
watcher.onClose(new KubernetesClientException("Unhandled exception in reconnect attempt", e));
}
}
}
}, nextReconnectInterval(), TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
logger.error("Exception in reconnect", e);
reconnectPending.set(false);
}, nextReconnectInterval(), TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
logger.error("Exception in reconnect", e);
reconnectPending.set(false);
}
}
}
});
});
}
}

public void onMessage(String messageSource) throws IOException {
Expand Down

0 comments on commit 5b1a57b

Please sign in to comment.