Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a separate node pool to run custom connector jobs #19770

Merged
merged 23 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,8 @@ public TemporalResponse<ConnectorJobOutput> submitGetSpec(final UUID jobId, fina
final IntegrationLauncherConfig launcherConfig = new IntegrationLauncherConfig()
.withJobId(jobId.toString())
.withAttemptId((long) attempt)
.withDockerImage(config.getDockerImage());
.withDockerImage(config.getDockerImage())
.withIsCustomConnector(config.getIsCustomConnector());
return execute(jobRunConfig,
() -> getWorkflowStub(SpecWorkflow.class, TemporalJobType.GET_SPEC).run(jobRunConfig, launcherConfig));

Expand All @@ -346,7 +347,8 @@ public TemporalResponse<ConnectorJobOutput> submitCheckConnection(final UUID job
.withJobId(jobId.toString())
.withAttemptId((long) attempt)
.withDockerImage(config.getDockerImage())
.withProtocolVersion(config.getProtocolVersion());
.withProtocolVersion(config.getProtocolVersion())
.withIsCustomConnector(config.getIsCustomConnector());
final StandardCheckConnectionInput input = new StandardCheckConnectionInput().withConnectionConfiguration(config.getConnectionConfiguration());

return execute(jobRunConfig,
Expand All @@ -361,7 +363,8 @@ public TemporalResponse<ConnectorJobOutput> submitDiscoverSchema(final UUID jobI
.withJobId(jobId.toString())
.withAttemptId((long) attempt)
.withDockerImage(config.getDockerImage())
.withProtocolVersion(config.getProtocolVersion());
.withProtocolVersion(config.getProtocolVersion())
.withIsCustomConnector(config.getIsCustomConnector());
final StandardDiscoverCatalogInput input = new StandardDiscoverCatalogInput().withConnectionConfiguration(config.getConnectionConfiguration())
.withSourceId(config.getSourceId()).withConnectorVersion(config.getConnectorVersion()).withConfigHash(config.getConfigHash());

Expand All @@ -376,13 +379,15 @@ public TemporalResponse<StandardSyncOutput> submitSync(final long jobId, final i
.withJobId(String.valueOf(jobId))
.withAttemptId((long) attempt)
.withDockerImage(config.getSourceDockerImage())
.withProtocolVersion(config.getSourceProtocolVersion());
.withProtocolVersion(config.getSourceProtocolVersion())
.withIsCustomConnector(config.getIsSourceCustomConnector());

final IntegrationLauncherConfig destinationLauncherConfig = new IntegrationLauncherConfig()
.withJobId(String.valueOf(jobId))
.withAttemptId((long) attempt)
.withDockerImage(config.getDestinationDockerImage())
.withProtocolVersion(config.getDestinationProtocolVersion());
.withProtocolVersion(config.getDestinationProtocolVersion())
.withIsCustomConnector(config.getIsDestinationCustomConnector());

final StandardSyncInput input = new StandardSyncInput()
.withNamespaceDefinition(config.getNamespaceDefinition())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class OrchestratorConstants {
EnvConfigs.JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_SECRET,
EnvConfigs.JOB_KUBE_SIDECAR_CONTAINER_IMAGE_PULL_POLICY,
EnvConfigs.JOB_KUBE_NODE_SELECTORS,
EnvConfigs.JOB_ISOLATED_KUBE_NODE_SELECTORS,
EnvConfigs.USE_CUSTOM_NODE_SELECTOR,
EnvConfigs.DOCKER_NETWORK,
EnvConfigs.LOCAL_DOCKER_MOUNT,
EnvConfigs.WORKSPACE_DOCKER_MOUNT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.airbyte.config.TolerationPOJO;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.AllArgsConstructor;

@AllArgsConstructor
Expand All @@ -18,6 +19,7 @@ public class WorkerConfigs {
private final ResourceRequirements resourceRequirements;
private final List<TolerationPOJO> workerKubeTolerations;
private final Map<String, String> workerKubeNodeSelectors;
private final Optional<Map<String, String>> workerIsolatedKubeNodeSelectors;
private final Map<String, String> workerKubeAnnotations;
private final String jobImagePullSecret;
private final String jobImagePullPolicy;
Expand All @@ -41,6 +43,7 @@ public WorkerConfigs(final Configs configs) {
.withMemoryLimit(configs.getJobMainContainerMemoryLimit()),
configs.getJobKubeTolerations(),
configs.getJobKubeNodeSelectors(),
configs.getUseCustomKubeNodeSelector() ? Optional.of(configs.getIsolatedJobKubeNodeSelectors()) : Optional.empty(),
configs.getJobKubeAnnotations(),
configs.getJobKubeMainContainerImagePullSecret(),
configs.getJobKubeMainContainerImagePullPolicy(),
Expand Down Expand Up @@ -72,6 +75,7 @@ public static WorkerConfigs buildSpecWorkerConfigs(final Configs configs) {
.withMemoryLimit(configs.getJobMainContainerMemoryLimit()),
configs.getJobKubeTolerations(),
nodeSelectors,
configs.getUseCustomKubeNodeSelector() ? Optional.of(configs.getIsolatedJobKubeNodeSelectors()) : Optional.empty(),
annotations,
configs.getJobKubeMainContainerImagePullSecret(),
configs.getJobKubeMainContainerImagePullPolicy(),
Expand Down Expand Up @@ -103,6 +107,7 @@ public static WorkerConfigs buildCheckWorkerConfigs(final Configs configs) {
.withMemoryLimit(configs.getCheckJobMainContainerMemoryLimit()),
configs.getJobKubeTolerations(),
nodeSelectors,
configs.getUseCustomKubeNodeSelector() ? Optional.of(configs.getIsolatedJobKubeNodeSelectors()) : Optional.empty(),
annotations,
configs.getJobKubeMainContainerImagePullSecret(),
configs.getJobKubeMainContainerImagePullPolicy(),
Expand Down Expand Up @@ -134,6 +139,7 @@ public static WorkerConfigs buildDiscoverWorkerConfigs(final Configs configs) {
.withMemoryLimit(configs.getJobMainContainerMemoryLimit()),
configs.getJobKubeTolerations(),
nodeSelectors,
configs.getUseCustomKubeNodeSelector() ? Optional.of(configs.getIsolatedJobKubeNodeSelectors()) : Optional.empty(),
annotations,
configs.getJobKubeMainContainerImagePullSecret(),
configs.getJobKubeMainContainerImagePullPolicy(),
Expand All @@ -154,6 +160,7 @@ public static WorkerConfigs buildReplicationWorkerConfigs(final Configs configs)
.withMemoryLimit(configs.getReplicationOrchestratorMemoryLimit()),
configs.getJobKubeTolerations(),
configs.getJobKubeNodeSelectors(),
configs.getUseCustomKubeNodeSelector() ? Optional.of(configs.getIsolatedJobKubeNodeSelectors()) : Optional.empty(),
configs.getJobKubeAnnotations(),
configs.getJobKubeMainContainerImagePullSecret(),
configs.getJobKubeMainContainerImagePullPolicy(),
Expand All @@ -180,6 +187,10 @@ public Map<String, String> getworkerKubeNodeSelectors() {
return workerKubeNodeSelectors;
}

public Optional<Map<String, String>> getWorkerIsolatedKubeNodeSelectors() {
return workerIsolatedKubeNodeSelectors;
}

public Map<String, String> getWorkerKubeAnnotations() {
return workerKubeAnnotations;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public boolean transform(final String jobId,
jobRoot,
dbtConfig.getDockerImage(),
false,
xiaohansong marked this conversation as resolved.
Show resolved Hide resolved
false,
files,
"/bin/bash",
resourceRequirements,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public static String getEntrypointEnvVariable(final ProcessFactory processFactor
jobRoot,
imageName,
false,
false,
Collections.emptyMap(),
"printenv",
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ private boolean runProcess(final String jobId,
attempt,
jobRoot,
normalizationImageName,
// custom connector does not use normalization
xiaohansong marked this conversation as resolved.
Show resolved Hide resolved
false,
false, files,
null,
resourceRequirements,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,26 @@ public class AirbyteIntegrationLauncher implements IntegrationLauncher {
private final ResourceRequirements resourceRequirement;
private final FeatureFlags featureFlags;

/**
* If true, launcher will use a separated isolated pool to run the job.
*
* At this moment, we put custom connector jobs into an isolated pool.
*/
private final boolean useIsolatedPool;

public AirbyteIntegrationLauncher(final String jobId,
xiaohansong marked this conversation as resolved.
Show resolved Hide resolved
final int attempt,
final String imageName,
final ProcessFactory processFactory,
final ResourceRequirements resourceRequirement) {
final ResourceRequirements resourceRequirement,
final boolean useIsolatedPool) {
this.jobId = jobId;
this.attempt = attempt;
this.imageName = imageName;
this.processFactory = processFactory;
this.resourceRequirement = resourceRequirement;
this.featureFlags = new EnvVariableFeatureFlags();
this.useIsolatedPool = useIsolatedPool;
}

@Trace(operationName = WORKER_OPERATION_NAME)
Expand All @@ -80,6 +89,7 @@ public Process spec(final Path jobRoot) throws WorkerException {
attempt,
jobRoot,
imageName,
useIsolatedPool,
false,
Collections.emptyMap(),
null,
Expand All @@ -100,6 +110,7 @@ public Process check(final Path jobRoot, final String configFilename, final Stri
attempt,
jobRoot,
imageName,
useIsolatedPool,
false,
ImmutableMap.of(configFilename, configContents),
null,
Expand All @@ -121,6 +132,7 @@ public Process discover(final Path jobRoot, final String configFilename, final S
attempt,
jobRoot,
imageName,
useIsolatedPool,
false,
ImmutableMap.of(configFilename, configContents),
null,
Expand Down Expand Up @@ -166,6 +178,7 @@ public Process read(final Path jobRoot,
attempt,
jobRoot,
imageName,
useIsolatedPool,
false,
files,
null,
Expand Down Expand Up @@ -195,6 +208,7 @@ public Process write(final Path jobRoot,
attempt,
jobRoot,
imageName,
useIsolatedPool,
true,
files,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public Process create(final String jobType,
final int attempt,
final Path jobRoot,
final String imageName,
final boolean usesIsolatedPool,
xiaohansong marked this conversation as resolved.
Show resolved Hide resolved
final boolean usesStdin,
final Map<String, String> files,
final String entrypoint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public Process create(
final int attempt,
final Path jobRoot,
final String imageName,
final boolean usesIsolatedPool,
final boolean usesStdin,
final Map<String, String> files,
final String entrypoint,
Expand All @@ -107,6 +108,12 @@ public Process create(

final var allLabels = getLabels(jobId, attempt, customLabels);

// If using isolated pool, check workerConfigs has isolated pool set. If not set, fall back to use
// regular node pool.
final var nodeSelectors =
usesIsolatedPool ? workerConfigs.getWorkerIsolatedKubeNodeSelectors().orElse(workerConfigs.getworkerKubeNodeSelectors())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I wonder if we should just thrown an exception if usesIsolatedPool is true but the getWorkerIsolatedKubeNodeSelectors Optional is empty. I think it would be safer for us to fail loudly if we didn't configure the isolated pool properly, rather than accidentally letting custom connectors run in our main pool. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call! fixed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Realized this is not true - usesIsolatedPool here is actually passed by callers (isCustomConnector flag). Thus for OSS users who do not run them in a separate pool, usesIsolatedPool might still be true because they are custom connectors. I should rename this to isCustomConnectors instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh good call, OSS users should be able to use custom connectors without an isolated pool indeed

: workerConfigs.getworkerKubeNodeSelectors();

return new KubePodProcess(
isOrchestrator,
processRunnerHost,
Expand All @@ -125,7 +132,7 @@ public Process create(
resourceRequirements,
workerConfigs.getJobImagePullSecret(),
workerConfigs.getWorkerKubeTolerations(),
workerConfigs.getworkerKubeNodeSelectors(),
nodeSelectors,
allLabels,
workerConfigs.getWorkerKubeAnnotations(),
workerConfigs.getJobSocatImage(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public interface ProcessFactory {
* @param attempt attempt Id
* @param jobPath Workspace directory to run the process from.
* @param imageName Docker image name to start the process from.
* @param usesIsolatedPool whether to use isolated pool to run the jobs.
* @param files File name to contents map that will be written into the working dir of the process
* prior to execution.
* @param entrypoint If not null, the default entrypoint program of the docker image can be changed
Expand All @@ -43,6 +44,7 @@ Process create(String jobType,
int attempt,
final Path jobPath,
final String imageName,
final boolean usesIsolatedPool,
final boolean usesStdin,
final Map<String, String> files,
final String entrypoint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ public DbtLauncherWorker(final UUID connectionId,
activityContext,
serverPort,
temporalUtils,
workerConfigs);
workerConfigs,
// Custom connector does not use Dbt at this moment, thus this flag for runnning job under
// isolated pool can be set to false.
false);
xiaohansong marked this conversation as resolved.
Show resolved Hide resolved
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class LauncherWorker<INPUT, OUTPUT> implements Worker<INPUT, OUTPUT> {
private final TemporalUtils temporalUtils;
private final WorkerConfigs workerConfigs;

private final boolean isCustomConnector;
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private AsyncOrchestratorPodProcess process;

Expand All @@ -87,7 +88,8 @@ public LauncherWorker(final UUID connectionId,
final Supplier<ActivityExecutionContext> activityContext,
final Integer serverPort,
final TemporalUtils temporalUtils,
final WorkerConfigs workerConfigs) {
final WorkerConfigs workerConfigs,
final boolean isCustomConnector) {

this.connectionId = connectionId;
this.application = application;
Expand All @@ -101,6 +103,7 @@ public LauncherWorker(final UUID connectionId,
this.serverPort = serverPort;
this.temporalUtils = temporalUtils;
this.workerConfigs = workerConfigs;
this.isCustomConnector = isCustomConnector;
}

@Trace(operationName = WORKER_OPERATION_NAME)
Expand Down Expand Up @@ -173,13 +176,19 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException
log.info("Creating " + podName + " for attempt number: " + jobRunConfig.getAttemptId());
killRunningPodsForConnection();

// custom connectors run in an isolated node pool from airbyte-supported connectors
// to reduce the blast radius of any problems with custom connector code.
final var nodeSelectors =
isCustomConnector ? workerConfigs.getWorkerIsolatedKubeNodeSelectors().orElse(workerConfigs.getworkerKubeNodeSelectors())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment here, I think it's more dangerous to fall back on our primary node selectors, and perhaps we should throw an exception instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this check in WorkerConfigurationBeanFactory.java instead - because 1) workerConfigs does not carry useIsolatedPool flag, 2) should throw this error as early as possible

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

: workerConfigs.getworkerKubeNodeSelectors();

try {
process.create(
allLabels,
resourceRequirements,
fileMap,
portMap,
workerConfigs.getworkerKubeNodeSelectors());
nodeSelectors);
} catch (final KubernetesClientException e) {
ApmTraceUtils.addExceptionToTrace(e);
throw new WorkerException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ public NormalizationLauncherWorker(final UUID connectionId,
activityContext,
serverPort,
temporalUtils,
workerConfigs);
workerConfigs,
// Normalization process will happen only on a fixed set of connectors,
// thus they are not going to be run under custom connectors. Setting this to false.
false);
xiaohansong marked this conversation as resolved.
Show resolved Hide resolved

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public ReplicationLauncherWorker(final UUID connectionId,
activityContext,
serverPort,
temporalUtils,
workerConfigs);
workerConfigs,
sourceLauncherConfig.getIsCustomConnector() || destinationLauncherConfig.getIsCustomConnector());
xiaohansong marked this conversation as resolved.
Show resolved Hide resolved
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ void setup() throws IOException, WorkerException {
WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, Jsons.serialize(catalog));

when(processFactory.create(AirbyteIntegrationLauncher.NORMALIZE_STEP, JOB_ID, JOB_ATTEMPT, jobRoot,
NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME, false, files, null,
NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME, false, false, files, null,
workerConfigs.getResourceRequirements(),
Map.of(AirbyteIntegrationLauncher.JOB_TYPE, AirbyteIntegrationLauncher.SYNC_JOB, AirbyteIntegrationLauncher.SYNC_STEP,
AirbyteIntegrationLauncher.NORMALIZE_STEP),
Expand Down
Loading