Skip to content

Commit

Permalink
[GOBBLIN-2174] Define DynamicScalingYarnService for the GoT workfor…
Browse files Browse the repository at this point in the history
…ce via `ScalingDirective`s (#4077)
  • Loading branch information
Blazer-007 authored Dec 9, 2024
1 parent 2142d22 commit 7309060
Show file tree
Hide file tree
Showing 13 changed files with 816 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,9 @@ public interface GobblinTemporalConfigurationKeys {
String TEMPORAL_NUM_WORKERS_PER_CONTAINER = PREFIX + "num.workers.per.container";
int DEFAULT_TEMPORAL_NUM_WORKERS_PER_CONTAINERS = 1;
String TEMPORAL_CONNECTION_STRING = PREFIX + "connection.string";

/**
* Prefix for Gobblin-on-Temporal Dynamic Scaling
*/
String DYNAMIC_SCALING_PREFIX = PREFIX + "dynamic.scaling.";
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,23 @@
package org.apache.gobblin.temporal.dynamic;

import com.typesafe.config.Config;
import lombok.AllArgsConstructor;
import lombok.Data;


/** A named worker {@link Config} */
@Data
@AllArgsConstructor
public class WorkerProfile {
private final String name;
private final Config config;

/**
* Constructs a `WorkerProfile` with the baseline name and the specified configuration.
*
* @param config the configuration for the worker profile
*/
public WorkerProfile(Config config) {
this(WorkforceProfiles.BASELINE_NAME, config);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.yarn;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.collections.CollectionUtils;

import com.typesafe.config.Config;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.AbstractIdleService;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.temporal.dynamic.ScalingDirective;
import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;

/**
* This class manages the dynamic scaling of the {@link YarnService} by periodically polling for scaling directives and passing
* the latest scaling directives to the {@link DynamicScalingYarnService} for processing.
*
* This is an abstract class that provides the basic functionality for managing dynamic scaling. Subclasses should implement
* {@link #createScalingDirectiveSource()} to provide a {@link ScalingDirectiveSource} that will be used to get scaling directives.
*
* The actual implemented class needs to be passed as value of config {@link org.apache.gobblin.yarn.GobblinYarnConfigurationKeys#APP_MASTER_SERVICE_CLASSES}
*/
@Slf4j
public abstract class AbstractDynamicScalingYarnServiceManager extends AbstractIdleService {

protected final static String DYNAMIC_SCALING_POLLING_INTERVAL = GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "polling.interval";
private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
protected final Config config;
private final DynamicScalingYarnService dynamicScalingYarnService;
private final ScheduledExecutorService dynamicScalingExecutor;

public AbstractDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster appMaster) {
this.config = appMaster.getConfig();
if (appMaster.get_yarnService() instanceof DynamicScalingYarnService) {
this.dynamicScalingYarnService = (DynamicScalingYarnService) appMaster.get_yarnService();
} else {
String errorMsg = "Failure while getting YarnService Instance from GobblinTemporalApplicationMaster::get_yarnService()"
+ " YarnService {" + appMaster.get_yarnService().getClass().getName() + "} is not an instance of DynamicScalingYarnService";
log.error(errorMsg);
throw new RuntimeException(errorMsg);
}
this.dynamicScalingExecutor = Executors.newSingleThreadScheduledExecutor(
ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("DynamicScalingExecutor")));
}

@Override
protected void startUp() {
int scheduleInterval = ConfigUtils.getInt(this.config, DYNAMIC_SCALING_POLLING_INTERVAL,
DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS);
log.info("Starting the {} with re-scaling interval of {} seconds", this.getClass().getSimpleName(), scheduleInterval);

this.dynamicScalingExecutor.scheduleAtFixedRate(
new GetScalingDirectivesRunnable(this.dynamicScalingYarnService, createScalingDirectiveSource()),
scheduleInterval, scheduleInterval, TimeUnit.SECONDS
);
}

@Override
protected void shutDown() {
log.info("Stopping the " + this.getClass().getSimpleName());
ExecutorsUtils.shutdownExecutorService(this.dynamicScalingExecutor, Optional.of(log));
}

/**
* Create a {@link ScalingDirectiveSource} to use for getting scaling directives.
*/
protected abstract ScalingDirectiveSource createScalingDirectiveSource();

/**
* A {@link Runnable} that gets the scaling directives from the {@link ScalingDirectiveSource} and passes them to the
* {@link DynamicScalingYarnService} for processing.
*/
@AllArgsConstructor
static class GetScalingDirectivesRunnable implements Runnable {
private final DynamicScalingYarnService dynamicScalingYarnService;
private final ScalingDirectiveSource scalingDirectiveSource;

@Override
public void run() {
try {
List<ScalingDirective> scalingDirectives = scalingDirectiveSource.getScalingDirectives();
if (CollectionUtils.isNotEmpty(scalingDirectives)) {
dynamicScalingYarnService.reviseWorkforcePlanAndRequestNewContainers(scalingDirectives);
}
} catch (IOException e) {
log.error("Failed to get scaling directives", e);
} catch (Throwable t) {
log.error("Unexpected error with dynamic scaling via directives", t);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.yarn;

import java.util.List;

import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

import com.google.common.eventbus.EventBus;
import com.typesafe.config.Config;

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.temporal.dynamic.ScalingDirective;
import org.apache.gobblin.temporal.dynamic.StaffingDeltas;
import org.apache.gobblin.temporal.dynamic.WorkerProfile;
import org.apache.gobblin.temporal.dynamic.WorkforcePlan;
import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;
import org.apache.gobblin.temporal.dynamic.WorkforceStaffing;
import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;

/**
* Service for dynamically scaling Gobblin containers running on YARN.
* This service manages workforce staffing and plans, and requests new containers as needed.
*/
@Slf4j
public class DynamicScalingYarnService extends YarnService {

/** this holds the current count of containers already requested for each worker profile */
private final WorkforceStaffing actualWorkforceStaffing;
/** this holds the current total workforce plan as per latest received scaling directives */
private final WorkforcePlan workforcePlan;

public DynamicScalingYarnService(Config config, String applicationName, String applicationId,
YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) throws Exception {
super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus);

this.actualWorkforceStaffing = WorkforceStaffing.initialize(0);
this.workforcePlan = new WorkforcePlan(this.config, this.config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY));
}

@Override
protected synchronized void requestInitialContainers() {
StaffingDeltas deltas = this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing);
requestNewContainersForStaffingDeltas(deltas);
}

/**
* Revises the workforce plan and requests new containers based on the given scaling directives.
*
* @param scalingDirectives the list of scaling directives
*/
public synchronized void reviseWorkforcePlanAndRequestNewContainers(List<ScalingDirective> scalingDirectives) {
if (CollectionUtils.isEmpty(scalingDirectives)) {
return;
}
this.workforcePlan.reviseWhenNewer(scalingDirectives);
StaffingDeltas deltas = this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing);
requestNewContainersForStaffingDeltas(deltas);
}

private synchronized void requestNewContainersForStaffingDeltas(StaffingDeltas deltas) {
deltas.getPerProfileDeltas().forEach(profileDelta -> {
if (profileDelta.getDelta() > 0) { // scale up!
WorkerProfile workerProfile = profileDelta.getProfile();
String profileName = workerProfile.getName();
int currNumContainers = this.actualWorkforceStaffing.getStaffing(profileName).orElse(0);
int delta = profileDelta.getDelta();
log.info("Requesting {} new containers for profile {} having currently {} containers", delta,
WorkforceProfiles.renderName(profileName), currNumContainers);
requestContainersForWorkerProfile(workerProfile, delta);
// update our staffing after requesting new containers
this.actualWorkforceStaffing.reviseStaffing(profileName, currNumContainers + delta, System.currentTimeMillis());
} else if (profileDelta.getDelta() < 0) { // scale down!
// TODO: Decide how to handle negative deltas
log.warn("Handling of Negative delta is not supported yet : Profile {} delta {} ",
profileDelta.getProfile().getName(), profileDelta.getDelta());
} // else, already at staffing plan (or at least have requested, so in-progress)
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.yarn;

import java.util.Optional;

import org.apache.hadoop.fs.FileSystem;

import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource;
import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;

/**
* {@link FsScalingDirectiveSource} based implementation of {@link AbstractDynamicScalingYarnServiceManager}.
*/
public class FsSourceDynamicScalingYarnServiceManager extends AbstractDynamicScalingYarnServiceManager {
// TODO: replace fetching of these configs using a new method similar to JobStateUtils::getWorkDirRoot
public final static String DYNAMIC_SCALING_DIRECTIVES_DIR = GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "directives.dir";
public final static String DYNAMIC_SCALING_ERRORS_DIR = GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "errors.dir";
private final FileSystem fs;

public FsSourceDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster appMaster) {
super(appMaster);
this.fs = appMaster.getFs();
}

@Override
protected ScalingDirectiveSource createScalingDirectiveSource() {
return new FsScalingDirectiveSource(
this.fs,
this.config.getString(DYNAMIC_SCALING_DIRECTIVES_DIR),
Optional.ofNullable(this.config.getString(DYNAMIC_SCALING_ERRORS_DIR))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public GobblinTemporalApplicationMaster(String applicationName, String applicati
protected YarnService buildTemporalYarnService(Config config, String applicationName, String applicationId,
YarnConfiguration yarnConfiguration, FileSystem fs)
throws Exception {
return new YarnService(config, applicationName, applicationId, yarnConfiguration, fs, this.eventBus);
return new DynamicScalingYarnService(config, applicationName, applicationId, yarnConfiguration, fs, this.eventBus);
}

/**
Expand Down
Loading

0 comments on commit 7309060

Please sign in to comment.