Skip to content

Commit

Permalink
Implement GoT Dynamic Auto-Scaling using heuristics based on `WorkUni…
Browse files Browse the repository at this point in the history
…tsSizeSummary`
  • Loading branch information
phet committed Dec 13, 2024
1 parent 1274c47 commit 2413e44
Show file tree
Hide file tree
Showing 19 changed files with 598 additions and 49 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.activity;

import java.util.List;
import java.util.Properties;

import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;

import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.ddm.work.TimeBudget;
import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
import org.apache.gobblin.temporal.dynamic.ScalingDirective;



/**
* Activity to suggest the Dynamic Scaling warranted to complete processing of some amount of {@link org.apache.gobblin.source.workunit.WorkUnit}s
* within {@link TimeBudget}, through a combination of Workforce auto-scaling and Worker right-sizing.
*
* As with all {@link ActivityInterface}s, this is stateless, so the {@link ScalingDirective}(s) returned "stand alone", presuming nothing of current
* {@link org.apache.gobblin.temporal.dynamic.WorkforceStaffing}. It thus falls to the caller to coordinate whether to apply the directive(s) as-is,
* or first to adjust in light of scaling levels already in the current {@link org.apache.gobblin.temporal.dynamic.WorkforcePlan}.
*/
@ActivityInterface
public interface RecommendScalingForWorkUnits {

/**
* Recommend the {@link ScalingDirective}s to process the {@link WorkUnit}s of {@link WorkUnitsSizeSummary} within {@link TimeBudget}.
*
* @param remainingWork may characterize a newly-generated batch of `WorkUnit` for which no processing has yet begun - or be the sub-portion
* of an in-progress job that still awaits processing
* @param sourceClass contextualizes the `WorkUnitsSizeSummary` and should name a {@link org.apache.gobblin.source.Source}
* @param timeBudget the remaining target duration for processing the summarized `WorkUnit`s
* @param jobProps all job props, to either guide the recommendation or better contextualize the nature of the `remainingWork`
* @return the {@link ScalingDirective}s to process the summarized {@link WorkUnit}s within {@link TimeBudget}
*/
@ActivityMethod
List<ScalingDirective> recommendScaling(WorkUnitsSizeSummary remainingWork, String sourceClass, TimeBudget timeBudget, Properties jobProps);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.activity.impl;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Properties;

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.temporal.ddm.activity.RecommendScalingForWorkUnits;
import org.apache.gobblin.temporal.ddm.work.TimeBudget;
import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
import org.apache.gobblin.temporal.dynamic.ProfileDerivation;
import org.apache.gobblin.temporal.dynamic.ProfileOverlay;
import org.apache.gobblin.temporal.dynamic.ScalingDirective;
import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;


/**
* Skeletal impl handling all foundational concerns, but leaving it to a concrete impl to actually choose the auto-scaling
* {@link ScalingDirective#getSetPoint()} for the exactly one {@link ScalingDirective} being recommended.
*/
@Slf4j
public abstract class AbstractRecommendScalingForWorkUnitsImpl implements RecommendScalingForWorkUnits {

// TODO: decide whether this name ought to be configurable - or instead a predictable name that callers may expect (and possibly adjust)
public static final String DEFAULT_PROFILE_DERIVATION_NAME = "workUnitsProc";

@Override
public List<ScalingDirective> recommendScaling(WorkUnitsSizeSummary remainingWork, String sourceClass, TimeBudget timeBudget, Properties jobProps) {
// NOTE: no attempt to determine the current scaling - per `RecommendScalingForWorkUnits` javadoc, the `ScalingDirective`(s) returned must "stand alone",
// presuming nothing of the current `WorkforcePlan`'s `WorkforceStaffing`
JobState jobState = new JobState(jobProps);
ScalingDirective procWUsWorkerScaling = new ScalingDirective(
calcProfileDerivationName(jobState),
calcDerivationSetPoint(remainingWork, sourceClass, timeBudget, jobState),
System.currentTimeMillis(),
Optional.of(calcProfileDerivation(calcBasisProfileName(jobState), remainingWork, sourceClass, jobState))
);
log.info("Recommended re-scaling to process work units: {}", procWUsWorkerScaling);
return Arrays.asList(procWUsWorkerScaling);
}

protected abstract int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork, String sourceClass, TimeBudget timeBudget, JobState jobState);

protected ProfileDerivation calcProfileDerivation(String basisProfileName, WorkUnitsSizeSummary remainingWork, String sourceClass, JobState jobState) {
// TODO: implement right-sizing!!! (for now just return unchanged)
return new ProfileDerivation(basisProfileName, ProfileOverlay.unchanged());
}

protected String calcProfileDerivationName(JobState jobState) {
// TODO: if we ever return > 1 directive, append a monotonically increasing number to avoid collisions
return DEFAULT_PROFILE_DERIVATION_NAME;
}

protected String calcBasisProfileName(JobState jobState) {
return WorkforceProfiles.BASELINE_NAME; // always build upon baseline
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.activity.impl;

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.ddm.work.TimeBudget;
import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
import org.apache.gobblin.temporal.ddm.worker.WorkFulfillmentWorker;


/**
* Simple config-driven linear relationship between `remainingWork` and the resulting `setPoint`
*
*
* TODO: describe algo!!!!!
*/
@Slf4j
public class RecommendScalingForWorkUnitsLinearHeuristicImpl extends AbstractRecommendScalingForWorkUnitsImpl {

public static final String AMORTIZED_NUM_BYTES_PER_MINUTE = GobblinTemporalConfigurationKeys.PREFIX + "dynamic.scaling.reference.numBytesPerMinute";
public static final long DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE = 10 * 1000L * 1000L * 60L; // 10MB/sec

@Override
protected int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork, String sourceClass, TimeBudget timeBudget, JobState jobState) {
int numSimultaneousWUsPerContainer = calcPerContainerWUCapacity(jobState);
long bytesPerMinute = calcAmortizedBytesPerMinute(jobState);
// for simplicity, for now, consider only top-level work units
long numWUs = remainingWork.getTopLevelWorkUnitsCount();
int medianQuantileIndex = remainingWork.getQuantilesCount() / 2;
double medianBytesPerWU = remainingWork.getTopLevelQuantilesMinSizes().get(medianQuantileIndex);
log.info("Calculating auto-scaling (for {} remaining work units within {}) using - bytesPerMinute = {}; medianBytesPerWU = {}",
numWUs, timeBudget, bytesPerMinute, medianBytesPerWU);

// calc how many container*minutes to process all WUs, based on the median WU size
double minutesProcTimeForMedianWU = medianBytesPerWU / bytesPerMinute;
double throughputWUsProcPerContainerMinute = numSimultaneousWUsPerContainer / minutesProcTimeForMedianWU;
double estContainerMinutesForAllWUs = numWUs / throughputWUsProcPerContainerMinute;

long targetNumMinutes = timeBudget.getMaxDurationDesiredMinutes();
// TODO: take into account `timeBudget.getPermittedOverageMinutes()` to decide whether to use `Math.ceil` instead
int recommendedNumContainers = (int) Math.floor(estContainerMinutesForAllWUs / targetNumMinutes);
log.info("Recommended auto-scaling: {} containers, given: minutesToProc(median(WUs)) = {}; throughput = {} (WUs / container*minute); est. container*minutes for all ({}) WUs = {}",
recommendedNumContainers, minutesProcTimeForMedianWU, throughputWUsProcPerContainerMinute, numWUs, estContainerMinutesForAllWUs);
return Math.max(recommendedNumContainers, 1);
}

protected int calcPerContainerWUCapacity(JobState jobState) {
int numWorkersPerContainer = jobState.getPropAsInt(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_WORKERS_PER_CONTAINER,
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_WORKERS_PER_CONTAINERS);
int numThreadsPerWorker = WorkFulfillmentWorker.MAX_EXECUTION_CONCURRENCY; // TODO: get from config, once that's implemented
return numWorkersPerContainer * numThreadsPerWorker;
}

protected long calcAmortizedBytesPerMinute(JobState jobState) {
return jobState.getPropAsLong(AMORTIZED_NUM_BYTES_PER_MINUTE, DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
public class JobStateUtils {
public static final String INPUT_DIR_NAME = "input"; // following MRJobLauncher.INPUT_DIR_NAME
public static final String OUTPUT_DIR_NAME = "output"; // following MRJobLauncher.OUTPUT_DIR_NAME
public static final String DYNAMIC_SCALING_RELATIVE_DIR_PATH = "dynamic-scaling/directives";
public static final String DYNAMIC_SCALING_ERRORS_RELATIVE_DIR_PATH = "dynamic-scaling/dropped-directives";
public static final boolean DEFAULT_WRITE_PREVIOUS_WORKUNIT_STATES = true;

// reuse same handle among activities executed by the same worker
Expand Down Expand Up @@ -141,6 +143,38 @@ public static Path getWorkUnitsPath(Path workDirRoot) {
return new Path(workDirRoot, INPUT_DIR_NAME);
}

/**
* ATTENTION: derives path according to {@link org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
* {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
* @return {@link Path} where {@link org.apache.gobblin.temporal.dynamic.ScalingDirective}s should reside
*/
public static Path getDynamicScalingPath(JobState jobState) {
return getDynamicScalingPath(getWorkDirRoot(jobState));
}

/**
* @return {@link Path} where {@link org.apache.gobblin.temporal.dynamic.ScalingDirective}s should reside
*/
public static Path getDynamicScalingPath(Path workDirRoot) {
return new Path(workDirRoot, DYNAMIC_SCALING_RELATIVE_DIR_PATH);
}

/**
* ATTENTION: derives path according to {@link org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
* {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
* @return {@link Path} where any {@link org.apache.gobblin.temporal.dynamic.ScalingDirective} errors should be placed
*/
public static Path getDynamicScalingErrorsPath(JobState jobState) {
return getDynamicScalingErrorsPath(getWorkDirRoot(jobState));
}

/**
* @return {@link Path} where any {@link org.apache.gobblin.temporal.dynamic.ScalingDirective} errors should be placed
*/
public static Path getDynamicScalingErrorsPath(Path workDirRoot) {
return new Path(workDirRoot, DYNAMIC_SCALING_ERRORS_RELATIVE_DIR_PATH);
}

/**
* ATTENTION: derives path according to {@link org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
* {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
public class GenerateWorkUnitsResult {
// NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite - "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor"
@NonNull private int generatedWuCount;
// TODO: characterize the WUs more thoroughly, by also including destination info, and with more specifics, like src+dest location, I/O config, throttling...
@NonNull private String sourceClass;
@NonNull private WorkUnitsSizeSummary workUnitsSizeSummary;
// Resources that the Temporal Job Launcher should clean up for Gobblin temporary work directory paths in writers
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.work;

import lombok.AccessLevel;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.Setter;


/**
* Duration for whatever work to complete, with a permitted overage to indicate firm-ness/soft-ness.
* Values are in minutes, befitting the granularity of inevitable companion activities, like:
* - network operations - opening connections, I/O, retries
* - starting/scaling workers
*/
@Data
@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@RequiredArgsConstructor
public class TimeBudget {
// NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite - "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor"
@NonNull private long maxDurationDesiredMinutes;
@NonNull private long permittedOverageMinutes;

/** construct w/ {@link #permittedOverageMinutes} expressed as a percentage of {@link #maxDurationDesiredMinutes} */
public static TimeBudget withOveragePercentage(long maxDurationDesiredMinutes, double permittedOveragePercentage) {
return new TimeBudget(maxDurationDesiredMinutes, (long) (maxDurationDesiredMinutes * permittedOveragePercentage));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.gobblin.temporal.ddm.activity.impl.DeleteWorkDirsActivityImpl;
import org.apache.gobblin.temporal.ddm.activity.impl.GenerateWorkUnitsImpl;
import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl;
import org.apache.gobblin.temporal.ddm.activity.impl.RecommendScalingForWorkUnitsLinearHeuristicImpl;
import org.apache.gobblin.temporal.ddm.workflow.impl.CommitStepWorkflowImpl;
import org.apache.gobblin.temporal.ddm.workflow.impl.ExecuteGobblinWorkflowImpl;
import org.apache.gobblin.temporal.ddm.workflow.impl.GenerateWorkUnitsWorkflowImpl;
Expand All @@ -48,14 +49,14 @@ public WorkFulfillmentWorker(Config config, WorkflowClient workflowClient) {

@Override
protected Class<?>[] getWorkflowImplClasses() {
return new Class[] { CommitStepWorkflowImpl.class, ExecuteGobblinWorkflowImpl.class, GenerateWorkUnitsWorkflowImpl.class,
NestingExecOfProcessWorkUnitWorkflowImpl.class, ProcessWorkUnitsWorkflowImpl.class };
return new Class[] { ExecuteGobblinWorkflowImpl.class, ProcessWorkUnitsWorkflowImpl.class, NestingExecOfProcessWorkUnitWorkflowImpl.class,
CommitStepWorkflowImpl.class, GenerateWorkUnitsWorkflowImpl.class };
}

@Override
protected Object[] getActivityImplInstances() {
return new Object[] { new CommitActivityImpl(), new DeleteWorkDirsActivityImpl(),new GenerateWorkUnitsImpl(),
new ProcessWorkUnitImpl(), new SubmitGTEActivityImpl()};
return new Object[] { new SubmitGTEActivityImpl(), new GenerateWorkUnitsImpl(), new RecommendScalingForWorkUnitsLinearHeuristicImpl(), new ProcessWorkUnitImpl(),
new CommitActivityImpl(), new DeleteWorkDirsActivityImpl() };
}

@Override
Expand Down
Loading

0 comments on commit 2413e44

Please sign in to comment.