diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadata.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadata.java index 533abae74044..a3e35cce7f36 100644 --- a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadata.java +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadata.java @@ -35,7 +35,8 @@ public static ImmutableMap generateSyncMetadata(final StandardSy metadata.put("connection_id", standardSync.getConnectionId()); final String frequencyString; - if (standardSync.getManual()) { + // TODO(https://github.com/airbytehq/airbyte/issues/2170): handle cron strings properly. + if (standardSync.getManual() || standardSync.getSchedule() == null) { frequencyString = "manual"; } else { final long intervalInMinutes = TimeUnit.SECONDS.toMinutes(ScheduleHelpers.getIntervalInSecond(standardSync.getSchedule())); diff --git a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java index 68b3f7071ffa..46e5106b7bf9 100644 --- a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java +++ b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java @@ -21,6 +21,8 @@ import io.airbyte.api.client.model.generated.ConnectionIdRequestBody; import io.airbyte.api.client.model.generated.ConnectionRead; import io.airbyte.api.client.model.generated.ConnectionSchedule; +import io.airbyte.api.client.model.generated.ConnectionScheduleData; +import io.airbyte.api.client.model.generated.ConnectionScheduleType; import io.airbyte.api.client.model.generated.ConnectionState; import io.airbyte.api.client.model.generated.ConnectionStatus; import io.airbyte.api.client.model.generated.ConnectionUpdate; @@ -455,7 +457,8 @@ public ConnectionRead createConnection(final String name, final UUID destinationId, final List operationIds, final AirbyteCatalog catalog, - final ConnectionSchedule schedule) + final ConnectionScheduleType scheduleType, + final ConnectionScheduleData scheduleData) throws ApiException { final ConnectionRead connection = apiClient.getConnectionApi().createConnection( new ConnectionCreate() @@ -463,7 +466,8 @@ public ConnectionRead createConnection(final String name, .sourceId(sourceId) .destinationId(destinationId) .syncCatalog(catalog) - .schedule(schedule) + .scheduleType(scheduleType) + .scheduleData(scheduleData) .operationIds(operationIds) .name(name) .namespaceDefinition(NamespaceDefinitionType.CUSTOMFORMAT) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AdvancedAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AdvancedAcceptanceTests.java index 87a9f67a0e09..a78778e7eaae 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AdvancedAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AdvancedAcceptanceTests.java @@ -24,6 +24,7 @@ import io.airbyte.api.client.model.generated.AirbyteStream; import io.airbyte.api.client.model.generated.AttemptInfoRead; import io.airbyte.api.client.model.generated.ConnectionIdRequestBody; +import io.airbyte.api.client.model.generated.ConnectionScheduleType; import io.airbyte.api.client.model.generated.ConnectionState; import io.airbyte.api.client.model.generated.DestinationDefinitionIdRequestBody; import io.airbyte.api.client.model.generated.DestinationDefinitionRead; @@ -142,7 +143,8 @@ void testManualSync() throws Exception { final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE; catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode)); final UUID connectionId = - testHarness.createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + testHarness.createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, ConnectionScheduleType.MANUAL, null) + .getConnectionId(); final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob()); testHarness.assertSourceAndDestinationDbInSync(false); @@ -187,7 +189,7 @@ void testCheckpointing() throws Exception { .cursorField(List.of(COLUMN_ID)) .destinationSyncMode(destinationSyncMode)); final UUID connectionId = - testHarness.createConnection(connectionName, sourceId, destinationId, Collections.emptyList(), catalog, null) + testHarness.createConnection(connectionName, sourceId, destinationId, Collections.emptyList(), catalog, ConnectionScheduleType.MANUAL, null) .getConnectionId(); final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); @@ -269,7 +271,7 @@ void testBackpressure() throws Exception { final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); final UUID connectionId = - testHarness.createConnection(connectionName, sourceId, destinationId, Collections.emptyList(), catalog, null) + testHarness.createConnection(connectionName, sourceId, destinationId, Collections.emptyList(), catalog, ConnectionScheduleType.MANUAL, null) .getConnectionId(); final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java index 867e2804fe5d..b0e56e0065c5 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java @@ -4,7 +4,6 @@ package io.airbyte.test.acceptance; -import static io.airbyte.api.client.model.generated.ConnectionSchedule.TimeUnitEnum.MINUTES; import static io.airbyte.test.utils.AirbyteAcceptanceTestHarness.AWESOME_PEOPLE_TABLE_NAME; import static io.airbyte.test.utils.AirbyteAcceptanceTestHarness.COLUMN_ID; import static io.airbyte.test.utils.AirbyteAcceptanceTestHarness.COLUMN_NAME; @@ -40,6 +39,11 @@ import io.airbyte.api.client.model.generated.ConnectionIdRequestBody; import io.airbyte.api.client.model.generated.ConnectionRead; import io.airbyte.api.client.model.generated.ConnectionSchedule; +import io.airbyte.api.client.model.generated.ConnectionScheduleData; +import io.airbyte.api.client.model.generated.ConnectionScheduleDataBasicSchedule; +import io.airbyte.api.client.model.generated.ConnectionScheduleDataBasicSchedule.TimeUnitEnum; +import io.airbyte.api.client.model.generated.ConnectionScheduleDataCron; +import io.airbyte.api.client.model.generated.ConnectionScheduleType; import io.airbyte.api.client.model.generated.ConnectionState; import io.airbyte.api.client.model.generated.ConnectionStatus; import io.airbyte.api.client.model.generated.DataType; @@ -80,7 +84,6 @@ import java.net.URISyntaxException; import java.sql.SQLException; import java.time.Duration; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -150,6 +153,9 @@ class BasicAcceptanceTests { private static final String FIELD = "field"; private static final String ID_AND_NAME = "id_and_name"; + private static final ConnectionScheduleData BASIC_SCHEDULE_DATA = new ConnectionScheduleData().basicSchedule( + new ConnectionScheduleDataBasicSchedule().units(1L).timeUnit(TimeUnitEnum.HOURS)); + @BeforeAll static void init() throws URISyntaxException, IOException, InterruptedException, ApiException { apiClient = new AirbyteApiClient( @@ -336,19 +342,19 @@ void testCreateConnection() throws ApiException { final UUID destinationId = testHarness.createPostgresDestination().getDestinationId(); final UUID operationId = testHarness.createOperation().getOperationId(); final String name = "test-connection-" + UUID.randomUUID(); - final ConnectionSchedule schedule = new ConnectionSchedule().timeUnit(MINUTES).units(100L); final SyncMode syncMode = SyncMode.FULL_REFRESH; final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE; catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode)); final ConnectionRead createdConnection = - testHarness.createConnection(name, sourceId, destinationId, List.of(operationId), catalog, schedule); + testHarness.createConnection(name, sourceId, destinationId, List.of(operationId), catalog, ConnectionScheduleType.BASIC, BASIC_SCHEDULE_DATA); assertEquals(sourceId, createdConnection.getSourceId()); assertEquals(destinationId, createdConnection.getDestinationId()); assertEquals(1, createdConnection.getOperationIds().size()); assertEquals(operationId, createdConnection.getOperationIds().get(0)); assertEquals(catalog, createdConnection.getSyncCatalog()); - assertEquals(schedule, createdConnection.getSchedule()); + assertEquals(ConnectionScheduleType.BASIC, createdConnection.getScheduleType()); + assertEquals(BASIC_SCHEDULE_DATA, createdConnection.getScheduleData()); assertEquals(name, createdConnection.getName()); } @@ -375,7 +381,8 @@ void testCancelSync() throws Exception { final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE; catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode)); final UUID connectionId = - testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, ConnectionScheduleType.MANUAL, null) + .getConnectionId(); final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); // wait to get out of PENDING @@ -394,25 +401,23 @@ void testScheduledSync() throws Exception { final UUID operationId = testHarness.createOperation().getOperationId(); final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); - final ConnectionSchedule connectionSchedule = new ConnectionSchedule().units(1L).timeUnit(MINUTES); final SyncMode syncMode = SyncMode.FULL_REFRESH; final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE; catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode)); - final var connection = - testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, connectionSchedule); - - // When a new connection is created, Airbyte might sync it immediately (before the sync interval). - // Then it will wait the sync interval. - // if the wait isn't long enough, failures say "Connection refused" because the assert kills the - // syncs in progress - List jobs = new ArrayList<>(); - while (jobs.size() < 2) { - final var listSyncJobsRequest = new io.airbyte.api.client.model.generated.JobListRequestBody().configTypes(List.of(JobConfigType.SYNC)) - .configId(connection.getConnectionId().toString()); - final var resp = apiClient.getJobsApi().listJobsFor(listSyncJobsRequest); - jobs = resp.getJobs(); + final UUID connectionId = + testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, ConnectionScheduleType.BASIC, + BASIC_SCHEDULE_DATA).getConnectionId(); + + for (int i = 0; i < 10; i++) { sleep(Duration.ofSeconds(30).toMillis()); + try { + final JobRead jobInfo = testHarness.getMostRecentSyncJobId(connectionId); + waitForSuccessfulJob(apiClient.getJobsApi(), jobInfo); + break; + } catch (Exception e) { + // Retry. + } } testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE); @@ -420,6 +425,41 @@ void testScheduledSync() throws Exception { @Test @Order(9) + void testCronSync() throws Exception { + final UUID sourceId = testHarness.createPostgresSource().getSourceId(); + final UUID destinationId = testHarness.createPostgresDestination().getDestinationId(); + final UUID operationId = testHarness.createOperation().getOperationId(); + final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); + + final ConnectionScheduleData connectionScheduleData = new ConnectionScheduleData().cron( + new ConnectionScheduleDataCron().cronExpression("* */2 * * * ?").cronTimeZone("UTC")); + final SyncMode syncMode = SyncMode.FULL_REFRESH; + final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE; + catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode)); + + final UUID connectionId = + testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, ConnectionScheduleType.CRON, + connectionScheduleData).getConnectionId(); + + for (int i = 0; i < 10; i++) { + sleep(Duration.ofSeconds(30).toMillis()); + try { + final JobRead jobInfo = testHarness.getMostRecentSyncJobId(connectionId); + waitForSuccessfulJob(apiClient.getJobsApi(), jobInfo); + break; + } catch (Exception e) { + // Retry. + } + } + testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE); + apiClient.getConnectionApi().deleteConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + + // remove connection to avoid exception during tear down + testHarness.removeConnection(connectionId); + } + + @Test + @Order(10) void testMultipleSchemasAndTablesSync() throws Exception { // create tables in another schema PostgreSQLContainerHelper.runSqlScript(MountableFile.forClasspathResource("postgres_second_schema_multiple_tables.sql"), sourcePsql); @@ -433,14 +473,15 @@ void testMultipleSchemasAndTablesSync() throws Exception { final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE; catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode)); final UUID connectionId = - testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, ConnectionScheduleType.MANUAL, null) + .getConnectionId(); final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob()); testHarness.assertSourceAndDestinationDbInSync(false); } @Test - @Order(10) + @Order(11) void testMultipleSchemasSameTablesSync() throws Exception { // create tables in another schema PostgreSQLContainerHelper.runSqlScript(MountableFile.forClasspathResource("postgres_separate_schema_same_table.sql"), sourcePsql); @@ -454,7 +495,8 @@ void testMultipleSchemasSameTablesSync() throws Exception { final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE; catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode)); final UUID connectionId = - testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, ConnectionScheduleType.MANUAL, null) + .getConnectionId(); final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob()); @@ -462,7 +504,7 @@ void testMultipleSchemasSameTablesSync() throws Exception { } @Test - @Order(11) + @Order(12) void testIncrementalDedupeSync() throws Exception { final UUID sourceId = testHarness.createPostgresSource().getSourceId(); final UUID destinationId = testHarness.createPostgresDestination().getDestinationId(); @@ -476,7 +518,8 @@ void testIncrementalDedupeSync() throws Exception { .destinationSyncMode(destinationSyncMode) .primaryKey(List.of(List.of(COLUMN_NAME)))); final UUID connectionId = - testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, ConnectionScheduleType.MANUAL, null) + .getConnectionId(); // sync from start final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() @@ -505,7 +548,7 @@ void testIncrementalDedupeSync() throws Exception { } @Test - @Order(12) + @Order(13) void testIncrementalSync() throws Exception { LOGGER.info("Starting testIncrementalSync()"); final UUID sourceId = testHarness.createPostgresSource().getSourceId(); @@ -527,7 +570,8 @@ void testIncrementalSync() throws Exception { .cursorField(List.of(COLUMN_ID)) .destinationSyncMode(destinationSyncMode)); final UUID connectionId = - testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, ConnectionScheduleType.MANUAL, null) + .getConnectionId(); LOGGER.info("Beginning testIncrementalSync() sync 1"); final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() @@ -581,7 +625,7 @@ void testIncrementalSync() throws Exception { } @Test - @Order(13) + @Order(14) void testDeleteConnection() throws Exception { final UUID sourceId = testHarness.createPostgresSource().getSourceId(); final UUID destinationId = testHarness.createPostgresDestination().getDestinationId(); @@ -596,7 +640,8 @@ void testDeleteConnection() throws Exception { .primaryKey(List.of(List.of(COLUMN_NAME)))); UUID connectionId = - testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, ConnectionScheduleType.MANUAL, null) + .getConnectionId(); final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); waitWhileJobHasStatus(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.RUNNING)); @@ -623,7 +668,8 @@ void testDeleteConnection() throws Exception { // test deletion of connection when temporal workflow is in a bad state LOGGER.info("Testing connection deletion when temporal is in a terminal state"); connectionId = - testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, ConnectionScheduleType.MANUAL, null) + .getConnectionId(); testHarness.terminateTemporalWorkflow(connectionId); @@ -638,7 +684,7 @@ void testDeleteConnection() throws Exception { } @Test - @Order(14) + @Order(15) void testUpdateConnectionWhenWorkflowUnreachable() throws Exception { // This test only covers the specific behavior of updating a connection that does not have an // underlying temporal workflow. @@ -659,7 +705,8 @@ void testUpdateConnectionWhenWorkflowUnreachable() throws Exception { LOGGER.info("Testing connection update when temporal is in a terminal state"); final UUID connectionId = - testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, ConnectionScheduleType.MANUAL, null) + .getConnectionId(); testHarness.terminateTemporalWorkflow(connectionId); @@ -675,7 +722,7 @@ void testUpdateConnectionWhenWorkflowUnreachable() throws Exception { } @Test - @Order(15) + @Order(16) void testManualSyncRepairsWorkflowWhenWorkflowUnreachable() throws Exception { // This test only covers the specific behavior of updating a connection that does not have an // underlying temporal workflow. @@ -701,7 +748,8 @@ void testManualSyncRepairsWorkflowWhenWorkflowUnreachable() throws Exception { LOGGER.info("Testing manual sync when temporal is in a terminal state"); final UUID connectionId = - testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, ConnectionScheduleType.MANUAL, null) + .getConnectionId(); LOGGER.info("Starting first manual sync"); final JobInfoRead firstJobInfo = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); @@ -724,7 +772,7 @@ void testManualSyncRepairsWorkflowWhenWorkflowUnreachable() throws Exception { } @Test - @Order(16) + @Order(17) void testResetConnectionRepairsWorkflowWhenWorkflowUnreachable() throws Exception { // This test only covers the specific behavior of updating a connection that does not have an // underlying temporal workflow. @@ -740,7 +788,8 @@ void testResetConnectionRepairsWorkflowWhenWorkflowUnreachable() throws Exceptio LOGGER.info("Testing reset connection when temporal is in a terminal state"); final UUID connectionId = - testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, ConnectionScheduleType.MANUAL, null) + .getConnectionId(); testHarness.terminateTemporalWorkflow(connectionId); @@ -749,7 +798,7 @@ void testResetConnectionRepairsWorkflowWhenWorkflowUnreachable() throws Exceptio } @Test - @Order(17) + @Order(18) void testResetCancelsRunningSync() throws Exception { final SourceDefinitionRead sourceDefinition = testHarness.createE2eSourceDefinition(); @@ -771,7 +820,8 @@ void testResetCancelsRunningSync() throws Exception { final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE; catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode)); final UUID connectionId = - testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, ConnectionScheduleType.MANUAL, null) + .getConnectionId(); final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); // wait to get out of PENDING @@ -813,7 +863,8 @@ void testSyncAfterUpgradeToPerStreamState(final TestInfo testInfo) throws Except .cursorField(List.of(COLUMN_ID)) .destinationSyncMode(DestinationSyncMode.APPEND)); final UUID connectionId = - testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, ConnectionScheduleType.MANUAL, null) + .getConnectionId(); LOGGER.info("Beginning {} sync 1", testInfo.getDisplayName()); final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() @@ -901,7 +952,8 @@ void testSyncAfterUpgradeToPerStreamStateWithNoNewData(final TestInfo testInfo) .cursorField(List.of(COLUMN_ID)) .destinationSyncMode(DestinationSyncMode.APPEND)); final UUID connectionId = - testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, ConnectionScheduleType.MANUAL, null) + .getConnectionId(); LOGGER.info("Beginning {} sync 1", testInfo.getDisplayName()); final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() @@ -976,7 +1028,8 @@ void testResetAllWhenSchemaIsModifiedForLegacySource() throws Exception { LOGGER.info("Discovered catalog: {}", catalog); final ConnectionRead connection = - testHarness.createConnection(name, sourceId, destinationId, List.of(operation.getOperationId()), catalog, null); + testHarness.createConnection(name, sourceId, destinationId, List.of(operation.getOperationId()), catalog, ConnectionScheduleType.MANUAL, + null); LOGGER.info("Created Connection: {}", connection); sourceDb.query(ctx -> { @@ -1092,6 +1145,7 @@ private void prettyPrintTables(final DSLContext ctx, final String... tables) { } @Test + @Disabled void testIncrementalSyncMultipleStreams() throws Exception { LOGGER.info("Starting testIncrementalSyncMultipleStreams()"); @@ -1118,7 +1172,8 @@ void testIncrementalSyncMultipleStreams() throws Exception { .cursorField(List.of(COLUMN_ID)) .destinationSyncMode(destinationSyncMode)); final UUID connectionId = - testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, ConnectionScheduleType.MANUAL, null) + .getConnectionId(); LOGGER.info("Beginning testIncrementalSync() sync 1"); final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() @@ -1197,7 +1252,8 @@ void testMultipleSchemasAndTablesSyncAndReset() throws Exception { final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE; catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode)); final UUID connectionId = - testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, ConnectionScheduleType.MANUAL, null) + .getConnectionId(); final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob()); testHarness.assertSourceAndDestinationDbInSync(false); @@ -1231,7 +1287,7 @@ void testPartialResetResetAllWhenSchemaIsModified(final TestInfo testInfo) throw testHarness.setIncrementalAppendSyncMode(catalog, List.of(COLUMN_ID)); final ConnectionRead connection = - testHarness.createConnection(name, sourceId, destinationId, List.of(operationId), catalog, null); + testHarness.createConnection(name, sourceId, destinationId, List.of(operationId), catalog, ConnectionScheduleType.MANUAL, null); // Run initial sync final JobInfoRead syncRead = @@ -1390,7 +1446,7 @@ void testFailureTimeout() throws Exception { final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); final UUID connectionId = - testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, Collections.emptyList(), catalog, null) + testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, Collections.emptyList(), catalog, ConnectionScheduleType.MANUAL, null) .getConnectionId(); final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index bb2bd5c349d8..1492b81f9060 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -23,6 +23,7 @@ import io.airbyte.api.client.model.generated.AirbyteStreamAndConfiguration; import io.airbyte.api.client.model.generated.ConnectionIdRequestBody; import io.airbyte.api.client.model.generated.ConnectionRead; +import io.airbyte.api.client.model.generated.ConnectionScheduleType; import io.airbyte.api.client.model.generated.ConnectionState; import io.airbyte.api.client.model.generated.ConnectionStateType; import io.airbyte.api.client.model.generated.DestinationDefinitionIdRequestBody; @@ -481,7 +482,8 @@ private UUID createCdcConnection() throws ApiException { .cursorField(List.of(COLUMN_ID)) .destinationSyncMode(destinationSyncMode)); final UUID connectionId = - testHarness.createConnection(CONNECTION_NAME, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + testHarness.createConnection(CONNECTION_NAME, sourceId, destinationId, List.of(operationId), catalog, ConnectionScheduleType.MANUAL, null) + .getConnectionId(); return connectionId; } diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/ContainerOrchestratorAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/ContainerOrchestratorAcceptanceTests.java index 38366d0a03a0..420e8ee17090 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/ContainerOrchestratorAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/ContainerOrchestratorAcceptanceTests.java @@ -13,6 +13,7 @@ import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.api.client.model.generated.AirbyteCatalog; import io.airbyte.api.client.model.generated.ConnectionIdRequestBody; +import io.airbyte.api.client.model.generated.ConnectionScheduleType; import io.airbyte.api.client.model.generated.DestinationDefinitionIdRequestBody; import io.airbyte.api.client.model.generated.DestinationDefinitionRead; import io.airbyte.api.client.model.generated.DestinationSyncMode; @@ -112,7 +113,8 @@ void testDowntimeDuringSync() throws Exception { LOGGER.info("Creating connection..."); final UUID connectionId = - testHarness.createConnection(connectionName, sourceId, destinationId, List.of(), catalog, null).getConnectionId(); + testHarness.createConnection(connectionName, sourceId, destinationId, List.of(), catalog, ConnectionScheduleType.MANUAL, null) + .getConnectionId(); LOGGER.info("Run manual sync..."); final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); @@ -153,7 +155,8 @@ void testCancelSyncWithInterruption() throws Exception { final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE; catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode)); final UUID connectionId = - testHarness.createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + testHarness.createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, ConnectionScheduleType.MANUAL, null) + .getConnectionId(); final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); waitWhileJobHasStatus(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.PENDING)); @@ -178,7 +181,8 @@ void testCancelSyncWhenCancelledWhenWorkerIsNotRunning() throws Exception { LOGGER.info("Creating connection..."); final UUID connectionId = - testHarness.createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + testHarness.createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, ConnectionScheduleType.MANUAL, null) + .getConnectionId(); LOGGER.info("Waiting for connection to be available in Temporal...");