diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java index 59e7a584af0..8fe09e5a527 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java @@ -80,7 +80,9 @@ public AzkabanGobblinYarnAppLauncher(String jobId, Properties gobblinProps) protected GobblinYarnAppLauncher getYarnAppLauncher(Config gobblinConfig) throws IOException { - return new GobblinYarnAppLauncher(gobblinConfig, this.yarnConfiguration); + GobblinYarnAppLauncher gobblinYarnAppLauncher = new GobblinYarnAppLauncher(gobblinConfig, this.yarnConfiguration); + gobblinYarnAppLauncher.initializeYarnClients(gobblinConfig); + return gobblinYarnAppLauncher; } /** diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java index e4acb8682eb..99c4094df50 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java @@ -157,6 +157,10 @@ * this count exceeds the maximum number allowed, it will initiate a shutdown. *

* + *

+ * Users of {@link GobblinYarnAppLauncher} need to call {@link #initializeYarnClients} which a child class can override. + *

+ * * @author Yinan Li */ public class GobblinYarnAppLauncher { @@ -260,7 +264,6 @@ public GobblinYarnAppLauncher(Config config, YarnConfiguration yarnConfiguration YarnHelixUtils.setAdditionalYarnClassPath(config, this.yarnConfiguration); this.yarnConfiguration.set("fs.automatic.close", "false"); this.originalYarnRMAddress = this.yarnConfiguration.get(GobblinYarnConfigurationKeys.YARN_RESOURCE_MANAGER_ADDRESS); - createPotentialYarnClients(config, this.potentialYarnClients); this.fs = GobblinClusterUtils.buildFileSystem(config, this.yarnConfiguration); this.closer.register(this.fs); @@ -323,14 +326,14 @@ public GobblinYarnAppLauncher(Config config, YarnConfiguration yarnConfiguration } } - protected void createPotentialYarnClients(Config config, Map potentialYarnClients) { + public void initializeYarnClients(Config config) { Set potentialRMAddresses = new HashSet<>(ConfigUtils.getStringList(config, GobblinYarnConfigurationKeys.OTHER_YARN_RESOURCE_MANAGER_ADDRESSES)); potentialRMAddresses.add(originalYarnRMAddress); for (String rmAddress : potentialRMAddresses) { YarnClient tmpYarnClient = YarnClient.createYarnClient(); this.yarnConfiguration.set(GobblinYarnConfigurationKeys.YARN_RESOURCE_MANAGER_ADDRESS, rmAddress); tmpYarnClient.init(new YarnConfiguration(this.yarnConfiguration)); - potentialYarnClients.put(rmAddress, tmpYarnClient); + this.potentialYarnClients.put(rmAddress, tmpYarnClient); } } @@ -1069,6 +1072,8 @@ static Config addMetricReportingDynamicConfig(Config config, KafkaAvroSchemaRegi public static void main(String[] args) throws Exception { final GobblinYarnAppLauncher gobblinYarnAppLauncher = new GobblinYarnAppLauncher(ConfigFactory.load(), new YarnConfiguration()); + gobblinYarnAppLauncher.initializeYarnClients(ConfigFactory.load()); + Runtime.getRuntime().addShutdownHook(new Thread() { @Override diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java index 95c109cfe6b..2eb8c4f32f5 100644 --- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java +++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java @@ -199,6 +199,7 @@ public void setUp() throws Exception { InstanceType.CONTROLLER, zkConnectionString); this.gobblinYarnAppLauncher = new GobblinYarnAppLauncher(this.config, clusterConf); + this.gobblinYarnAppLauncher.initializeYarnClients(this.config); this.configManagedHelix = ConfigFactory.parseURL(url) .withValue("gobblin.cluster.zk.connection.string", @@ -213,6 +214,7 @@ public void setUp() throws Exception { InstanceType.PARTICIPANT, zkConnectionString); this.gobblinYarnAppLauncherManagedHelix = new GobblinYarnAppLauncher(this.configManagedHelix, clusterConf); + this.gobblinYarnAppLauncherManagedHelix.initializeYarnClients(this.configManagedHelix); } @Test