Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #4637: adding a generalized ready timeout to pod operations #4695

Merged
merged 2 commits into from
Dec 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#### Bugs

#### Improvements
* Fix #4637: all pod operations that require a ready / succeeded pod may use withReadyWaitTimeout, which supersedes withLogWaitTimeout.
* Fix #4633: provided inline access to all RunConfig builder methods via run().withNewRunConfig()
* Fix #4670: the initial informer listing will use a resourceVersion of 0 to utilize the watch cache if possible. This means that the initial cache state when the informer is returned, or the start future is completed, may not be as fresh as the previous behavior which forced the latest version. It will of course become more consistent as the watch will already have been established.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public interface ContainerResource

/**
* Will send the given input stream via a polling mechanism.
*
*
* @deprecated use redirectingInput and the resulting {@link ExecWatch#getOutput()} with
* InputStream#transferTo(java.io.OutputStream) on JDK 9+
* or
Expand All @@ -45,12 +45,21 @@ public interface ContainerResource
/**
* Will provide an {@link OutputStream} via {@link ExecWatch#getInput()} with the
* given buffer size.
*
*
* @param bufferSize if null will use the default
*/
TtyExecOutputErrorable redirectingInput(Integer bufferSize);

CopyOrReadable file(String path);

CopyOrReadable dir(String path);

/**
* How long to wait for the pod to be ready before performing an operation, such as
* getting the logs, exec, attach, copy, etc.
*
* @param timeout in milliseconds
*/
@Override
ContainerResource withReadyWaitTimeout(Integer timeout);
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,11 @@ public interface CopyOrReadable {

boolean copy(Path destination);

/**
* How long to wait for a ready pod before performing the copy or read operation.
*
* @param timeout in milliseconds
*/
CopyOrReadable withReadyWaitTimeout(Integer timeout);

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public interface Execable {

/**
* Execute a command in a container
*
*
* @param input the command to run
* @return container with stdin, stdout, stderr streams
* (if redirectingInput(), redirectingOutput(), redirectingError() were called respectively)
Expand All @@ -29,10 +29,18 @@ public interface Execable {

/**
* Attach to the main process of a container
*
*
* @return container with stdin, stdout, stderr streams
* (if redirectingInput(), redirectingOutput(), redirectingError() were called respectively)
*/
ExecWatch attach();

/**
* How long shall we wait until a Pod is ready before attaching or execing
*
* @param timeout in milliseconds
* @return {@link Loggable} for fetching logs
*/
Execable withReadyWaitTimeout(Integer timeout);

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,19 @@ public interface Loggable {
*
* @param logWaitTimeout timeout in milliseconds
* @return {@link Loggable} for fetching logs
*
* @deprecated use {@link #withReadyWaitTimeout(Integer)}
*/
@Deprecated
Loggable withLogWaitTimeout(Integer logWaitTimeout);

/**
* While waiting for Pod logs, how long shall we wait until a Pod
* becomes ready and starts producing logs
*
* @param timeout in milliseconds
* @return {@link Loggable} for fetching logs
*/
Loggable withReadyWaitTimeout(Integer timeout);

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.fabric8.kubernetes.client.dsl.internal;

import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.dsl.ExecListener;
import io.fabric8.kubernetes.client.utils.URLUtils.URLBuilder;
import lombok.AllArgsConstructor;
Expand Down Expand Up @@ -58,7 +59,7 @@ public StreamContext() {
private String sinceTimestamp;
private Integer sinceSeconds;
private Integer tailingLines;
private Integer logWaitTimeout;
private Integer readyWaitTimeout;
private boolean prettyOutput;
private ExecListener execListener;
private Integer limitBytes;
Expand Down Expand Up @@ -125,8 +126,8 @@ public PodOperationContext withDir(String dir) {
return this.toBuilder().dir(dir).build();
}

public PodOperationContext withLogWaitTimeout(Integer logWaitTimeout) {
return this.toBuilder().logWaitTimeout(logWaitTimeout).build();
public PodOperationContext withReadyWaitTimeout(Integer logWaitTimeout) {
return this.toBuilder().readyWaitTimeout(logWaitTimeout).build();
}

public String getLogParameters() {
Expand Down Expand Up @@ -163,15 +164,22 @@ public void addQueryParameters(URLBuilder httpUrlBuilder) {
if (tty) {
httpUrlBuilder.addQueryParameter("tty", "true");
}
boolean usingStream = false;
if (in != null || redirectingIn) {
httpUrlBuilder.addQueryParameter("stdin", "true");
usingStream = true;
}
boolean debug = ExecWebSocketListener.LOGGER.isDebugEnabled();
if (output != null || debug) {
httpUrlBuilder.addQueryParameter("stdout", "true");
usingStream = true;
}
if (error != null || terminateOnError || debug) {
httpUrlBuilder.addQueryParameter("stderr", "true");
usingStream = true;
}
if (!usingStream) {
throw new KubernetesClientException("Pod operation is not valid unless an in, out, or error stream is used.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,12 @@ public abstract RollableScalableResourceOperation<T, L, R> newInstance(PodOperat

@Override
public Loggable withLogWaitTimeout(Integer logWaitTimeout) {
return newInstance(rollingOperationContext.withLogWaitTimeout(logWaitTimeout), context);
return withReadyWaitTimeout(logWaitTimeout);
}

@Override
public Loggable withReadyWaitTimeout(Integer timeout) {
return newInstance(rollingOperationContext.withReadyWaitTimeout(timeout), context);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,12 @@ public LogWatch watchLog(OutputStream out) {

@Override
public Loggable withLogWaitTimeout(Integer logWaitTimeout) {
return new JobOperationsImpl(podControllerOperationContext.withLogWaitTimeout(logWaitTimeout), context);
return withReadyWaitTimeout(logWaitTimeout);
}

@Override
public Loggable withReadyWaitTimeout(Integer timeout) {
return new JobOperationsImpl(podControllerOperationContext.withReadyWaitTimeout(timeout), context);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public class PodOperationsImpl extends HasMetadataOperation<Pod, PodList, PodRes
implements PodResource, CopyOrReadable {

public static final int HTTP_TOO_MANY_REQUESTS = 429;
private static final Integer DEFAULT_POD_LOG_WAIT_TIMEOUT = 5;
private static final Integer DEFAULT_POD_READY_WAIT_TIMEOUT = 5;
private static final String[] EMPTY_COMMAND = { "/bin/sh", "-i" };
public static final String DEFAULT_CONTAINER_ANNOTATION_NAME = "kubectl.kubernetes.io/default-container";

Expand Down Expand Up @@ -172,8 +172,8 @@ private void checkForPiped(Object object) {
public LogWatch watchLog(OutputStream out) {
checkForPiped(out);
try {
PodOperationUtil.waitUntilReadyBeforeFetchingLogs(this,
getContext().getLogWaitTimeout() != null ? getContext().getLogWaitTimeout() : DEFAULT_POD_LOG_WAIT_TIMEOUT);
PodOperationUtil.waitUntilReadyOrSucceded(this,
getContext().getReadyWaitTimeout() != null ? getContext().getReadyWaitTimeout() : DEFAULT_POD_READY_WAIT_TIMEOUT);
// Issue Pod Logs HTTP request
URL url = new URL(URLUtils.join(getResourceUrl().toString(), getContext().getLogParameters() + "&follow=true"));
final LogWatchCallback callback = new LogWatchCallback(out, this.context.getExecutor());
Expand All @@ -183,9 +183,14 @@ public LogWatch watchLog(OutputStream out) {
}
}

@Override
public PodOperationsImpl withReadyWaitTimeout(Integer logWaitTimeout) {
return new PodOperationsImpl(getContext().withReadyWaitTimeout(logWaitTimeout), context);
}

@Override
public Loggable withLogWaitTimeout(Integer logWaitTimeout) {
return new PodOperationsImpl(getContext().withLogWaitTimeout(logWaitTimeout), context);
return withReadyWaitTimeout(logWaitTimeout);
}

@Override
Expand Down Expand Up @@ -280,6 +285,7 @@ public ExecWatch exec(String... command) {
String[] actualCommands = command.length >= 1 ? command : EMPTY_COMMAND;
try {
URL url = getURL("exec", actualCommands);

return setupConnectionToPod(url.toURI());
} catch (Exception e) {
throw KubernetesClientException.launderThrowable(forOperationType("exec"), e);
Expand All @@ -290,13 +296,17 @@ public ExecWatch exec(String... command) {
public ExecWatch attach() {
try {
URL url = getURL("attach", null);

return setupConnectionToPod(url.toURI());
} catch (Exception e) {
throw KubernetesClientException.launderThrowable(forOperationType("attach"), e);
}
}

private URL getURL(String operation, String[] commands) throws MalformedURLException {
Pod fromServer = PodOperationUtil.waitUntilReadyOrSucceded(this,
getContext().getReadyWaitTimeout() != null ? getContext().getReadyWaitTimeout() : DEFAULT_POD_READY_WAIT_TIMEOUT);

String url = URLUtils.join(getResourceUrl().toString(), operation);
URLBuilder httpUrlBuilder = new URLBuilder(url);
if (commands != null) {
Expand All @@ -305,16 +315,18 @@ private URL getURL(String operation, String[] commands) throws MalformedURLExcep
}
}
PodOperationContext contextToUse = getContext();
contextToUse = contextToUse.withContainerId(validateOrDefaultContainerId(contextToUse.getContainerId()));
contextToUse = contextToUse.withContainerId(validateOrDefaultContainerId(contextToUse.getContainerId(), fromServer));
contextToUse.addQueryParameters(httpUrlBuilder);
return httpUrlBuilder.build();
}

/**
* If not specified, choose an appropriate default container id
*/
String validateOrDefaultContainerId(String name) {
Pod pod = this.requireFromServer();
String validateOrDefaultContainerId(String name, Pod pod) {
if (pod == null) {
pod = this.getItemOrRequireFromServer();
}
// spec and container null-checks are not necessary for real k8s clusters, added them to simplify some tests running in the mockserver
if (pod.getSpec() == null || pod.getSpec().getContainers() == null || pod.getSpec().getContainers().isEmpty()) {
throw new KubernetesClientException("Pod has no containers!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class PodOperationUtil {
private static final Logger LOG = LoggerFactory.getLogger(PodOperationUtil.class);
Expand Down Expand Up @@ -114,14 +115,19 @@ public static List<PodResource> getPodOperationsForController(PodOperationsImpl
return PodOperationUtil.getFilteredPodsForLogs(podOperations, controllerPodList, controllerUid);
}

public static void waitUntilReadyBeforeFetchingLogs(PodResource podOperation, Integer logWaitTimeout) {
public static Pod waitUntilReadyOrSucceded(PodResource podOperation, Integer logWaitTimeout) {
AtomicReference<Pod> podRef = new AtomicReference<>();
try {
// Wait for Pod to become ready or succeeded
podOperation.waitUntilCondition(p -> p != null && (Readiness.isPodReady(p) || Readiness.isPodSucceeded(p)),
podOperation.waitUntilCondition(p -> {
podRef.set(p);
return p != null && (Readiness.isPodReady(p) || Readiness.isPodSucceeded(p));
},
logWaitTimeout,
TimeUnit.SECONDS);
} catch (Exception otherException) {
LOG.debug("Error while waiting for Pod to become Ready: {}", otherException.getMessage());
}
return podRef.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.file.Path;
Expand All @@ -49,11 +51,11 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -195,11 +197,14 @@ void uploadFileAndVerify(PodUploadTester<Boolean> fileUploadMethodToTest) throws
final boolean result = fileUploadMethodToTest.apply();

assertThat(result).isTrue();
verify(builder, times(1)).uri(argThat(request -> {
assertThat(request).hasToString(
"https://openshift.com:8443/api/v1/namespaces/default/pods/pod/exec?command=sh&command=-c&command=mkdir%20-p%20%27%2Fmock%2Fdir%27%20%26%26%20base64%20-d%20-%20%3E%20%27%2Fmock%2Fdir%2Ffile%27&container=container&stdin=true&stderr=true");
return true;
}));
ArgumentCaptor<URI> captor = ArgumentCaptor.forClass(URI.class);
verify(builder, times(2)).uri(captor.capture());
assertEquals(
"https://openshift.com:8443/api/v1/namespaces/default/pods?fieldSelector=metadata.name%3Dpod&allowWatchBookmarks=true&watch=true",
captor.getAllValues().get(0).toString());
assertEquals(
"https://openshift.com:8443/api/v1/namespaces/default/pods/pod/exec?command=sh&command=-c&command=mkdir%20-p%20%27%2Fmock%2Fdir%27%20%26%26%20base64%20-d%20-%20%3E%20%27%2Fmock%2Fdir%2Ffile%27&container=container&stdin=true&stderr=true",
captor.getAllValues().get(1).toString());
verify(mockWebSocket, atLeast(1)).send(any(ByteBuffer.class));
}

Expand All @@ -224,11 +229,14 @@ private void uploadDirectoryAndVerify(PodUploadTester<Boolean> directoryUpload)
final boolean result = directoryUpload.apply();

assertThat(result).isTrue();
verify(builder, times(1)).uri(argThat(request -> {
assertThat(request).hasToString(
"https://openshift.com:8443/api/v1/namespaces/default/pods/pod/exec?command=sh&command=-c&command=mkdir%20-p%20%27%2Fmock%2Fdir%27%20%26%26%20base64%20-d%20-%20%7C%20tar%20-C%20%27%2Fmock%2Fdir%27%20-xzf%20-&container=container&stdin=true&stderr=true");
return true;
}));
ArgumentCaptor<URI> captor = ArgumentCaptor.forClass(URI.class);
verify(builder, times(2)).uri(captor.capture());
assertEquals(
"https://openshift.com:8443/api/v1/namespaces/default/pods?fieldSelector=metadata.name%3Dpod&allowWatchBookmarks=true&watch=true",
captor.getAllValues().get(0).toString());
assertEquals(
"https://openshift.com:8443/api/v1/namespaces/default/pods/pod/exec?command=sh&command=-c&command=mkdir%20-p%20%27%2Fmock%2Fdir%27%20%26%26%20base64%20-d%20-%20%7C%20tar%20-C%20%27%2Fmock%2Fdir%27%20-xzf%20-&container=container&stdin=true&stderr=true",
captor.getAllValues().get(1).toString());
verify(mockWebSocket, atLeast(1)).send(any(ByteBuffer.class));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void testGetFilteredPodsForLogs() {
void testGetGenericPodOperations() {
// When
PodOperationsImpl podOperations = PodOperationUtil.getGenericPodOperations(operationContext,
new PodOperationContext().withPrettyOutput(false).withLogWaitTimeout(5).withContainerId("container1"));
new PodOperationContext().withPrettyOutput(false).withReadyWaitTimeout(5).withContainerId("container1"));

// Then
assertThat(podOperations).isNotNull();
Expand All @@ -91,7 +91,7 @@ void testGetGenericPodOperations() {
@Test
void testWaitUntilReadyBeforeFetchingLogs() {
// When
PodOperationUtil.waitUntilReadyBeforeFetchingLogs(podOperations, 5);
PodOperationUtil.waitUntilReadyOrSucceded(podOperations, 5);
// Then
verify(podOperations, times(1)).waitUntilCondition(any(), eq(5L), eq(TimeUnit.SECONDS));
}
Expand Down
Loading