Skip to content

Commit

Permalink
Set a timeout on kube pod process integration tests to prevent a hang (
Browse files Browse the repository at this point in the history
…#7087)

* Improved logging for troubleshooting.

* Limiting the number of tests for faster iteration testing.

* Re-enable tests

* Removed unnecessary logging from troubleshooting

* Formatting

* whitespace

* Eliminated duplicate timeout.

* Whitespace/formatting

* Show logs when a sync job fails during testing.

* Formatting

* Use alphanumeric random string, not bytes, to test paths
  • Loading branch information
airbyte-jenny authored Oct 26, 2021
1 parent 3327bf8 commit c2adf9b
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void testWriteFileToRandomDir() throws IOException {

@Test
public void testGetTailDoesNotExist() throws IOException {
final List<String> tail = IOs.getTail(100, Path.of(RandomStringUtils.random(100)));
final List<String> tail = IOs.getTail(100, Path.of(RandomStringUtils.randomAlphanumeric(100)));
assertEquals(Collections.emptyList(), tail);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ public void testCancelSync() throws Exception {
@Test
@Order(9)
public void testIncrementalSync() throws Exception {
LOGGER.info("Starting testIncrementalSync()");
final String connectionName = "test-connection";
final UUID sourceId = createPostgresSource().getSourceId();
final UUID destinationId = createDestination().getDestinationId();
Expand All @@ -514,6 +515,7 @@ public void testIncrementalSync() throws Exception {
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

LOGGER.info("Beginning testIncrementalSync() sync 1");
final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob());
Expand All @@ -534,6 +536,7 @@ public void testIncrementalSync() throws Exception {
source.query(ctx -> ctx.execute("UPDATE id_and_name SET name='yennefer' WHERE id=2"));
source.close();

LOGGER.info("Starting testIncrementalSync() sync 2");
final JobInfoRead connectionSyncRead2 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob());
Expand All @@ -542,13 +545,15 @@ public void testIncrementalSync() throws Exception {
assertRawDestinationContains(expectedRecords, new SchemaTableNamePair("public", STREAM_NAME));

// reset back to no data.
LOGGER.info("Starting testIncrementalSync() reset");
final JobInfoRead jobInfoRead = apiClient.getConnectionApi().resetConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), jobInfoRead.getJob());
LOGGER.info("state after reset: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

assertRawDestinationContains(Collections.emptyList(), new SchemaTableNamePair("public", STREAM_NAME));

// sync one more time. verify it is the equivalent of a full refresh.
LOGGER.info("Starting testIncrementalSync() sync 3");
final JobInfoRead connectionSyncRead3 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead3.getJob());
Expand Down Expand Up @@ -1175,6 +1180,16 @@ private void deleteOperation(final UUID destinationId) throws ApiException {

private static void waitForSuccessfulJob(final JobsApi jobsApi, final JobRead originalJob) throws InterruptedException, ApiException {
final JobRead job = waitForJob(jobsApi, originalJob, Sets.newHashSet(JobStatus.PENDING, JobStatus.RUNNING));

if (!JobStatus.SUCCEEDED.equals(job.getStatus())) {
// If a job failed during testing, show us why.
final JobIdRequestBody id = new JobIdRequestBody();
id.setId(originalJob.getId());
for (final AttemptInfoRead attemptInfo : jobsApi.getJobInfo(id).getAttempts()) {
LOGGER.warn("Unsuccessful job attempt " + attemptInfo.getAttempt().getId()
+ " with status " + job.getStatus() + " produced log output as follows: " + attemptInfo.getLogs().getLogLines());
}
}
assertEquals(JobStatus.SUCCEEDED, job.getStatus());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,19 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.RandomStringUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

// requires kube running locally to run. If using Minikube it requires MINIKUBE=true
// Must have a timeout on this class because it tests child processes that may misbehave; otherwise
// this can hang forever during failures.
@Timeout(value = 5,
unit = TimeUnit.MINUTES)
public class KubePodProcessIntegrationTest {

private static final boolean IS_MINIKUBE = Boolean.parseBoolean(Optional.ofNullable(System.getenv("IS_MINIKUBE")).orElse("false"));
Expand Down Expand Up @@ -210,6 +216,6 @@ private static String getHost() {
} catch (final UnknownHostException e) {
throw new RuntimeException(e);
}
};
}

}

0 comments on commit c2adf9b

Please sign in to comment.