From b34be7babbd1187b9ae3d061a2d23b8fb112f70e Mon Sep 17 00:00:00 2001
From: Arjun
Date: Sat, 15 Apr 2023 17:17:25 -0700
Subject: [PATCH] initiliaze yarn clients in yarn app launcher so that a child
class can override the yarn client creation logic
---
.../azkaban/AzkabanGobblinYarnAppLauncher.java | 4 +++-
.../apache/gobblin/yarn/GobblinYarnAppLauncher.java | 11 ++++++++---
.../gobblin/yarn/GobblinYarnAppLauncherTest.java | 2 ++
3 files changed, 13 insertions(+), 4 deletions(-)
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
index 59e7a584af0..8fe09e5a527 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
@@ -80,7 +80,9 @@ public AzkabanGobblinYarnAppLauncher(String jobId, Properties gobblinProps)
protected GobblinYarnAppLauncher getYarnAppLauncher(Config gobblinConfig)
throws IOException {
- return new GobblinYarnAppLauncher(gobblinConfig, this.yarnConfiguration);
+ GobblinYarnAppLauncher gobblinYarnAppLauncher = new GobblinYarnAppLauncher(gobblinConfig, this.yarnConfiguration);
+ gobblinYarnAppLauncher.initializeYarnClients(gobblinConfig);
+ return gobblinYarnAppLauncher;
}
/**
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index e4acb8682eb..99c4094df50 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -157,6 +157,10 @@
* this count exceeds the maximum number allowed, it will initiate a shutdown.
*
*
+ *
+ * Users of {@link GobblinYarnAppLauncher} need to call {@link #initializeYarnClients} which a child class can override.
+ *
+ *
* @author Yinan Li
*/
public class GobblinYarnAppLauncher {
@@ -260,7 +264,6 @@ public GobblinYarnAppLauncher(Config config, YarnConfiguration yarnConfiguration
YarnHelixUtils.setAdditionalYarnClassPath(config, this.yarnConfiguration);
this.yarnConfiguration.set("fs.automatic.close", "false");
this.originalYarnRMAddress = this.yarnConfiguration.get(GobblinYarnConfigurationKeys.YARN_RESOURCE_MANAGER_ADDRESS);
- createPotentialYarnClients(config, this.potentialYarnClients);
this.fs = GobblinClusterUtils.buildFileSystem(config, this.yarnConfiguration);
this.closer.register(this.fs);
@@ -323,14 +326,14 @@ public GobblinYarnAppLauncher(Config config, YarnConfiguration yarnConfiguration
}
}
- protected void createPotentialYarnClients(Config config, Map potentialYarnClients) {
+ public void initializeYarnClients(Config config) {
Set potentialRMAddresses = new HashSet<>(ConfigUtils.getStringList(config, GobblinYarnConfigurationKeys.OTHER_YARN_RESOURCE_MANAGER_ADDRESSES));
potentialRMAddresses.add(originalYarnRMAddress);
for (String rmAddress : potentialRMAddresses) {
YarnClient tmpYarnClient = YarnClient.createYarnClient();
this.yarnConfiguration.set(GobblinYarnConfigurationKeys.YARN_RESOURCE_MANAGER_ADDRESS, rmAddress);
tmpYarnClient.init(new YarnConfiguration(this.yarnConfiguration));
- potentialYarnClients.put(rmAddress, tmpYarnClient);
+ this.potentialYarnClients.put(rmAddress, tmpYarnClient);
}
}
@@ -1069,6 +1072,8 @@ static Config addMetricReportingDynamicConfig(Config config, KafkaAvroSchemaRegi
public static void main(String[] args) throws Exception {
final GobblinYarnAppLauncher gobblinYarnAppLauncher =
new GobblinYarnAppLauncher(ConfigFactory.load(), new YarnConfiguration());
+ gobblinYarnAppLauncher.initializeYarnClients(ConfigFactory.load());
+
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
index 95c109cfe6b..2eb8c4f32f5 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
@@ -199,6 +199,7 @@ public void setUp() throws Exception {
InstanceType.CONTROLLER, zkConnectionString);
this.gobblinYarnAppLauncher = new GobblinYarnAppLauncher(this.config, clusterConf);
+ this.gobblinYarnAppLauncher.initializeYarnClients(this.config);
this.configManagedHelix = ConfigFactory.parseURL(url)
.withValue("gobblin.cluster.zk.connection.string",
@@ -213,6 +214,7 @@ public void setUp() throws Exception {
InstanceType.PARTICIPANT, zkConnectionString);
this.gobblinYarnAppLauncherManagedHelix = new GobblinYarnAppLauncher(this.configManagedHelix, clusterConf);
+ this.gobblinYarnAppLauncherManagedHelix.initializeYarnClients(this.configManagedHelix);
}
@Test