Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes around exceptional cases in watchers #2671

Merged
merged 3 commits into from
Dec 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 5.0-SNAPSHOT

#### Bugs
* Fix: Reliability improvements to watchers
* Fix #2592: ConcurrentModificationException in CRUD KubernetesMockServer
* Fix #2519: Generated schemas contains a valid meta-schema URI reference (`http://json-schema.org/draft-05/schema#`)
* Fix #2631: Handle null values when getting current context on OIDC interceptors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public void eventReceived(Action action, T resource) {
case DELETED:
if (condition.test(null)) {
future.complete(null);
} else {
future.completeExceptionally(new WatcherException("Unexpected deletion of watched resource, will never satisfy condition"));
}
break;
case ERROR:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,25 +98,16 @@ private void runWatch() {
String fieldQueryString = baseOperation.getFieldQueryParam();
String name = baseOperation.getName();

// for API groups we can use the name in the path rather than a fieldSelector
// which is more likely to work well for API Groups
if (name != null && name.length() > 0) {
if (baseOperation.isApiGroup()) {
httpUrlBuilder.addPathSegment(name);
} else {
if (fieldQueryString.length() > 0) {
fieldQueryString += ",";
}
fieldQueryString += "metadata.name=" + name;
if (fieldQueryString.length() > 0) {
fieldQueryString += ",";
}
fieldQueryString += "metadata.name=" + name;
}
if (Utils.isNotNullOrEmpty(fieldQueryString)) {
if (baseOperation.isApiGroup()) {
logger.warn("Ignoring field selector " + fieldQueryString + " on watch URI " + requestUrl + " as fieldSelector is not yet supported on API Groups APIs");
} else {
httpUrlBuilder.addQueryParameter("fieldSelector", fieldQueryString);
}
httpUrlBuilder.addQueryParameter("fieldSelector", fieldQueryString);
}

listOptions.setResourceVersion(resourceVersion.get());
HttpClientUtils.appendListOptionParams(httpUrlBuilder, listOptions);

Expand Down Expand Up @@ -244,6 +235,8 @@ public void onMessage(WebSocket webSocket, String message) {
logger.error("Received wrong type of object for watch", e);
} catch (IllegalArgumentException e) {
logger.error("Invalid event type", e);
} catch (Throwable e) {
logger.error("Unhandled exception encountered in watcher event handler", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,19 @@

package io.fabric8.kubernetes.client.dsl.internal;

import static java.net.HttpURLConnection.HTTP_GONE;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResource;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
Expand All @@ -40,18 +53,6 @@
import okhttp3.Response;
import okhttp3.logging.HttpLoggingInterceptor;
import okio.BufferedSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.net.HttpURLConnection.HTTP_GONE;

public class WatchHTTPManager<T extends HasMetadata, L extends KubernetesResourceList<T>> extends AbstractWatchManager<T> {
private static final Logger logger = LoggerFactory.getLogger(WatchHTTPManager.class);
Expand Down Expand Up @@ -101,8 +102,6 @@ public WatchHTTPManager(final OkHttpClient client,
}

private void runWatch() {
logger.debug("Watching via HTTP GET ... {}", this);

HttpUrl.Builder httpUrlBuilder = HttpUrl.get(requestUrl).newBuilder();
String labelQueryParam = baseOperation.getLabelQueryParam();
if (Utils.isNotNullOrEmpty(labelQueryParam)) {
Expand All @@ -112,17 +111,11 @@ private void runWatch() {
String fieldQueryString = baseOperation.getFieldQueryParam();
String name = baseOperation.getName();

// for API groups we can use the name in the path rather than a fieldSelector
// which is more likely to work well for API Groups
if (name != null && name.length() > 0) {
if (baseOperation.isApiGroup()) {
httpUrlBuilder.addPathSegment(name);
} else {
if (fieldQueryString.length() > 0) {
fieldQueryString += ",";
}
fieldQueryString += "metadata.name=" + name;
if (fieldQueryString.length() > 0) {
fieldQueryString += ",";
}
fieldQueryString += "metadata.name=" + name;
}

if (Utils.isNotNullOrEmpty(fieldQueryString)) {
Expand All @@ -133,35 +126,43 @@ private void runWatch() {
HttpClientUtils.appendListOptionParams(httpUrlBuilder, listOptions);
String origin = requestUrl.getProtocol() + "://" + requestUrl.getHost();
if (requestUrl.getPort() != -1) {
origin += ":" + requestUrl.getPort();
origin += ":" + requestUrl.getPort();
}

HttpUrl url = httpUrlBuilder.build();

logger.debug("Watching via HTTP GET {}", url);

final Request request = new Request.Builder()
.get()
.url(httpUrlBuilder.build())
.url(url)
.addHeader("Origin", origin)
.build();

clonedClient.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
logger.info("Watch connection failed. reason: {}", e.getMessage());
scheduleReconnect();
scheduleReconnect(true);
}

@Override
public void onResponse(Call call, Response response) throws IOException {
if (!response.isSuccessful()) {
throw OperationSupport.requestFailure(request,
OperationSupport.createStatus(response.code(), response.message()));
onStatus(OperationSupport.createStatus(response.code(), response.message()));
}

boolean shouldBackoff = true;

try {
BufferedSource source = response.body().source();
while (!source.exhausted()) {
String message = source.readUtf8LineStrict();
onMessage(message);
}
// the normal operation of a long poll get is to return once a response is available.
// in that case we should reconnect immediately.
shouldBackoff = false;
} catch (Exception e) {
logger.info("Watch terminated unexpectedly. reason: {}", e.getMessage());
}
Expand All @@ -171,12 +172,13 @@ public void onResponse(Call call, Response response) throws IOException {
if (response != null) {
response.body().close();
}
scheduleReconnect();

scheduleReconnect(shouldBackoff);
}
});
}

private void scheduleReconnect() {
private void scheduleReconnect(boolean shouldBackoff) {
if (forceClosed.get()) {
logger.warn("Ignoring error for already closed/closing connection");
return;
Expand All @@ -188,6 +190,7 @@ private void scheduleReconnect() {
}

logger.debug("Submitting reconnect task to the executor");

// make sure that whichever thread calls this method, the tasks are
// performed serially in the executor.
submit(() -> {
Expand All @@ -199,6 +202,11 @@ private void scheduleReconnect() {
// actual reconnect only after the back-off time has passed, without
// blocking the thread
logger.debug("Scheduling reconnect task");

long delay = shouldBackoff
? nextReconnectInterval()
: 0;

schedule(() -> {
try {
WatchHTTPManager.this.runWatch();
Expand All @@ -209,7 +217,7 @@ private void scheduleReconnect() {
close();
watcher.onClose(new WatcherException("Unhandled exception in reconnect attempt", e));
}
}, nextReconnectInterval(), TimeUnit.MILLISECONDS);
}, delay, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
// This is a standard exception if we close the scheduler. We should not print it
if (!forceClosed.get()) {
Expand Down Expand Up @@ -243,18 +251,7 @@ public void onMessage(String messageSource) {
}
}
} else if (object instanceof Status) {
Status status = (Status) object;
// The resource version no longer exists - this has to be handled by the caller.
if (status.getCode() == HTTP_GONE) {
// exception
// shut down executor, etc.
close();
watcher.onClose(new WatcherException(status.getMessage(), new KubernetesClientException(status)));
return;
}

watcher.eventReceived(Action.ERROR, null);
logger.error("Error received: {}", status);
onStatus((Status) object);
} else {
logger.error("Unknown message received: {}", messageSource);
}
Expand All @@ -265,6 +262,21 @@ public void onMessage(String messageSource) {
}
}

private void onStatus(Status status) {
// The resource version no longer exists - this has to be handled by the caller.
if (status.getCode() == HTTP_GONE) {
// exception
// shut down executor, etc.
close();
watcher.onClose(new WatcherException(status.getMessage(), new KubernetesClientException(status)));
return;
}

watcher.eventReceived(Action.ERROR, null);
logger.error("Error received: {}", status.toString());
}


protected static WatchEvent readWatchEvent(String messageSource) {
WatchEvent event = Serialization.unmarshal(messageSource, WatchEvent.class);
KubernetesResource object = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ void itCompletesOnMatchAdded() throws Exception {
watcher.eventReceived(Action.ADDED, configMap);
assertTrue(watcher.getFuture().isDone());
assertEquals(watcher.getFuture().get(), configMap);
condition.isCalledWith(configMap);
assertTrue(condition.isCalledWith(configMap));
}

@Test
Expand All @@ -61,7 +61,7 @@ void itCompletesOnMatchModified() throws Exception {
watcher.eventReceived(Action.MODIFIED, configMap);
assertTrue(watcher.getFuture().isDone());
assertEquals(watcher.getFuture().get(), configMap);
condition.isCalledWith(configMap);
assertTrue(condition.isCalledWith(configMap));
}

@Test
Expand All @@ -71,7 +71,7 @@ void itCompletesOnMatchDeleted() throws Exception {
watcher.eventReceived(Action.DELETED, configMap);
assertTrue(watcher.getFuture().isDone());
assertNull(watcher.getFuture().get());
condition.isCalledWith(null);
assertTrue(condition.isCalledWith(null));
}

@Test
Expand All @@ -80,7 +80,7 @@ void itDoesNotCompleteOnNoMatchAdded() {
WaitForConditionWatcher<ConfigMap> watcher = new WaitForConditionWatcher<>(condition);
watcher.eventReceived(Action.ADDED, configMap);
assertFalse(watcher.getFuture().isDone());
condition.isCalledWith(configMap);
assertTrue(condition.isCalledWith(configMap));
}

@Test
Expand All @@ -89,16 +89,23 @@ void itDoesNotCompleteOnNoMatchModified() {
WaitForConditionWatcher<ConfigMap> watcher = new WaitForConditionWatcher<>(condition);
watcher.eventReceived(Action.MODIFIED, configMap);
assertFalse(watcher.getFuture().isDone());
condition.isCalledWith(configMap);
assertTrue(condition.isCalledWith(configMap));
}

@Test
void itDoesNotCompleteOnNoMatchDeleted() {
void itCompletesExceptionallyOnUnexpectedDeletion() throws Exception {
TrackingPredicate condition = condition(Objects::nonNull);
WaitForConditionWatcher<ConfigMap> watcher = new WaitForConditionWatcher<>(condition);
watcher.eventReceived(Action.DELETED, configMap);
assertFalse(watcher.getFuture().isDone());
condition.isCalledWith(null);
assertTrue(watcher.getFuture().isDone());
try {
watcher.getFuture().get();
fail("should have thrown exception");
} catch (ExecutionException e) {
assertEquals(WatcherException.class, e.getCause().getClass());
assertEquals("Unexpected deletion of watched resource, will never satisfy condition", e.getCause().getMessage());
}
assertTrue(condition.isCalledWith(null));
}

@Test
Expand Down