Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a separate node pool to run custom connector jobs #19770

Merged
merged 23 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.airbyte.config.TolerationPOJO;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.AllArgsConstructor;

@AllArgsConstructor
Expand All @@ -18,7 +19,7 @@ public class WorkerConfigs {
private final ResourceRequirements resourceRequirements;
private final List<TolerationPOJO> workerKubeTolerations;
private final Map<String, String> workerKubeNodeSelectors;
private final Map<String, String> workerIsolatedKubeNodeSelectors;
private final Optional<Map<String, String>> workerIsolatedKubeNodeSelectors;
private final Map<String, String> workerKubeAnnotations;
private final String jobImagePullSecret;
private final String jobImagePullPolicy;
Expand All @@ -42,7 +43,7 @@ public WorkerConfigs(final Configs configs) {
.withMemoryLimit(configs.getJobMainContainerMemoryLimit()),
configs.getJobKubeTolerations(),
configs.getJobKubeNodeSelectors(),
configs.getUseCustomKubeNodeSelector() ? configs.getIsolatedJobKubeNodeSelectors() : configs.getJobKubeNodeSelectors(),
configs.getUseCustomKubeNodeSelector() ? Optional.of(configs.getIsolatedJobKubeNodeSelectors()) : Optional.empty(),
configs.getJobKubeAnnotations(),
configs.getJobKubeMainContainerImagePullSecret(),
configs.getJobKubeMainContainerImagePullPolicy(),
Expand Down Expand Up @@ -74,7 +75,7 @@ public static WorkerConfigs buildSpecWorkerConfigs(final Configs configs) {
.withMemoryLimit(configs.getJobMainContainerMemoryLimit()),
configs.getJobKubeTolerations(),
nodeSelectors,
configs.getUseCustomKubeNodeSelector() ? configs.getIsolatedJobKubeNodeSelectors() : nodeSelectors,
configs.getUseCustomKubeNodeSelector() ? Optional.of(configs.getIsolatedJobKubeNodeSelectors()) : Optional.empty(),
annotations,
configs.getJobKubeMainContainerImagePullSecret(),
configs.getJobKubeMainContainerImagePullPolicy(),
Expand Down Expand Up @@ -106,7 +107,7 @@ public static WorkerConfigs buildCheckWorkerConfigs(final Configs configs) {
.withMemoryLimit(configs.getCheckJobMainContainerMemoryLimit()),
configs.getJobKubeTolerations(),
nodeSelectors,
configs.getUseCustomKubeNodeSelector() ? configs.getIsolatedJobKubeNodeSelectors() : nodeSelectors,
configs.getUseCustomKubeNodeSelector() ? Optional.of(configs.getIsolatedJobKubeNodeSelectors()) : Optional.empty(),
annotations,
configs.getJobKubeMainContainerImagePullSecret(),
configs.getJobKubeMainContainerImagePullPolicy(),
Expand Down Expand Up @@ -138,7 +139,7 @@ public static WorkerConfigs buildDiscoverWorkerConfigs(final Configs configs) {
.withMemoryLimit(configs.getJobMainContainerMemoryLimit()),
configs.getJobKubeTolerations(),
nodeSelectors,
configs.getUseCustomKubeNodeSelector() ? configs.getIsolatedJobKubeNodeSelectors() : nodeSelectors,
configs.getUseCustomKubeNodeSelector() ? Optional.of(configs.getIsolatedJobKubeNodeSelectors()) : Optional.empty(),
annotations,
configs.getJobKubeMainContainerImagePullSecret(),
configs.getJobKubeMainContainerImagePullPolicy(),
Expand All @@ -159,7 +160,7 @@ public static WorkerConfigs buildReplicationWorkerConfigs(final Configs configs)
.withMemoryLimit(configs.getReplicationOrchestratorMemoryLimit()),
configs.getJobKubeTolerations(),
configs.getJobKubeNodeSelectors(),
configs.getUseCustomKubeNodeSelector() ? configs.getIsolatedJobKubeNodeSelectors() : configs.getJobKubeNodeSelectors(),
configs.getUseCustomKubeNodeSelector() ? Optional.of(configs.getIsolatedJobKubeNodeSelectors()) : Optional.empty(),
configs.getJobKubeAnnotations(),
configs.getJobKubeMainContainerImagePullSecret(),
configs.getJobKubeMainContainerImagePullPolicy(),
Expand All @@ -186,7 +187,7 @@ public Map<String, String> getworkerKubeNodeSelectors() {
return workerKubeNodeSelectors;
}

public Map<String, String> getWorkerIsolatedKubeNodeSelectors() {
public Optional<Map<String, String>> getWorkerIsolatedKubeNodeSelectors() {
return workerIsolatedKubeNodeSelectors;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,26 @@ public class AirbyteIntegrationLauncher implements IntegrationLauncher {
private final ResourceRequirements resourceRequirement;
private final FeatureFlags featureFlags;

private final boolean useIsolatedNodePool;
/**
* If true, launcher will use a separated isolated pool to run the job.
*
* At this moment, we put custom connector jobs into an isolated pool.
*/
private final boolean useIsolatedPool;

public AirbyteIntegrationLauncher(final String jobId,
xiaohansong marked this conversation as resolved.
Show resolved Hide resolved
final int attempt,
final String imageName,
final ProcessFactory processFactory,
final ResourceRequirements resourceRequirement,
final boolean useIsolatedNodePool) {
final boolean useIsolatedPool) {
this.jobId = jobId;
this.attempt = attempt;
this.imageName = imageName;
this.processFactory = processFactory;
this.resourceRequirement = resourceRequirement;
this.featureFlags = new EnvVariableFeatureFlags();
this.useIsolatedNodePool = useIsolatedNodePool;
this.useIsolatedPool = useIsolatedPool;
}

@Trace(operationName = WORKER_OPERATION_NAME)
Expand All @@ -84,7 +89,7 @@ public Process spec(final Path jobRoot) throws WorkerException {
attempt,
jobRoot,
imageName,
useIsolatedNodePool,
useIsolatedPool,
false,
Collections.emptyMap(),
null,
Expand All @@ -105,7 +110,7 @@ public Process check(final Path jobRoot, final String configFilename, final Stri
attempt,
jobRoot,
imageName,
useIsolatedNodePool,
useIsolatedPool,
false,
ImmutableMap.of(configFilename, configContents),
null,
Expand All @@ -127,7 +132,7 @@ public Process discover(final Path jobRoot, final String configFilename, final S
attempt,
jobRoot,
imageName,
useIsolatedNodePool,
useIsolatedPool,
false,
ImmutableMap.of(configFilename, configContents),
null,
Expand Down Expand Up @@ -173,7 +178,7 @@ public Process read(final Path jobRoot,
attempt,
jobRoot,
imageName,
useIsolatedNodePool,
useIsolatedPool,
false,
files,
null,
Expand Down Expand Up @@ -203,7 +208,7 @@ public Process write(final Path jobRoot,
attempt,
jobRoot,
imageName,
useIsolatedNodePool,
useIsolatedPool,
true,
files,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,11 @@ public Process create(

final var allLabels = getLabels(jobId, attempt, customLabels);

final var nodeSelectors = usesIsolatedPool ? workerConfigs.getWorkerIsolatedKubeNodeSelectors() : workerConfigs.getworkerKubeNodeSelectors();
// If using isolated pool, check workerConfigs has isolated pool set. If not set, fall back to use
// regular node pool.
final var nodeSelectors =
usesIsolatedPool ? workerConfigs.getWorkerIsolatedKubeNodeSelectors().orElse(workerConfigs.getworkerKubeNodeSelectors())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I wonder if we should just thrown an exception if usesIsolatedPool is true but the getWorkerIsolatedKubeNodeSelectors Optional is empty. I think it would be safer for us to fail loudly if we didn't configure the isolated pool properly, rather than accidentally letting custom connectors run in our main pool. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call! fixed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Realized this is not true - usesIsolatedPool here is actually passed by callers (isCustomConnector flag). Thus for OSS users who do not run them in a separate pool, usesIsolatedPool might still be true because they are custom connectors. I should rename this to isCustomConnectors instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh good call, OSS users should be able to use custom connectors without an isolated pool indeed

: workerConfigs.getworkerKubeNodeSelectors();

return new KubePodProcess(
isOrchestrator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public DbtLauncherWorker(final UUID connectionId,
serverPort,
temporalUtils,
workerConfigs,
// Custom connector does not use Dbt at this moment, thus this flag for runnning job under
// isolated pool can be set to false.
false);
xiaohansong marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,19 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException
log.info("Creating " + podName + " for attempt number: " + jobRunConfig.getAttemptId());
killRunningPodsForConnection();

// custom connectors run in an isolated node pool from airbyte-supported connectors
// to reduce the blast radius of any problems with custom connector code.
final var nodeSelectors =
isCustomConnector ? workerConfigs.getWorkerIsolatedKubeNodeSelectors().orElse(workerConfigs.getworkerKubeNodeSelectors())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment here, I think it's more dangerous to fall back on our primary node selectors, and perhaps we should throw an exception instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this check in WorkerConfigurationBeanFactory.java instead - because 1) workerConfigs does not carry useIsolatedPool flag, 2) should throw this error as early as possible

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

: workerConfigs.getworkerKubeNodeSelectors();

try {
process.create(
allLabels,
resourceRequirements,
fileMap,
portMap,
isCustomConnector ? workerConfigs.getWorkerIsolatedKubeNodeSelectors() : workerConfigs.getworkerKubeNodeSelectors());
nodeSelectors);
} catch (final KubernetesClientException e) {
ApmTraceUtils.addExceptionToTrace(e);
throw new WorkerException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public NormalizationLauncherWorker(final UUID connectionId,
serverPort,
temporalUtils,
workerConfigs,
// Normalization process will happen only on a fixed set of connectors,
// thus they are not going to be run under custom connectors. Setting this to false.
false);
xiaohansong marked this conversation as resolved.
Show resolved Hide resolved

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,17 @@ public Optional<String> runJob() throws Exception {
DESTINATION_DOCKER_IMAGE_KEY, destinationLauncherConfig.getDockerImage(),
SOURCE_DOCKER_IMAGE_KEY, sourceLauncherConfig.getDockerImage()));

// At this moment, if either source or destination is from custom connector image, we will put all
// jobs into isolated pool to run.
boolean useIsolatedPool = sourceLauncherConfig.getIsCustomConnector() || destinationLauncherConfig.getIsCustomConnector();
log.info("Setting up source launcher...");
final var sourceLauncher = new AirbyteIntegrationLauncher(
sourceLauncherConfig.getJobId(),
Math.toIntExact(sourceLauncherConfig.getAttemptId()),
sourceLauncherConfig.getDockerImage(),
processFactory,
syncInput.getSourceResourceRequirements(),
sourceLauncherConfig.getIsCustomConnector());
useIsolatedPool);

log.info("Setting up destination launcher...");
final var destinationLauncher = new AirbyteIntegrationLauncher(
Expand All @@ -119,7 +122,7 @@ public Optional<String> runJob() throws Exception {
destinationLauncherConfig.getDockerImage(),
processFactory,
syncInput.getDestinationResourceRequirements(),
destinationLauncherConfig.getIsCustomConnector());
useIsolatedPool);

log.info("Setting up source...");
// reset jobs use an empty source to induce resetting all data in destination.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ public Long create(final UUID connectionId) {
new Version(destinationDefinition.getProtocolVersion()),
standardSyncOperations,
workspace.getWebhookOperationConfigs(),
sourceDefinition, destinationDefinition)
sourceDefinition,
destinationDefinition)
.orElseThrow(() -> new IllegalStateException("We shouldn't be trying to create a new sync job if there is one running already."));

} catch (final IOException | JsonValidationException | ConfigNotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,9 @@ public DestinationDefinitionRead createCustomDestinationDefinition(final CustomD
private StandardDestinationDefinition destinationDefinitionFromCreate(final DestinationDefinitionCreate destinationDefCreate) throws IOException {
final ConnectorSpecification spec = getSpecForImage(
destinationDefCreate.getDockerRepository(),
destinationDefCreate.getDockerImageTag(), true);
destinationDefCreate.getDockerImageTag(),
// Only custom connectors can be created via handlers.
true);

final Version airbyteProtocolVersion = AirbyteProtocolVersion.getWithDefault(spec.getProtocolVersion());

Expand Down Expand Up @@ -284,16 +286,6 @@ public void deleteDestinationDefinition(final DestinationDefinitionIdRequestBody
configRepository.writeStandardDestinationDefinition(persistedDestinationDefinition);
}

public void deleteCustomDestinationDefinition(final DestinationDefinitionIdWithWorkspaceId destinationDefinitionIdWithWorkspaceId)
throws IOException, JsonValidationException, ConfigNotFoundException {
final UUID definitionId = destinationDefinitionIdWithWorkspaceId.getDestinationDefinitionId();
final UUID workspaceId = destinationDefinitionIdWithWorkspaceId.getWorkspaceId();
if (!configRepository.workspaceCanUseCustomDefinition(definitionId, workspaceId)) {
throw new IdNotFoundKnownException("Cannot find the requested definition with given id for this workspace", definitionId.toString());
}
deleteDestinationDefinition(new DestinationDefinitionIdRequestBody().destinationDefinitionId(definitionId));
}

private ConnectorSpecification getSpecForImage(final String dockerRepository, final String imageTag, boolean isCustomConnector) throws IOException {
final String imageName = DockerUtils.getTaggedImageName(dockerRepository, imageTag);
final SynchronousResponse<ConnectorSpecification> getSpecResponse = schedulerSynchronousClient.createGetSpecJob(imageName, isCustomConnector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,11 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final Source
final boolean bustActorCatalogCache = discoverSchemaRequestBody.getDisableCache() != null && discoverSchemaRequestBody.getDisableCache();
if (currentCatalog.isEmpty() || bustActorCatalogCache) {
final SynchronousResponse<UUID> persistedCatalogId =
synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName, connectorVersion, new Version(sourceDef.getProtocolVersion()),
synchronousSchedulerClient.createDiscoverSchemaJob(
source,
imageName,
connectorVersion,
new Version(sourceDef.getProtocolVersion()),
isCustomConnector);
final SourceDiscoverSchemaRead discoveredSchema = retrieveDiscoveredSchema(persistedCatalogId);

Expand Down Expand Up @@ -295,8 +299,13 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceCreate(final So
.withSourceDefinitionId(sourceCreate.getSourceDefinitionId())
.withConfiguration(partialConfig)
.withWorkspaceId(sourceCreate.getWorkspaceId());
final SynchronousResponse<UUID> response = synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName, sourceDef.getDockerImageTag(),
new Version(sourceDef.getProtocolVersion()), isCustomConnector);
final SynchronousResponse<UUID> response = synchronousSchedulerClient.createDiscoverSchemaJob(
source,
imageName,
sourceDef.getDockerImageTag(),
new Version(
sourceDef.getProtocolVersion()),
isCustomConnector);
return retrieveDiscoveredSchema(response);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,11 @@ public SourceDefinitionRead createCustomSourceDefinition(final CustomSourceDefin
private StandardSourceDefinition sourceDefinitionFromCreate(final SourceDefinitionCreate sourceDefinitionCreate)
throws IOException {
final ConnectorSpecification spec =
getSpecForImage(sourceDefinitionCreate.getDockerRepository(), sourceDefinitionCreate.getDockerImageTag(), true);
getSpecForImage(
sourceDefinitionCreate.getDockerRepository(),
sourceDefinitionCreate.getDockerImageTag(),
// Only custom connectors can be created via handlers.
true);

final Version airbyteProtocolVersion = AirbyteProtocolVersion.getWithDefault(spec.getProtocolVersion());

Expand Down Expand Up @@ -285,16 +289,6 @@ public void deleteSourceDefinition(final SourceDefinitionIdRequestBody sourceDef
configRepository.writeStandardSourceDefinition(persistedSourceDefinition);
}

public void deleteCustomSourceDefinition(final SourceDefinitionIdWithWorkspaceId sourceDefinitionIdWithWorkspaceId)
xiaohansong marked this conversation as resolved.
Show resolved Hide resolved
throws IOException, JsonValidationException, ConfigNotFoundException {
final UUID definitionId = sourceDefinitionIdWithWorkspaceId.getSourceDefinitionId();
final UUID workspaceId = sourceDefinitionIdWithWorkspaceId.getWorkspaceId();
if (!configRepository.workspaceCanUseCustomDefinition(definitionId, workspaceId)) {
throw new IdNotFoundKnownException("Cannot find the requested definition with given id for this workspace", definitionId.toString());
}
deleteSourceDefinition(new SourceDefinitionIdRequestBody().sourceDefinitionId(definitionId));
}

private ConnectorSpecification getSpecForImage(final String dockerRepository, final String imageTag, final boolean isCustomConnector)
throws IOException {
final String imageName = DockerUtils.getTaggedImageName(dockerRepository, imageTag);
Expand Down
Loading