From a7194667a885e00cfaf867686b2f33bdc609ad4d Mon Sep 17 00:00:00 2001 From: alovew Date: Tue, 15 Nov 2022 11:46:20 -0800 Subject: [PATCH 01/10] remove from orchestrator --- .../workers/config/ContainerOrchestratorConfigBeanFactory.java | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java index d9259005b3ce..79fe622aeeb3 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java @@ -76,7 +76,6 @@ public ContainerOrchestratorConfig kubernetesContainerOrchestratorConfig( environmentVariables.put(DD_DOGSTATSD_PORT_ENV_VAR, dataDogStatsdPort); environmentVariables.put(PUBLISH_METRICS_ENV_VAR, shouldPublishMetrics); environmentVariables.put(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, Boolean.toString(featureFlags.useStreamCapableState())); - environmentVariables.put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, Boolean.toString(featureFlags.autoDetectSchema())); environmentVariables.put(JAVA_OPTS_ENV_VAR, containerOrchestratorJavaOpts); if (System.getenv(DD_ENV_ENV_VAR) != null) { From c2ab26bcc07e68e2f4e98086c4ba521452453983 Mon Sep 17 00:00:00 2001 From: alovew Date: Tue, 15 Nov 2022 14:56:56 -0800 Subject: [PATCH 02/10] tests --- .../airbyte/commons/temporal/sync/OrchestratorConstants.java | 3 +-- .../io/airbyte/workers/process/AirbyteIntegrationLauncher.java | 3 +-- .../workers/process/AirbyteIntegrationLauncherTest.java | 3 +-- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java index 21a0ff14b645..7a7c7806d7d0 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java @@ -65,8 +65,7 @@ public class OrchestratorConstants { EnvConfigs.STATE_STORAGE_S3_ACCESS_KEY, EnvConfigs.STATE_STORAGE_S3_SECRET_ACCESS_KEY, EnvConfigs.STATE_STORAGE_S3_REGION, - EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, - EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA)) + EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE)) .build(); public static final String INIT_FILE_ENV_MAP = "envMap.json"; diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java index 5183e643f13a..7f8542f9b116 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java @@ -212,8 +212,7 @@ private Map getWorkerMetadata() { WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, imageName, WorkerEnvConstants.WORKER_JOB_ID, jobId, WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(attempt), - EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(featureFlags.useStreamCapableState()), - EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(featureFlags.autoDetectSchema())); + EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(featureFlags.useStreamCapableState())); } } diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java index 3be0ab36723b..03bf563792b3 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java @@ -53,8 +53,7 @@ class AirbyteIntegrationLauncherTest { WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, FAKE_IMAGE, WorkerEnvConstants.WORKER_JOB_ID, JOB_ID, WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(JOB_ATTEMPT), - EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(new EnvVariableFeatureFlags().useStreamCapableState()), - EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(new EnvVariableFeatureFlags().autoDetectSchema())); + EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(new EnvVariableFeatureFlags().useStreamCapableState())); private WorkerConfigs workerConfigs; @Mock From 09e97e62039a2fae713311cfd06731276a94faa1 Mon Sep 17 00:00:00 2001 From: alovew Date: Tue, 15 Nov 2022 16:50:16 -0800 Subject: [PATCH 03/10] Revert "remove from orchestrator" This reverts commit dc7a1d187f5adef9772d7c679166ddaff8101504. --- .../workers/config/ContainerOrchestratorConfigBeanFactory.java | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java index 79fe622aeeb3..d9259005b3ce 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java @@ -76,6 +76,7 @@ public ContainerOrchestratorConfig kubernetesContainerOrchestratorConfig( environmentVariables.put(DD_DOGSTATSD_PORT_ENV_VAR, dataDogStatsdPort); environmentVariables.put(PUBLISH_METRICS_ENV_VAR, shouldPublishMetrics); environmentVariables.put(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, Boolean.toString(featureFlags.useStreamCapableState())); + environmentVariables.put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, Boolean.toString(featureFlags.autoDetectSchema())); environmentVariables.put(JAVA_OPTS_ENV_VAR, containerOrchestratorJavaOpts); if (System.getenv(DD_ENV_ENV_VAR) != null) { From 7eb58267fea2555cb61e345e8d3e3d5a7ae5c8f6 Mon Sep 17 00:00:00 2001 From: alovew Date: Tue, 15 Nov 2022 16:51:16 -0800 Subject: [PATCH 04/10] revert --- .../airbyte/commons/temporal/sync/OrchestratorConstants.java | 3 ++- .../io/airbyte/workers/process/AirbyteIntegrationLauncher.java | 3 ++- .../workers/process/AirbyteIntegrationLauncherTest.java | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java index 7a7c7806d7d0..21a0ff14b645 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java @@ -65,7 +65,8 @@ public class OrchestratorConstants { EnvConfigs.STATE_STORAGE_S3_ACCESS_KEY, EnvConfigs.STATE_STORAGE_S3_SECRET_ACCESS_KEY, EnvConfigs.STATE_STORAGE_S3_REGION, - EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE)) + EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, + EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA)) .build(); public static final String INIT_FILE_ENV_MAP = "envMap.json"; diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java index 7f8542f9b116..5183e643f13a 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java @@ -212,7 +212,8 @@ private Map getWorkerMetadata() { WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, imageName, WorkerEnvConstants.WORKER_JOB_ID, jobId, WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(attempt), - EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(featureFlags.useStreamCapableState())); + EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(featureFlags.useStreamCapableState()), + EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(featureFlags.autoDetectSchema())); } } diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java index 03bf563792b3..3be0ab36723b 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java @@ -53,7 +53,8 @@ class AirbyteIntegrationLauncherTest { WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, FAKE_IMAGE, WorkerEnvConstants.WORKER_JOB_ID, JOB_ID, WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(JOB_ATTEMPT), - EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(new EnvVariableFeatureFlags().useStreamCapableState())); + EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(new EnvVariableFeatureFlags().useStreamCapableState()), + EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(new EnvVariableFeatureFlags().autoDetectSchema())); private WorkerConfigs workerConfigs; @Mock From 7d2b98d7703e1a295034e57322e50bdf5e9dd402 Mon Sep 17 00:00:00 2001 From: alovew Date: Tue, 22 Nov 2022 11:20:00 -0800 Subject: [PATCH 05/10] add connection status to sourceDiscoverSchemaRead --- airbyte-api/src/main/openapi/config.yaml | 2 ++ .../server/handlers/SchedulerHandler.java | 19 ++++++++++++++++--- .../WebBackendConnectionsHandler.java | 1 + .../server/handlers/SchedulerHandlerTest.java | 10 ++++++++-- .../WebBackendConnectionsHandlerTest.java | 14 ++++++++------ .../server/helpers/ConnectionHelpers.java | 17 ++++++++++++++--- .../api/generated-api-html/index.html | 1 + .../examples/airbyte.local/openapi.yaml | 2 ++ 8 files changed, 52 insertions(+), 14 deletions(-) diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index d1745349070b..3ca2408e707c 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -2935,6 +2935,8 @@ components: $ref: "#/components/schemas/CatalogDiff" breakingChange: type: boolean + connectionStatus: + $ref: "#/components/schemas/ConnectionStatus" SourceSearch: type: object properties: diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index 49a4a75940e9..cd0b3744c06e 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -15,6 +15,7 @@ import io.airbyte.api.model.generated.CheckConnectionRead; import io.airbyte.api.model.generated.CheckConnectionRead.StatusEnum; import io.airbyte.api.model.generated.ConnectionIdRequestBody; +import io.airbyte.api.model.generated.ConnectionStatus; import io.airbyte.api.model.generated.ConnectionUpdate; import io.airbyte.api.model.generated.DestinationCoreConfig; import io.airbyte.api.model.generated.DestinationDefinitionIdWithWorkspaceId; @@ -39,6 +40,7 @@ import io.airbyte.api.model.generated.SynchronousJobRead; import io.airbyte.commons.docker.DockerUtils; import io.airbyte.commons.enums.Enums; +import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.temporal.ErrorCode; import io.airbyte.commons.temporal.TemporalClient.ManualOperationResult; @@ -95,6 +97,7 @@ public class SchedulerHandler { private final JobPersistence jobPersistence; private final JobConverter jobConverter; private final EventRunner eventRunner; + private final EnvVariableFeatureFlags envVariableFeatureFlags; public SchedulerHandler(final ConfigRepository configRepository, final SecretsRepositoryReader secretsRepositoryReader, @@ -114,7 +117,8 @@ public SchedulerHandler(final ConfigRepository configRepository, jobPersistence, eventRunner, new JobConverter(workerEnvironment, logConfigs), - connectionsHandler); + connectionsHandler, + new EnvVariableFeatureFlags()); } @VisibleForTesting @@ -126,7 +130,8 @@ public SchedulerHandler(final ConfigRepository configRepository, final JobPersistence jobPersistence, final EventRunner eventRunner, final JobConverter jobConverter, - final ConnectionsHandler connectionsHandler) { + final ConnectionsHandler connectionsHandler, + final EnvVariableFeatureFlags envVariableFeatureFlags) { this.configRepository = configRepository; this.secretsRepositoryWriter = secretsRepositoryWriter; this.synchronousSchedulerClient = synchronousSchedulerClient; @@ -136,6 +141,7 @@ public SchedulerHandler(final ConfigRepository configRepository, this.eventRunner = eventRunner; this.jobConverter = jobConverter; this.connectionsHandler = connectionsHandler; + this.envVariableFeatureFlags = envVariableFeatureFlags; } public CheckConnectionRead checkSourceConnectionFromSourceId(final SourceIdRequestBody sourceIdRequestBody) @@ -367,8 +373,15 @@ private void discoveredSchemaWithCatalogDiff(SourceDiscoverSchemaRead discovered boolean containsBreakingChange = containsBreakingChange(diff); ConnectionUpdate updateObject = new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(discoverSchemaRequestBody.getConnectionId()); + ConnectionStatus connectionStatus; + if (envVariableFeatureFlags.autoDetectSchema() && containsBreakingChange) { + connectionStatus = ConnectionStatus.INACTIVE; + } else { + connectionStatus = ConnectionStatus.ACTIVE; + } + updateObject.status(connectionStatus); connectionsHandler.updateConnection(updateObject); - discoveredSchema.catalogDiff(diff).breakingChange(containsBreakingChange); + discoveredSchema.catalogDiff(diff).breakingChange(containsBreakingChange).connectionStatus(connectionStatus); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index 9931d3665f93..a261f79426aa 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -353,6 +353,7 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti */ diff = refreshedCatalog.get().getCatalogDiff(); connection.setBreakingChange(refreshedCatalog.get().getBreakingChange()); + connection.setStatus(refreshedCatalog.get().getConnectionStatus()); } else if (catalogUsedToMakeConfiguredCatalog.isPresent()) { // reconstructs a full picture of the full schema at the time the catalog was configured. syncCatalog = updateSchemaWithDiscovery(configuredCatalog, catalogUsedToMakeConfiguredCatalog.get()); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java index 2eb3e9be56d7..181f87fc713b 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java @@ -25,6 +25,7 @@ import io.airbyte.api.model.generated.CheckConnectionRead; import io.airbyte.api.model.generated.ConnectionIdRequestBody; import io.airbyte.api.model.generated.ConnectionRead; +import io.airbyte.api.model.generated.ConnectionStatus; import io.airbyte.api.model.generated.ConnectionUpdate; import io.airbyte.api.model.generated.DestinationCoreConfig; import io.airbyte.api.model.generated.DestinationDefinitionIdWithWorkspaceId; @@ -45,6 +46,7 @@ import io.airbyte.api.model.generated.StreamTransform.TransformTypeEnum; import io.airbyte.commons.docker.DockerUtils; import io.airbyte.commons.enums.Enums; +import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions; import io.airbyte.commons.temporal.ErrorCode; @@ -145,6 +147,7 @@ class SchedulerHandlerTest { private EventRunner eventRunner; private JobConverter jobConverter; private ConnectionsHandler connectionsHandler; + private EnvVariableFeatureFlags envVariableFeatureFlags; @BeforeEach void setup() { @@ -162,6 +165,7 @@ void setup() { jobPersistence = mock(JobPersistence.class); eventRunner = mock(EventRunner.class); connectionsHandler = mock(ConnectionsHandler.class); + envVariableFeatureFlags = mock(EnvVariableFeatureFlags.class); jobConverter = spy(new JobConverter(WorkerEnvironment.DOCKER, LogConfigs.EMPTY)); @@ -174,7 +178,8 @@ void setup() { jobPersistence, eventRunner, jobConverter, - connectionsHandler); + connectionsHandler, + envVariableFeatureFlags); } @Test @@ -619,7 +624,8 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdBreaking() throws IOException final AirbyteCatalog persistenceCatalog = Jsons.object(actorCatalog.getCatalog(), io.airbyte.protocol.models.AirbyteCatalog.class); final io.airbyte.api.model.generated.AirbyteCatalog expectedActorCatalog = CatalogConverter.toApi(persistenceCatalog); - final ConnectionUpdate expectedConnectionUpdate = new ConnectionUpdate().connectionId(connectionId).breakingChange(true); + final ConnectionUpdate expectedConnectionUpdate = + new ConnectionUpdate().connectionId(connectionId).breakingChange(true).status(ConnectionStatus.ACTIVE); final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request); assertEquals(actual.getCatalogDiff(), catalogDiff); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java index 04d78fe0273f..6f6506392321 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java @@ -76,6 +76,7 @@ import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardSync; +import io.airbyte.config.StandardSync.Status; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.ConfigRepository.DestinationAndDefinition; @@ -173,9 +174,10 @@ void setup() throws IOException, JsonValidationException, ConfigNotFoundExceptio final DestinationRead destinationRead = DestinationHelpers.getDestinationRead(destination, destinationDefinition); final StandardSync standardSync = - ConnectionHelpers.generateSyncWithSourceAndDestinationId(source.getSourceId(), destination.getDestinationId(), false); + ConnectionHelpers.generateSyncWithSourceAndDestinationId(source.getSourceId(), destination.getDestinationId(), false, Status.ACTIVE); final StandardSync brokenStandardSync = - ConnectionHelpers.generateSyncWithSourceAndDestinationId(source.getSourceId(), destination.getDestinationId(), true); + ConnectionHelpers.generateSyncWithSourceAndDestinationId(source.getSourceId(), destination.getDestinationId(), true, Status.INACTIVE); + when(configRepository.listWorkspaceStandardSyncs(sourceRead.getWorkspaceId(), false)) .thenReturn(Collections.singletonList(standardSync)); when(configRepository.getSourceAndDefinitionsFromSourceIds(Collections.singletonList(source.getSourceId()))) @@ -277,7 +279,7 @@ void setup() throws IOException, JsonValidationException, ConfigNotFoundExceptio .streamDescriptor(new io.airbyte.api.model.generated.StreamDescriptor().name("users-data1")) .updateStream(null)))); - expectedWithNewSchemaAndBreakingChange = expectedWebBackendConnectionReadObject(connectionRead, sourceRead, destinationRead, + expectedWithNewSchemaAndBreakingChange = expectedWebBackendConnectionReadObject(brokenConnectionRead, sourceRead, destinationRead, new OperationReadList().operations(expected.getOperations()), SchemaChange.BREAKING, now, modifiedCatalog, null) .catalogDiff(new CatalogDiff().transforms(List.of( new StreamTransform().transformType(TransformTypeEnum.ADD_STREAM) @@ -418,7 +420,7 @@ void testWebBackendGetConnectionWithDiscoveryAndNewSchema() throws ConfigNotFoun when(configRepository.getActorCatalogById(any())).thenReturn(new ActorCatalog().withId(UUID.randomUUID())); SourceDiscoverSchemaRead schemaRead = new SourceDiscoverSchemaRead().catalogDiff(expectedWithNewSchema.getCatalogDiff()).catalog(expectedWithNewSchema.getSyncCatalog()) - .breakingChange(false); + .breakingChange(false).connectionStatus(ConnectionStatus.ACTIVE); when(schedulerHandler.discoverSchemaForSourceFromSourceId(any())).thenReturn(schemaRead); final WebBackendConnectionRead result = testWebBackendGetConnection(true, connectionRead, @@ -435,10 +437,10 @@ void testWebBackendGetConnectionWithDiscoveryAndNewSchemaBreakingChange() throws when(configRepository.getActorCatalogById(any())).thenReturn(new ActorCatalog().withId(UUID.randomUUID())); SourceDiscoverSchemaRead schemaRead = new SourceDiscoverSchemaRead().catalogDiff(expectedWithNewSchema.getCatalogDiff()).catalog(expectedWithNewSchema.getSyncCatalog()) - .breakingChange(true); + .breakingChange(true).connectionStatus(ConnectionStatus.INACTIVE); when(schedulerHandler.discoverSchemaForSourceFromSourceId(any())).thenReturn(schemaRead); - final WebBackendConnectionRead result = testWebBackendGetConnection(true, connectionRead, + final WebBackendConnectionRead result = testWebBackendGetConnection(true, brokenConnectionRead, operationReadList); assertEquals(expectedWithNewSchemaAndBreakingChange, result); } diff --git a/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java b/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java index ab742a079b36..908a5749e72f 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java +++ b/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java @@ -33,6 +33,7 @@ import io.airbyte.config.Schedule.TimeUnit; import io.airbyte.config.ScheduleData; import io.airbyte.config.StandardSync; +import io.airbyte.config.StandardSync.Status; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; @@ -107,7 +108,10 @@ public static StandardSync generateSyncWithDestinationId(final UUID destinationI .withManual(true); } - public static StandardSync generateSyncWithSourceAndDestinationId(final UUID sourceId, final UUID destinationId, final boolean isBroken) { + public static StandardSync generateSyncWithSourceAndDestinationId(final UUID sourceId, + final UUID destinationId, + final boolean isBroken, + final Status status) { final UUID connectionId = UUID.randomUUID(); return new StandardSync() @@ -116,7 +120,7 @@ public static StandardSync generateSyncWithSourceAndDestinationId(final UUID sou .withNamespaceDefinition(NamespaceDefinitionType.SOURCE) .withNamespaceFormat(null) .withPrefix(STANDARD_SYNC_PREFIX) - .withStatus(StandardSync.Status.ACTIVE) + .withStatus(status) .withCatalog(generateBasicConfiguredAirbyteCatalog()) .withSourceCatalogId(UUID.randomUUID()) .withSourceId(sourceId) @@ -166,7 +170,6 @@ public static ConnectionRead generateExpectedConnectionRead(final UUID connectio .namespaceDefinition(io.airbyte.api.model.generated.NamespaceDefinitionType.SOURCE) .namespaceFormat(null) .prefix("presto_to_hudi") - .status(ConnectionStatus.ACTIVE) .schedule(generateBasicConnectionSchedule()) .scheduleType(ConnectionScheduleType.BASIC) .scheduleData(generateBasicConnectionScheduleData()) @@ -199,6 +202,14 @@ public static ConnectionRead generateExpectedConnectionRead(final StandardSync s .units(standardSync.getSchedule().getUnits())); } + if (standardSync.getStatus() == Status.INACTIVE) { + connectionRead.setStatus(ConnectionStatus.INACTIVE); + } else if (standardSync.getStatus() == Status.ACTIVE) { + connectionRead.setStatus(ConnectionStatus.ACTIVE); + } else if (standardSync.getStatus() == Status.DEPRECATED) { + connectionRead.setStatus(ConnectionStatus.DEPRECATED); + } + return connectionRead; } diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index 4b88a9f3fdff..9257c2920bbf 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -11891,6 +11891,7 @@

SourceDiscoverSchemaRead - <
catalogId (optional)
UUID format: uuid
catalogDiff (optional)
breakingChange (optional)
+
connectionStatus (optional)
diff --git a/tools/openapi2jsonschema/examples/airbyte.local/openapi.yaml b/tools/openapi2jsonschema/examples/airbyte.local/openapi.yaml index 30c32da65c21..6dd25eb599e6 100644 --- a/tools/openapi2jsonschema/examples/airbyte.local/openapi.yaml +++ b/tools/openapi2jsonschema/examples/airbyte.local/openapi.yaml @@ -2493,6 +2493,8 @@ components: $ref: "#/components/schemas/CatalogDiff" breakingChange: type: boolean + connectionStatus: + $ref: "#/components/schemas/ConnectionStatus" required: - jobInfo type: object From 11b7fc2298719478c3791ed7190c1516863af912 Mon Sep 17 00:00:00 2001 From: alovew Date: Mon, 28 Nov 2022 10:07:05 -0800 Subject: [PATCH 06/10] check connection preferences --- .../airbyte/server/handlers/SchedulerHandler.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index cd0b3744c06e..60034643a67e 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -15,6 +15,7 @@ import io.airbyte.api.model.generated.CheckConnectionRead; import io.airbyte.api.model.generated.CheckConnectionRead.StatusEnum; import io.airbyte.api.model.generated.ConnectionIdRequestBody; +import io.airbyte.api.model.generated.ConnectionRead; import io.airbyte.api.model.generated.ConnectionStatus; import io.airbyte.api.model.generated.ConnectionUpdate; import io.airbyte.api.model.generated.DestinationCoreConfig; @@ -28,6 +29,7 @@ import io.airbyte.api.model.generated.JobIdRequestBody; import io.airbyte.api.model.generated.JobInfoRead; import io.airbyte.api.model.generated.LogRead; +import io.airbyte.api.model.generated.NonBreakingChangesPreference; import io.airbyte.api.model.generated.SourceCoreConfig; import io.airbyte.api.model.generated.SourceDefinitionIdWithWorkspaceId; import io.airbyte.api.model.generated.SourceDefinitionSpecificationRead; @@ -366,15 +368,16 @@ private void discoveredSchemaWithCatalogDiff(SourceDiscoverSchemaRead discovered throws JsonValidationException, ConfigNotFoundException, IOException { final Optional catalogUsedToMakeConfiguredCatalog = connectionsHandler .getConnectionAirbyteCatalog(discoverSchemaRequestBody.getConnectionId()); + final ConnectionRead connectionRead = connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId()); final io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog = - connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId()).getSyncCatalog(); + connectionRead.getSyncCatalog(); CatalogDiff diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(), CatalogConverter.toProtocol(currentAirbyteCatalog)); boolean containsBreakingChange = containsBreakingChange(diff); ConnectionUpdate updateObject = new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(discoverSchemaRequestBody.getConnectionId()); ConnectionStatus connectionStatus; - if (envVariableFeatureFlags.autoDetectSchema() && containsBreakingChange) { + if (shouldDisableConnection(containsBreakingChange, connectionRead.getNonBreakingChangesPreference())) { connectionStatus = ConnectionStatus.INACTIVE; } else { connectionStatus = ConnectionStatus.ACTIVE; @@ -385,6 +388,14 @@ private void discoveredSchemaWithCatalogDiff(SourceDiscoverSchemaRead discovered } + private boolean shouldDisableConnection(boolean containsBreakingChange, NonBreakingChangesPreference preference) { + if (!envVariableFeatureFlags.autoDetectSchema()) { + return false; + } + + return containsBreakingChange || preference == NonBreakingChangesPreference.DISABLE; + } + private CheckConnectionRead reportConnectionStatus(final SynchronousResponse response) { final CheckConnectionRead checkConnectionRead = new CheckConnectionRead() .jobInfo(jobConverter.getSynchronousJobRead(response)); From bc3aa28d52faf53e4e728c3cb873b49741433a95 Mon Sep 17 00:00:00 2001 From: alovew Date: Mon, 28 Nov 2022 11:47:20 -0800 Subject: [PATCH 07/10] tests --- .../server/handlers/SchedulerHandlerTest.java | 159 ++++++++++++++++++ 1 file changed, 159 insertions(+) diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java index 181f87fc713b..679080aa16d2 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java @@ -35,6 +35,7 @@ import io.airbyte.api.model.generated.FieldTransform; import io.airbyte.api.model.generated.JobIdRequestBody; import io.airbyte.api.model.generated.JobInfoRead; +import io.airbyte.api.model.generated.NonBreakingChangesPreference; import io.airbyte.api.model.generated.SourceCoreConfig; import io.airbyte.api.model.generated.SourceDefinitionIdWithWorkspaceId; import io.airbyte.api.model.generated.SourceDefinitionSpecificationRead; @@ -582,6 +583,110 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreaking() throws IOExcept assertEquals(actual.getCatalog(), expectedActorCatalog); } + @Test + void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreakingDisableConnectionPreferenceNoFeatureFlag() + throws IOException, JsonValidationException, ConfigNotFoundException { + final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); + final UUID connectionId = UUID.randomUUID(); + final UUID discoveredCatalogId = UUID.randomUUID(); + final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; + final SourceDiscoverSchemaRequestBody request = + new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true); + final StreamTransform streamTransform = new StreamTransform().transformType(TransformTypeEnum.REMOVE_STREAM) + .streamDescriptor(new io.airbyte.api.model.generated.StreamDescriptor().name(DOGS)); + final CatalogDiff catalogDiff = new CatalogDiff().addTransformsItem(streamTransform); + when(envVariableFeatureFlags.autoDetectSchema()).thenReturn(false); + when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId())) + .thenReturn(new StandardSourceDefinition() + .withDockerRepository(SOURCE_DOCKER_REPO) + .withDockerImageTag(SOURCE_DOCKER_TAG) + .withProtocolVersion(SOURCE_PROTOCOL_VERSION) + .withSourceDefinitionId(source.getSourceDefinitionId())); + when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source); + when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG, new Version(SOURCE_PROTOCOL_VERSION))) + .thenReturn(discoverResponse); + + when(discoverResponse.isSuccess()).thenReturn(true); + when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId); + + final AirbyteCatalog airbyteCatalogCurrent = new AirbyteCatalog().withStreams(Lists.newArrayList( + CatalogHelpers.createAirbyteStream("shoes", Field.of("sku", JsonSchemaType.STRING)), + CatalogHelpers.createAirbyteStream(DOGS, Field.of(NAME, JsonSchemaType.STRING)))); + + final ConnectionRead connectionRead = + new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent)).nonBreakingChangesPreference( + NonBreakingChangesPreference.DISABLE); + when(connectionsHandler.getConnection(request.getConnectionId())).thenReturn(connectionRead); + when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff); + + final ActorCatalog actorCatalog = new ActorCatalog() + .withCatalog(Jsons.jsonNode(airbyteCatalog)) + .withCatalogHash("") + .withId(discoveredCatalogId); + when(configRepository.getActorCatalogById(discoveredCatalogId)).thenReturn(actorCatalog); + + final AirbyteCatalog persistenceCatalog = Jsons.object(actorCatalog.getCatalog(), + io.airbyte.protocol.models.AirbyteCatalog.class); + final io.airbyte.api.model.generated.AirbyteCatalog expectedActorCatalog = CatalogConverter.toApi(persistenceCatalog); + + final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request); + assertEquals(actual.getCatalogDiff(), catalogDiff); + assertEquals(actual.getCatalog(), expectedActorCatalog); + assertEquals(actual.getConnectionStatus(), ConnectionStatus.ACTIVE); + } + + @Test + void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreakingDisableConnectionPreferenceFeatureFlag() + throws IOException, JsonValidationException, ConfigNotFoundException { + final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); + final UUID connectionId = UUID.randomUUID(); + final UUID discoveredCatalogId = UUID.randomUUID(); + final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; + final SourceDiscoverSchemaRequestBody request = + new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true); + final StreamTransform streamTransform = new StreamTransform().transformType(TransformTypeEnum.REMOVE_STREAM) + .streamDescriptor(new io.airbyte.api.model.generated.StreamDescriptor().name(DOGS)); + final CatalogDiff catalogDiff = new CatalogDiff().addTransformsItem(streamTransform); + when(envVariableFeatureFlags.autoDetectSchema()).thenReturn(true); + when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId())) + .thenReturn(new StandardSourceDefinition() + .withDockerRepository(SOURCE_DOCKER_REPO) + .withDockerImageTag(SOURCE_DOCKER_TAG) + .withProtocolVersion(SOURCE_PROTOCOL_VERSION) + .withSourceDefinitionId(source.getSourceDefinitionId())); + when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source); + when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG, new Version(SOURCE_PROTOCOL_VERSION))) + .thenReturn(discoverResponse); + + when(discoverResponse.isSuccess()).thenReturn(true); + when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId); + + final AirbyteCatalog airbyteCatalogCurrent = new AirbyteCatalog().withStreams(Lists.newArrayList( + CatalogHelpers.createAirbyteStream("shoes", Field.of("sku", JsonSchemaType.STRING)), + CatalogHelpers.createAirbyteStream(DOGS, Field.of(NAME, JsonSchemaType.STRING)))); + + final ConnectionRead connectionRead = + new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent)).nonBreakingChangesPreference( + NonBreakingChangesPreference.DISABLE); + when(connectionsHandler.getConnection(request.getConnectionId())).thenReturn(connectionRead); + when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff); + + final ActorCatalog actorCatalog = new ActorCatalog() + .withCatalog(Jsons.jsonNode(airbyteCatalog)) + .withCatalogHash("") + .withId(discoveredCatalogId); + when(configRepository.getActorCatalogById(discoveredCatalogId)).thenReturn(actorCatalog); + + final AirbyteCatalog persistenceCatalog = Jsons.object(actorCatalog.getCatalog(), + io.airbyte.protocol.models.AirbyteCatalog.class); + final io.airbyte.api.model.generated.AirbyteCatalog expectedActorCatalog = CatalogConverter.toApi(persistenceCatalog); + + final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request); + assertEquals(actual.getCatalogDiff(), catalogDiff); + assertEquals(actual.getCatalog(), expectedActorCatalog); + assertEquals(actual.getConnectionStatus(), ConnectionStatus.INACTIVE); + } + @Test void testDiscoverSchemaFromSourceIdWithConnectionIdBreaking() throws IOException, JsonValidationException, ConfigNotFoundException { final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); @@ -630,6 +735,60 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdBreaking() throws IOException final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request); assertEquals(actual.getCatalogDiff(), catalogDiff); assertEquals(actual.getCatalog(), expectedActorCatalog); + assertEquals(actual.getConnectionStatus(), ConnectionStatus.ACTIVE); + verify(connectionsHandler).updateConnection(expectedConnectionUpdate); + } + + @Test + void testDiscoverSchemaFromSourceIdWithConnectionIdBreakingFeatureFlagOn() throws IOException, JsonValidationException, ConfigNotFoundException { + final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); + final UUID connectionId = UUID.randomUUID(); + final UUID discoveredCatalogId = UUID.randomUUID(); + final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; + final SourceDiscoverSchemaRequestBody request = + new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true); + final StreamTransform streamTransform = new StreamTransform().transformType(TransformTypeEnum.UPDATE_STREAM) + .streamDescriptor(new io.airbyte.api.model.generated.StreamDescriptor().name(DOGS)).addUpdateStreamItem(new FieldTransform().transformType( + FieldTransform.TransformTypeEnum.REMOVE_FIELD).breaking(true)); + final CatalogDiff catalogDiff = new CatalogDiff().addTransformsItem(streamTransform); + when(envVariableFeatureFlags.autoDetectSchema()).thenReturn(true); + when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId())) + .thenReturn(new StandardSourceDefinition() + .withDockerRepository(SOURCE_DOCKER_REPO) + .withDockerImageTag(SOURCE_DOCKER_TAG) + .withProtocolVersion(SOURCE_PROTOCOL_VERSION) + .withSourceDefinitionId(source.getSourceDefinitionId())); + when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source); + when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG, new Version(SOURCE_PROTOCOL_VERSION))) + .thenReturn(discoverResponse); + + when(discoverResponse.isSuccess()).thenReturn(true); + when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId); + + final AirbyteCatalog airbyteCatalogCurrent = new AirbyteCatalog().withStreams(Lists.newArrayList( + CatalogHelpers.createAirbyteStream("shoes", Field.of("sku", JsonSchemaType.STRING)), + CatalogHelpers.createAirbyteStream(DOGS, Field.of(NAME, JsonSchemaType.STRING)))); + + final ConnectionRead connectionRead = new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent)); + when(connectionsHandler.getConnection(request.getConnectionId())).thenReturn(connectionRead); + when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff); + + final ActorCatalog actorCatalog = new ActorCatalog() + .withCatalog(Jsons.jsonNode(airbyteCatalog)) + .withCatalogHash("") + .withId(discoveredCatalogId); + when(configRepository.getActorCatalogById(discoveredCatalogId)).thenReturn(actorCatalog); + + final AirbyteCatalog persistenceCatalog = Jsons.object(actorCatalog.getCatalog(), + io.airbyte.protocol.models.AirbyteCatalog.class); + final io.airbyte.api.model.generated.AirbyteCatalog expectedActorCatalog = CatalogConverter.toApi(persistenceCatalog); + final ConnectionUpdate expectedConnectionUpdate = + new ConnectionUpdate().connectionId(connectionId).breakingChange(true).status(ConnectionStatus.INACTIVE); + + final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request); + assertEquals(actual.getCatalogDiff(), catalogDiff); + assertEquals(actual.getCatalog(), expectedActorCatalog); + assertEquals(actual.getConnectionStatus(), ConnectionStatus.INACTIVE); verify(connectionsHandler).updateConnection(expectedConnectionUpdate); } From e0c6fd4db0751c8c048c6bf771e8f40c8cf85d06 Mon Sep 17 00:00:00 2001 From: alovew Date: Mon, 28 Nov 2022 12:32:47 -0800 Subject: [PATCH 08/10] check for diff --- .../server/handlers/SchedulerHandler.java | 6 +++--- .../server/handlers/SchedulerHandlerTest.java | 16 +++++++++------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index 60034643a67e..0352faab5dea 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -377,7 +377,7 @@ private void discoveredSchemaWithCatalogDiff(SourceDiscoverSchemaRead discovered ConnectionUpdate updateObject = new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(discoverSchemaRequestBody.getConnectionId()); ConnectionStatus connectionStatus; - if (shouldDisableConnection(containsBreakingChange, connectionRead.getNonBreakingChangesPreference())) { + if (shouldDisableConnection(containsBreakingChange, connectionRead.getNonBreakingChangesPreference(), diff)) { connectionStatus = ConnectionStatus.INACTIVE; } else { connectionStatus = ConnectionStatus.ACTIVE; @@ -388,12 +388,12 @@ private void discoveredSchemaWithCatalogDiff(SourceDiscoverSchemaRead discovered } - private boolean shouldDisableConnection(boolean containsBreakingChange, NonBreakingChangesPreference preference) { + private boolean shouldDisableConnection(boolean containsBreakingChange, NonBreakingChangesPreference preference, CatalogDiff diff) { if (!envVariableFeatureFlags.autoDetectSchema()) { return false; } - return containsBreakingChange || preference == NonBreakingChangesPreference.DISABLE; + return containsBreakingChange || (preference == NonBreakingChangesPreference.DISABLE && !diff.getTransforms().isEmpty()); } private CheckConnectionRead reportConnectionStatus(final SynchronousResponse response) { diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java index 679080aa16d2..3e7c7d5bbb3e 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java @@ -109,9 +109,11 @@ class SchedulerHandlerTest { private static final String DESTINATION_PROTOCOL_VERSION = "0.7.9"; private static final String NAME = "name"; private static final String DOGS = "dogs"; + private static final String SHOES = "shoes"; + private static final String SKU = "sku"; - private static final AirbyteCatalog airbyteCatalog = CatalogHelpers.createAirbyteCatalog("shoes", - Field.of("sku", JsonSchemaType.STRING)); + private static final AirbyteCatalog airbyteCatalog = CatalogHelpers.createAirbyteCatalog(SHOES, + Field.of(SKU, JsonSchemaType.STRING)); private static final SourceConnection SOURCE = new SourceConnection() .withName("my postgres db") @@ -561,7 +563,7 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreaking() throws IOExcept when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId); final AirbyteCatalog airbyteCatalogCurrent = new AirbyteCatalog().withStreams(Lists.newArrayList( - CatalogHelpers.createAirbyteStream("shoes", Field.of("sku", JsonSchemaType.STRING)), + CatalogHelpers.createAirbyteStream(SHOES, Field.of(SKU, JsonSchemaType.STRING)), CatalogHelpers.createAirbyteStream(DOGS, Field.of(NAME, JsonSchemaType.STRING)))); final ConnectionRead connectionRead = new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent)); @@ -610,7 +612,7 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreakingDisableConnectionP when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId); final AirbyteCatalog airbyteCatalogCurrent = new AirbyteCatalog().withStreams(Lists.newArrayList( - CatalogHelpers.createAirbyteStream("shoes", Field.of("sku", JsonSchemaType.STRING)), + CatalogHelpers.createAirbyteStream(SHOES, Field.of(SKU, JsonSchemaType.STRING)), CatalogHelpers.createAirbyteStream(DOGS, Field.of(NAME, JsonSchemaType.STRING)))); final ConnectionRead connectionRead = @@ -662,7 +664,7 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreakingDisableConnectionP when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId); final AirbyteCatalog airbyteCatalogCurrent = new AirbyteCatalog().withStreams(Lists.newArrayList( - CatalogHelpers.createAirbyteStream("shoes", Field.of("sku", JsonSchemaType.STRING)), + CatalogHelpers.createAirbyteStream(SHOES, Field.of(SKU, JsonSchemaType.STRING)), CatalogHelpers.createAirbyteStream(DOGS, Field.of(NAME, JsonSchemaType.STRING)))); final ConnectionRead connectionRead = @@ -713,7 +715,7 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdBreaking() throws IOException when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId); final AirbyteCatalog airbyteCatalogCurrent = new AirbyteCatalog().withStreams(Lists.newArrayList( - CatalogHelpers.createAirbyteStream("shoes", Field.of("sku", JsonSchemaType.STRING)), + CatalogHelpers.createAirbyteStream(SHOES, Field.of(SKU, JsonSchemaType.STRING)), CatalogHelpers.createAirbyteStream(DOGS, Field.of(NAME, JsonSchemaType.STRING)))); final ConnectionRead connectionRead = new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent)); @@ -766,7 +768,7 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdBreakingFeatureFlagOn() throw when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId); final AirbyteCatalog airbyteCatalogCurrent = new AirbyteCatalog().withStreams(Lists.newArrayList( - CatalogHelpers.createAirbyteStream("shoes", Field.of("sku", JsonSchemaType.STRING)), + CatalogHelpers.createAirbyteStream(SHOES, Field.of(SKU, JsonSchemaType.STRING)), CatalogHelpers.createAirbyteStream(DOGS, Field.of(NAME, JsonSchemaType.STRING)))); final ConnectionRead connectionRead = new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent)); From 59c63967f9c12b7147781c5cb750e542d3a0f2d4 Mon Sep 17 00:00:00 2001 From: alovew Date: Mon, 28 Nov 2022 12:38:12 -0800 Subject: [PATCH 09/10] tests --- .../server/handlers/SchedulerHandlerTest.java | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java index 3e7c7d5bbb3e..9a1f657468b0 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java @@ -794,6 +794,57 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdBreakingFeatureFlagOn() throw verify(connectionsHandler).updateConnection(expectedConnectionUpdate); } + @Test + void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreakingDisableConnectionPreferenceFeatureFlagNoDiff() + throws IOException, JsonValidationException, ConfigNotFoundException { + final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); + final UUID connectionId = UUID.randomUUID(); + final UUID discoveredCatalogId = UUID.randomUUID(); + final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; + final SourceDiscoverSchemaRequestBody request = + new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true); + final CatalogDiff catalogDiff = new CatalogDiff(); + when(envVariableFeatureFlags.autoDetectSchema()).thenReturn(true); + when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId())) + .thenReturn(new StandardSourceDefinition() + .withDockerRepository(SOURCE_DOCKER_REPO) + .withDockerImageTag(SOURCE_DOCKER_TAG) + .withProtocolVersion(SOURCE_PROTOCOL_VERSION) + .withSourceDefinitionId(source.getSourceDefinitionId())); + when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source); + when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG, new Version(SOURCE_PROTOCOL_VERSION))) + .thenReturn(discoverResponse); + + when(discoverResponse.isSuccess()).thenReturn(true); + when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId); + + final AirbyteCatalog airbyteCatalogCurrent = new AirbyteCatalog().withStreams(Lists.newArrayList( + CatalogHelpers.createAirbyteStream(SHOES, Field.of(SKU, JsonSchemaType.STRING)), + CatalogHelpers.createAirbyteStream(DOGS, Field.of(NAME, JsonSchemaType.STRING)))); + + final ConnectionRead connectionRead = + new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent)).nonBreakingChangesPreference( + NonBreakingChangesPreference.DISABLE); + when(connectionsHandler.getConnection(request.getConnectionId())).thenReturn(connectionRead); + when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff); + + final ActorCatalog actorCatalog = new ActorCatalog() + .withCatalog(Jsons.jsonNode(airbyteCatalog)) + .withCatalogHash("") + .withId(discoveredCatalogId); + when(configRepository.getActorCatalogById(discoveredCatalogId)).thenReturn(actorCatalog); + + final AirbyteCatalog persistenceCatalog = Jsons.object(actorCatalog.getCatalog(), + io.airbyte.protocol.models.AirbyteCatalog.class); + final io.airbyte.api.model.generated.AirbyteCatalog expectedActorCatalog = CatalogConverter.toApi(persistenceCatalog); + + final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request); + assertEquals(actual.getCatalogDiff(), catalogDiff); + assertEquals(actual.getCatalog(), expectedActorCatalog); + assertEquals(actual.getConnectionStatus(), ConnectionStatus.ACTIVE); + } + + @Test void testDiscoverSchemaForSourceFromSourceCreate() throws JsonValidationException, IOException, ConfigNotFoundException { final SourceConnection source = new SourceConnection() From c5be1b02ed9a5a961c2fa53bf08b14c9b46afc2f Mon Sep 17 00:00:00 2001 From: alovew Date: Mon, 28 Nov 2022 12:43:16 -0800 Subject: [PATCH 10/10] inject env variable feature flags --- .../src/main/java/io/airbyte/server/ServerApp.java | 6 +++++- .../java/io/airbyte/server/handlers/SchedulerHandler.java | 5 +++-- .../io/airbyte/server/handlers/SchedulerHandlerTest.java | 1 - 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index e74c91094418..f4f3fd9eaf31 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -7,6 +7,7 @@ import io.airbyte.analytics.Deployment; import io.airbyte.analytics.TrackingClient; import io.airbyte.analytics.TrackingClientSingleton; +import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.lang.CloseableShutdownHook; import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.temporal.ConnectionManagerUtils; @@ -213,6 +214,8 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final TrackingClient trackingClient = TrackingClientSingleton.get(); final JobTracker jobTracker = new JobTracker(configRepository, jobPersistence, trackingClient); + final EnvVariableFeatureFlags envVariableFeatureFlags = new EnvVariableFeatureFlags(); + final WebUrlHelper webUrlHelper = new WebUrlHelper(configs.getWebappUrl()); final JobErrorReportingClient jobErrorReportingClient = JobErrorReportingClientFactory.getClient(configs.getJobErrorReportingStrategy(), configs); final JobErrorReporter jobErrorReporter = @@ -286,7 +289,8 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, configs.getWorkerEnvironment(), configs.getLogConfigs(), eventRunner, - connectionsHandler); + connectionsHandler, + envVariableFeatureFlags); final DbMigrationHandler dbMigrationHandler = new DbMigrationHandler(configsDatabase, configsFlyway, jobsDatabase, jobsFlyway); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index 0352faab5dea..7621cb4baf93 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -109,7 +109,8 @@ public SchedulerHandler(final ConfigRepository configRepository, final WorkerEnvironment workerEnvironment, final LogConfigs logConfigs, final EventRunner eventRunner, - final ConnectionsHandler connectionsHandler) { + final ConnectionsHandler connectionsHandler, + final EnvVariableFeatureFlags envVariableFeatureFlags) { this( configRepository, secretsRepositoryWriter, @@ -120,7 +121,7 @@ public SchedulerHandler(final ConfigRepository configRepository, eventRunner, new JobConverter(workerEnvironment, logConfigs), connectionsHandler, - new EnvVariableFeatureFlags()); + envVariableFeatureFlags); } @VisibleForTesting diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java index 9a1f657468b0..f06b4d6a8d69 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java @@ -844,7 +844,6 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreakingDisableConnectionP assertEquals(actual.getConnectionStatus(), ConnectionStatus.ACTIVE); } - @Test void testDiscoverSchemaForSourceFromSourceCreate() throws JsonValidationException, IOException, ConfigNotFoundException { final SourceConnection source = new SourceConnection()