Skip to content

Commit

Permalink
Implement GoT Dynamic auto-scaling using WorkUnitsSizeSummary-drive…
Browse files Browse the repository at this point in the history
…n linear heuristic
  • Loading branch information
phet committed Dec 15, 2024
1 parent 1274c47 commit 7caf9ee
Show file tree
Hide file tree
Showing 22 changed files with 636 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.activity;

import java.util.List;
import java.util.Properties;

import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;

import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.ddm.work.TimeBudget;
import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
import org.apache.gobblin.temporal.dynamic.ScalingDirective;



/**
* Activity to suggest the Dynamic Scaling warranted to complete processing of some amount of {@link org.apache.gobblin.source.workunit.WorkUnit}s
* within {@link TimeBudget}, through a combination of Workforce auto-scaling and Worker right-sizing.
*
* As with all {@link ActivityInterface}s, this is stateless, so the {@link ScalingDirective}(s) returned "stand alone", presuming nothing of current
* {@link org.apache.gobblin.temporal.dynamic.WorkforceStaffing}. It thus falls to the caller to coordinate whether to apply the directive(s) as-is,
* or first to adjust in light of scaling levels already in the current {@link org.apache.gobblin.temporal.dynamic.WorkforcePlan}.
*/
@ActivityInterface
public interface RecommendScalingForWorkUnits {

/**
* Recommend the {@link ScalingDirective}s to process the {@link WorkUnit}s of {@link WorkUnitsSizeSummary} within {@link TimeBudget}.
*
* @param remainingWork may characterize a newly-generated batch of `WorkUnit` for which no processing has yet begun - or be the sub-portion
* of an in-progress job that still awaits processing
* @param sourceClass contextualizes the `WorkUnitsSizeSummary` and should name a {@link org.apache.gobblin.source.Source}
* @param timeBudget the remaining target duration for processing the summarized `WorkUnit`s
* @param jobProps all job props, to either guide the recommendation or better contextualize the nature of the `remainingWork`
* @return the {@link ScalingDirective}s to process the summarized {@link WorkUnit}s within {@link TimeBudget}
*/
@ActivityMethod
List<ScalingDirective> recommendScaling(WorkUnitsSizeSummary remainingWork, String sourceClass, TimeBudget timeBudget, Properties jobProps);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.activity.impl;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Properties;

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.temporal.ddm.activity.RecommendScalingForWorkUnits;
import org.apache.gobblin.temporal.ddm.work.TimeBudget;
import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
import org.apache.gobblin.temporal.dynamic.ProfileDerivation;
import org.apache.gobblin.temporal.dynamic.ProfileOverlay;
import org.apache.gobblin.temporal.dynamic.ScalingDirective;
import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;


/**
* Skeletal impl handling all foundational concerns, but leaving it to a concrete impl to actually choose the auto-scaling
* {@link ScalingDirective#getSetPoint()} for the exactly one {@link ScalingDirective} being recommended.
*/
@Slf4j
public abstract class AbstractRecommendScalingForWorkUnitsImpl implements RecommendScalingForWorkUnits {

// TODO: decide whether this name ought to be configurable - or instead a predictable name that callers may expect (and possibly adjust)
public static final String DEFAULT_PROFILE_DERIVATION_NAME = "workUnitsProc";

@Override
public List<ScalingDirective> recommendScaling(WorkUnitsSizeSummary remainingWork, String sourceClass, TimeBudget timeBudget, Properties jobProps) {
// NOTE: no attempt to determine the current scaling - per `RecommendScalingForWorkUnits` javadoc, the `ScalingDirective`(s) returned must "stand alone",
// presuming nothing of the current `WorkforcePlan`'s `WorkforceStaffing`
JobState jobState = new JobState(jobProps);
ScalingDirective procWUsWorkerScaling = new ScalingDirective(
calcProfileDerivationName(jobState),
calcDerivationSetPoint(remainingWork, sourceClass, timeBudget, jobState),
System.currentTimeMillis(),
Optional.of(calcProfileDerivation(calcBasisProfileName(jobState), remainingWork, sourceClass, jobState))
);
log.info("Recommended re-scaling to process work units: {}", procWUsWorkerScaling);
return Arrays.asList(procWUsWorkerScaling);
}

protected abstract int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork, String sourceClass, TimeBudget timeBudget, JobState jobState);

protected ProfileDerivation calcProfileDerivation(String basisProfileName, WorkUnitsSizeSummary remainingWork, String sourceClass, JobState jobState) {
// TODO: implement right-sizing!!! (for now just return unchanged)
return new ProfileDerivation(basisProfileName, ProfileOverlay.unchanged());
}

protected String calcProfileDerivationName(JobState jobState) {
// TODO: if we ever return > 1 directive, append a monotonically increasing number to avoid collisions
return DEFAULT_PROFILE_DERIVATION_NAME;
}

protected String calcBasisProfileName(JobState jobState) {
return WorkforceProfiles.BASELINE_NAME; // always build upon baseline
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.activity.impl;

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.ddm.work.TimeBudget;
import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
import org.apache.gobblin.temporal.ddm.worker.WorkFulfillmentWorker;


/**
* Simple config-driven linear relationship between `remainingWork` and the resulting `setPoint`
*
*
* TODO: describe algo!!!!!
*/
@Slf4j
public class RecommendScalingForWorkUnitsLinearHeuristicImpl extends AbstractRecommendScalingForWorkUnitsImpl {

public static final String AMORTIZED_NUM_BYTES_PER_MINUTE = GobblinTemporalConfigurationKeys.PREFIX + "dynamic.scaling.reference.numBytesPerMinute";
public static final long DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE = 10 * 1000L * 1000L * 60L; // 10MB/sec

@Override
protected int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork, String sourceClass, TimeBudget timeBudget, JobState jobState) {
// for simplicity, for now, consider only top-level work units (aka. `MultiWorkUnit`s - MWUs)
long numMWUs = remainingWork.getTopLevelWorkUnitsCount();
double meanBytesPerMWU = remainingWork.getTopLevelWorkUnitsMeanSize();
int numSimultaneousMWUsPerContainer = calcPerContainerWUCapacity(jobState); // (a worker-thread is a slot for top-level (MWUs) - not constituent sub-WUs)
long bytesPerMinuteProcRate = calcAmortizedBytesPerMinute(jobState);
log.info("Calculating auto-scaling (for {} remaining work units within {}) using: bytesPerMinuteProcRate = {}; meanBytesPerMWU = {}",
numMWUs, timeBudget, bytesPerMinuteProcRate, meanBytesPerMWU);

// calc how many container*minutes to process all MWUs, based on mean MWU size
double minutesProcTimeForMeanMWU = meanBytesPerMWU / bytesPerMinuteProcRate;
double meanMWUsThroughputPerContainerMinute = numSimultaneousMWUsPerContainer / minutesProcTimeForMeanMWU;
double estContainerMinutesForAllMWUs = numMWUs / meanMWUsThroughputPerContainerMinute;

long targetNumMinutes = timeBudget.getMaxDurationDesiredMinutes();
// TODO: take into account `timeBudget.getPermittedOverageMinutes()` - e.g. to decide whether to use `Math.ceil` vs. `Math.floor`

// TODO: decide how to account for container startup; working est. for GoT-on-YARN ~ 3 mins (req to alloc ~ 30s; alloc to workers ready ~ 2.5m)
// e.g. can we amortize away / ignore when `targetNumMinutes >> workerRequestToReadyNumMinutes`?
// TODO take into account that MWUs are quantized into discrete chunks; this est. uses avg and presumes to divide partial MWUs amongst workers
// can we we mostly ignore if we keep MWU "chunk size" "small-ish", like maybe even just `duration(max(MWU)) <= targetNumMinutes/2)`?

int recommendedNumContainers = (int) Math.floor(estContainerMinutesForAllMWUs / targetNumMinutes);
log.info("Recommended auto-scaling: {} containers, given: minutesToProc(mean(MWUs)) = {}; throughput = {} (MWUs / container*minute); "
+ "est. container*minutes to complete ALL ({}) MWUs = {}",
recommendedNumContainers, minutesProcTimeForMeanMWU, meanMWUsThroughputPerContainerMinute, numMWUs, estContainerMinutesForAllMWUs);
return recommendedNumContainers;
}

protected int calcPerContainerWUCapacity(JobState jobState) {
int numWorkersPerContainer = jobState.getPropAsInt(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_WORKERS_PER_CONTAINER,
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_WORKERS_PER_CONTAINERS);
int numThreadsPerWorker = WorkFulfillmentWorker.MAX_EXECUTION_CONCURRENCY; // TODO: get from config, once that's implemented
return numWorkersPerContainer * numThreadsPerWorker;
}

protected long calcAmortizedBytesPerMinute(JobState jobState) {
return jobState.getPropAsLong(AMORTIZED_NUM_BYTES_PER_MINUTE, DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
public class JobStateUtils {
public static final String INPUT_DIR_NAME = "input"; // following MRJobLauncher.INPUT_DIR_NAME
public static final String OUTPUT_DIR_NAME = "output"; // following MRJobLauncher.OUTPUT_DIR_NAME
public static final String DYNAMIC_SCALING_RELATIVE_DIR_PATH = "dynamic-scaling/directives";
public static final String DYNAMIC_SCALING_ERRORS_RELATIVE_DIR_PATH = "dynamic-scaling/dropped-directives";
public static final boolean DEFAULT_WRITE_PREVIOUS_WORKUNIT_STATES = true;

// reuse same handle among activities executed by the same worker
Expand Down Expand Up @@ -141,6 +143,38 @@ public static Path getWorkUnitsPath(Path workDirRoot) {
return new Path(workDirRoot, INPUT_DIR_NAME);
}

/**
* ATTENTION: derives path according to {@link org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
* {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
* @return {@link Path} where {@link org.apache.gobblin.temporal.dynamic.ScalingDirective}s should reside
*/
public static Path getDynamicScalingPath(JobState jobState) {
return getDynamicScalingPath(getWorkDirRoot(jobState));
}

/**
* @return {@link Path} where {@link org.apache.gobblin.temporal.dynamic.ScalingDirective}s should reside
*/
public static Path getDynamicScalingPath(Path workDirRoot) {
return new Path(workDirRoot, DYNAMIC_SCALING_RELATIVE_DIR_PATH);
}

/**
* ATTENTION: derives path according to {@link org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
* {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
* @return {@link Path} where any {@link org.apache.gobblin.temporal.dynamic.ScalingDirective} errors should be placed
*/
public static Path getDynamicScalingErrorsPath(JobState jobState) {
return getDynamicScalingErrorsPath(getWorkDirRoot(jobState));
}

/**
* @return {@link Path} where any {@link org.apache.gobblin.temporal.dynamic.ScalingDirective} errors should be placed
*/
public static Path getDynamicScalingErrorsPath(Path workDirRoot) {
return new Path(workDirRoot, DYNAMIC_SCALING_ERRORS_RELATIVE_DIR_PATH);
}

/**
* ATTENTION: derives path according to {@link org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
* {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
public class GenerateWorkUnitsResult {
// NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite - "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor"
@NonNull private int generatedWuCount;
// TODO: characterize the WUs more thoroughly, by also including destination info, and with more specifics, like src+dest location, I/O config, throttling...
@NonNull private String sourceClass;
@NonNull private WorkUnitsSizeSummary workUnitsSizeSummary;
// Resources that the Temporal Job Launcher should clean up for Gobblin temporary work directory paths in writers
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.work;

import lombok.AccessLevel;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.Setter;


/**
* Duration for whatever work to complete, with a permitted overage to indicate firm-ness/soft-ness.
* Values are in minutes, befitting the granularity of inevitable companion activities, like:
* - network operations - opening connections, I/O, retries
* - starting/scaling workers
*/
@Data
@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@RequiredArgsConstructor
public class TimeBudget {
// NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite - "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor"
@NonNull private long maxDurationDesiredMinutes;
@NonNull private long permittedOverageMinutes;

/** construct w/ {@link #permittedOverageMinutes} expressed as a percentage of {@link #maxDurationDesiredMinutes} */
public static TimeBudget withOveragePercentage(long maxDurationDesiredMinutes, double permittedOveragePercentage) {
return new TimeBudget(maxDurationDesiredMinutes, (long) (maxDurationDesiredMinutes * permittedOveragePercentage));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@
* Conveys a {@link org.apache.gobblin.source.workunit.WorkUnit} by claim-check, where the `workUnitPath` is resolved
* against the {@link org.apache.hadoop.fs.FileSystem} given by `nameNodeUri`. see:
* @see <a href="https://learn.microsoft.com/en-us/azure/architecture/patterns/claim-check">Claim-Check Pattern</a>
*
* TODO: if we're to generalize Work Prediction+Prioritization across multiplexed jobs, each having its own separate time budget, every WU claim-check
* standing on its own would allow an external observer to inspect only the task queue w/o correlation between workflow histories. For that, decide whether
* to add job-identifying metadata here or just tack on time budget (aka. SLA deadline) info. Either could be tunneled within the filename in the manner
* of `JobLauncherUtils.WorkUnitPathCalculator.calcNextPathWithTunneledSizeInfo` - in fact, by convention, the job ID / flow ID already is... we just don't
* recover it herein.
*/
@Data
@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import lombok.RequiredArgsConstructor;
import lombok.Setter;

import com.fasterxml.jackson.annotation.JsonIgnore;

import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;

Expand All @@ -50,4 +52,24 @@ public class WorkUnitsSizeSummary {
@NonNull private double quantilesWidth;
@NonNull private List<Double> topLevelQuantilesMinSizes;
@NonNull private List<Double> constituentQuantilesMinSizes;

@JsonIgnore // (because no-arg method resembles 'java bean property')
public double getTopLevelWorkUnitsMeanSize() {
return this.totalSize / this.topLevelWorkUnitsCount;
}

@JsonIgnore // (because no-arg method resembles 'java bean property')
public double getConstituentWorkUnitsMeanSize() {
return this.totalSize / this.constituentWorkUnitsCount;
}

@JsonIgnore // (because no-arg method resembles 'java bean property')
public double getTopLevelWorkUnitsMedianSize() {
return this.topLevelQuantilesMinSizes.get(this.quantilesCount / 2);
}

@JsonIgnore // (because no-arg method resembles 'java bean property')
public double getConstituentWorkUnitsMedianSize() {
return this.topLevelQuantilesMinSizes.get(this.quantilesCount / 2);
}
}
Loading

0 comments on commit 7caf9ee

Please sign in to comment.