From c6ff5abaa3427341644eaba3226d051898b78125 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Thu, 30 Jun 2022 13:29:22 -0700 Subject: [PATCH] Make sure that the feature flag is transfer to container (#14314) * Make sure that the feature flag is transfer to container * propagate the feature flags * Avoid propagating the feature flags * Fix tests --- .../process/AirbyteIntegrationLauncher.java | 7 +++++- .../CheckConnectionActivityImpl.java | 2 +- .../catalog/DiscoverCatalogActivityImpl.java | 1 - .../AirbyteIntegrationLauncherTest.java | 22 ++++++++++++++++--- 4 files changed, 26 insertions(+), 6 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java index 01cfe7c9976f..b8c1935a279a 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java @@ -7,6 +7,8 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import io.airbyte.commons.features.EnvVariableFeatureFlags; +import io.airbyte.commons.features.FeatureFlags; import io.airbyte.config.ResourceRequirements; import io.airbyte.config.WorkerEnvConstants; import io.airbyte.workers.exception.WorkerException; @@ -44,6 +46,7 @@ public class AirbyteIntegrationLauncher implements IntegrationLauncher { private final String imageName; private final ProcessFactory processFactory; private final ResourceRequirements resourceRequirement; + private final FeatureFlags featureFlags; public AirbyteIntegrationLauncher(final String jobId, final int attempt, @@ -55,6 +58,7 @@ public AirbyteIntegrationLauncher(final String jobId, this.imageName = imageName; this.processFactory = processFactory; this.resourceRequirement = resourceRequirement; + this.featureFlags = new EnvVariableFeatureFlags(); } @Override @@ -188,7 +192,8 @@ private Map getWorkerMetadata() { return Map.of( WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, imageName, WorkerEnvConstants.WORKER_JOB_ID, jobId, - WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(attempt)); + WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(attempt), + EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(featureFlags.useStreamCapableState())); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java index 840e8f5d9967..74d3e3336216 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java @@ -54,7 +54,7 @@ public CheckConnectionActivityImpl(final WorkerConfigs workerConfigs, this.airbyteVersion = airbyteVersion; } - public StandardCheckConnectionOutput run(CheckConnectionInput args) { + public StandardCheckConnectionOutput run(final CheckConnectionInput args) { final JsonNode fullConfig = secretsHydrator.hydrate(args.getConnectionConfiguration().getConnectionConfiguration()); final StandardCheckConnectionInput input = new StandardCheckConnectionInput() diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java index 220382b7c301..ebe914bb8807 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java @@ -55,7 +55,6 @@ public DiscoverCatalogActivityImpl(final WorkerConfigs workerConfigs, this.logConfigs = logConfigs; this.jobPersistence = jobPersistence; this.airbyteVersion = airbyteVersion; - } public AirbyteCatalog run(final JobRunConfig jobRunConfig, diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java index 89e6b4d58ed1..fceedd214535 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java @@ -4,10 +4,19 @@ package io.airbyte.workers.process; -import static io.airbyte.workers.process.AirbyteIntegrationLauncher.*; +import static io.airbyte.workers.process.AirbyteIntegrationLauncher.CHECK_JOB; +import static io.airbyte.workers.process.AirbyteIntegrationLauncher.DISCOVER_JOB; +import static io.airbyte.workers.process.AirbyteIntegrationLauncher.JOB_TYPE; +import static io.airbyte.workers.process.AirbyteIntegrationLauncher.READ_STEP; +import static io.airbyte.workers.process.AirbyteIntegrationLauncher.SPEC_JOB; +import static io.airbyte.workers.process.AirbyteIntegrationLauncher.SYNC_JOB; +import static io.airbyte.workers.process.AirbyteIntegrationLauncher.SYNC_STEP; +import static io.airbyte.workers.process.AirbyteIntegrationLauncher.WRITE_STEP; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import io.airbyte.commons.features.EnvVariableFeatureFlags; +import io.airbyte.commons.features.FeatureFlags; import io.airbyte.config.EnvConfigs; import io.airbyte.config.WorkerEnvConstants; import io.airbyte.workers.WorkerConfigs; @@ -17,8 +26,12 @@ import java.util.Map; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +@ExtendWith(MockitoExtension.class) class AirbyteIntegrationLauncherTest { private static final String JOB_ID = "0"; @@ -37,16 +50,19 @@ class AirbyteIntegrationLauncherTest { private static final Map JOB_METADATA = Map.of( WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, FAKE_IMAGE, WorkerEnvConstants.WORKER_JOB_ID, JOB_ID, - WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(JOB_ATTEMPT)); + WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(JOB_ATTEMPT), + EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(false)); private WorkerConfigs workerConfigs; + @Mock private ProcessFactory processFactory; private AirbyteIntegrationLauncher launcher; + @Mock + private FeatureFlags featureFlags; @BeforeEach void setUp() { workerConfigs = new WorkerConfigs(new EnvConfigs()); - processFactory = Mockito.mock(ProcessFactory.class); launcher = new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, FAKE_IMAGE, processFactory, workerConfigs.getResourceRequirements()); }