Skip to content

Commit

Permalink
Auto Detect Schema Change environment variable (#19312)
Browse files Browse the repository at this point in the history
* auto detect schema environment variable
  • Loading branch information
alovew authored Nov 28, 2022
1 parent fcf264e commit 14a29a0
Show file tree
Hide file tree
Showing 21 changed files with 54 additions and 7 deletions.
2 changes: 2 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,5 @@ METRIC_CLIENT=
OTEL_COLLECTOR_ENDPOINT="http://host.docker.internal:4317"

USE_STREAM_CAPABLE_STATE=true
AUTO_DETECT_SCHEMA=false

1 change: 1 addition & 0 deletions .env.dev
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ SYNC_JOB_MAX_ATTEMPTS=3
SYNC_JOB_MAX_TIMEOUT_DAYS=3
WORKERS_MICRONAUT_ENVIRONMENTS=control-plane
CRON_MICRONAUT_ENVIRONMENTS=control-plane
AUTO_DETECT_SCHEMA=false

# Sentry
SENTRY_DSN=""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ public class OrchestratorConstants {
EnvConfigs.STATE_STORAGE_S3_ACCESS_KEY,
EnvConfigs.STATE_STORAGE_S3_SECRET_ACCESS_KEY,
EnvConfigs.STATE_STORAGE_S3_REGION,
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE))
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE,
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA))
.build();

public static final String INIT_FILE_ENV_MAP = "envMap.json";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ private Map<String, String> getWorkerMetadata() {
WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, imageName,
WorkerEnvConstants.WORKER_JOB_ID, jobId,
WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(attempt),
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(featureFlags.useStreamCapableState()));
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(featureFlags.useStreamCapableState()),
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(featureFlags.autoDetectSchema()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class AirbyteIntegrationLauncherTest {
WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, FAKE_IMAGE,
WorkerEnvConstants.WORKER_JOB_ID, JOB_ID,
WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(JOB_ATTEMPT),
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(new EnvVariableFeatureFlags().useStreamCapableState()));
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(new EnvVariableFeatureFlags().useStreamCapableState()),
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(new EnvVariableFeatureFlags().autoDetectSchema()));

private WorkerConfigs workerConfigs;
@Mock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
public class EnvVariableFeatureFlags implements FeatureFlags {

public static final String USE_STREAM_CAPABLE_STATE = "USE_STREAM_CAPABLE_STATE";
public static final String AUTO_DETECT_SCHEMA = "AUTO_DETECT_SCHEMA";
public static final String LOG_CONNECTOR_MESSAGES = "LOG_CONNECTOR_MESSAGES";
public static final String NEED_STATE_VALIDATION = "NEED_STATE_VALIDATION";

Expand All @@ -31,6 +32,11 @@ public boolean useStreamCapableState() {
return getEnvOrDefault(USE_STREAM_CAPABLE_STATE, false, Boolean::parseBoolean);
}

@Override
public boolean autoDetectSchema() {
return getEnvOrDefault(AUTO_DETECT_SCHEMA, false, Boolean::parseBoolean);
}

@Override
public boolean logConnectorMessages() {
return getEnvOrDefault(LOG_CONNECTOR_MESSAGES, false, Boolean::parseBoolean);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public interface FeatureFlags {

boolean useStreamCapableState();

boolean autoDetectSchema();

boolean logConnectorMessages();

boolean needStateValidation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,8 @@ public interface Configs {
*/
int getActivityNumberOfAttempt();

boolean getAutoDetectSchema();

enum TrackingStrategy {
SEGMENT,
LOGGING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ public class EnvConfigs implements Configs {
private static final long DEFAULT_MAX_SYNC_WORKERS = 5;
private static final long DEFAULT_MAX_NOTIFY_WORKERS = 5;
private static final String DEFAULT_NETWORK = "host";
private static final String AUTO_DETECT_SCHEMA = "AUTO_DETECT_SCHEMA";

public static final Map<String, Function<EnvConfigs, String>> JOB_SHARED_ENVS = Map.of(
AIRBYTE_VERSION, (instance) -> instance.getAirbyteVersion().serialize(),
Expand Down Expand Up @@ -1050,6 +1051,11 @@ public int getWorkflowFailureRestartDelaySeconds() {
return Integer.parseInt(getEnvOrDefault(WORKFLOW_FAILURE_RESTART_DELAY_SECONDS, String.valueOf(10 * 60)));
}

@Override
public boolean getAutoDetectSchema() {
return getEnvOrDefault(AUTO_DETECT_SCHEMA, false);
}

@Override
public int getActivityNumberOfAttempt() {
return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_ATTEMPT, "5"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public ContainerOrchestratorConfig kubernetesContainerOrchestratorConfig(
environmentVariables.put(DD_DOGSTATSD_PORT_ENV_VAR, dataDogStatsdPort);
environmentVariables.put(PUBLISH_METRICS_ENV_VAR, shouldPublishMetrics);
environmentVariables.put(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, Boolean.toString(featureFlags.useStreamCapableState()));
environmentVariables.put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, Boolean.toString(featureFlags.autoDetectSchema()));
environmentVariables.put(JAVA_OPTS_ENV_VAR, containerOrchestratorJavaOpts);

if (System.getenv(DD_ENV_ENV_VAR) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void testAsyncOrchestratorPodProcess(final String pullPolicy) throws Inte
null,
null,
null,
Map.of(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true"),
Map.of(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true", EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, "false"),
serverPort);

final Map<Integer, Integer> portMap = Map.of(
Expand Down
5 changes: 5 additions & 0 deletions charts/airbyte-server/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ spec:
configMapKeyRef:
name: {{ .Release.Name }}-airbyte-env
key: AIRBYTE_VERSION
- name: AUTO_DETECT_SCHEMA
valueFrom:
configMapKeyRef:
name: {{ .Release.Name }}-airbyte-env
key: AUTO_DETECT_SCHEMA
- name: CONFIG_ROOT
valueFrom:
configMapKeyRef:
Expand Down
5 changes: 5 additions & 0 deletions charts/airbyte-worker/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,11 @@ spec:
configMapKeyRef:
name: {{ .Release.Name }}-airbyte-env
key: WORKFLOW_FAILURE_RESTART_DELAY_SECONDS
- name: AUTO_DETECT_SCHEMA
valueFrom:
configMapKeyRef:
name: {{ .Release.Name }}-airbyte-env
key: AUTO_DETECT_SCHEMA
- name: USE_STREAM_CAPABLE_STATE
valueFrom:
configMapKeyRef:
Expand Down
1 change: 1 addition & 0 deletions charts/airbyte/templates/env-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ data:
ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS: ""
WORKFLOW_FAILURE_RESTART_DELAY_SECONDS: ""
USE_STREAM_CAPABLE_STATE: "true"
AUTO_DETECT_SCHEMA: "false"
CONTAINER_ORCHESTRATOR_ENABLED: {{ .Values.worker.containerOrchestrator.enabled | quote }}
CONTAINER_ORCHESTRATOR_IMAGE: {{ .Values.worker.containerOrchestrator.image | quote }}
WORKERS_MICRONAUT_ENVIRONMENTS: "control-plane"
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ services:
- ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS=${ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS}
- ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS=${ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS}
- WORKFLOW_FAILURE_RESTART_DELAY_SECONDS=${WORKFLOW_FAILURE_RESTART_DELAY_SECONDS}
- AUTO_DETECT_SCHEMA=${AUTO_DETECT_SCHEMA}
- USE_STREAM_CAPABLE_STATE=${USE_STREAM_CAPABLE_STATE}
- MICRONAUT_ENVIRONMENTS=${WORKERS_MICRONAUT_ENVIRONMENTS}
volumes:
Expand Down Expand Up @@ -147,6 +148,7 @@ services:
- WORKER_ENVIRONMENT=${WORKER_ENVIRONMENT}
- WORKSPACE_ROOT=${WORKSPACE_ROOT}
- GITHUB_STORE_BRANCH=${GITHUB_STORE_BRANCH}
- AUTO_DETECT_SCHEMA=${AUTO_DETECT_SCHEMA}
ports:
- 8001
volumes:
Expand Down
1 change: 1 addition & 0 deletions kube/overlays/dev-integration-test/.env
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,4 @@ USE_STREAM_CAPABLE_STATE=true

SHOULD_RUN_NOTIFY_WORKFLOWS=false
MAX_NOTIFY_WORKERS=5
AUTO_DETECT_SCHEMA=false
2 changes: 1 addition & 1 deletion kube/overlays/dev/.env
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,6 @@ ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS=
WORKFLOW_FAILURE_RESTART_DELAY_SECONDS=

USE_STREAM_CAPABLE_STATE=true

SHOULD_RUN_NOTIFY_WORKFLOWS=false
MAX_NOTIFY_WORKERS=5
AUTO_DETECT_SCHEMA=false
2 changes: 1 addition & 1 deletion kube/overlays/stable-with-resource-limits/.env
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,6 @@ ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS=
WORKFLOW_FAILURE_RESTART_DELAY_SECONDS=

USE_STREAM_CAPABLE_STATE=true

SHOULD_RUN_NOTIFY_WORKFLOWS=false
MAX_NOTIFY_WORKERS=5
AUTO_DETECT_SCHEMA=false
2 changes: 1 addition & 1 deletion kube/overlays/stable/.env
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,6 @@ ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS=
WORKFLOW_FAILURE_RESTART_DELAY_SECONDS=

USE_STREAM_CAPABLE_STATE=true

SHOULD_RUN_NOTIFY_WORKFLOWS=false
MAX_NOTIFY_WORKERS=5
AUTO_DETECT_SCHEMA=false
5 changes: 5 additions & 0 deletions kube/resources/server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ spec:
configMapKeyRef:
name: airbyte-env
key: JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION
- name: AUTO_DETECT_SCHEMA
valueFrom:
configMapKeyRef:
name: airbyte-env
key: AUTO_DETECT_SCHEMA
ports:
- containerPort: 8001
volumeMounts:
Expand Down
5 changes: 5 additions & 0 deletions kube/resources/worker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,11 @@ spec:
configMapKeyRef:
name: airbyte-env
key: WORKFLOW_FAILURE_RESTART_DELAY_SECONDS
- name: AUTO_DETECT_SCHEMA
valueFrom:
configMapKeyRef:
name: airbyte-env
key: AUTO_DETECT_SCHEMA
- name: USE_STREAM_CAPABLE_STATE
valueFrom:
configMapKeyRef:
Expand Down

0 comments on commit 14a29a0

Please sign in to comment.