Skip to content

Commit

Permalink
Make sure that the feature flag is transfer to container (#14314)
Browse files Browse the repository at this point in the history
* Make sure that the feature flag is transfer to container

* propagate the feature flags

* Avoid propagating the feature flags

* Fix tests
  • Loading branch information
benmoriceau authored Jun 30, 2022
1 parent 6dadd1b commit c6ff5ab
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.WorkerEnvConstants;
import io.airbyte.workers.exception.WorkerException;
Expand Down Expand Up @@ -44,6 +46,7 @@ public class AirbyteIntegrationLauncher implements IntegrationLauncher {
private final String imageName;
private final ProcessFactory processFactory;
private final ResourceRequirements resourceRequirement;
private final FeatureFlags featureFlags;

public AirbyteIntegrationLauncher(final String jobId,
final int attempt,
Expand All @@ -55,6 +58,7 @@ public AirbyteIntegrationLauncher(final String jobId,
this.imageName = imageName;
this.processFactory = processFactory;
this.resourceRequirement = resourceRequirement;
this.featureFlags = new EnvVariableFeatureFlags();
}

@Override
Expand Down Expand Up @@ -188,7 +192,8 @@ private Map<String, String> getWorkerMetadata() {
return Map.of(
WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, imageName,
WorkerEnvConstants.WORKER_JOB_ID, jobId,
WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(attempt));
WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(attempt),
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(featureFlags.useStreamCapableState()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public CheckConnectionActivityImpl(final WorkerConfigs workerConfigs,
this.airbyteVersion = airbyteVersion;
}

public StandardCheckConnectionOutput run(CheckConnectionInput args) {
public StandardCheckConnectionOutput run(final CheckConnectionInput args) {
final JsonNode fullConfig = secretsHydrator.hydrate(args.getConnectionConfiguration().getConnectionConfiguration());

final StandardCheckConnectionInput input = new StandardCheckConnectionInput()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public DiscoverCatalogActivityImpl(final WorkerConfigs workerConfigs,
this.logConfigs = logConfigs;
this.jobPersistence = jobPersistence;
this.airbyteVersion = airbyteVersion;

}

public AirbyteCatalog run(final JobRunConfig jobRunConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,19 @@

package io.airbyte.workers.process;

import static io.airbyte.workers.process.AirbyteIntegrationLauncher.*;
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.CHECK_JOB;
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.DISCOVER_JOB;
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.JOB_TYPE;
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.READ_STEP;
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.SPEC_JOB;
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.SYNC_JOB;
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.SYNC_STEP;
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.WRITE_STEP;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.WorkerEnvConstants;
import io.airbyte.workers.WorkerConfigs;
Expand All @@ -17,8 +26,12 @@
import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
class AirbyteIntegrationLauncherTest {

private static final String JOB_ID = "0";
Expand All @@ -37,16 +50,19 @@ class AirbyteIntegrationLauncherTest {
private static final Map<String, String> JOB_METADATA = Map.of(
WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, FAKE_IMAGE,
WorkerEnvConstants.WORKER_JOB_ID, JOB_ID,
WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(JOB_ATTEMPT));
WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(JOB_ATTEMPT),
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(false));

private WorkerConfigs workerConfigs;
@Mock
private ProcessFactory processFactory;
private AirbyteIntegrationLauncher launcher;
@Mock
private FeatureFlags featureFlags;

@BeforeEach
void setUp() {
workerConfigs = new WorkerConfigs(new EnvConfigs());
processFactory = Mockito.mock(ProcessFactory.class);
launcher = new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, FAKE_IMAGE, processFactory, workerConfigs.getResourceRequirements());
}

Expand Down

0 comments on commit c6ff5ab

Please sign in to comment.