diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java index db279355fa5..be3ad5c8f43 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java @@ -32,6 +32,7 @@ import azkaban.jobExecutor.AbstractJob; import lombok.Getter; +import org.apache.gobblin.util.AzkabanLauncherUtils; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.yarn.GobblinYarnAppLauncher; import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; @@ -63,7 +64,7 @@ public class AzkabanGobblinYarnAppLauncher extends AbstractJob { public AzkabanGobblinYarnAppLauncher(String jobId, Properties gobblinProps) throws IOException { super(jobId, LOGGER); - + gobblinProps = AzkabanLauncherUtils.undoPlaceholderConversion(gobblinProps); addRuntimeProperties(gobblinProps); Config gobblinConfig = ConfigUtils.propertiesToConfig(gobblinProps); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java index 47d76b963e0..c8fbd047c5d 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java @@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -157,6 +158,7 @@ class YarnService extends AbstractIdleService { private final Optional containerJvmArgs; private final String containerTimezone; + private final String proxyJvmArgs; @Getter(AccessLevel.PROTECTED) private volatile Optional maxResourceCapacity = Optional.absent(); @@ -240,6 +242,9 @@ public YarnService(Config config, String applicationName, String applicationId, Optional.of(config.getString(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY)) : Optional.absent(); + this.proxyJvmArgs = config.hasPath(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) ? + config.getString(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) : StringUtils.EMPTY; + int numContainerLaunchThreads = ConfigUtils.getInt(config, GobblinYarnConfigurationKeys.MAX_CONTAINER_LAUNCH_THREADS_KEY, GobblinYarnConfigurationKeys.DEFAULT_MAX_CONTAINER_LAUNCH_THREADS); @@ -594,6 +599,7 @@ protected String buildContainerCommand(Container container, String helixParticip .append(" -D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_DIR_NAME).append("=").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR) .append(" -D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_FILE_NAME).append("=").append(containerProcessName).append(".").append(ApplicationConstants.STDOUT) .append(" ").append(JvmUtils.formatJvmArguments(this.containerJvmArgs)) + .append(" ").append(this.proxyJvmArgs) .append(" ").append(GobblinTemporalYarnTaskRunner.class.getName()) .append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME) .append(" ").append(this.applicationName) diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/AzkabanLauncherUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/AzkabanLauncherUtils.java new file mode 100644 index 00000000000..ca6d98a5b73 --- /dev/null +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AzkabanLauncherUtils.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.util; + +import java.util.Map; +import java.util.Properties; + +import org.apache.commons.lang.StringUtils; + +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableBiMap; + +/** + * Utility class for Azkaban App Launcher. + */ +public class AzkabanLauncherUtils { + public static final String PLACEHOLDER_MAP_KEY = "placeholderMap"; + + /** + * Reverts properties that were converted to placeholders back to their original values. + * It checks if the properties contain placeholderMap key and, if so, uses it as an inverse map + * to replace placeholder values with their original values. + * + * @param appProperties the properties object containing the application properties alongwith the inverse map + * @return a new Properties object with placeholders reverted to their original values, or the original properties if no placeholderMap + */ + public static Properties undoPlaceholderConversion(Properties appProperties) { + if (StringUtils.EMPTY.equals(appProperties.getProperty(PLACEHOLDER_MAP_KEY, StringUtils.EMPTY))) { + return appProperties; + } + + Properties convertedProperties = new Properties(); + convertedProperties.putAll(appProperties); + + // Undo properties converted to placeholders + Map inversePlaceholderMap = ImmutableBiMap.copyOf(Splitter.on(",").withKeyValueSeparator(":") + .split(convertedProperties.get(PLACEHOLDER_MAP_KEY).toString())).inverse(); + for (Map.Entry entry : convertedProperties.entrySet()) { + if (inversePlaceholderMap.containsKey(entry.getValue().toString())) { + convertedProperties.put(entry.getKey(), inversePlaceholderMap.get(entry.getValue().toString())); + } + } + return convertedProperties; + } +} diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/AzkabanLauncherUtilsTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/AzkabanLauncherUtilsTest.java new file mode 100644 index 00000000000..8aed8a11cdb --- /dev/null +++ b/gobblin-utility/src/test/java/org/apache/gobblin/AzkabanLauncherUtilsTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin; + +import java.util.Properties; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import org.apache.gobblin.util.AzkabanLauncherUtils; + + +@Test +public class AzkabanLauncherUtilsTest { + + @Test + public void testPropertyPlaceholderReplacement() { + Properties props = new Properties(); + + props.setProperty("placeholderMap", ":emptyStringPlaceholder, :spacePlaceholder,\\t:tabPlaceholder"); + props.setProperty("key1", "emptyStringPlaceholder"); + props.setProperty("key2", "spacePlaceholder"); + props.setProperty("key3", "tabPlaceholder"); + props.setProperty("key4", "someOtherValue"); + props.setProperty("key5", "123emptyStringPlaceholder"); + + props = AzkabanLauncherUtils.undoPlaceholderConversion(props); + Assert.assertEquals(props.get("key1").toString(), ""); + Assert.assertEquals(props.get("key2").toString(), " "); + Assert.assertEquals(props.get("key3").toString(), "\\t"); + Assert.assertEquals(props.get("key4").toString(), "someOtherValue"); + + // should replace exact matches only + Assert.assertEquals(props.get("key5").toString(), "123emptyStringPlaceholder"); + } + + @Test + public void testPlaceholderMapMissing() { + Properties props = new Properties(); + props.setProperty("key1", "emptyStringPlaceholder"); + + props = AzkabanLauncherUtils.undoPlaceholderConversion(props); + Assert.assertEquals(props.get("key1").toString(), "emptyStringPlaceholder"); + } + + @Test + public void testEmptyPlaceholderMap() { + Properties props = new Properties(); + props.setProperty("placeholderMap", ""); + props.setProperty("key1", "emptyStringPlaceholder"); + + props = AzkabanLauncherUtils.undoPlaceholderConversion(props); + Assert.assertEquals(props.get("key1").toString(), "emptyStringPlaceholder"); + } +} diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java index 01655e9faba..031e295aa04 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java @@ -37,6 +37,7 @@ import org.apache.avro.Schema; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; import org.apache.commons.mail.EmailException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -209,6 +210,7 @@ public class GobblinYarnAppLauncher { private final long appReportIntervalMinutes; private final Optional appMasterJvmArgs; + private final String proxyJvmArgs; private final String sinkLogRootDir; @@ -280,6 +282,9 @@ public GobblinYarnAppLauncher(Config config, YarnConfiguration yarnConfiguration Optional.of(config.getString(GobblinYarnConfigurationKeys.APP_MASTER_JVM_ARGS_KEY)) : Optional.absent(); + this.proxyJvmArgs = config.hasPath(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) ? + config.getString(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) : StringUtils.EMPTY; + this.sinkLogRootDir = ConfigUtils.getString(config, GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY, null); this.maxGetApplicationReportFailures = config.getInt(GobblinYarnConfigurationKeys.MAX_GET_APP_REPORT_FAILURES_KEY); @@ -829,6 +834,7 @@ protected String buildApplicationMasterCommand(String applicationId, int memoryM .append(" -D").append(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY).append("=").append(config.getString(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY)) .append(" -D").append(GobblinYarnConfigurationKeys.YARN_APPLICATION_LIB_JAR_LIST).append("=").append(String.join(",", this.libJarNames)) .append(" ").append(JvmUtils.formatJvmArguments(this.appMasterJvmArgs)) + .append(" ").append(this.proxyJvmArgs) .append(" ").append(appMasterClass.getName()) .append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME) .append(" ").append(this.applicationName) diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java index b8fc95b29d4..10bc9f97093 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java @@ -57,6 +57,12 @@ public class GobblinYarnConfigurationKeys { public static final String YARN_APPLICATION_LIB_JAR_LIST = GOBBLIN_YARN_PREFIX + "lib.jar.list"; + /** + * Config for setting http/https proxy host and port. + * The value for this property is expected in format -DproxySet=true -Dhttps.proxyHost=[hostname] -Dhttps.proxyPort=[port] + */ + public static final String YARN_APPLICATION_PROXY_JVM_ARGS = GOBBLIN_YARN_PREFIX + "proxy.jvm.args"; + // Used to store the start time of the app launcher to propagate to workers and appmaster public static final String YARN_APPLICATION_LAUNCHER_START_TIME_KEY = GOBBLIN_YARN_PREFIX + "application.start.time"; diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java index e1f24434489..622c6d56479 100644 --- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java +++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java @@ -215,7 +215,7 @@ public void setUp() throws Exception { .resolve(); } - @Test + @Test(dependsOnMethods = "testCreateHelixCluster") public void testBuildApplicationMasterCommand() { String command = this.gobblinYarnAppLauncher.buildApplicationMasterCommand("application_1234_3456", 64); @@ -355,6 +355,16 @@ public void testSendShutdownRequest() throws Exception { assertWithBackoff.assertEquals(getInstanceMessageNum, 0, "all controller messages processed"); } + @Test(dependsOnMethods = "testCreateHelixCluster") + public void testProxyJvmArgs() throws IOException { + this.config = this.config.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS, + ConfigValueFactory.fromAnyRef("-Dhttp.proxyHost=foo-bar.baz.org -Dhttp.proxyPort=1234")); + this.gobblinYarnAppLauncher = new GobblinYarnAppLauncher(this.config, clusterConf); + + String command = this.gobblinYarnAppLauncher.buildApplicationMasterCommand("application_1234_3456", 64); + Assert.assertTrue(command.contains("-Dhttp.proxyHost=foo-bar.baz.org -Dhttp.proxyPort=1234")); + } + @AfterClass public void tearDown() throws IOException, TimeoutException { try {