From dd111ed50d73789133c7d418d6912a03e1edb290 Mon Sep 17 00:00:00 2001 From: terencecho Date: Tue, 29 Mar 2022 21:18:33 -0700 Subject: [PATCH 01/16] Add acceptance test for deleting connetion in bad temporal state --- .../test/acceptance/AcceptanceTests.java | 42 ++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index 7fb338c6109c..3a875f6e7c6e 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -80,8 +80,11 @@ import io.airbyte.db.Databases; import io.airbyte.test.airbyte_test_container.AirbyteTestContainer; import io.airbyte.test.utils.PostgreSQLContainerHelper; +import io.airbyte.workers.temporal.TemporalUtils; import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; +import io.temporal.client.WorkflowClient; +import io.temporal.serviceclient.WorkflowServiceStubs; import java.io.File; import java.io.IOException; import java.net.Inet4Address; @@ -106,6 +109,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.assertj.core.api.Assertions; import org.jooq.JSONB; import org.jooq.Record; import org.jooq.Result; @@ -784,7 +788,7 @@ public void testCheckpointing() throws Exception { // now cancel it so that we freeze state! try { apiClient.getJobsApi().cancelJob(new JobIdRequestBody().id(connectionSyncRead1.getJob().getId())); - } catch (Exception e) {} + } catch (final Exception e) {} final ConnectionState connectionState = waitForConnectionState(apiClient, connectionId); @@ -1147,6 +1151,42 @@ public void testCancelSyncWhenCancelledWhenWorkerIsNotRunning() throws Exception assertEquals(JobStatus.CANCELLED, resp.get().getJob().getStatus()); } + @Test + @Order(22) + public void testActionsWhenTemporalIsInTerminalState() throws Exception { + final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService("localhost:7233"); + final WorkflowClient workflowCLient = WorkflowClient.newInstance(temporalService); + + final String connectionName = "test-connection"; + final UUID sourceId = createPostgresSource().getSourceId(); + final UUID destinationId = createDestination().getDestinationId(); + final UUID operationId = createOperation().getOperationId(); + final AirbyteCatalog catalog = discoverSourceSchema(sourceId); + final SyncMode syncMode = SyncMode.INCREMENTAL; + final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND_DEDUP; + catalog.getStreams().forEach(s -> s.getConfig() + .syncMode(syncMode) + .cursorField(List.of(COLUMN_ID)) + .destinationSyncMode(destinationSyncMode) + .primaryKey(List.of(List.of(COLUMN_NAME)))); + final UUID connectionId = + createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + + // Terminate workflow + LOGGER.info("Terminating temporal workflow..."); + workflowCLient.newUntypedWorkflowStub("connection_manager_" + connectionId).terminate(""); + + // expect an exception because the temporal workflow is not running + Assertions.assertThatExceptionOfType(ApiException.class) + .isThrownBy(() -> apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId))); + + // we should still be able to delete the connection when the temporal workflow is in this state + apiClient.getConnectionApi().deleteConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + + // remove connection so we don't try to delete connection again during tear down + connectionIds.remove(connectionId); + } + private AirbyteCatalog discoverSourceSchema(final UUID sourceId) throws ApiException { return apiClient.getSourceApi().discoverSchemaForSource(new SourceDiscoverSchemaRequestBody().sourceId(sourceId)).getCatalog(); } From 04f7f9fa8c352ae6bdde5ad3541ec26b39c3146e Mon Sep 17 00:00:00 2001 From: terencecho Date: Tue, 29 Mar 2022 23:21:37 -0700 Subject: [PATCH 02/16] disable new test on kube --- .../java/io/airbyte/test/acceptance/AcceptanceTests.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index 3a875f6e7c6e..a7fca4df7a40 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -1153,6 +1153,8 @@ public void testCancelSyncWhenCancelledWhenWorkerIsNotRunning() throws Exception @Test @Order(22) + @DisabledIfEnvironmentVariable(named = "KUBE", + matches = "true") public void testActionsWhenTemporalIsInTerminalState() throws Exception { final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService("localhost:7233"); final WorkflowClient workflowCLient = WorkflowClient.newInstance(temporalService); From dbcd83223518df9b7d19dd9ccd42095d00713f58 Mon Sep 17 00:00:00 2001 From: terencecho Date: Wed, 30 Mar 2022 00:27:08 -0700 Subject: [PATCH 03/16] try using different temproalHost --- .../java/io/airbyte/test/acceptance/AcceptanceTests.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index a7fca4df7a40..ed7da1b3114f 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -1156,7 +1156,11 @@ public void testCancelSyncWhenCancelledWhenWorkerIsNotRunning() throws Exception @DisabledIfEnvironmentVariable(named = "KUBE", matches = "true") public void testActionsWhenTemporalIsInTerminalState() throws Exception { - final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService("localhost:7233"); + String temporalHost = "localhost:7233"; + if (!USE_EXTERNAL_DEPLOYMENT) { + temporalHost = "airbyte-temporal:7233"; + } + final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(temporalHost); final WorkflowClient workflowCLient = WorkflowClient.newInstance(temporalService); final String connectionName = "test-connection"; From 09227584b5bd92b167fa61f471d3dffd402c3c4c Mon Sep 17 00:00:00 2001 From: terencecho Date: Wed, 30 Mar 2022 00:57:37 -0700 Subject: [PATCH 04/16] add log info line to give time for temporal client to spin up --- .../java/io/airbyte/test/acceptance/AcceptanceTests.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index ed7da1b3114f..fc67b9b3733d 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -1156,11 +1156,7 @@ public void testCancelSyncWhenCancelledWhenWorkerIsNotRunning() throws Exception @DisabledIfEnvironmentVariable(named = "KUBE", matches = "true") public void testActionsWhenTemporalIsInTerminalState() throws Exception { - String temporalHost = "localhost:7233"; - if (!USE_EXTERNAL_DEPLOYMENT) { - temporalHost = "airbyte-temporal:7233"; - } - final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(temporalHost); + final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService("localhost:7233"); final WorkflowClient workflowCLient = WorkflowClient.newInstance(temporalService); final String connectionName = "test-connection"; @@ -1178,6 +1174,8 @@ public void testActionsWhenTemporalIsInTerminalState() throws Exception { final UUID connectionId = createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + LOGGER.info("Waiting for connection to be available in Temporal..."); + // Terminate workflow LOGGER.info("Terminating temporal workflow..."); workflowCLient.newUntypedWorkflowStub("connection_manager_" + connectionId).terminate(""); From fecc56e232d8b58c8f9afb3e989597e68d8bc124 Mon Sep 17 00:00:00 2001 From: terencecho Date: Wed, 30 Mar 2022 01:36:40 -0700 Subject: [PATCH 05/16] try avoiding missing row in temporal db --- .../java/io/airbyte/test/acceptance/AcceptanceTests.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index fc67b9b3733d..ec025b977f01 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -1176,6 +1176,12 @@ public void testActionsWhenTemporalIsInTerminalState() throws Exception { LOGGER.info("Waiting for connection to be available in Temporal..."); + LOGGER.info("Run manual sync..."); + final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + + LOGGER.info("Waiting for job to run..."); + waitWhileJobHasStatus(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.RUNNING)); + // Terminate workflow LOGGER.info("Terminating temporal workflow..."); workflowCLient.newUntypedWorkflowStub("connection_manager_" + connectionId).terminate(""); From 7b30eeec5952a58adbb3d094deaf2fdeb6427df2 Mon Sep 17 00:00:00 2001 From: terencecho Date: Wed, 30 Mar 2022 11:35:23 -0700 Subject: [PATCH 06/16] check if temporal workflow is reachable --- .../io/airbyte/test/acceptance/AcceptanceTests.java | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index ec025b977f01..eb72ad7ed9fb 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -81,6 +81,7 @@ import io.airbyte.test.airbyte_test_container.AirbyteTestContainer; import io.airbyte.test.utils.PostgreSQLContainerHelper; import io.airbyte.workers.temporal.TemporalUtils; +import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflow; import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; import io.temporal.client.WorkflowClient; @@ -1173,14 +1174,10 @@ public void testActionsWhenTemporalIsInTerminalState() throws Exception { .primaryKey(List.of(List.of(COLUMN_NAME)))); final UUID connectionId = createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); - - LOGGER.info("Waiting for connection to be available in Temporal..."); - - LOGGER.info("Run manual sync..."); - final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); - - LOGGER.info("Waiting for job to run..."); - waitWhileJobHasStatus(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.RUNNING)); + + // check if temporal workflow is reachable + final ConnectionManagerWorkflow connectionManagerWorkflow = workflowCLient.newWorkflowStub(ConnectionManagerWorkflow.class, "connection_manager_" + connectionId); + connectionManagerWorkflow.getState(); // Terminate workflow LOGGER.info("Terminating temporal workflow..."); From 770a88563ff47948aed6618e0c147f07cd2b2506 Mon Sep 17 00:00:00 2001 From: terencecho Date: Wed, 30 Mar 2022 11:35:23 -0700 Subject: [PATCH 07/16] check if temporal workflow is reachable --- .../java/io/airbyte/test/acceptance/AcceptanceTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index eb72ad7ed9fb..d28d4a171f4e 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -1176,7 +1176,8 @@ public void testActionsWhenTemporalIsInTerminalState() throws Exception { createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); // check if temporal workflow is reachable - final ConnectionManagerWorkflow connectionManagerWorkflow = workflowCLient.newWorkflowStub(ConnectionManagerWorkflow.class, "connection_manager_" + connectionId); + final ConnectionManagerWorkflow connectionManagerWorkflow = + workflowCLient.newWorkflowStub(ConnectionManagerWorkflow.class, "connection_manager_" + connectionId); connectionManagerWorkflow.getState(); // Terminate workflow From 2e90a219ff531b5622207f55e293433787904fd2 Mon Sep 17 00:00:00 2001 From: terencecho Date: Wed, 30 Mar 2022 11:35:23 -0700 Subject: [PATCH 08/16] check if temporal workflow is reachable --- .../java/io/airbyte/test/acceptance/AcceptanceTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index d28d4a171f4e..6a878fdcaa86 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -1174,7 +1174,6 @@ public void testActionsWhenTemporalIsInTerminalState() throws Exception { .primaryKey(List.of(List.of(COLUMN_NAME)))); final UUID connectionId = createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); - // check if temporal workflow is reachable final ConnectionManagerWorkflow connectionManagerWorkflow = workflowCLient.newWorkflowStub(ConnectionManagerWorkflow.class, "connection_manager_" + connectionId); From 6cb3181e1082b337abb47c48b1119bf62ec7c780 Mon Sep 17 00:00:00 2001 From: terencecho Date: Wed, 30 Mar 2022 12:43:30 -0700 Subject: [PATCH 09/16] try waiting for connection state --- .../io/airbyte/test/acceptance/AcceptanceTests.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index 6a878fdcaa86..5b2a0ec75819 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -1153,11 +1153,15 @@ public void testCancelSyncWhenCancelledWhenWorkerIsNotRunning() throws Exception } @Test - @Order(22) + @Order(-3) @DisabledIfEnvironmentVariable(named = "KUBE", matches = "true") public void testActionsWhenTemporalIsInTerminalState() throws Exception { - final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService("localhost:7233"); + String temporalHost = "localhost:7233"; + if (!USE_EXTERNAL_DEPLOYMENT) { + temporalHost = "airbyte-temporal:7233"; + } + final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(temporalHost); final WorkflowClient workflowCLient = WorkflowClient.newInstance(temporalService); final String connectionName = "test-connection"; @@ -1172,8 +1176,11 @@ public void testActionsWhenTemporalIsInTerminalState() throws Exception { .cursorField(List.of(COLUMN_ID)) .destinationSyncMode(destinationSyncMode) .primaryKey(List.of(List.of(COLUMN_NAME)))); + final UUID connectionId = createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + waitForConnectionState(apiClient, connectionId); + // check if temporal workflow is reachable final ConnectionManagerWorkflow connectionManagerWorkflow = workflowCLient.newWorkflowStub(ConnectionManagerWorkflow.class, "connection_manager_" + connectionId); From 0e53a27622262c04aac45f518578dc3068edf625 Mon Sep 17 00:00:00 2001 From: terencecho Date: Wed, 30 Mar 2022 13:15:15 -0700 Subject: [PATCH 10/16] try using airbyte-temporal hostname --- .../java/io/airbyte/test/acceptance/AcceptanceTests.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index 5b2a0ec75819..6021db00350b 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -1157,11 +1157,7 @@ public void testCancelSyncWhenCancelledWhenWorkerIsNotRunning() throws Exception @DisabledIfEnvironmentVariable(named = "KUBE", matches = "true") public void testActionsWhenTemporalIsInTerminalState() throws Exception { - String temporalHost = "localhost:7233"; - if (!USE_EXTERNAL_DEPLOYMENT) { - temporalHost = "airbyte-temporal:7233"; - } - final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(temporalHost); + final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService("airbyte-temporal:7233"); final WorkflowClient workflowCLient = WorkflowClient.newInstance(temporalService); final String connectionName = "test-connection"; From 95438ef0294089fc20d685a76397ca2e1bf2bcef Mon Sep 17 00:00:00 2001 From: terencecho Date: Wed, 30 Mar 2022 13:54:05 -0700 Subject: [PATCH 11/16] Revert "try using airbyte-temporal hostname" This reverts commit 0e53a27622262c04aac45f518578dc3068edf625. --- .../java/io/airbyte/test/acceptance/AcceptanceTests.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index 6021db00350b..5b2a0ec75819 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -1157,7 +1157,11 @@ public void testCancelSyncWhenCancelledWhenWorkerIsNotRunning() throws Exception @DisabledIfEnvironmentVariable(named = "KUBE", matches = "true") public void testActionsWhenTemporalIsInTerminalState() throws Exception { - final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService("airbyte-temporal:7233"); + String temporalHost = "localhost:7233"; + if (!USE_EXTERNAL_DEPLOYMENT) { + temporalHost = "airbyte-temporal:7233"; + } + final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(temporalHost); final WorkflowClient workflowCLient = WorkflowClient.newInstance(temporalService); final String connectionName = "test-connection"; From 2828cc9912b90825efcf0fac225d6950edd33411 Mon Sep 17 00:00:00 2001 From: terencecho Date: Wed, 30 Mar 2022 13:57:39 -0700 Subject: [PATCH 12/16] Revert back to using localhost --- .../java/io/airbyte/test/acceptance/AcceptanceTests.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index 5b2a0ec75819..fc0d660253a7 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -1157,11 +1157,7 @@ public void testCancelSyncWhenCancelledWhenWorkerIsNotRunning() throws Exception @DisabledIfEnvironmentVariable(named = "KUBE", matches = "true") public void testActionsWhenTemporalIsInTerminalState() throws Exception { - String temporalHost = "localhost:7233"; - if (!USE_EXTERNAL_DEPLOYMENT) { - temporalHost = "airbyte-temporal:7233"; - } - final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(temporalHost); + final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService("localhost:7233"); final WorkflowClient workflowCLient = WorkflowClient.newInstance(temporalService); final String connectionName = "test-connection"; From e33be17e2500d69f84c5394afe3c65bf98053293 Mon Sep 17 00:00:00 2001 From: terencecho Date: Wed, 30 Mar 2022 14:04:46 -0700 Subject: [PATCH 13/16] Add 5 second wait --- .../java/io/airbyte/test/acceptance/AcceptanceTests.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index fc0d660253a7..a2c1202bdc86 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -1177,6 +1177,8 @@ public void testActionsWhenTemporalIsInTerminalState() throws Exception { createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); waitForConnectionState(apiClient, connectionId); + Thread.sleep(5000); + // check if temporal workflow is reachable final ConnectionManagerWorkflow connectionManagerWorkflow = workflowCLient.newWorkflowStub(ConnectionManagerWorkflow.class, "connection_manager_" + connectionId); From 6fb466e97bbc0322a529be856f46ce92142d4c16 Mon Sep 17 00:00:00 2001 From: terencecho Date: Wed, 30 Mar 2022 15:21:02 -0700 Subject: [PATCH 14/16] only enable test for new scheduler --- .../java/io/airbyte/test/acceptance/AcceptanceTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index a2c1202bdc86..5b3ecb490ae3 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -1156,6 +1156,8 @@ public void testCancelSyncWhenCancelledWhenWorkerIsNotRunning() throws Exception @Order(-3) @DisabledIfEnvironmentVariable(named = "KUBE", matches = "true") + @EnabledIfEnvironmentVariable(named = "NEW_SCHEDULER", + matches = "true") public void testActionsWhenTemporalIsInTerminalState() throws Exception { final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService("localhost:7233"); final WorkflowClient workflowCLient = WorkflowClient.newInstance(temporalService); @@ -1177,8 +1179,6 @@ public void testActionsWhenTemporalIsInTerminalState() throws Exception { createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); waitForConnectionState(apiClient, connectionId); - Thread.sleep(5000); - // check if temporal workflow is reachable final ConnectionManagerWorkflow connectionManagerWorkflow = workflowCLient.newWorkflowStub(ConnectionManagerWorkflow.class, "connection_manager_" + connectionId); From 7cb16f6e0299128240566c3d6fb4ac76f9956122 Mon Sep 17 00:00:00 2001 From: terencecho Date: Wed, 30 Mar 2022 16:17:17 -0700 Subject: [PATCH 15/16] only enable test for new scheduler 2 --- .../io/airbyte/test/acceptance/AcceptanceTests.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index 5b3ecb490ae3..01190c2d9cfe 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -1153,12 +1153,16 @@ public void testCancelSyncWhenCancelledWhenWorkerIsNotRunning() throws Exception } @Test - @Order(-3) + @Order(22) @DisabledIfEnvironmentVariable(named = "KUBE", matches = "true") - @EnabledIfEnvironmentVariable(named = "NEW_SCHEDULER", - matches = "true") public void testActionsWhenTemporalIsInTerminalState() throws Exception { + final FeatureFlags featureFlags = new EnvVariableFeatureFlags(); + if (!featureFlags.usesNewScheduler()) { + LOGGER.info("Skipping test since not using new temporal scheduler"); + return; + } + final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService("localhost:7233"); final WorkflowClient workflowCLient = WorkflowClient.newInstance(temporalService); From 1b3d7bb6b8c1bc857d5ab840d63d97b82ca8ae53 Mon Sep 17 00:00:00 2001 From: terencecho Date: Wed, 30 Mar 2022 20:46:09 -0700 Subject: [PATCH 16/16] refactor test to cover normal and unexpected temporal state --- .../test/acceptance/AcceptanceTests.java | 66 ++++++++++++------- 1 file changed, 43 insertions(+), 23 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index 01190c2d9cfe..d9efce9e907b 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -110,7 +110,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import org.assertj.core.api.Assertions; import org.jooq.JSONB; import org.jooq.Record; import org.jooq.Result; @@ -1154,18 +1153,7 @@ public void testCancelSyncWhenCancelledWhenWorkerIsNotRunning() throws Exception @Test @Order(22) - @DisabledIfEnvironmentVariable(named = "KUBE", - matches = "true") - public void testActionsWhenTemporalIsInTerminalState() throws Exception { - final FeatureFlags featureFlags = new EnvVariableFeatureFlags(); - if (!featureFlags.usesNewScheduler()) { - LOGGER.info("Skipping test since not using new temporal scheduler"); - return; - } - - final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService("localhost:7233"); - final WorkflowClient workflowCLient = WorkflowClient.newInstance(temporalService); - + public void testDeleteConnection() throws Exception { final String connectionName = "test-connection"; final UUID sourceId = createPostgresSource().getSourceId(); final UUID destinationId = createDestination().getDestinationId(); @@ -1179,9 +1167,48 @@ public void testActionsWhenTemporalIsInTerminalState() throws Exception { .destinationSyncMode(destinationSyncMode) .primaryKey(List.of(List.of(COLUMN_NAME)))); - final UUID connectionId = + UUID connectionId = createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); - waitForConnectionState(apiClient, connectionId); + + final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + waitWhileJobHasStatus(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.RUNNING)); + + // test normal deletion of connection + apiClient.getConnectionApi().deleteConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + + // remove connection to avoid exception during tear down + connectionIds.remove(connectionId); + + LOGGER.info("Waiting for connection to be deleted..."); + Thread.sleep(500); + + ConnectionStatus connectionStatus = + apiClient.getConnectionApi().getConnection(new ConnectionIdRequestBody().connectionId(connectionId)).getStatus(); + assertEquals(ConnectionStatus.DEPRECATED, connectionStatus); + + // test deletion of connection when temporal workflow is in a bad state, only when using new + // scheduler + final FeatureFlags featureFlags = new EnvVariableFeatureFlags(); + if (featureFlags.usesNewScheduler()) { + LOGGER.info("Testing connection deletion when temporal is in a terminal state"); + connectionId = createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + + terminateTemporalWorkflow(connectionId); + + // we should still be able to delete the connection when the temporal workflow is in this state + apiClient.getConnectionApi().deleteConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + + LOGGER.info("Waiting for connection to be deleted..."); + Thread.sleep(500); + + connectionStatus = apiClient.getConnectionApi().getConnection(new ConnectionIdRequestBody().connectionId(connectionId)).getStatus(); + assertEquals(ConnectionStatus.DEPRECATED, connectionStatus); + } + } + + private void terminateTemporalWorkflow(final UUID connectionId) { + final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService("localhost:7233"); + final WorkflowClient workflowCLient = WorkflowClient.newInstance(temporalService); // check if temporal workflow is reachable final ConnectionManagerWorkflow connectionManagerWorkflow = @@ -1192,14 +1219,7 @@ public void testActionsWhenTemporalIsInTerminalState() throws Exception { LOGGER.info("Terminating temporal workflow..."); workflowCLient.newUntypedWorkflowStub("connection_manager_" + connectionId).terminate(""); - // expect an exception because the temporal workflow is not running - Assertions.assertThatExceptionOfType(ApiException.class) - .isThrownBy(() -> apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId))); - - // we should still be able to delete the connection when the temporal workflow is in this state - apiClient.getConnectionApi().deleteConnection(new ConnectionIdRequestBody().connectionId(connectionId)); - - // remove connection so we don't try to delete connection again during tear down + // remove connection to avoid exception during tear down connectionIds.remove(connectionId); }