Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pass workspace id to sync workflow and use it to selectively enable field selection #20589

Merged
merged 14 commits into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ public class OrchestratorConstants {
EnvConfigs.STATE_STORAGE_S3_SECRET_ACCESS_KEY,
EnvConfigs.STATE_STORAGE_S3_REGION,
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE,
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA))
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA,
EnvVariableFeatureFlags.APPLY_FIELD_SELECTION,
EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES))
.build();

public static final String INIT_FILE_ENV_MAP = "envMap.json";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public final ReplicationOutput run(final StandardSyncInput syncInput, final Path
.stream()
.collect(Collectors.toMap(s -> s.getStream().getNamespace() + "." + s.getStream().getName(),
s -> String.format("%s - %s", s.getSyncMode(), s.getDestinationSyncMode()))));
LOGGER.debug("field selection enabled: {}", fieldSelectionEnabled);
final WorkerSourceConfig sourceConfig = WorkerUtils.syncToWorkerSourceConfig(syncInput);

ApmTraceUtils.addTagsToTrace(generateTraceTags(destinationConfig, jobRoot));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,9 @@ private Map<String, String> getWorkerMetadata() {
WorkerEnvConstants.WORKER_JOB_ID, jobId,
WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(attempt),
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(featureFlags.useStreamCapableState()),
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(featureFlags.autoDetectSchema()));
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(featureFlags.autoDetectSchema()),
EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(featureFlags.applyFieldSelection()),
EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ class AirbyteIntegrationLauncherTest {
WorkerEnvConstants.WORKER_JOB_ID, JOB_ID,
WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(JOB_ATTEMPT),
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(new EnvVariableFeatureFlags().useStreamCapableState()),
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(new EnvVariableFeatureFlags().autoDetectSchema()));
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(new EnvVariableFeatureFlags().autoDetectSchema()),
EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(new EnvVariableFeatureFlags().applyFieldSelection()),
EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, new EnvVariableFeatureFlags().fieldSelectionWorkspaces());

private WorkerConfigs workerConfigs;
@Mock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public class EnvVariableFeatureFlags implements FeatureFlags {
public static final String NEED_STATE_VALIDATION = "NEED_STATE_VALIDATION";
public static final String APPLY_FIELD_SELECTION = "APPLY_FIELD_SELECTION";

public static final String FIELD_SELECTION_WORKSPACES = "FIELD_SELECTION_WORKSPACES";

@Override
public boolean autoDisablesFailingConnections() {
log.info("Auto Disable Failing Connections: " + Boolean.parseBoolean(System.getenv("AUTO_DISABLE_FAILING_CONNECTIONS")));
Expand Down Expand Up @@ -55,6 +57,11 @@ public boolean applyFieldSelection() {
return getEnvOrDefault(APPLY_FIELD_SELECTION, false, Boolean::parseBoolean);
}

@Override
public String fieldSelectionWorkspaces() {
return getEnvOrDefault(FIELD_SELECTION_WORKSPACES, "", (arg) -> arg);
}

// TODO: refactor in order to use the same method than the ones in EnvConfigs.java
public <T> T getEnvOrDefault(final String key, final T defaultValue, final Function<String, T> parser) {
final String value = System.getenv(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@ public interface FeatureFlags {

boolean needStateValidation();

/**
* Return true if field selection should be applied. See also fieldSelectionWorkspaces.
*
* @return whether field selection should be applied
*/
boolean applyFieldSelection();

/**
* Get the workspaces allow-listed for field selection. This should take precedence over
* applyFieldSelection.
*
* @return a comma-separated list of workspace ids where field selection should be enabled.
*/
String fieldSelectionWorkspaces();

}
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,10 @@ public interface Configs {

boolean getAutoDetectSchema();

boolean getApplyFieldSelection();

String getFieldSelectionWorkspaces();

enum TrackingStrategy {
SEGMENT,
LOGGING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ public class EnvConfigs implements Configs {
private static final long DEFAULT_MAX_NOTIFY_WORKERS = 5;
private static final String DEFAULT_NETWORK = "host";
private static final String AUTO_DETECT_SCHEMA = "AUTO_DETECT_SCHEMA";
private static final String APPLY_FIELD_SELECTION = "APPLY_FIELD_SELECTION";
private static final String FIELD_SELECTION_WORKSPACES = "FIELD_SELECTION_WORKSPACES";

public static final Map<String, Function<EnvConfigs, String>> JOB_SHARED_ENVS = Map.of(
AIRBYTE_VERSION, (instance) -> instance.getAirbyteVersion().serialize(),
Expand Down Expand Up @@ -1123,6 +1125,16 @@ public boolean getAutoDetectSchema() {
return getEnvOrDefault(AUTO_DETECT_SCHEMA, false);
}

@Override
public boolean getApplyFieldSelection() {
return getEnvOrDefault(APPLY_FIELD_SELECTION, false);
}

@Override
public String getFieldSelectionWorkspaces() {
return getEnvOrDefault(FIELD_SELECTION_WORKSPACES, "");
}

@Override
public int getActivityNumberOfAttempt() {
return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_ATTEMPT, "5"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,7 @@ properties:
isDestinationCustomConnector:
description: determine if the running image of the destination is a custom connector.
type: boolean
workspaceId:
description: The id of the workspace associated with the sync
type: string
format: uuid
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,7 @@ properties:
isDestinationCustomConnector:
description: determine if the destination running image is a custom connector.
type: boolean
workspaceId:
description: The id of the workspace associated with the sync
type: string
format: uuid
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,7 @@ properties:
description: optional resource requirements to use in dest container - this is used instead of `resourceRequirements` for the dest container
type: object
"$ref": ResourceRequirements.yaml
workspaceId:
description: The id of the workspace associated with this sync
type: string
format: uuid
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@
import io.airbyte.workers.sync.ReplicationLauncherWorker;
import java.lang.invoke.MethodHandles;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -148,7 +151,7 @@ public Optional<String> runJob() throws Exception {
new VersionedAirbyteMessageBufferedWriterFactory(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion())),
new AirbyteMessageTracker(),
new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)),
metricReporter, featureFlags.applyFieldSelection());
metricReporter, enableFieldSelection(featureFlags, syncInput.getWorkspaceId()));

log.info("Running replication worker...");
final var jobRoot = TemporalUtils.getJobRoot(configs.getWorkspaceRoot(),
Expand All @@ -165,4 +168,20 @@ private AirbyteStreamFactory getStreamFactory(final Version protocolVersion, fin
: new DefaultAirbyteStreamFactory(mdcScope);
}

private boolean enableFieldSelection(final FeatureFlags featureFlags, final UUID workspaceId) {
final String workspaceIdsString = featureFlags.fieldSelectionWorkspaces();
final Set<UUID> workspaceIds = new HashSet<>();
for (final String id : workspaceIdsString.split(",")) {
workspaceIds.add(UUID.fromString(id));
}
for (final UUID workspace : workspaceIds) {
log.info("field selection workspace: {}", workspace);
}
if (workspaceId != null && workspaceIds.contains(workspaceId)) {
return true;
}

return featureFlags.applyFieldSelection();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public Optional<Long> createSyncJob(final SourceConnection source,
final List<StandardSyncOperation> standardSyncOperations,
@Nullable final JsonNode webhookOperationConfigs,
final StandardSourceDefinition sourceDefinition,
final StandardDestinationDefinition destinationDefinition)
final StandardDestinationDefinition destinationDefinition,
final UUID workspaceId)
throws IOException {
// reusing this isn't going to quite work.

Expand Down Expand Up @@ -96,7 +97,8 @@ public Optional<Long> createSyncJob(final SourceConnection source,
.withSourceResourceRequirements(mergedSrcResourceReq)
.withDestinationResourceRequirements(mergedDstResourceReq)
.withIsSourceCustomConnector(sourceDefinition.getCustom())
.withIsDestinationCustomConnector(destinationDefinition.getCustom());
.withIsDestinationCustomConnector(destinationDefinition.getCustom())
.withWorkspaceId(workspaceId);

getCurrentConnectionState(standardSync.getConnectionId()).ifPresent(jobSyncConfig::withState);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import javax.annotation.Nullable;

public interface JobCreator {

/**
*
* @param source db model representing where data comes from
* @param destination db model representing where data goes
* @param standardSync sync options
* @param sourceDockerImage docker image to use for the source
* @param destinationDockerImage docker image to use for the destination
* @param workspaceId
* @return the new job if no other conflicting job was running, otherwise empty
* @throws IOException if something wrong happens
*/
Expand All @@ -40,7 +41,8 @@ Optional<Long> createSyncJob(SourceConnection source,
List<StandardSyncOperation> standardSyncOperations,
@Nullable JsonNode webhookOperationConfigs,
StandardSourceDefinition sourceDefinition,
StandardDestinationDefinition destinationDefinition)
StandardDestinationDefinition destinationDefinition,
UUID workspaceId)
throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ public Long create(final UUID connectionId) {
standardSyncOperations,
workspace.getWebhookOperationConfigs(),
sourceDefinition,
destinationDefinition)
destinationDefinition,
workspace.getWorkspaceId())
.orElseThrow(() -> new IllegalStateException("We shouldn't be trying to create a new sync job if there is one running already."));

} catch (final IOException | JsonValidationException | ConfigNotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class DefaultJobCreatorTest {
private static final StandardSourceDefinition STANDARD_SOURCE_DEFINITION;
private static final StandardDestinationDefinition STANDARD_DESTINATION_DEFINITION;
private static final long JOB_ID = 12L;
private static final UUID WORKSPACE_ID = UUID.randomUUID();

private JobPersistence jobPersistence;
private StatePersistence statePersistence;
Expand Down Expand Up @@ -190,7 +191,8 @@ void testCreateSyncJob() throws IOException {
.withDestinationResourceRequirements(workerResourceRequirements)
.withWebhookOperationConfigs(PERSISTED_WEBHOOK_CONFIGS)
.withIsSourceCustomConnector(false)
.withIsDestinationCustomConnector(false);
.withIsDestinationCustomConnector(false)
.withWorkspaceId(WORKSPACE_ID);

final JobConfig jobConfig = new JobConfig()
.withConfigType(JobConfig.ConfigType.SYNC)
Expand All @@ -210,7 +212,7 @@ void testCreateSyncJob() throws IOException {
List.of(STANDARD_SYNC_OPERATION),
PERSISTED_WEBHOOK_CONFIGS,
STANDARD_SOURCE_DEFINITION,
STANDARD_DESTINATION_DEFINITION).orElseThrow();
STANDARD_DESTINATION_DEFINITION, WORKSPACE_ID).orElseThrow();
assertEquals(JOB_ID, jobId);
}

Expand Down Expand Up @@ -247,7 +249,7 @@ void testCreateSyncJobEnsureNoQueuing() throws IOException {
DESTINATION_PROTOCOL_VERSION,
List.of(STANDARD_SYNC_OPERATION),
null,
STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION).isEmpty());
STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION, UUID.randomUUID()).isEmpty());
}

@Test
Expand All @@ -262,7 +264,7 @@ void testCreateSyncJobDefaultWorkerResourceReqs() throws IOException {
DESTINATION_PROTOCOL_VERSION,
List.of(STANDARD_SYNC_OPERATION),
null,
STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION);
STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION, WORKSPACE_ID);

final JobSyncConfig expectedJobSyncConfig = new JobSyncConfig()
.withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition())
Expand All @@ -280,7 +282,8 @@ void testCreateSyncJobDefaultWorkerResourceReqs() throws IOException {
.withSourceResourceRequirements(workerResourceRequirements)
.withDestinationResourceRequirements(workerResourceRequirements)
.withIsSourceCustomConnector(false)
.withIsDestinationCustomConnector(false);
.withIsDestinationCustomConnector(false)
.withWorkspaceId(WORKSPACE_ID);

final JobConfig expectedJobConfig = new JobConfig()
.withConfigType(JobConfig.ConfigType.SYNC)
Expand Down Expand Up @@ -310,7 +313,7 @@ void testCreateSyncJobConnectionResourceReqs() throws IOException {
DESTINATION_PROTOCOL_VERSION,
List.of(STANDARD_SYNC_OPERATION),
null,
STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION);
STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION, WORKSPACE_ID);

final JobSyncConfig expectedJobSyncConfig = new JobSyncConfig()
.withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition())
Expand All @@ -328,7 +331,8 @@ void testCreateSyncJobConnectionResourceReqs() throws IOException {
.withSourceResourceRequirements(standardSyncResourceRequirements)
.withDestinationResourceRequirements(standardSyncResourceRequirements)
.withIsSourceCustomConnector(false)
.withIsDestinationCustomConnector(false);
.withIsDestinationCustomConnector(false)
.withWorkspaceId(WORKSPACE_ID);

final JobConfig expectedJobConfig = new JobConfig()
.withConfigType(JobConfig.ConfigType.SYNC)
Expand Down Expand Up @@ -364,7 +368,8 @@ void testCreateSyncJobSourceAndDestinationResourceReqs() throws IOException {
null,
new StandardSourceDefinition().withResourceRequirements(new ActorDefinitionResourceRequirements().withDefault(sourceResourceRequirements)),
new StandardDestinationDefinition().withResourceRequirements(new ActorDefinitionResourceRequirements().withJobSpecific(List.of(
new JobTypeResourceLimit().withJobType(JobType.SYNC).withResourceRequirements(destResourceRequirements)))));
new JobTypeResourceLimit().withJobType(JobType.SYNC).withResourceRequirements(destResourceRequirements)))),
WORKSPACE_ID);

final JobSyncConfig expectedJobSyncConfig = new JobSyncConfig()
.withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition())
Expand All @@ -382,7 +387,8 @@ void testCreateSyncJobSourceAndDestinationResourceReqs() throws IOException {
.withSourceResourceRequirements(sourceResourceRequirements)
.withDestinationResourceRequirements(destResourceRequirements)
.withIsSourceCustomConnector(false)
.withIsDestinationCustomConnector(false);
.withIsDestinationCustomConnector(false)
.withWorkspaceId(WORKSPACE_ID);

final JobConfig expectedJobConfig = new JobConfig()
.withConfigType(JobConfig.ConfigType.SYNC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo
final UUID destinationId = UUID.randomUUID();
final UUID operationId = UUID.randomUUID();
final UUID workspaceWebhookConfigId = UUID.randomUUID();
final UUID workspaceId = UUID.randomUUID();
final String workspaceWebhookName = "test-webhook-name";
final JsonNode persistedWebhookConfigs = Jsons.deserialize(
String.format("{\"webhookConfigs\": [{\"id\": \"%s\", \"name\": \"%s\", \"authToken\": {\"_secret\": \"a-secret_v1\"}}]}",
Expand Down Expand Up @@ -87,7 +88,7 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo
when(
jobCreator.createSyncJob(sourceConnection, destinationConnection, standardSync, srcDockerImage, srcProtocolVersion, dstDockerImage,
dstProtocolVersion, operations,
persistedWebhookConfigs, standardSourceDefinition, standardDestinationDefinition))
persistedWebhookConfigs, standardSourceDefinition, standardDestinationDefinition, workspaceId))
.thenReturn(Optional.of(jobId));
when(configRepository.getStandardSourceDefinition(sourceDefinitionId))
.thenReturn(standardSourceDefinition);
Expand All @@ -96,7 +97,7 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo
.thenReturn(standardDestinationDefinition);

when(configRepository.getStandardWorkspaceNoSecrets(any(), eq(true))).thenReturn(
new StandardWorkspace().withWebhookOperationConfigs(persistedWebhookConfigs));
new StandardWorkspace().withWorkspaceId(workspaceId).withWebhookOperationConfigs(persistedWebhookConfigs));

final SyncJobFactory factory = new DefaultSyncJobFactory(true, jobCreator, configRepository, mock(OAuthConfigSupplier.class), workspaceHelper);
final long actualJobId = factory.create(connectionId);
Expand All @@ -105,7 +106,7 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo
verify(jobCreator)
.createSyncJob(sourceConnection, destinationConnection, standardSync, srcDockerImage, srcProtocolVersion, dstDockerImage, dstProtocolVersion,
operations, persistedWebhookConfigs,
standardSourceDefinition, standardDestinationDefinition);
standardSourceDefinition, standardDestinationDefinition, workspaceId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public ContainerOrchestratorConfig kubernetesContainerOrchestratorConfig(
environmentVariables.put(PUBLISH_METRICS_ENV_VAR, shouldPublishMetrics);
environmentVariables.put(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, Boolean.toString(featureFlags.useStreamCapableState()));
environmentVariables.put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, Boolean.toString(featureFlags.autoDetectSchema()));
environmentVariables.put(EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, Boolean.toString(featureFlags.applyFieldSelection()));
environmentVariables.put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces());
environmentVariables.put(JAVA_OPTS_ENV_VAR, containerOrchestratorJavaOpts);

if (System.getenv(DD_ENV_ENV_VAR) != null) {
Expand Down
Loading