Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass allowedHosts to container runners #21676

Merged
merged 9 commits into from
Jan 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public boolean transform(final String jobId,
files,
"/bin/bash",
resourceRequirements,
null,
Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, CUSTOM_STEP),
Collections.emptyMap(),
Collections.emptyMap(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public static String getEntrypointEnvVariable(final ProcessFactory processFactor
Collections.emptyMap(),
"printenv",
null,
null,
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ private boolean runProcess(final String jobId,
false, files,
null,
resourceRequirements,
null,
Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, NORMALIZE_STEP),
Collections.emptyMap(),
Collections.emptyMap(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import datadog.trace.api.Trace;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.config.AllowedHosts;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.WorkerEnvConstants;
import io.airbyte.metrics.lib.ApmTraceUtils;
Expand Down Expand Up @@ -50,19 +51,22 @@ public class AirbyteIntegrationLauncher implements IntegrationLauncher {
* At this moment, we put custom connector jobs into an isolated pool.
*/
private final boolean useIsolatedPool;
private final AllowedHosts allowedHosts;

public AirbyteIntegrationLauncher(final String jobId,
final int attempt,
final String imageName,
final ProcessFactory processFactory,
final ResourceRequirements resourceRequirement,
final AllowedHosts allowedHosts,
final boolean useIsolatedPool,
final FeatureFlags featureFlags) {
this.jobId = jobId;
this.attempt = attempt;
this.imageName = imageName;
this.processFactory = processFactory;
this.resourceRequirement = resourceRequirement;
this.allowedHosts = allowedHosts;
this.featureFlags = featureFlags;
this.useIsolatedPool = useIsolatedPool;
}
Expand All @@ -82,6 +86,7 @@ public Process spec(final Path jobRoot) throws WorkerException {
Collections.emptyMap(),
null,
resourceRequirement,
allowedHosts,
Map.of(JOB_TYPE_KEY, SPEC_JOB),
getWorkerMetadata(),
Collections.emptyMap(),
Expand All @@ -103,6 +108,7 @@ public Process check(final Path jobRoot, final String configFilename, final Stri
ImmutableMap.of(configFilename, configContents),
null,
resourceRequirement,
allowedHosts,
Map.of(JOB_TYPE_KEY, CHECK_JOB),
getWorkerMetadata(),
Collections.emptyMap(),
Expand All @@ -125,6 +131,7 @@ public Process discover(final Path jobRoot, final String configFilename, final S
ImmutableMap.of(configFilename, configContents),
null,
resourceRequirement,
allowedHosts,
Map.of(JOB_TYPE_KEY, DISCOVER_JOB),
getWorkerMetadata(),
Collections.emptyMap(),
Expand Down Expand Up @@ -171,6 +178,7 @@ public Process read(final Path jobRoot,
files,
null,
resourceRequirement,
allowedHosts,
Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, READ_STEP),
getWorkerMetadata(),
Collections.emptyMap(),
Expand Down Expand Up @@ -201,6 +209,7 @@ public Process write(final Path jobRoot,
files,
null,
resourceRequirement,
allowedHosts,
Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, WRITE_STEP),
getWorkerMetadata(),
Collections.emptyMap(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.map.MoreMaps;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.AllowedHosts;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerUtils;
Expand Down Expand Up @@ -91,6 +92,7 @@ public Process create(final String jobType,
final Map<String, String> files,
final String entrypoint,
final ResourceRequirements resourceRequirements,
final AllowedHosts allowedHosts,
final Map<String, String> labels,
final Map<String, String> jobMetadata,
final Map<Integer, Integer> internalToExternalPorts,
Expand Down Expand Up @@ -120,7 +122,7 @@ public Process create(final String jobType,
"--log-driver",
"none");
final String containerName = ProcessFactory.createProcessName(imageName, jobType, jobId, attempt, DOCKER_NAME_LEN_LIMIT);
LOGGER.info("Creating docker container = {} with resources {}", containerName, resourceRequirements);
LOGGER.info("Creating docker container = {} with resources {} and allowed hosts {}", containerName, resourceRequirements, allowedHosts);
cmd.add("--name");
cmd.add(containerName);
cmd.addAll(localDebuggingOptions(containerName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.map.MoreMaps;
import io.airbyte.config.AllowedHosts;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.exception.WorkerException;
Expand Down Expand Up @@ -85,6 +86,7 @@ public Process create(
final Map<String, String> files,
final String entrypoint,
final ResourceRequirements resourceRequirements,
final AllowedHosts allowedHosts,
final Map<String, String> customLabels,
final Map<String, String> jobMetadata,
final Map<Integer, Integer> internalToExternalPorts,
Expand All @@ -93,7 +95,8 @@ public Process create(
try {
// used to differentiate source and destination processes with the same id and attempt
final String podName = ProcessFactory.createProcessName(imageName, jobType, jobId, attempt, KUBE_NAME_LEN_LIMIT);
LOGGER.info("Attempting to start pod = {} for {} with resources {}", podName, imageName, resourceRequirements);
LOGGER.info("Attempting to start pod = {} for {} with resources {} and allowed hosts {}", podName, imageName, resourceRequirements,
allowedHosts);

final int stdoutLocalPort = KubePortManagerSingleton.getInstance().take();
LOGGER.info("{} stdoutLocalPort = {}", podName, stdoutLocalPort);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.workers.process;

import io.airbyte.config.AllowedHosts;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.workers.exception.WorkerException;
import java.nio.file.Path;
Expand Down Expand Up @@ -49,6 +50,7 @@ Process create(String jobType,
final Map<String, String> files,
final String entrypoint,
final ResourceRequirements resourceRequirements,
final AllowedHosts allowedHosts,
final Map<String, String> labels,
final Map<String, String> jobMetadata,
final Map<Integer, Integer> portMapping,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ void setup() throws IOException, WorkerException {
when(processFactory.create(NORMALIZE_STEP, JOB_ID, JOB_ATTEMPT, jobRoot,
DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), false, false, files, null,
workerConfigs.getResourceRequirements(),
null,
Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, NORMALIZE_STEP),
Map.of(),
Map.of(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class AirbyteIntegrationLauncherTest {
@BeforeEach
void setUp() {
workerConfigs = new WorkerConfigs(new EnvConfigs());
launcher = new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, FAKE_IMAGE, processFactory, workerConfigs.getResourceRequirements(), false,
launcher = new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, FAKE_IMAGE, processFactory, workerConfigs.getResourceRequirements(), null, false,
featureFlags);
}

Expand All @@ -78,7 +78,7 @@ void spec() throws WorkerException {
launcher.spec(JOB_ROOT);

Mockito.verify(processFactory).create(SPEC_JOB, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, false, Collections.emptyMap(), null,
workerConfigs.getResourceRequirements(), Map.of(JOB_TYPE_KEY, SPEC_JOB), JOB_METADATA,
workerConfigs.getResourceRequirements(), null, Map.of(JOB_TYPE_KEY, SPEC_JOB), JOB_METADATA,
Map.of(),
"spec");
}
Expand All @@ -89,6 +89,7 @@ void check() throws WorkerException {

Mockito.verify(processFactory).create(CHECK_JOB, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, false, CONFIG_FILES, null,
workerConfigs.getResourceRequirements(),
null,
Map.of(JOB_TYPE_KEY, CHECK_JOB),
JOB_METADATA,
Map.of(),
Expand All @@ -102,6 +103,7 @@ void discover() throws WorkerException {

Mockito.verify(processFactory).create(DISCOVER_JOB, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, false, CONFIG_FILES, null,
workerConfigs.getResourceRequirements(),
null,
Map.of(JOB_TYPE_KEY, DISCOVER_JOB),
JOB_METADATA,
Map.of(),
Expand All @@ -115,6 +117,7 @@ void read() throws WorkerException {

Mockito.verify(processFactory).create(READ_STEP, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, false, CONFIG_CATALOG_STATE_FILES, null,
workerConfigs.getResourceRequirements(),
null,
Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, READ_STEP),
JOB_METADATA,
Map.of(),
Expand All @@ -131,6 +134,7 @@ void write() throws WorkerException {

Mockito.verify(processFactory).create(WRITE_STEP, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, true, CONFIG_CATALOG_FILES, null,
workerConfigs.getResourceRequirements(),
null,
Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, WRITE_STEP),
JOB_METADATA,
Map.of(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ void testFileWriting() throws IOException, WorkerException {
final DockerProcessFactory processFactory =
new DockerProcessFactory(new WorkerConfigs(new EnvConfigs()), workspaceRoot, null, null, null);
processFactory.create("tester", "job_id", 0, jobRoot, BUSYBOX, false, false, ImmutableMap.of("config.json", "{\"data\": 2}"), "echo hi",
new WorkerConfigs(new EnvConfigs()).getResourceRequirements(), Map.of(), Map.of(), Map.of());
new WorkerConfigs(new EnvConfigs()).getResourceRequirements(), null, Map.of(), Map.of(), Map.of());

assertEquals(
Jsons.jsonNode(ImmutableMap.of("data", 2)),
Expand Down Expand Up @@ -125,6 +125,7 @@ void testEnvMapSet() throws IOException, WorkerException, InterruptedException {
Map.of(),
"/bin/sh",
workerConfigs.getResourceRequirements(),
null,
Map.of(),
Map.of(),
Map.of(),
Expand Down Expand Up @@ -158,6 +159,7 @@ private void waitForDockerToInitialize(final ProcessFactory processFactory, fina
Map.of(),
"/bin/sh",
workerConfigs.getResourceRequirements(),
null,
Map.of(),
Map.of(),
Map.of(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.airbyte.config.ActorCatalogFetchEvent;
import io.airbyte.config.ActorCatalogWithUpdatedAt;
import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.AllowedHosts;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.DestinationOAuthParameter;
import io.airbyte.config.FieldSelectionData;
Expand Down Expand Up @@ -160,7 +161,10 @@ public static StandardSourceDefinition buildStandardSourceDefinition(final Recor
: record.get(ACTOR_DEFINITION.RELEASE_DATE).toString())
.withResourceRequirements(record.get(ACTOR_DEFINITION.RESOURCE_REQUIREMENTS) == null
? null
: Jsons.deserialize(record.get(ACTOR_DEFINITION.RESOURCE_REQUIREMENTS).data(), ActorDefinitionResourceRequirements.class));
: Jsons.deserialize(record.get(ACTOR_DEFINITION.RESOURCE_REQUIREMENTS).data(), ActorDefinitionResourceRequirements.class))
.withAllowedHosts(record.get(ACTOR_DEFINITION.ALLOWED_HOSTS) == null
? null
: Jsons.deserialize(record.get(ACTOR_DEFINITION.ALLOWED_HOSTS).data(), AllowedHosts.class));
}

public static StandardDestinationDefinition buildStandardDestinationDefinition(final Record record) {
Expand Down Expand Up @@ -193,7 +197,10 @@ public static StandardDestinationDefinition buildStandardDestinationDefinition(f
: null)
.withResourceRequirements(record.get(ACTOR_DEFINITION.RESOURCE_REQUIREMENTS) == null
? null
: Jsons.deserialize(record.get(ACTOR_DEFINITION.RESOURCE_REQUIREMENTS).data(), ActorDefinitionResourceRequirements.class));
: Jsons.deserialize(record.get(ACTOR_DEFINITION.RESOURCE_REQUIREMENTS).data(), ActorDefinitionResourceRequirements.class))
.withAllowedHosts(record.get(ACTOR_DEFINITION.ALLOWED_HOSTS) == null
? null
: Jsons.deserialize(record.get(ACTOR_DEFINITION.ALLOWED_HOSTS).data(), AllowedHosts.class));
}

public static DestinationOAuthParameter buildDestinationOAuthParameter(final Record record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,9 @@
icon: github.svg
sourceType: api
releaseStage: generally_available
allowedHosts:
hosts:
- api.github.com
- name: Gitlab
sourceDefinitionId: 5e6175e5-68e1-4c17-bff9-56103bbb0d80
dockerRepository: airbyte/source-gitlab
Expand Down Expand Up @@ -1343,6 +1346,9 @@
icon: postgresql.svg
sourceType: database
releaseStage: generally_available
allowedHosts:
hosts:
- "${host}"
Comment on lines +1349 to +1351
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@git-phu I'd love your 👍 on how I ended up enabling interpolation. By the time we get to the KubeProcessFactory (or DockerProcessFactory), these strings are all resolved, but another pair of 👀 can't hurt.

- name: Postmark App
sourceDefinitionId: cde75ca1-1e28-4a0f-85bb-90c546de9f1f
dockerRepository: airbyte/source-postmarkapp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public Optional<String> runJob() throws Exception {
sourceLauncherConfig.getDockerImage(),
processFactory,
syncInput.getSourceResourceRequirements(),
sourceLauncherConfig.getAllowedHosts(),
useIsolatedPool,
featureFlags);

Expand All @@ -133,6 +134,7 @@ public Optional<String> runJob() throws Exception {
destinationLauncherConfig.getDockerImage(),
processFactory,
syncInput.getDestinationResourceRequirements(),
destinationLauncherConfig.getAllowedHosts(),
useIsolatedPool,
featureFlags);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1250,14 +1250,14 @@ protected void assertNamespaceNormalization(final String testCaseId,
private ConnectorSpecification runSpec() throws WorkerException {
return convertProtocolObject(
new DefaultGetSpecWorker(
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, false, new EnvVariableFeatureFlags()))
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, null, false, new EnvVariableFeatureFlags()))
.run(new JobGetSpecConfig().withDockerImage(getImageName()), jobRoot).getSpec(),
ConnectorSpecification.class);
}

protected StandardCheckConnectionOutput runCheck(final JsonNode config) throws WorkerException {
return new DefaultCheckConnectionWorker(
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, false, new EnvVariableFeatureFlags()),
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, null, false, new EnvVariableFeatureFlags()),
mConnectorConfigUpdater)
.run(new StandardCheckConnectionInput().withConnectionConfiguration(config), jobRoot)
.getCheckConnection();
Expand All @@ -1267,7 +1267,7 @@ protected StandardCheckConnectionOutput.Status runCheckWithCatchedException(
final JsonNode config) {
try {
final StandardCheckConnectionOutput standardCheckConnectionOutput = new DefaultCheckConnectionWorker(
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, false, new EnvVariableFeatureFlags()),
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, null, false, new EnvVariableFeatureFlags()),
mConnectorConfigUpdater)
.run(new StandardCheckConnectionInput().withConnectionConfiguration(config), jobRoot)
.getCheckConnection();
Expand All @@ -1280,7 +1280,7 @@ protected StandardCheckConnectionOutput.Status runCheckWithCatchedException(

protected AirbyteDestination getDestination() {
return new DefaultAirbyteDestination(
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, false, new EnvVariableFeatureFlags()));
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, null, false, new EnvVariableFeatureFlags()));
}

protected void runSyncAndVerifyStateOutput(final JsonNode config,
Expand Down
Loading