Skip to content

Commit

Permalink
initiliaze yarn clients in yarn app launcher so that a child class ca…
Browse files Browse the repository at this point in the history
…n override the yarn client creation logic
  • Loading branch information
arjun4084346 committed Apr 17, 2023
1 parent 27dea4a commit b34be7b
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ public AzkabanGobblinYarnAppLauncher(String jobId, Properties gobblinProps)

protected GobblinYarnAppLauncher getYarnAppLauncher(Config gobblinConfig)
throws IOException {
return new GobblinYarnAppLauncher(gobblinConfig, this.yarnConfiguration);
GobblinYarnAppLauncher gobblinYarnAppLauncher = new GobblinYarnAppLauncher(gobblinConfig, this.yarnConfiguration);
gobblinYarnAppLauncher.initializeYarnClients(gobblinConfig);
return gobblinYarnAppLauncher;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@
* this count exceeds the maximum number allowed, it will initiate a shutdown.
* </p>
*
* <p>
* Users of {@link GobblinYarnAppLauncher} need to call {@link #initializeYarnClients} which a child class can override.
* </p>
*
* @author Yinan Li
*/
public class GobblinYarnAppLauncher {
Expand Down Expand Up @@ -260,7 +264,6 @@ public GobblinYarnAppLauncher(Config config, YarnConfiguration yarnConfiguration
YarnHelixUtils.setAdditionalYarnClassPath(config, this.yarnConfiguration);
this.yarnConfiguration.set("fs.automatic.close", "false");
this.originalYarnRMAddress = this.yarnConfiguration.get(GobblinYarnConfigurationKeys.YARN_RESOURCE_MANAGER_ADDRESS);
createPotentialYarnClients(config, this.potentialYarnClients);

this.fs = GobblinClusterUtils.buildFileSystem(config, this.yarnConfiguration);
this.closer.register(this.fs);
Expand Down Expand Up @@ -323,14 +326,14 @@ public GobblinYarnAppLauncher(Config config, YarnConfiguration yarnConfiguration
}
}

protected void createPotentialYarnClients(Config config, Map<String, YarnClient> potentialYarnClients) {
public void initializeYarnClients(Config config) {
Set<String> potentialRMAddresses = new HashSet<>(ConfigUtils.getStringList(config, GobblinYarnConfigurationKeys.OTHER_YARN_RESOURCE_MANAGER_ADDRESSES));
potentialRMAddresses.add(originalYarnRMAddress);
for (String rmAddress : potentialRMAddresses) {
YarnClient tmpYarnClient = YarnClient.createYarnClient();
this.yarnConfiguration.set(GobblinYarnConfigurationKeys.YARN_RESOURCE_MANAGER_ADDRESS, rmAddress);
tmpYarnClient.init(new YarnConfiguration(this.yarnConfiguration));
potentialYarnClients.put(rmAddress, tmpYarnClient);
this.potentialYarnClients.put(rmAddress, tmpYarnClient);
}
}

Expand Down Expand Up @@ -1069,6 +1072,8 @@ static Config addMetricReportingDynamicConfig(Config config, KafkaAvroSchemaRegi
public static void main(String[] args) throws Exception {
final GobblinYarnAppLauncher gobblinYarnAppLauncher =
new GobblinYarnAppLauncher(ConfigFactory.load(), new YarnConfiguration());
gobblinYarnAppLauncher.initializeYarnClients(ConfigFactory.load());

Runtime.getRuntime().addShutdownHook(new Thread() {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ public void setUp() throws Exception {
InstanceType.CONTROLLER, zkConnectionString);

this.gobblinYarnAppLauncher = new GobblinYarnAppLauncher(this.config, clusterConf);
this.gobblinYarnAppLauncher.initializeYarnClients(this.config);

this.configManagedHelix = ConfigFactory.parseURL(url)
.withValue("gobblin.cluster.zk.connection.string",
Expand All @@ -213,6 +214,7 @@ public void setUp() throws Exception {
InstanceType.PARTICIPANT, zkConnectionString);

this.gobblinYarnAppLauncherManagedHelix = new GobblinYarnAppLauncher(this.configManagedHelix, clusterConf);
this.gobblinYarnAppLauncherManagedHelix.initializeYarnClients(this.configManagedHelix);
}

@Test
Expand Down

0 comments on commit b34be7b

Please sign in to comment.