Skip to content

Commit

Permalink
[GOBBLIN-2172] Enable launching GoT YARN AM and workers with proxy HT…
Browse files Browse the repository at this point in the history
…TPS host+port (#4075)
  • Loading branch information
abhishekmjain authored Nov 14, 2024
1 parent 13a6926 commit 4bb0c40
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import azkaban.jobExecutor.AbstractJob;
import lombok.Getter;

import org.apache.gobblin.util.AzkabanLauncherUtils;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.yarn.GobblinYarnAppLauncher;
import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
Expand Down Expand Up @@ -63,7 +64,7 @@ public class AzkabanGobblinYarnAppLauncher extends AbstractJob {
public AzkabanGobblinYarnAppLauncher(String jobId, Properties gobblinProps)
throws IOException {
super(jobId, LOGGER);

gobblinProps = AzkabanLauncherUtils.undoPlaceholderConversion(gobblinProps);
addRuntimeProperties(gobblinProps);

Config gobblinConfig = ConfigUtils.propertiesToConfig(gobblinProps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -157,6 +158,7 @@ class YarnService extends AbstractIdleService {

private final Optional<String> containerJvmArgs;
private final String containerTimezone;
private final String proxyJvmArgs;

@Getter(AccessLevel.PROTECTED)
private volatile Optional<Resource> maxResourceCapacity = Optional.absent();
Expand Down Expand Up @@ -240,6 +242,9 @@ public YarnService(Config config, String applicationName, String applicationId,
Optional.of(config.getString(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY)) :
Optional.<String>absent();

this.proxyJvmArgs = config.hasPath(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) ?
config.getString(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) : StringUtils.EMPTY;

int numContainerLaunchThreads =
ConfigUtils.getInt(config, GobblinYarnConfigurationKeys.MAX_CONTAINER_LAUNCH_THREADS_KEY,
GobblinYarnConfigurationKeys.DEFAULT_MAX_CONTAINER_LAUNCH_THREADS);
Expand Down Expand Up @@ -594,6 +599,7 @@ protected String buildContainerCommand(Container container, String helixParticip
.append(" -D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_DIR_NAME).append("=").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR)
.append(" -D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_FILE_NAME).append("=").append(containerProcessName).append(".").append(ApplicationConstants.STDOUT)
.append(" ").append(JvmUtils.formatJvmArguments(this.containerJvmArgs))
.append(" ").append(this.proxyJvmArgs)
.append(" ").append(GobblinTemporalYarnTaskRunner.class.getName())
.append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)
.append(" ").append(this.applicationName)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.util;

import java.util.Map;
import java.util.Properties;

import org.apache.commons.lang.StringUtils;

import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableBiMap;

/**
* Utility class for Azkaban App Launcher.
*/
public class AzkabanLauncherUtils {
public static final String PLACEHOLDER_MAP_KEY = "placeholderMap";

/**
* Reverts properties that were converted to placeholders back to their original values.
* It checks if the properties contain placeholderMap key and, if so, uses it as an inverse map
* to replace placeholder values with their original values.
*
* @param appProperties the properties object containing the application properties alongwith the inverse map
* @return a new Properties object with placeholders reverted to their original values, or the original properties if no placeholderMap
*/
public static Properties undoPlaceholderConversion(Properties appProperties) {
if (StringUtils.EMPTY.equals(appProperties.getProperty(PLACEHOLDER_MAP_KEY, StringUtils.EMPTY))) {
return appProperties;
}

Properties convertedProperties = new Properties();
convertedProperties.putAll(appProperties);

// Undo properties converted to placeholders
Map<String, String> inversePlaceholderMap = ImmutableBiMap.copyOf(Splitter.on(",").withKeyValueSeparator(":")
.split(convertedProperties.get(PLACEHOLDER_MAP_KEY).toString())).inverse();
for (Map.Entry<Object, Object> entry : convertedProperties.entrySet()) {
if (inversePlaceholderMap.containsKey(entry.getValue().toString())) {
convertedProperties.put(entry.getKey(), inversePlaceholderMap.get(entry.getValue().toString()));
}
}
return convertedProperties;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin;

import java.util.Properties;

import org.testng.Assert;
import org.testng.annotations.Test;

import org.apache.gobblin.util.AzkabanLauncherUtils;


@Test
public class AzkabanLauncherUtilsTest {

@Test
public void testPropertyPlaceholderReplacement() {
Properties props = new Properties();

props.setProperty("placeholderMap", ":emptyStringPlaceholder, :spacePlaceholder,\\t:tabPlaceholder");
props.setProperty("key1", "emptyStringPlaceholder");
props.setProperty("key2", "spacePlaceholder");
props.setProperty("key3", "tabPlaceholder");
props.setProperty("key4", "someOtherValue");
props.setProperty("key5", "123emptyStringPlaceholder");

props = AzkabanLauncherUtils.undoPlaceholderConversion(props);
Assert.assertEquals(props.get("key1").toString(), "");
Assert.assertEquals(props.get("key2").toString(), " ");
Assert.assertEquals(props.get("key3").toString(), "\\t");
Assert.assertEquals(props.get("key4").toString(), "someOtherValue");

// should replace exact matches only
Assert.assertEquals(props.get("key5").toString(), "123emptyStringPlaceholder");
}

@Test
public void testPlaceholderMapMissing() {
Properties props = new Properties();
props.setProperty("key1", "emptyStringPlaceholder");

props = AzkabanLauncherUtils.undoPlaceholderConversion(props);
Assert.assertEquals(props.get("key1").toString(), "emptyStringPlaceholder");
}

@Test
public void testEmptyPlaceholderMap() {
Properties props = new Properties();
props.setProperty("placeholderMap", "");
props.setProperty("key1", "emptyStringPlaceholder");

props = AzkabanLauncherUtils.undoPlaceholderConversion(props);
Assert.assertEquals(props.get("key1").toString(), "emptyStringPlaceholder");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import org.apache.avro.Schema;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.mail.EmailException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -209,6 +210,7 @@ public class GobblinYarnAppLauncher {
private final long appReportIntervalMinutes;

private final Optional<String> appMasterJvmArgs;
private final String proxyJvmArgs;

private final String sinkLogRootDir;

Expand Down Expand Up @@ -280,6 +282,9 @@ public GobblinYarnAppLauncher(Config config, YarnConfiguration yarnConfiguration
Optional.of(config.getString(GobblinYarnConfigurationKeys.APP_MASTER_JVM_ARGS_KEY)) :
Optional.<String>absent();

this.proxyJvmArgs = config.hasPath(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) ?
config.getString(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) : StringUtils.EMPTY;

this.sinkLogRootDir = ConfigUtils.getString(config, GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY, null);

this.maxGetApplicationReportFailures = config.getInt(GobblinYarnConfigurationKeys.MAX_GET_APP_REPORT_FAILURES_KEY);
Expand Down Expand Up @@ -829,6 +834,7 @@ protected String buildApplicationMasterCommand(String applicationId, int memoryM
.append(" -D").append(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY).append("=").append(config.getString(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY))
.append(" -D").append(GobblinYarnConfigurationKeys.YARN_APPLICATION_LIB_JAR_LIST).append("=").append(String.join(",", this.libJarNames))
.append(" ").append(JvmUtils.formatJvmArguments(this.appMasterJvmArgs))
.append(" ").append(this.proxyJvmArgs)
.append(" ").append(appMasterClass.getName())
.append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)
.append(" ").append(this.applicationName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ public class GobblinYarnConfigurationKeys {

public static final String YARN_APPLICATION_LIB_JAR_LIST = GOBBLIN_YARN_PREFIX + "lib.jar.list";

/**
* Config for setting http/https proxy host and port.
* The value for this property is expected in format -DproxySet=true -Dhttps.proxyHost=[hostname] -Dhttps.proxyPort=[port]
*/
public static final String YARN_APPLICATION_PROXY_JVM_ARGS = GOBBLIN_YARN_PREFIX + "proxy.jvm.args";

// Used to store the start time of the app launcher to propagate to workers and appmaster
public static final String YARN_APPLICATION_LAUNCHER_START_TIME_KEY = GOBBLIN_YARN_PREFIX + "application.start.time";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public void setUp() throws Exception {
.resolve();
}

@Test
@Test(dependsOnMethods = "testCreateHelixCluster")
public void testBuildApplicationMasterCommand() {
String command = this.gobblinYarnAppLauncher.buildApplicationMasterCommand("application_1234_3456", 64);

Expand Down Expand Up @@ -355,6 +355,16 @@ public void testSendShutdownRequest() throws Exception {
assertWithBackoff.assertEquals(getInstanceMessageNum, 0, "all controller messages processed");
}

@Test(dependsOnMethods = "testCreateHelixCluster")
public void testProxyJvmArgs() throws IOException {
this.config = this.config.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS,
ConfigValueFactory.fromAnyRef("-Dhttp.proxyHost=foo-bar.baz.org -Dhttp.proxyPort=1234"));
this.gobblinYarnAppLauncher = new GobblinYarnAppLauncher(this.config, clusterConf);

String command = this.gobblinYarnAppLauncher.buildApplicationMasterCommand("application_1234_3456", 64);
Assert.assertTrue(command.contains("-Dhttp.proxyHost=foo-bar.baz.org -Dhttp.proxyPort=1234"));
}

@AfterClass
public void tearDown() throws IOException, TimeoutException {
try {
Expand Down

0 comments on commit 4bb0c40

Please sign in to comment.