diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index 7b33f1d3031..13a145a3e92 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -208,6 +208,8 @@ public class ConfigurationKeys { public static final String DEFAULT_FORK_OPERATOR_CLASS = "org.apache.gobblin.fork.IdentityForkOperator"; public static final String JOB_COMMIT_POLICY_KEY = "job.commit.policy"; public static final String DEFAULT_JOB_COMMIT_POLICY = "full"; + public static final String JOB_TARGET_COMPLETION_DURATION_IN_MINUTES_KEY = "job.duration.target.completion.in.minutes"; + public static final long DEFAULT_JOB_TARGET_COMPLETION_DURATION_IN_MINUTES = 360; public static final String PARTIAL_FAIL_TASK_FAILS_JOB_COMMIT = "job.commit.partial.fail.task.fails.job.commit"; // If true, commit of different datasets will be performed in parallel diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java index fd5c9bab666..ca6b80bd2ff 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java @@ -1060,7 +1060,8 @@ private void cleanLeftoverStagingData(WorkUnitStream workUnits, JobState jobStat try { if (!canCleanStagingData(jobState)) { - LOG.error("Job " + jobState.getJobName() + " has unfinished commit sequences. Will not clean up staging data."); + // TODO: decide whether should be `.warn`, stay as `.info`, or change back to `.error` + LOG.info("Job " + jobState.getJobName() + " has unfinished commit sequences. Will not clean up staging data."); return; } } catch (IOException e) { diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java index 76c5f56224a..6c513d4b9a5 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java @@ -17,10 +17,12 @@ package org.apache.gobblin.runtime; +import lombok.AccessLevel; import lombok.Data; import lombok.NoArgsConstructor; import lombok.NonNull; import lombok.RequiredArgsConstructor; +import lombok.Setter; import org.apache.gobblin.metrics.DatasetMetric; @@ -30,6 +32,7 @@ * that can be reported as a single event in the commit phase. */ @Data +@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization @RequiredArgsConstructor @NoArgsConstructor // IMPORTANT: for jackson (de)serialization public class DatasetTaskSummary { diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java index 3d51f15c19d..e90e901a56f 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java @@ -69,4 +69,7 @@ public interface GobblinTemporalConfigurationKeys { * Prefix for Gobblin-on-Temporal Dynamic Scaling */ String DYNAMIC_SCALING_PREFIX = PREFIX + "dynamic.scaling."; + + String DYNAMIC_SCALING_POLLING_INTERVAL_SECS = DYNAMIC_SCALING_PREFIX + "polling.interval.seconds"; + int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60; } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java index 19a65078909..82ee515ca23 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java @@ -92,6 +92,7 @@ public class GobblinTemporalClusterManager implements ApplicationLauncher, Stand @Getter protected final FileSystem fs; + @Getter protected final String applicationId; @Getter @@ -285,8 +286,11 @@ public Collection getStandardMetricsCollection() { * comment lifted from {@link org.apache.gobblin.cluster.GobblinClusterManager} * TODO for now the cluster id is hardcoded to 1 both here and in the {@link GobblinTaskRunner}. In the future, the * cluster id should be created by the {@link GobblinTemporalClusterManager} and passed to each {@link GobblinTaskRunner} + * + * NOTE: renamed from `getApplicationId` to avoid shadowing the `@Getter`-generated instance method of that name + * TODO: unravel what to make of the comment above. as it is, `GobblinTemporalApplicationMaster#main` is what runs, NOT `GobblinTemporalClusterManager#main` */ - private static String getApplicationId() { + private static String getApplicationIdStatic() { return "1"; } @@ -332,7 +336,7 @@ public static void main(String[] args) throws Exception { } try (GobblinTemporalClusterManager GobblinTemporalClusterManager = new GobblinTemporalClusterManager( - cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME), getApplicationId(), + cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME), getApplicationIdStatic(), config, Optional.absent())) { GobblinTemporalClusterManager.start(); } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/RecommendScalingForWorkUnits.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/RecommendScalingForWorkUnits.java new file mode 100644 index 00000000000..f3715a0d98e --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/RecommendScalingForWorkUnits.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.activity; + +import java.util.List; +import java.util.Properties; + +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; + +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.temporal.ddm.work.TimeBudget; +import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; + + + +/** + * Activity to suggest the Dynamic Scaling warranted to complete processing of some amount of {@link org.apache.gobblin.source.workunit.WorkUnit}s + * within {@link TimeBudget}, through a combination of Workforce auto-scaling and Worker right-sizing. + * + * As with all {@link ActivityInterface}s, this is stateless, so the {@link ScalingDirective}(s) returned "stand alone", presuming nothing of current + * {@link org.apache.gobblin.temporal.dynamic.WorkforceStaffing}. It thus falls to the caller to coordinate whether to apply the directive(s) as-is, + * or first to adjust in light of scaling levels already in the current {@link org.apache.gobblin.temporal.dynamic.WorkforcePlan}. + */ +@ActivityInterface +public interface RecommendScalingForWorkUnits { + + /** + * Recommend the {@link ScalingDirective}s to process the {@link WorkUnit}s of {@link WorkUnitsSizeSummary} within {@link TimeBudget}. + * + * @param remainingWork may characterize a newly-generated batch of `WorkUnit`s for which no processing has yet begun - or be the sub-portion + * of an in-progress job that still awaits processing + * @param sourceClass contextualizes the `WorkUnitsSizeSummary` and should name a {@link org.apache.gobblin.source.Source} + * @param timeBudget the remaining target duration for processing the summarized `WorkUnit`s + * @param jobProps all job props, to either guide the recommendation or better contextualize the nature of the `remainingWork` + * @return the {@link ScalingDirective}s to process the summarized {@link WorkUnit}s within {@link TimeBudget} + */ + @ActivityMethod + List recommendScaling(WorkUnitsSizeSummary remainingWork, String sourceClass, TimeBudget timeBudget, Properties jobProps); +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImpl.java new file mode 100644 index 00000000000..a0d3fd11e55 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImpl.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.activity.impl; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.temporal.ddm.activity.RecommendScalingForWorkUnits; +import org.apache.gobblin.temporal.ddm.work.TimeBudget; +import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary; +import org.apache.gobblin.temporal.dynamic.ProfileDerivation; +import org.apache.gobblin.temporal.dynamic.ProfileOverlay; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.WorkforceProfiles; + + +/** + * Skeletal impl handling all foundational concerns, but leaving it to a concrete impl to actually choose the auto-scaling + * {@link ScalingDirective#getSetPoint()} for the exactly one {@link ScalingDirective} being recommended. + */ +@Slf4j +public abstract class AbstractRecommendScalingForWorkUnitsImpl implements RecommendScalingForWorkUnits { + + // TODO: decide whether this name ought to be configurable - or instead a predictable name that callers may expect (and possibly adjust) + public static final String DEFAULT_PROFILE_DERIVATION_NAME = "workUnitsProc"; + + @Override + public List recommendScaling(WorkUnitsSizeSummary remainingWork, String sourceClass, TimeBudget timeBudget, Properties jobProps) { + // NOTE: no attempt to determine the current scaling - per `RecommendScalingForWorkUnits` javadoc, the `ScalingDirective`(s) returned must "stand alone", + // presuming nothing of the current `WorkforcePlan`'s `WorkforceStaffing` + JobState jobState = new JobState(jobProps); + ScalingDirective procWUsWorkerScaling = new ScalingDirective( + calcProfileDerivationName(jobState), + calcDerivationSetPoint(remainingWork, sourceClass, timeBudget, jobState), + System.currentTimeMillis(), + Optional.of(calcProfileDerivation(calcBasisProfileName(jobState), remainingWork, sourceClass, jobState)) + ); + log.info("Recommended re-scaling to process work units: {}", procWUsWorkerScaling); + return Arrays.asList(procWUsWorkerScaling); + } + + protected abstract int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork, String sourceClass, TimeBudget timeBudget, JobState jobState); + + protected ProfileDerivation calcProfileDerivation(String basisProfileName, WorkUnitsSizeSummary remainingWork, String sourceClass, JobState jobState) { + // TODO: implement right-sizing!!! (for now just return unchanged) + return new ProfileDerivation(basisProfileName, ProfileOverlay.unchanged()); + } + + protected String calcProfileDerivationName(JobState jobState) { + // TODO: if we ever return > 1 directive, append a monotonically increasing number to avoid collisions + return DEFAULT_PROFILE_DERIVATION_NAME; + } + + protected String calcBasisProfileName(JobState jobState) { + return WorkforceProfiles.BASELINE_NAME; // always build upon baseline + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java index 97699f2ce23..c077a62822e 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java @@ -28,10 +28,13 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import javax.annotation.Nullable; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import lombok.extern.slf4j.Slf4j; + import com.google.api.client.util.Lists; import com.google.common.base.Function; import com.google.common.base.Strings; @@ -39,8 +42,6 @@ import com.google.common.collect.Iterables; import io.temporal.failure.ApplicationFailure; -import javax.annotation.Nullable; -import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; import org.apache.gobblin.broker.iface.SharedResourcesBroker; @@ -81,8 +82,7 @@ public CommitStats commit(WUProcessingSpec workSpec) { int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS; Optional optJobName = Optional.empty(); AutomaticTroubleshooter troubleshooter = null; - try { - FileSystem fs = Help.loadFileSystem(workSpec); + try (FileSystem fs = Help.loadFileSystem(workSpec)) { JobState jobState = Help.loadJobState(workSpec, fs); optJobName = Optional.ofNullable(jobState.getJobName()); SharedResourcesBroker instanceBroker = JobStateUtils.getSharedResourcesBroker(jobState); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java index 4996c16c1cd..0a192a81bde 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java @@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import io.temporal.failure.ApplicationFailure; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -37,6 +36,7 @@ import com.google.common.base.Preconditions; import com.google.common.io.Closer; import com.tdunning.math.stats.TDigest; +import io.temporal.failure.ApplicationFailure; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; @@ -118,7 +118,7 @@ public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmi troubleshooter.start(); try (Closer closer = Closer.create()) { // before embarking on (potentially expensive) WU creation, first pre-check that the FS is available - FileSystem fs = JobStateUtils.openFileSystem(jobState); + FileSystem fs = closer.register(JobStateUtils.openFileSystem(jobState)); fs.mkdirs(workDirRoot); Set pathsToCleanUp = new HashSet<>(); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java new file mode 100644 index 00000000000..906ebe24266 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.activity.impl; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.temporal.ddm.work.TimeBudget; +import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary; +import org.apache.gobblin.temporal.ddm.worker.WorkFulfillmentWorker; + + +/** + * Simple config-driven linear recommendation for how many containers to use to complete the "remaining work" within a given {@link TimeBudget}, per: + * + * a. from {@link WorkUnitsSizeSummary}, find how many (remaining) "top-level" {@link org.apache.gobblin.source.workunit.MultiWorkUnit}s of some mean size + * b. from the configured {@link #AMORTIZED_NUM_BYTES_PER_MINUTE}, find the expected "processing rate" in bytes / minute + * 1. estimate the time required for processing a mean-sized `MultiWorkUnit` (MWU) + * c. from {@link JobState}, find per-container `MultiWorkUnit` parallelism capacity (aka. "worker-slots") to base the recommendation upon + * 2. calculate the per-container throughput of MWUs per minute + * 3. estimate the total per-container-minutes required to process all MWUs + * d. from the {@link TimeBudget}, find the target number of minutes in which to complete processing of all MWUs + * 4. recommend the number of containers so all MWU processing should finish within the target number of minutes + */ +@Slf4j +public class RecommendScalingForWorkUnitsLinearHeuristicImpl extends AbstractRecommendScalingForWorkUnitsImpl { + + public static final String AMORTIZED_NUM_BYTES_PER_MINUTE = GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "heuristic.params.numBytesPerMinute"; + public static final long DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE = 80 * 1000L * 1000L * 60L; // 80MB/sec + + @Override + protected int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork, String sourceClass, TimeBudget jobTimeBudget, JobState jobState) { + // for simplicity, for now, consider only top-level work units (aka. `MultiWorkUnit`s - MWUs) + long numMWUs = remainingWork.getTopLevelWorkUnitsCount(); + double meanBytesPerMWU = remainingWork.getTopLevelWorkUnitsMeanSize(); + int numSimultaneousMWUsPerContainer = calcPerContainerWUCapacity(jobState); // (a worker-thread is a slot for top-level (MWUs) - not constituent sub-WUs) + long bytesPerMinuteProcRate = calcAmortizedBytesPerMinute(jobState); + log.info("Calculating auto-scaling (for {} remaining work units within {}) using: bytesPerMinuteProcRate = {}; meanBytesPerMWU = {}", + numMWUs, jobTimeBudget, bytesPerMinuteProcRate, meanBytesPerMWU); + + // calc how many container*minutes to process all MWUs, based on mean MWU size + double minutesProcTimeForMeanMWU = meanBytesPerMWU / bytesPerMinuteProcRate; + double meanMWUsThroughputPerContainerMinute = numSimultaneousMWUsPerContainer / minutesProcTimeForMeanMWU; + double estContainerMinutesForAllMWUs = numMWUs / meanMWUsThroughputPerContainerMinute; + + long targetNumMinutesForAllMWUs = jobTimeBudget.getMaxTargetDurationMinutes(); + // TODO: take into account `jobTimeBudget.getPermittedOverageMinutes()` - e.g. to decide whether to use `Math.ceil` vs. `Math.floor` + + // TODO: decide how to account for container startup; working est. for GoT-on-YARN ~ 3 mins (req to alloc ~ 30s; alloc to workers ready ~ 2.5m) + // e.g. can we amortize away / ignore when `targetNumMinutesForAllMWUs >> workerRequestToReadyNumMinutes`? + // TODO take into account that MWUs are quantized into discrete chunks; this est. uses avg and presumes to divide partial MWUs amongst workers + // can we we mostly ignore if we keep MWU "chunk size" "small-ish", like maybe even just `duration(max(MWU)) <= targetNumMinutesForAllMWUs/2)`? + + int recommendedNumContainers = (int) Math.floor(estContainerMinutesForAllMWUs / targetNumMinutesForAllMWUs); + log.info("Recommended auto-scaling: {} containers, given: minutesToProc(mean(MWUs)) = {}; throughput = {} (MWUs / container*minute); " + + "est. container*minutes to complete ALL ({}) MWUs = {}", + recommendedNumContainers, minutesProcTimeForMeanMWU, meanMWUsThroughputPerContainerMinute, numMWUs, estContainerMinutesForAllMWUs); + return recommendedNumContainers; + } + + protected int calcPerContainerWUCapacity(JobState jobState) { + int numWorkersPerContainer = jobState.getPropAsInt(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_WORKERS_PER_CONTAINER, + GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_WORKERS_PER_CONTAINERS); + int numThreadsPerWorker = WorkFulfillmentWorker.MAX_EXECUTION_CONCURRENCY; // TODO: get from config, once that's implemented + return numWorkersPerContainer * numThreadsPerWorker; + } + + protected long calcAmortizedBytesPerMinute(JobState jobState) { + return jobState.getPropAsLong(AMORTIZED_NUM_BYTES_PER_MINUTE, DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE); + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java index 52da7b5299b..e6066019562 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java @@ -65,6 +65,8 @@ public class JobStateUtils { public static final String INPUT_DIR_NAME = "input"; // following MRJobLauncher.INPUT_DIR_NAME public static final String OUTPUT_DIR_NAME = "output"; // following MRJobLauncher.OUTPUT_DIR_NAME + public static final String DYNAMIC_SCALING_RELATIVE_DIR_PATH = "dynamic-scaling/directives"; + public static final String DYNAMIC_SCALING_ERRORS_RELATIVE_DIR_PATH = "dynamic-scaling/dropped-directives"; public static final boolean DEFAULT_WRITE_PREVIOUS_WORKUNIT_STATES = true; // reuse same handle among activities executed by the same worker @@ -141,6 +143,38 @@ public static Path getWorkUnitsPath(Path workDirRoot) { return new Path(workDirRoot, INPUT_DIR_NAME); } + /** + * ATTENTION: derives path according to {@link org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same + * {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY} + * @return {@link Path} where {@link org.apache.gobblin.temporal.dynamic.ScalingDirective}s should reside + */ + public static Path getDynamicScalingPath(JobState jobState) { + return getDynamicScalingPath(getWorkDirRoot(jobState)); + } + + /** + * @return {@link Path} where {@link org.apache.gobblin.temporal.dynamic.ScalingDirective}s should reside + */ + public static Path getDynamicScalingPath(Path workDirRoot) { + return new Path(workDirRoot, DYNAMIC_SCALING_RELATIVE_DIR_PATH); + } + + /** + * ATTENTION: derives path according to {@link org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same + * {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY} + * @return {@link Path} where any {@link org.apache.gobblin.temporal.dynamic.ScalingDirective} errors should be placed + */ + public static Path getDynamicScalingErrorsPath(JobState jobState) { + return getDynamicScalingErrorsPath(getWorkDirRoot(jobState)); + } + + /** + * @return {@link Path} where any {@link org.apache.gobblin.temporal.dynamic.ScalingDirective} errors should be placed + */ + public static Path getDynamicScalingErrorsPath(Path workDirRoot) { + return new Path(workDirRoot, DYNAMIC_SCALING_ERRORS_RELATIVE_DIR_PATH); + } + /** * ATTENTION: derives path according to {@link org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same * {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java index f9298311332..c65dc3c9487 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java @@ -20,10 +20,12 @@ import java.util.HashMap; import java.util.Map; +import lombok.AccessLevel; import lombok.Data; import lombok.NoArgsConstructor; import lombok.NonNull; import lombok.RequiredArgsConstructor; +import lombok.Setter; /** @@ -32,6 +34,7 @@ * and {@link org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow#commit(WUProcessingSpec)}. */ @Data +@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization @NoArgsConstructor // IMPORTANT: for jackson (de)serialization @RequiredArgsConstructor public class CommitStats { diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java index b795566bb1b..e6a0d9e4175 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java @@ -17,17 +17,19 @@ package org.apache.gobblin.temporal.ddm.work; +import lombok.AccessLevel; import lombok.Data; import lombok.NoArgsConstructor; import lombok.NonNull; import lombok.RequiredArgsConstructor; +import lombok.Setter; /** * Stats for a dataset that was committed. */ @Data -@NonNull +@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization @RequiredArgsConstructor @NoArgsConstructor // IMPORTANT: for jackson (de)serialization public class DatasetStats { diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DirDeletionResult.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DirDeletionResult.java index 33883432c84..90a7fe5db2e 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DirDeletionResult.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DirDeletionResult.java @@ -20,16 +20,19 @@ import java.util.HashMap; import java.util.Map; +import lombok.AccessLevel; import lombok.Data; import lombok.NoArgsConstructor; import lombok.NonNull; import lombok.RequiredArgsConstructor; +import lombok.Setter; /** * Data structure representing the stats for a cleaned up work directory, where it returns a map of directories the result of their cleanup */ @Data +@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization @NoArgsConstructor // IMPORTANT: for jackson (de)serialization @RequiredArgsConstructor public class DirDeletionResult { diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java index abaae2ada9e..89beebf1532 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java @@ -18,14 +18,17 @@ package org.apache.gobblin.temporal.ddm.work; import java.util.Map; +import lombok.AccessLevel; import lombok.Data; import lombok.NoArgsConstructor; import lombok.NonNull; import lombok.RequiredArgsConstructor; +import lombok.Setter; /** Capture details (esp. for the temporal UI) of a {@link org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow} execution */ @Data +@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization @RequiredArgsConstructor @NoArgsConstructor // IMPORTANT: for jackson (de)serialization public class ExecGobblinStats { diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java index f30998ae857..6d20d9fc152 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java @@ -19,10 +19,12 @@ import java.util.Set; +import lombok.AccessLevel; import lombok.Data; import lombok.NoArgsConstructor; import lombok.NonNull; import lombok.RequiredArgsConstructor; +import lombok.Setter; /** @@ -30,11 +32,13 @@ * the folders should be cleaned up after the full job execution is completed */ @Data +@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization @NoArgsConstructor // IMPORTANT: for jackson (de)serialization @RequiredArgsConstructor public class GenerateWorkUnitsResult { // NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite - "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor" @NonNull private int generatedWuCount; + // TODO: characterize the WUs more thoroughly, by also including destination info, and with more specifics, like src+dest location, I/O config, throttling... @NonNull private String sourceClass; @NonNull private WorkUnitsSizeSummary workUnitsSizeSummary; // Resources that the Temporal Job Launcher should clean up for Gobblin temporary work directory paths in writers diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/TimeBudget.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/TimeBudget.java new file mode 100644 index 00000000000..ae794cd2262 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/TimeBudget.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.work; + +import lombok.AccessLevel; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.Setter; + + +/** + * Duration for whatever work to complete, with a permitted overage to indicate firm-ness/soft-ness. + * Values are in minutes, befitting the granularity of inevitable companion activities, like: + * - network operations - opening connections, I/O, retries + * - starting/scaling workers + */ +@Data +@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization +@NoArgsConstructor // IMPORTANT: for jackson (de)serialization +@RequiredArgsConstructor +public class TimeBudget { + // NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite - "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor" + @NonNull private long maxTargetDurationMinutes; + @NonNull private long permittedOverageMinutes; + + /** construct w/ {@link #permittedOverageMinutes} expressed as a percentage of {@link #maxTargetDurationMinutes} */ + public static TimeBudget withOveragePercentage(long maxDurationDesiredMinutes, double permittedOveragePercentage) { + return new TimeBudget(maxDurationDesiredMinutes, (long) (maxDurationDesiredMinutes * permittedOveragePercentage)); + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java index d218c37b243..dfa207f66c6 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java @@ -20,15 +20,17 @@ import java.net.URI; import java.util.Optional; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.hadoop.fs.Path; +import lombok.AccessLevel; import lombok.Data; import lombok.NoArgsConstructor; import lombok.NonNull; import lombok.RequiredArgsConstructor; +import lombok.Setter; -import org.apache.hadoop.fs.Path; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonTypeInfo; import org.apache.gobblin.configuration.State; import org.apache.gobblin.runtime.AbstractJobLauncher; @@ -44,6 +46,7 @@ * is resolved against the {@link org.apache.hadoop.fs.FileSystem} given by `nameNodeUri` */ @Data +@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization @NoArgsConstructor // IMPORTANT: for jackson (de)serialization @RequiredArgsConstructor @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class") // to handle extensions @@ -51,7 +54,7 @@ public class WUProcessingSpec implements FileSystemApt, FileSystemJobStateful { @NonNull private URI fileSystemUri; @NonNull private String workUnitsDir; @NonNull private EventSubmitterContext eventSubmitterContext; - @NonNull private Tuning tuning = Tuning.DEFAULT; + @NonNull @Setter(AccessLevel.PUBLIC) private Tuning tuning = Tuning.DEFAULT; /** whether to conduct job-level timing (and send results via GTE) */ @JsonIgnore // (because no-arg method resembles 'java bean property') @@ -74,6 +77,7 @@ public Path getJobStatePath() { /** Configuration for {@link org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow#performWorkload(WorkflowAddr, Workload, int, int, int, Optional)}*/ @Data + @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization @NoArgsConstructor // IMPORTANT: for jackson (de)serialization @RequiredArgsConstructor public static class Tuning { diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java index e454c9ceef5..0c068cb3ed7 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java @@ -21,10 +21,12 @@ import org.apache.hadoop.fs.Path; +import lombok.AccessLevel; import lombok.Data; import lombok.NoArgsConstructor; import lombok.NonNull; import lombok.RequiredArgsConstructor; +import lombok.Setter; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -40,12 +42,19 @@ * Conveys a {@link org.apache.gobblin.source.workunit.WorkUnit} by claim-check, where the `workUnitPath` is resolved * against the {@link org.apache.hadoop.fs.FileSystem} given by `nameNodeUri`. see: * @see Claim-Check Pattern + * + * TODO: if we're to generalize Work Prediction+Prioritization across multiplexed jobs, each having its own separate time budget, every WU claim-check + * standing on its own would allow an external observer to inspect only the task queue w/o correlation between workflow histories. For that, decide whether + * to add job-identifying metadata here or just tack on time budget (aka. SLA deadline) info. Either could be tunneled within the filename in the manner + * of `JobLauncherUtils.WorkUnitPathCalculator.calcNextPathWithTunneledSizeInfo` - in fact, by convention, the job ID / flow ID already is... we just don't + * recover it herein. */ @Data +@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization @NoArgsConstructor // IMPORTANT: for jackson (de)serialization @RequiredArgsConstructor public class WorkUnitClaimCheck implements FileSystemApt, FileSystemJobStateful { - @NonNull private String correlator; + @NonNull @Setter(AccessLevel.PACKAGE) private String correlator; @NonNull private URI fileSystemUri; @NonNull private String workUnitPath; @NonNull private WorkUnitSizeInfo workUnitSizeInfo; diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java index 3ea426c284b..971ed5a04bd 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java @@ -19,10 +19,14 @@ import java.util.List; +import lombok.AccessLevel; import lombok.Data; import lombok.NoArgsConstructor; import lombok.NonNull; import lombok.RequiredArgsConstructor; +import lombok.Setter; + +import com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.gobblin.source.workunit.MultiWorkUnit; import org.apache.gobblin.source.workunit.WorkUnit; @@ -36,6 +40,7 @@ * @see org.apache.gobblin.util.WorkUnitSizeInfo */ @Data +@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization @NoArgsConstructor // IMPORTANT: for jackson (de)serialization @RequiredArgsConstructor public class WorkUnitsSizeSummary { @@ -47,4 +52,24 @@ public class WorkUnitsSizeSummary { @NonNull private double quantilesWidth; @NonNull private List topLevelQuantilesMinSizes; @NonNull private List constituentQuantilesMinSizes; + + @JsonIgnore // (because no-arg method resembles 'java bean property') + public double getTopLevelWorkUnitsMeanSize() { + return this.totalSize * 1.0 / this.topLevelWorkUnitsCount; + } + + @JsonIgnore // (because no-arg method resembles 'java bean property') + public double getConstituentWorkUnitsMeanSize() { + return this.totalSize * 1.0 / this.constituentWorkUnitsCount; + } + + @JsonIgnore // (because no-arg method resembles 'java bean property') + public double getTopLevelWorkUnitsMedianSize() { + return this.topLevelQuantilesMinSizes.get(this.quantilesCount / 2); + } + + @JsonIgnore // (because no-arg method resembles 'java bean property') + public double getConstituentWorkUnitsMedianSize() { + return this.topLevelQuantilesMinSizes.get(this.quantilesCount / 2); + } } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java index 74737af5988..27348858e7c 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java @@ -29,6 +29,7 @@ import org.apache.gobblin.temporal.ddm.activity.impl.DeleteWorkDirsActivityImpl; import org.apache.gobblin.temporal.ddm.activity.impl.GenerateWorkUnitsImpl; import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl; +import org.apache.gobblin.temporal.ddm.activity.impl.RecommendScalingForWorkUnitsLinearHeuristicImpl; import org.apache.gobblin.temporal.ddm.workflow.impl.CommitStepWorkflowImpl; import org.apache.gobblin.temporal.ddm.workflow.impl.ExecuteGobblinWorkflowImpl; import org.apache.gobblin.temporal.ddm.workflow.impl.GenerateWorkUnitsWorkflowImpl; @@ -48,14 +49,14 @@ public WorkFulfillmentWorker(Config config, WorkflowClient workflowClient) { @Override protected Class[] getWorkflowImplClasses() { - return new Class[] { CommitStepWorkflowImpl.class, ExecuteGobblinWorkflowImpl.class, GenerateWorkUnitsWorkflowImpl.class, - NestingExecOfProcessWorkUnitWorkflowImpl.class, ProcessWorkUnitsWorkflowImpl.class }; + return new Class[] { ExecuteGobblinWorkflowImpl.class, ProcessWorkUnitsWorkflowImpl.class, NestingExecOfProcessWorkUnitWorkflowImpl.class, + CommitStepWorkflowImpl.class, GenerateWorkUnitsWorkflowImpl.class }; } @Override protected Object[] getActivityImplInstances() { - return new Object[] { new CommitActivityImpl(), new DeleteWorkDirsActivityImpl(),new GenerateWorkUnitsImpl(), - new ProcessWorkUnitImpl(), new SubmitGTEActivityImpl()}; + return new Object[] { new SubmitGTEActivityImpl(), new GenerateWorkUnitsImpl(), new RecommendScalingForWorkUnitsLinearHeuristicImpl(), new ProcessWorkUnitImpl(), + new CommitActivityImpl(), new DeleteWorkDirsActivityImpl() }; } @Override diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java index 1d5a63b7366..6de7c51e36b 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java @@ -20,14 +20,23 @@ import java.io.IOException; import java.net.URI; import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import lombok.extern.slf4j.Slf4j; + +import com.google.common.io.Closer; +import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import io.temporal.activity.ActivityOptions; @@ -36,29 +45,36 @@ import io.temporal.failure.ApplicationFailure; import io.temporal.workflow.ChildWorkflowOptions; import io.temporal.workflow.Workflow; -import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.cluster.GobblinClusterUtils; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.metrics.event.TimingEvent; import org.apache.gobblin.runtime.JobState; import org.apache.gobblin.temporal.ddm.activity.DeleteWorkDirsActivity; import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits; +import org.apache.gobblin.temporal.ddm.activity.RecommendScalingForWorkUnits; import org.apache.gobblin.temporal.ddm.launcher.ProcessWorkUnitsJobLauncher; import org.apache.gobblin.temporal.ddm.util.JobStateUtils; +import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils; import org.apache.gobblin.temporal.ddm.work.CommitStats; import org.apache.gobblin.temporal.ddm.work.DirDeletionResult; import org.apache.gobblin.temporal.ddm.work.ExecGobblinStats; import org.apache.gobblin.temporal.ddm.work.GenerateWorkUnitsResult; +import org.apache.gobblin.temporal.ddm.work.TimeBudget; import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary; import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; import org.apache.gobblin.temporal.ddm.work.assistance.Help; import org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow; import org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow; +import org.apache.gobblin.temporal.dynamic.FsScalingDirectivesRecipient; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectivesRecipient; import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext; import org.apache.gobblin.temporal.workflows.metrics.EventTimer; import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer; -import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils; +import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.PropertiesUtils; +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; @Slf4j @@ -79,8 +95,21 @@ public class ExecuteGobblinWorkflowImpl implements ExecuteGobblinWorkflow { .setRetryOptions(GEN_WUS_ACTIVITY_RETRY_OPTS) .build(); - private final GenerateWorkUnits genWUsActivityStub = Workflow.newActivityStub(GenerateWorkUnits.class, - GEN_WUS_ACTIVITY_OPTS); + private final GenerateWorkUnits genWUsActivityStub = Workflow.newActivityStub(GenerateWorkUnits.class, GEN_WUS_ACTIVITY_OPTS); + + private static final RetryOptions RECOMMEND_SCALING_RETRY_OPTS = RetryOptions.newBuilder() + .setInitialInterval(Duration.ofSeconds(3)) + .setMaximumInterval(Duration.ofSeconds(100)) + .setBackoffCoefficient(2) + .setMaximumAttempts(4) + .build(); + + private static final ActivityOptions RECOMMEND_SCALING_ACTIVITY_OPTS = ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofMinutes(5)) + .setRetryOptions(RECOMMEND_SCALING_RETRY_OPTS) + .build(); + private final RecommendScalingForWorkUnits recommendScalingStub = Workflow.newActivityStub(RecommendScalingForWorkUnits.class, + RECOMMEND_SCALING_ACTIVITY_OPTS); private static final RetryOptions DELETE_WORK_DIRS_RETRY_OPTS = RetryOptions.newBuilder() .setInitialInterval(Duration.ofSeconds(3)) @@ -100,16 +129,32 @@ public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext event TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(eventSubmitterContext); timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit(); // update GaaS: `TimingEvent.JOB_START_TIME` EventTimer jobSuccessTimer = timerFactory.createJobTimer(); - Optional generateWorkUnitResultsOpt = Optional.empty(); + Optional optGenerateWorkUnitResult = Optional.empty(); WUProcessingSpec wuSpec = createProcessingSpec(jobProps, eventSubmitterContext); boolean isSuccessful = false; - try { - generateWorkUnitResultsOpt = Optional.of(genWUsActivityStub.generateWorkUnits(jobProps, eventSubmitterContext)); - WorkUnitsSizeSummary wuSizeSummary = generateWorkUnitResultsOpt.get().getWorkUnitsSizeSummary(); + try (Closer closer = Closer.create()) { + GenerateWorkUnitsResult generateWorkUnitResult = genWUsActivityStub.generateWorkUnits(jobProps, eventSubmitterContext); + optGenerateWorkUnitResult = Optional.of(generateWorkUnitResult); + WorkUnitsSizeSummary wuSizeSummary = generateWorkUnitResult.getWorkUnitsSizeSummary(); int numWUsGenerated = safelyCastNumConstituentWorkUnitsOrThrow(wuSizeSummary); int numWUsCommitted = 0; CommitStats commitStats = CommitStats.createEmpty(); if (numWUsGenerated > 0) { + TimeBudget timeBudget = calcWUProcTimeBudget(jobSuccessTimer.getStartTime(), wuSizeSummary, jobProps); + List scalingDirectives = + recommendScalingStub.recommendScaling(wuSizeSummary, generateWorkUnitResult.getSourceClass(), timeBudget, jobProps); + log.info("Recommended scaling to process WUs within {}: {}", timeBudget, scalingDirectives); + try { + ScalingDirectivesRecipient recipient = createScalingDirectivesRecipient(jobProps, closer); + List adjustedScalingDirectives = adjustRecommendedScaling(scalingDirectives); + log.info("Submitting (adjusted) scaling directives: {}", adjustedScalingDirectives); + recipient.receive(adjustedScalingDirectives); + // TODO: when eliminating the "GenWUs Worker", pause/block until scaling is complete + } catch (IOException e) { + // TODO: decide whether this should be a hard failure; for now, "gracefully degrade" by continuing processing + log.error("Failed to send re-scaling directive", e); + } + ProcessWorkUnitsWorkflow processWUsWorkflow = createProcessWorkUnitsWorkflow(jobProps); commitStats = processWUsWorkflow.process(wuSpec); numWUsCommitted = commitStats.getNumCommittedWorkUnits(); @@ -128,8 +173,8 @@ public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext event // TODO: Cleanup WorkUnit/Taskstate Directory for jobs cancelled mid flight try { log.info("Cleaning up work dirs for job {}", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)); - if (generateWorkUnitResultsOpt.isPresent()) { - cleanupWorkDirs(wuSpec, eventSubmitterContext, generateWorkUnitResultsOpt.get().getWorkDirPathsToDelete()); + if (optGenerateWorkUnitResult.isPresent()) { + cleanupWorkDirs(wuSpec, eventSubmitterContext, optGenerateWorkUnitResult.get().getWorkDirPathsToDelete()); } else { log.warn("Skipping cleanup of work dirs for job due to no output from GenerateWorkUnits"); } @@ -154,6 +199,34 @@ protected ProcessWorkUnitsWorkflow createProcessWorkUnitsWorkflow(Properties job return Workflow.newChildWorkflowStub(ProcessWorkUnitsWorkflow.class, childOpts); } + protected TimeBudget calcWUProcTimeBudget(Instant jobStartTime, WorkUnitsSizeSummary wuSizeSummary, Properties jobProps) { + // TODO: make fully configurable! for now, cap Work Discovery at 45 mins and set aside 10 mins for the `CommitStepWorkflow` + long maxGenWUsMins = 45; + long commitStepMins = 10; + long totalTargetTimeMins = TimeUnit.MINUTES.toMinutes(PropertiesUtils.getPropAsLong(jobProps, + ConfigurationKeys.JOB_TARGET_COMPLETION_DURATION_IN_MINUTES_KEY, + ConfigurationKeys.DEFAULT_JOB_TARGET_COMPLETION_DURATION_IN_MINUTES)); + double permittedOveragePercentage = .2; + Duration genWUsDuration = Duration.between(jobStartTime, TemporalEventTimer.getCurrentTime()); + long remainingMins = totalTargetTimeMins - Math.min(genWUsDuration.toMinutes(), maxGenWUsMins) - commitStepMins; + return TimeBudget.withOveragePercentage(remainingMins, permittedOveragePercentage); + } + + protected List adjustRecommendedScaling(List recommendedScalingDirectives) { + // TODO: make any adjustments - e.g. decide whether to shutdown the (often oversize) `GenerateWorkUnits` worker or alternatively to deduct one to count it + if (recommendedScalingDirectives.size() == 0) { + return recommendedScalingDirectives; + } + // TODO: be more robust and code more defensively, rather than presuming the impl of `RecommendScalingForWorkUnitsLinearHeuristicImpl` + ArrayList adjustedScaling = new ArrayList<>(recommendedScalingDirectives); + ScalingDirective firstDirective = adjustedScaling.get(0); + // deduct one for (already existing) `GenerateWorkUnits` worker (we presume its "baseline" `WorkerProfile` similar enough to substitute for this new one) + adjustedScaling.set(0, firstDirective.updateSetPoint(firstDirective.getSetPoint() - 1)); + // CAUTION: filter out set point zero, which (depending upon `.getProfileName()`) *could* down-scale away our only current worker + // TODO: consider whether to allow either a) "pre-defining" a profile w/ set point zero, available for later use OR b) down-scaling to zero to pause worker + return adjustedScaling.stream().filter(sd -> sd.getSetPoint() > 0).collect(Collectors.toList()); + } + protected static WUProcessingSpec createProcessingSpec(Properties jobProps, EventSubmitterContext eventSubmitterContext) { JobState jobState = new JobState(jobProps); URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState); @@ -169,6 +242,22 @@ protected static WUProcessingSpec createProcessingSpec(Properties jobProps, Even return wuSpec; } + protected ScalingDirectivesRecipient createScalingDirectivesRecipient(Properties jobProps, Closer closer) throws IOException { + JobState jobState = new JobState(jobProps); + FileSystem fs = closer.register(JobStateUtils.openFileSystem(jobState)); + Config jobConfig = ConfigUtils.propertiesToConfig(jobProps); + String appName = jobConfig.getString(GobblinYarnConfigurationKeys.APPLICATION_NAME_KEY); + // *hopefully* `GobblinClusterConfigurationKeys.CLUSTER_EXACT_WORK_DIR` is among `job.Config`! if so, everything Just Works, but if not... + // there's not presently an easy way to obtain the yarn app ID (like `application_1734430124616_67239`), so we'd need to plumb one through, + // almost certainly based on `org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner.taskRunnerId` + String applicationId = "__WARNING__NOT_A_REAL_APPLICATION_ID__"; + Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(jobConfig, fs, appName, applicationId); + log.info("Using GobblinCluster work dir: {}", appWorkDir); + + Path directivesDirPath = JobStateUtils.getDynamicScalingPath(appWorkDir); + return new FsScalingDirectivesRecipient(fs, directivesDirPath); + } + private void cleanupWorkDirs(WUProcessingSpec workSpec, EventSubmitterContext eventSubmitterContext, Set directoriesToClean) throws IOException { // TODO: Add configuration to support cleaning up historical work dirs from same job name diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java index 6402e473bf7..8b8236ffadd 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java @@ -19,12 +19,14 @@ import java.util.Map; import java.util.Optional; +import lombok.extern.slf4j.Slf4j; + +import com.google.common.io.Closer; import com.typesafe.config.ConfigFactory; import io.temporal.api.enums.v1.ParentClosePolicy; import io.temporal.workflow.ChildWorkflowOptions; import io.temporal.workflow.Workflow; -import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.temporal.cluster.WorkerConfig; import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils; @@ -59,12 +61,11 @@ public CommitStats process(WUProcessingSpec workSpec) { } private CommitStats performWork(WUProcessingSpec workSpec) { - Workload workload = createWorkload(workSpec); Map searchAttributes; JobState jobState; - try { - jobState = Help.loadJobState(workSpec, Help.loadFileSystem(workSpec)); + try (Closer closer = Closer.create()) { + jobState = Help.loadJobState(workSpec, closer.register(Help.loadFileSystem(workSpec))); } catch (Exception e) { log.error("Error loading jobState", e); throw new RuntimeException("Error loading jobState", e); @@ -117,6 +118,7 @@ protected NestingExecWorkflow createProcessingWorkflow(FileS protected CommitStepWorkflow createCommitStepWorkflow(Map searchAttributes) { ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder() + // TODO: verify to instead use: Policy.PARENT_CLOSE_POLICY_TERMINATE) .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON) .setSearchAttributes(searchAttributes) .setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(COMMIT_STEP_WORKFLOW_ID_BASE, diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSource.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSource.java index 6725c58b6e3..a7d38256b2b 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSource.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSource.java @@ -40,6 +40,8 @@ * {@link ScalingDirectiveParser#OVERLAY_DEFINITION_PLACEHOLDER} syntax and write their {@link ProfileDerivation} overlay as the file's data/content. * Within-length scaling directives are no-data, zero-length files. When backed by HDFS, reading such zero-length scaling directive filenames is a * NameNode-only operation, with their metadata-only nature conserving NN object count/quota. + * + * @see FsScalingDirectivesRecipient */ @Slf4j public class FsScalingDirectiveSource implements ScalingDirectiveSource { @@ -49,10 +51,15 @@ public class FsScalingDirectiveSource implements ScalingDirectiveSource { private final ScalingDirectiveParser parser = new ScalingDirectiveParser(); /** Read from `directivesDirPath` of `fileSystem`, and optionally move invalid/rejected directives to `optErrorsDirPath` */ - public FsScalingDirectiveSource(FileSystem fileSystem, String directivesDirPath, Optional optErrorsDirPath) { + public FsScalingDirectiveSource(FileSystem fileSystem, Path directivesDirPath, Optional optErrorsDirPath) { this.fileSystem = fileSystem; - this.dirPath = new Path(directivesDirPath); - this.optErrorsPath = optErrorsDirPath.map(Path::new); + this.dirPath = directivesDirPath; + this.optErrorsPath = optErrorsDirPath; + } + + /** Read from `directivesDirPath` of `fileSystem`, and optionally move invalid/rejected directives to `optErrorsDirPath` */ + public FsScalingDirectiveSource(FileSystem fileSystem, String directivesDirPath, Optional optErrorsDirPath) { + this(fileSystem, new Path(directivesDirPath), optErrorsDirPath.map(Path::new)); } /** diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectivesRecipient.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectivesRecipient.java new file mode 100644 index 00000000000..dcb0276b569 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectivesRecipient.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynamic; + +import java.io.IOException; +import java.util.List; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + + +/** + * A {@link ScalingDirectivesRecipient} that writes {@link ScalingDirective}s to a {@link FileSystem} directory, where each directive is the name + * of a single file inside the directory. + * + * TODO: per {@link FsScalingDirectiveSource} - directives too long for one filename path component MUST (but currently do NOT!) use the + * {@link ScalingDirectiveParser#OVERLAY_DEFINITION_PLACEHOLDER} syntax and write their {@link ProfileDerivation} overlay as the file's data/content. + * + * Within-length scaling directives are no-data, zero-length files. When backed by HDFS, writing such zero-length scaling directive filenames is a + * NameNode-only operation, with their metadata-only nature conserving NN object count/quota. + * + * @see FsScalingDirectiveSource + */ +@Slf4j +public class FsScalingDirectivesRecipient implements ScalingDirectivesRecipient { + public static final int MAX_STRINGIFIED_DIRECTIVE_LENGTH = 255; + private final FileSystem fileSystem; + private final Path dirPath; + + /** Write to `directivesDirPath` of `fileSystem` */ + public FsScalingDirectivesRecipient(FileSystem fileSystem, Path directivesDirPath) throws IOException { + this.fileSystem = fileSystem; + this.dirPath = directivesDirPath; + this.fileSystem.mkdirs(this.dirPath); + } + + /** Write to `directivesDirPath` of `fileSystem` */ + public FsScalingDirectivesRecipient(FileSystem fileSystem, String directivesDirPath) throws IOException { + this(fileSystem, new Path(directivesDirPath)); + } + + @Override + public void receive(List directives) throws IOException { + for (ScalingDirective directive : directives) { + String directiveAsString = ScalingDirectiveParser.asString(directive); + // handle directivePaths in excess of length limit + if (directiveAsString.length() <= MAX_STRINGIFIED_DIRECTIVE_LENGTH) { + Path directivePath = new Path(dirPath, directiveAsString); + log.info("Adding ScalingDirective: {} at '{}' - {}", directiveAsString, directivePath, directive); + fileSystem.create(directivePath, false).close(); + } else { + ScalingDirectiveParser.StringWithPlaceholderPlusOverlay placeholderForm = ScalingDirectiveParser.asStringWithPlaceholderPlusOverlay(directive); + Path directivePath = new Path(dirPath, placeholderForm.getDirectiveStringWithPlaceholder()); + log.info("Adding ScalingDirective with overlay: {} at '{}' - {}", directiveAsString, directivePath, directive); + try (FSDataOutputStream out = fileSystem.create(directivePath, false)) { + out.writeUTF(placeholderForm.getOverlayDefinitionString()); + } + } + } + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileDerivation.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileDerivation.java index 1001150df3e..e17bc29e21f 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileDerivation.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileDerivation.java @@ -20,16 +20,24 @@ import java.util.Optional; import java.util.function.Function; -import com.typesafe.config.Config; +import lombok.AccessLevel; import lombok.Data; import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import com.typesafe.config.Config; /** * Defines a new {@link WorkerProfile} by evolving from another profile, the basis. Such evolution creates a new immutable profile through * {@link ProfileOverlay}, which either adds or removes properties from the basis profile's definition. That basis profile must already exist. */ @Data +@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization +@NoArgsConstructor // IMPORTANT: for jackson (de)serialization +@RequiredArgsConstructor public class ProfileDerivation { /** Flags when the basis profile was NOT found */ @@ -41,8 +49,8 @@ public UnknownBasisException(String basisName) { } } - private final String basisProfileName; - private final ProfileOverlay overlay; + @NonNull private String basisProfileName; + @NonNull private ProfileOverlay overlay; /** @return a new profile definition through evolution from the basis profile, which is to be obtained via `basisResolver` */ public Config formulateConfig(Function> basisResolver) throws UnknownBasisException { diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileOverlay.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileOverlay.java index 64b5d8ec30b..2e2ffc76046 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileOverlay.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileOverlay.java @@ -25,13 +25,20 @@ import java.util.Set; import java.util.stream.Collectors; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigValueFactory; +import lombok.AccessLevel; import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.NonNull; import lombok.RequiredArgsConstructor; +import lombok.Setter; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigValueFactory; /** Alt. forms of profile overlay to evolve one profile {@link Config} into another. Two overlays may be combined hierarchically into a new overlay. */ +@JsonTypeInfo(use = JsonTypeInfo.Id.MINIMAL_CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class") // to handle impls (`MINIMAL..`, as all defs below) public interface ProfileOverlay { /** @return a new, evolved {@link Config}, by application of this overlay */ @@ -40,21 +47,36 @@ public interface ProfileOverlay { /** @return a new overlay, by combining this overlay *over* another */ ProfileOverlay over(ProfileOverlay other); + /** @return a new overlay that would change nothing when used in a {@link ProfileDerivation} (beyond introducing a distinct name) */ + static ProfileOverlay unchanged() { + return new Adding(); + } + /** A key-value pair/duple */ @Data + @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization + @NoArgsConstructor // IMPORTANT: for jackson (de)serialization + @RequiredArgsConstructor class KVPair { - private final String key; - private final String value; + @NonNull private String key; + @NonNull private String value; } /** An overlay to evolve any profile by adding key-value pairs */ @Data - @RequiredArgsConstructor // explicit, due to second, variadic ctor + @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization + @RequiredArgsConstructor // explicit, due to other ctors class Adding implements ProfileOverlay { - private final List additionPairs; + @NonNull private List additionPairs; + + // IMPORTANT: for jackson (de)serialization + public Adding() { + this(new ArrayList<>()); + } + /** variadic, for convenience */ public Adding(KVPair... kvPairs) { this(Arrays.asList(kvPairs)); } @@ -90,10 +112,13 @@ public ProfileOverlay over(ProfileOverlay other) { /** An overlay to evolve any profile by removing named keys */ @Data + @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization + @NoArgsConstructor // IMPORTANT: for jackson (de)serialization @RequiredArgsConstructor // explicit, due to second, variadic ctor class Removing implements ProfileOverlay { - private final List removalKeys; + @NonNull private List removalKeys; + /** variadic, for convenience */ public Removing(String... keys) { this(Arrays.asList(keys)); } @@ -128,9 +153,11 @@ public ProfileOverlay over(ProfileOverlay other) { /** An overlay to evolve any profile by adding key-value pairs while also removing named keys */ @Data + @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization + @NoArgsConstructor // IMPORTANT: for jackson (de)serialization class Combo implements ProfileOverlay { - private final Adding adding; - private final Removing removing; + @NonNull private Adding adding; + @NonNull private Removing removing; /** restricted-access ctor: instead use {@link Combo#normalize(Adding, Removing)} */ private Combo(Adding adding, Removing removing) { diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirective.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirective.java index 8af9e95249a..5cfda7e205b 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirective.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirective.java @@ -18,9 +18,17 @@ package org.apache.gobblin.temporal.dynamic; import java.util.Optional; + +import lombok.AccessLevel; import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.NonNull; import lombok.RequiredArgsConstructor; +import lombok.Setter; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; /** * Core abstraction to model scaling adjustment: a directive originates at a specific moment in time to provide a set point for a given worker profile. @@ -28,12 +36,33 @@ * define that new profile through a {@link ProfileDerivation} referencing a known profile. Once defined, a worker profile MUST NOT be redefined. */ @Data +@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization +@NoArgsConstructor // IMPORTANT: for jackson (de)serialization @RequiredArgsConstructor +/* + * NOTE: due to type erasure, neither alternative approach works when returning a collection of `ScalingDirective`s (only when a direct `ScalingDirective`) + * see: https://github.com/FasterXML/jackson-databind/issues/336 + * instead, `@JsonProperty("this")` clarifies the class name in serialized form + * @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "$this") + * @JsonTypeInfo(include=JsonTypeInfo.As.WRAPPER_OBJECT, use=JsonTypeInfo.Id.NAME) + */ +@JsonPropertyOrder({ "this", "profileName", "setPoint", "optDerivedFrom", "timestampEpochMillis" }) // improve readability (e.g. in the temporal UI) +@JsonIgnoreProperties(ignoreUnknown = true) /* necessary since no `setThis` setter (to act as inverse of `supplyJsonClassSimpleName`), so to avoid: + * com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field \"this\" (class ...dynamic.ScalingDirective), not marked as ignorable + */ public class ScalingDirective { - private final String profileName; - private final int setPoint; - private final long timestampEpochMillis; - private final Optional optDerivedFrom; + @NonNull private String profileName; + // NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite - "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor" + @NonNull private int setPoint; + @NonNull private long timestampEpochMillis; + @NonNull private Optional optDerivedFrom; + + /** purely for observability: announce class to clarify serialized form */ + @JsonProperty("this") + public String supplyJsonClassSimpleName() { + return this.getClass().getSimpleName(); + } + /** Create a set-point-only directive (for a known profile, with no {@link ProfileDerivation}) */ public ScalingDirective(String profileName, int setPoint, long timestampEpochMillis) { @@ -44,7 +73,12 @@ public ScalingDirective(String profileName, int setPoint, long timestampEpochMil this(profileName, setPoint, timestampEpochMillis, Optional.of(new ProfileDerivation(basisProfileName, overlay))); } - /** @return the canonical display name (of {@link #getProfileName()}) for tracing/debugging */ + /** @return a new `ScalingDirective`, otherwise unchanged, but with {@link ScalingDirective#setPoint} replaced by `newSetPoint` */ + public ScalingDirective updateSetPoint(int newSetPoint) { + return new ScalingDirective(this.profileName, newSetPoint, this.timestampEpochMillis, this.optDerivedFrom); + } + + /** @return the canonical *display* name (for {@link #getProfileName()}) for tracing/debugging */ public String renderName() { return WorkforceProfiles.renderName(this.profileName); } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParser.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParser.java index fa00c5630a0..b9656f8f131 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParser.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParser.java @@ -27,6 +27,7 @@ import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; +import lombok.Data; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -174,6 +175,25 @@ protected static String definePlaceholder(String directiveWithPlaceholder, Strin } } + + /** + * A two-part stringified form of a `ScalingDirective`, comprised of: + * - the "core" directive, but using {@link #OVERLAY_DEFINITION_PLACEHOLDER} in place of any overlay + * - the separately stringified {@link ProfileOverlay} - empty string, when no overlay + * + * This facilitates writing the directive as a (size-constrained) file name, with the overlay definition written as the file's contents. + * + * NOTE: Every `ProfileOverlay` will be invariably rendered separately; the length of a singular `String` representation has no bearing. + * + * @see #asStringWithPlaceholderPlusOverlay(ScalingDirective) + */ + @Data + public static class StringWithPlaceholderPlusOverlay { + private final String directiveStringWithPlaceholder; + private final String overlayDefinitionString; + } + + // TODO: syntax to remove an attr while ALSO "adding" (so not simply setting to the empty string) - [proposal: alt. form for KV_PAIR ::= ( KEY '|=|' )] // syntax (described in class-level javadoc): @@ -263,26 +283,49 @@ public static String asString(ScalingDirective directive) { directive.getOptDerivedFrom().ifPresent(derivedFrom -> { sb.append(',').append(derivedFrom.getBasisProfileName()); sb.append(derivedFrom.getOverlay() instanceof ProfileOverlay.Adding ? "+(" : "-("); - ProfileOverlay overlay = derivedFrom.getOverlay(); - if (overlay instanceof ProfileOverlay.Adding) { - ProfileOverlay.Adding adding = (ProfileOverlay.Adding) overlay; - for (ProfileOverlay.KVPair kv : adding.getAdditionPairs()) { - sb.append(kv.getKey()).append('=').append(urlEncode(kv.getValue())).append(", "); - } - if (adding.getAdditionPairs().size() > 0) { - sb.setLength(sb.length() - 2); // remove trailing ", " - } - } else { - ProfileOverlay.Removing removing = (ProfileOverlay.Removing) overlay; - for (String key : removing.getRemovalKeys()) { - sb.append(key).append(", "); - } - if (removing.getRemovalKeys().size() > 0) { - sb.setLength(sb.length() - 2); // remove trailing ", " - } - } + sb.append(stringifyProfileOverlay(derivedFrom.getOverlay())); + sb.append(')'); + }); + return sb.toString(); + } + + /** @return the `scalingDirective` invariably stringified as two parts, a {@link StringWithPlaceholderPlusOverlay} - regardless of stringified length */ + public static StringWithPlaceholderPlusOverlay asStringWithPlaceholderPlusOverlay(ScalingDirective directive) { + StringBuilder sb = new StringBuilder(); + sb.append(directive.getTimestampEpochMillis()).append('.').append(directive.getProfileName()).append('=').append(directive.getSetPoint()); + Optional optProfileOverlayStr = directive.getOptDerivedFrom().map(derivedFrom -> + stringifyProfileOverlay(derivedFrom.getOverlay()) + ); + directive.getOptDerivedFrom().ifPresent(derivedFrom -> { + sb.append(',').append(derivedFrom.getBasisProfileName()); + sb.append(derivedFrom.getOverlay() instanceof ProfileOverlay.Adding ? "+(" : "-("); + sb.append(OVERLAY_DEFINITION_PLACEHOLDER); sb.append(')'); }); + return new StringWithPlaceholderPlusOverlay(sb.toString(), optProfileOverlayStr.orElse("")); + } + + private static String stringifyProfileOverlay(ProfileOverlay overlay) { + StringBuilder sb = new StringBuilder(); + if (overlay instanceof ProfileOverlay.Adding) { + ProfileOverlay.Adding adding = (ProfileOverlay.Adding) overlay; + for (ProfileOverlay.KVPair kv : adding.getAdditionPairs()) { + sb.append(kv.getKey()).append('=').append(urlEncode(kv.getValue())).append(", "); + } + if (adding.getAdditionPairs().size() > 0) { + sb.setLength(sb.length() - 2); // remove trailing ", " + } + } else if (overlay instanceof ProfileOverlay.Removing) { + ProfileOverlay.Removing removing = (ProfileOverlay.Removing) overlay; + for (String key : removing.getRemovalKeys()) { + sb.append(key).append(", "); + } + if (removing.getRemovalKeys().size() > 0) { + sb.setLength(sb.length() - 2); // remove trailing ", " + } + } else { // `ProfileOverlay.Combo` is NOT supported! + throw new IllegalArgumentException("unsupported derived class of type '" + overlay.getClass().getName() + "'"); + } return sb.toString(); } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectivesRecipient.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectivesRecipient.java new file mode 100644 index 00000000000..0a361eed5ea --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectivesRecipient.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynamic; + +import java.io.IOException; +import java.util.List; + + +/** An opaque sink for {@link org.apache.gobblin.temporal.dynamic.ScalingDirective}s - typically either to process or proxy them */ +public interface ScalingDirectivesRecipient { + /** @param directives the {@link ScalingDirective}s to receive */ + void receive(List directives) throws IOException; +} + + diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java index 2a070e2ed9b..336d357f237 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java @@ -141,6 +141,8 @@ public synchronized void reviseWhenNewer(List directives) { */ public synchronized void reviseWhenNewer(List directives, Consumer illegalRevisionHandler) { directives.stream().sequential() + // filter, to avoid `OutOfOrderDirective` exceptions that would clutter the log - especially since `reviseWhenNewer` suggests graceful handling + .filter(directive -> this.lastRevisionEpochMillis < directive.getTimestampEpochMillis()) .forEach(directive -> { try { revise(directive); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WorkflowAddr.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WorkflowAddr.java index 0329d90d9fb..f2d5a5f6824 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WorkflowAddr.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WorkflowAddr.java @@ -17,16 +17,18 @@ package org.apache.gobblin.temporal.util.nesting.work; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.google.common.base.Joiner; -import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.List; + import lombok.Getter; import lombok.NoArgsConstructor; import lombok.NonNull; import lombok.RequiredArgsConstructor; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; + /** Hierarchical address for nesting workflows (0-based). */ @NoArgsConstructor // IMPORTANT: for jackson (de)serialization diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java index 003a05907e4..ac0f24bf816 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java @@ -17,6 +17,7 @@ package org.apache.gobblin.temporal.workflows.metrics; import java.io.Closeable; +import java.time.Instant; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.event.EventSubmitter; @@ -55,4 +56,6 @@ public interface EventTimer extends Closeable { default void close() { stop(); } + + Instant getStartTime(); } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java index e1ac601986b..93beaadd033 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java @@ -20,9 +20,11 @@ import java.time.Duration; import java.time.Instant; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + import io.temporal.activity.ActivityOptions; import io.temporal.workflow.Workflow; -import lombok.RequiredArgsConstructor; import org.apache.gobblin.metrics.event.EventSubmitter; import org.apache.gobblin.metrics.event.GobblinEventBuilder; @@ -42,7 +44,7 @@ public class TemporalEventTimer implements EventTimer { private final SubmitGTEActivity trackingEventActivity; private final GobblinEventBuilder eventBuilder; private final EventSubmitterContext eventSubmitterContext; - private final Instant startTime; + @Getter private final Instant startTime; // Alias to stop() public void submit() { @@ -69,7 +71,11 @@ private void stop(Instant endTime) { trackingEventActivity.submitGTE(this.eventBuilder, eventSubmitterContext); } - private static Instant getCurrentTime() { + /** + * {@link Workflow}-safe (i.e. deterministic) way for equivalent of {@link System#currentTimeMillis()} + * WARNING: DO NOT use from an {@link io.temporal.activity.Activity} + */ + public static Instant getCurrentTime() { return Instant.ofEpochMilli(Workflow.currentTimeMillis()); } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java index 3022d5c9ecc..ca6aa720641 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java @@ -17,6 +17,7 @@ package org.apache.gobblin.temporal.yarn; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.List; import java.util.concurrent.TimeUnit; @@ -50,14 +51,14 @@ @Slf4j public abstract class AbstractDynamicScalingYarnServiceManager extends AbstractIdleService { - protected final static String DYNAMIC_SCALING_POLLING_INTERVAL = GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "polling.interval"; - private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60; protected final Config config; + protected final String applicationId; private final DynamicScalingYarnService dynamicScalingYarnService; private final ScheduledExecutorService dynamicScalingExecutor; public AbstractDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster appMaster) { this.config = appMaster.getConfig(); + this.applicationId = appMaster.getApplicationId(); if (appMaster.get_yarnService() instanceof DynamicScalingYarnService) { this.dynamicScalingYarnService = (DynamicScalingYarnService) appMaster.get_yarnService(); } else { @@ -72,9 +73,9 @@ public AbstractDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster } @Override - protected void startUp() { - int scheduleInterval = ConfigUtils.getInt(this.config, DYNAMIC_SCALING_POLLING_INTERVAL, - DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS); + protected void startUp() throws IOException { + int scheduleInterval = ConfigUtils.getInt(this.config, GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_POLLING_INTERVAL_SECS, + GobblinTemporalConfigurationKeys.DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS); log.info("Starting the {} with re-scaling interval of {} seconds", this.getClass().getSimpleName(), scheduleInterval); this.dynamicScalingExecutor.scheduleAtFixedRate( @@ -84,7 +85,7 @@ protected void startUp() { } @Override - protected void shutDown() { + protected void shutDown() throws IOException { log.info("Stopping the " + this.getClass().getSimpleName()); ExecutorsUtils.shutdownExecutorService(this.dynamicScalingExecutor, Optional.of(log)); } @@ -92,7 +93,7 @@ protected void shutDown() { /** * Create a {@link ScalingDirectiveSource} to use for getting scaling directives. */ - protected abstract ScalingDirectiveSource createScalingDirectiveSource(); + protected abstract ScalingDirectiveSource createScalingDirectiveSource() throws IOException; /** * A {@link Runnable} that gets the scaling directives from the {@link ScalingDirectiveSource} and passes them to the @@ -110,6 +111,8 @@ public void run() { if (CollectionUtils.isNotEmpty(scalingDirectives)) { dynamicScalingYarnService.reviseWorkforcePlanAndRequestNewContainers(scalingDirectives); } + } catch (FileNotFoundException fnfe) { + log.warn("Failed to get scaling directives - " + fnfe.getMessage()); // important message, but no need for a stack trace } catch (IOException e) { log.error("Failed to get scaling directives", e); } catch (Throwable t) { diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java index f6b65bbd2db..6f95f8cd854 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java @@ -17,34 +17,58 @@ package org.apache.gobblin.temporal.yarn; +import java.io.IOException; import java.util.Optional; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; -import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.cluster.GobblinClusterUtils; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.temporal.ddm.util.JobStateUtils; import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource; import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; + /** * {@link FsScalingDirectiveSource} based implementation of {@link AbstractDynamicScalingYarnServiceManager}. */ +@Slf4j public class FsSourceDynamicScalingYarnServiceManager extends AbstractDynamicScalingYarnServiceManager { - // TODO: replace fetching of these configs using a new method similar to JobStateUtils::getWorkDirRoot - public final static String DYNAMIC_SCALING_DIRECTIVES_DIR = GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "directives.dir"; - public final static String DYNAMIC_SCALING_ERRORS_DIR = GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "errors.dir"; - private final FileSystem fs; + + private FileSystem fs; public FsSourceDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster appMaster) { super(appMaster); - this.fs = appMaster.getFs(); } @Override - protected ScalingDirectiveSource createScalingDirectiveSource() { + protected void startUp() throws IOException { + JobState jobState = new JobState(ConfigUtils.configToProperties(this.config)); + // since `super.startUp()` will invoke `createScalingDirectiveSource()`, which needs `this.fs`, create it beforehand + this.fs = JobStateUtils.openFileSystem(jobState); + super.startUp(); + } + + @Override + protected void shutDown() throws IOException { + super.shutDown(); + this.fs.close(); + } + + @Override + protected ScalingDirectiveSource createScalingDirectiveSource() throws IOException { + String appName = config.getString(GobblinYarnConfigurationKeys.APPLICATION_NAME_KEY); + Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, fs, appName, this.applicationId); + log.info("Using GobblinCluster work dir: {}", appWorkDir); return new FsScalingDirectiveSource( - this.fs, - this.config.getString(DYNAMIC_SCALING_DIRECTIVES_DIR), - Optional.ofNullable(this.config.getString(DYNAMIC_SCALING_ERRORS_DIR)) + fs, + JobStateUtils.getDynamicScalingPath(appWorkDir), + Optional.of(JobStateUtils.getDynamicScalingErrorsPath(appWorkDir)) ); } } diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImplTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImplTest.java new file mode 100644 index 00000000000..f7470f1928b --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImplTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.activity.impl; + +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.temporal.ddm.work.TimeBudget; +import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary; + + +/** Test for {@link RecommendScalingForWorkUnitsLinearHeuristicImpl} */ +public class RecommendScalingForWorkUnitsLinearHeuristicImplTest { + + private RecommendScalingForWorkUnitsLinearHeuristicImpl scalingHeuristic; + @Mock private JobState jobState; + @Mock private WorkUnitsSizeSummary workUnitsSizeSummary; + @Mock private TimeBudget timeBudget; + + @BeforeMethod + public void setUp() { + scalingHeuristic = new RecommendScalingForWorkUnitsLinearHeuristicImpl(); + MockitoAnnotations.openMocks(this); + } + + @Test + public void testCalcDerivationSetPoint() { + Mockito.when(jobState.getPropAsInt(Mockito.eq(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_WORKERS_PER_CONTAINER), Mockito.anyInt())) + .thenReturn(4); // 4 workers per container + Mockito.when(jobState.getPropAsLong(Mockito.eq(RecommendScalingForWorkUnitsLinearHeuristicImpl.AMORTIZED_NUM_BYTES_PER_MINUTE), Mockito.anyLong())) + .thenReturn(100L * 1000 * 1000); // 100MB/minute + long targetTimeBudgetMinutes = 75L; + Mockito.when(timeBudget.getMaxTargetDurationMinutes()).thenReturn(targetTimeBudgetMinutes); + + long totalNumMWUs = 3000L; + Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsCount()).thenReturn(totalNumMWUs); + Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsMeanSize()).thenReturn(500e6); // 500MB + // parallelization capacity = 20 container-slots (= 4 * 5) + // per-container-slot rate = 5 container-slot-mins/mean(MWU) (= 500 MB/mean(MWU) / 100MB/min) + long numMWUsPerMinutePerContainer = 4; // (amortized) per-container rate = 4 MWU/container-minute (= 20 / 5) + long totalNumContainerMinutesAllMWUs = totalNumMWUs / numMWUsPerMinutePerContainer; // 750 container-minutes (= 3000 MWU / 4 MWU/container-min) + long expectedSetPoint = totalNumContainerMinutesAllMWUs / targetTimeBudgetMinutes; // 10 containers (= 750 / 75) + + int resultA = scalingHeuristic.calcDerivationSetPoint(workUnitsSizeSummary, "sourceClass", timeBudget, jobState); + Assert.assertEquals(resultA, expectedSetPoint); + + // verify: 3x MWUs ==> 3x the recommended set point + Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsCount()).thenReturn(totalNumMWUs * 3); + int tripledResult = scalingHeuristic.calcDerivationSetPoint(workUnitsSizeSummary, "sourceClass", timeBudget, jobState); + Assert.assertEquals(tripledResult, resultA * 3); + + // reduce the target duration by a third, and verify: 3/2 the recommended set point + Mockito.when(timeBudget.getMaxTargetDurationMinutes()).thenReturn(2 * (targetTimeBudgetMinutes / 3)); + int reducedTimeBudgetResult = scalingHeuristic.calcDerivationSetPoint(workUnitsSizeSummary, "sourceClass", timeBudget, jobState); + Assert.assertEquals(reducedTimeBudgetResult, (long) Math.round(expectedSetPoint * 3 * (3.0 / 2.0))); + } +} diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectivesRecipientTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectivesRecipientTest.java new file mode 100644 index 00000000000..8f84d6f0c04 --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectivesRecipientTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynamic; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +/** Test {@link FsScalingDirectivesRecipient} */ +public class FsScalingDirectivesRecipientTest { + + @Mock + private FileSystem fileSystem; + private FsScalingDirectivesRecipient recipient; + private ScalingDirectiveParser sdParser = new ScalingDirectiveParser(); + private static final String DIRECTIVES_DIR = "/test/dynamic/directives"; + + @BeforeMethod + public void setUp() throws IOException { + MockitoAnnotations.openMocks(this); + recipient = new FsScalingDirectivesRecipient(fileSystem, DIRECTIVES_DIR); + } + + @Test + public void testReceiveWithShortDirectives() throws IOException { + List directivesStrs = Arrays.asList( + "1700010000.=4", + "1700020000.new_profile=7", + "1700030000.new_profile=7,+(a.b.c=7, x.y=five)" + ); + + FSDataOutputStream mockOutputStream = Mockito.mock(FSDataOutputStream.class); + Mockito.when(fileSystem.create(Mockito.any(Path.class), Mockito.eq(false))).thenReturn(mockOutputStream); + + recipient.receive(directivesStrs.stream().map(str -> { + try { + return sdParser.parse(str); + } catch (ScalingDirectiveParser.InvalidSyntaxException e) { + throw new RuntimeException(e); + } + }).collect(Collectors.toList())); + + Mockito.verify(fileSystem).mkdirs(Mockito.eq(new Path(DIRECTIVES_DIR))); + ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(Path.class); + Mockito.verify(fileSystem, Mockito.times(directivesStrs.size())).create(pathCaptor.capture(), Mockito.eq(false)); + List capturedPaths = pathCaptor.getAllValues(); + + Assert.assertEquals(capturedPaths.get(0), new Path(DIRECTIVES_DIR, directivesStrs.get(0))); + Assert.assertEquals(capturedPaths.get(1), new Path(DIRECTIVES_DIR, directivesStrs.get(1))); + Assert.assertEquals(capturedPaths.get(2), new Path(DIRECTIVES_DIR, directivesStrs.get(2))); + Mockito.verifyNoMoreInteractions(fileSystem); + } + + @Test + public void testReceiveWithLongDirective() throws IOException { + String profileName = "derivedProfile"; + int setPoint = 42; + long timestamp = 1234567890; + String basisProfileName = "origProfile"; + // NOTE: 42 chars to render the above + 1 for the closing `)`... 212 remaining + + String alphabet = IntStream.rangeClosed('a', 'z').collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append).toString(); + List removeKeys = Arrays.asList( + alphabet, // len = 26 + alphabet.toUpperCase(), + alphabet.substring(1), // len = 25 + alphabet.substring(1).toUpperCase(), + alphabet.substring(2), // len = 24 + alphabet.substring(2).toUpperCase(), + alphabet.substring(3), // len = 23 + alphabet.substring(3).toUpperCase(), + alphabet.substring(4) // len = 22 + ); + ScalingDirective nearlyALongDirective = new ScalingDirective(profileName, setPoint, timestamp, basisProfileName, + new ProfileOverlay.Removing(removeKeys.subList(0, removeKeys.size() - 2))); + ScalingDirective aLongDirective = new ScalingDirective(profileName, setPoint, timestamp, basisProfileName, + new ProfileOverlay.Removing(removeKeys)); + + String nearlyALongDirectiveForm = ScalingDirectiveParser.asString(nearlyALongDirective); + ScalingDirectiveParser.StringWithPlaceholderPlusOverlay aLongDirectiveForm = ScalingDirectiveParser.asStringWithPlaceholderPlusOverlay(aLongDirective); + Assert.assertTrue(aLongDirectiveForm.getOverlayDefinitionString().length() > 0); + + FSDataOutputStream mockOutputStream = Mockito.mock(FSDataOutputStream.class); + Mockito.when(fileSystem.create(Mockito.any(Path.class), Mockito.eq(false))).thenReturn(mockOutputStream); + + recipient.receive(Arrays.asList(nearlyALongDirective, aLongDirective)); + + Mockito.verify(fileSystem).mkdirs(Mockito.eq(new Path(DIRECTIVES_DIR))); + ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(Path.class); + Mockito.verify(fileSystem, Mockito.times(2)).create(pathCaptor.capture(), Mockito.eq(false)); + List capturedPaths = pathCaptor.getAllValues(); + Assert.assertEquals(capturedPaths.get(0), new Path(DIRECTIVES_DIR, nearlyALongDirectiveForm)); + Assert.assertEquals(capturedPaths.get(1), new Path(DIRECTIVES_DIR, aLongDirectiveForm.getDirectiveStringWithPlaceholder())); + Mockito.verifyNoMoreInteractions(fileSystem); + + Mockito.verify(mockOutputStream).writeUTF(Mockito.eq(aLongDirectiveForm.getOverlayDefinitionString())); + Mockito.verify(mockOutputStream, Mockito.times(2)).close(); + Mockito.verifyNoMoreInteractions(mockOutputStream); + } +} diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileDerivationTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileDerivationTest.java index e953298c66f..4adc604b3dc 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileDerivationTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileDerivationTest.java @@ -54,7 +54,7 @@ public void testFormulateConfigWithSuccessfulBasisResolution() throws ProfileDer public void testFormulateConfigUnknownBasis() { String basisProfileName = "foo"; try { - ProfileDerivation derivation = new ProfileDerivation(basisProfileName, null); + ProfileDerivation derivation = new ProfileDerivation(basisProfileName, ProfileOverlay.unchanged()); derivation.formulateConfig(ignore -> Optional.empty()); Assert.fail("Expected instead: UnknownBasisException"); } catch (ProfileDerivation.UnknownBasisException ube) { @@ -65,14 +65,14 @@ public void testFormulateConfigUnknownBasis() { @Test public void testRenderNameNonBaseline() { String name = "testProfile"; - ProfileDerivation profileDerivation = new ProfileDerivation(name, null); + ProfileDerivation profileDerivation = new ProfileDerivation(name, ProfileOverlay.unchanged()); String renderedName = profileDerivation.renderName(); Assert.assertEquals(renderedName, name); } @Test public void testRenderNameBaseline() { - ProfileDerivation profileDerivation = new ProfileDerivation(WorkforceProfiles.BASELINE_NAME, null); + ProfileDerivation profileDerivation = new ProfileDerivation(WorkforceProfiles.BASELINE_NAME, ProfileOverlay.unchanged()); String renderedName = profileDerivation.renderName(); Assert.assertEquals(renderedName, WorkforceProfiles.BASELINE_NAME_RENDERING); } diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParserTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParserTest.java index 890b3a01308..558d3043688 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParserTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParserTest.java @@ -245,6 +245,34 @@ public void parseDirectivesWithPlaceholderThrowsOverlayPlaceholderNeedsDefinitio Assert.assertEquals(ScalingDirectiveParser.asString(parser.parse(directive)), directive); } + @DataProvider(name = "stringifiedForAsStringWithPlaceholderPlusOverlay") + public Object[][] directivesForAsStringWithPlaceholderPlusOverlay() { + return new Object[][]{ + { "1728435970.my_profile=24", "1728435970.my_profile=24", "" }, + { "1728439210.new_profile=16,bar+(a.b.c=7, l.m=sixteen)", "1728439210.new_profile=16,bar+(...)", "a.b.c=7, l.m=sixteen" }, + { "1728436436.other_profile=9,my_profile-(x, y.z)", "1728436436.other_profile=9,my_profile-(...)", "x, y.z" } + }; + } + + @Test( + expectedExceptions = {}, + dataProvider = "stringifiedForAsStringWithPlaceholderPlusOverlay" + ) + public void testAsStringWithPlaceholderPlusSeparateOverlay(String directive, String expectedString, String expectedOverlay) + throws ScalingDirectiveParser.InvalidSyntaxException { + ScalingDirective sd = parser.parse(directive); + ScalingDirectiveParser.StringWithPlaceholderPlusOverlay result = ScalingDirectiveParser.asStringWithPlaceholderPlusOverlay(sd); + Assert.assertEquals(result.getDirectiveStringWithPlaceholder(), expectedString); + Assert.assertEquals(result.getOverlayDefinitionString(), expectedOverlay); + + // verify round-trip back to the original: + try { + parser.parse(result.getDirectiveStringWithPlaceholder()); + } catch (ScalingDirectiveParser.OverlayPlaceholderNeedsDefinition needsDefinition) { + Assert.assertEquals(ScalingDirectiveParser.asString(needsDefinition.retryParsingWithDefinition(expectedOverlay)), directive); + } + } + @DataProvider(name = "overlayPlaceholderDirectivesWithCompletionDefAndEquivalent") public String[][] overlayPlaceholderDirectivesWithCompletionDefAndEquivalent() { return new String[][]{ diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforcePlanTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforcePlanTest.java index fc99bd9f94f..8e7bf875d1d 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforcePlanTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforcePlanTest.java @@ -17,6 +17,7 @@ package org.apache.gobblin.temporal.dynamic; +import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; @@ -67,27 +68,27 @@ public void reviseWithValidDerivation() throws WorkforcePlan.IllegalRevisionExce } @Test - public void reviseWhenNewerRejectsOutOfOrderDirectivesAndContinues() { + public void reviseWhenNewerSilentlySkipsOutOfOrderDirectivesAndContinues() { AtomicInteger numErrors = new AtomicInteger(0); Assert.assertEquals(plan.getLastRevisionEpochMillis(), WorkforceStaffing.INITIALIZATION_PROVENANCE_EPOCH_MILLIS); Assert.assertEquals(plan.getNumProfiles(), 1); plan.reviseWhenNewer(Lists.newArrayList( new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 2, 100L), new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 3, 500L), - // (1) error: `OutdatedDirective` + // NOT an error (e.g. `OutdatedDirective`), since this is skipped due to the out-of-date timestamp new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 4, 250L), - // (2) error: `OutdatedDirective` + // NOT an error (e.g. `OutdatedDirective`), since this is skipped due to the out-of-date timestamp createNewProfileDirective("new_profile", 5, 450L, WorkforceProfiles.BASELINE_NAME), // NOTE: the second attempt at derivation is NOT judged a duplicate, as the outdated timestamp of first attempt (above) meant it was ignored! createNewProfileDirective("new_profile", 6, 600L, WorkforceProfiles.BASELINE_NAME), new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 7, 800L), - // (3) error: `OutdatedDirective` + // NOT an error (e.g. `OutdatedDirective`), since this is skipped due to the out-of-date timestamp new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 8, 750L) ), failure -> numErrors.incrementAndGet()); Assert.assertEquals(plan.getLastRevisionEpochMillis(), 800L); Assert.assertEquals(plan.getNumProfiles(), 2); - Assert.assertEquals(numErrors.get(), 3); + Assert.assertEquals(numErrors.get(), 0); Assert.assertEquals(plan.peepStaffing(WorkforceProfiles.BASELINE_NAME), Optional.of(7), WorkforceProfiles.BASELINE_NAME_RENDERING); Assert.assertEquals(plan.peepStaffing("new_profile"), Optional.of(6), "new_profile"); } @@ -95,7 +96,7 @@ public void reviseWhenNewerRejectsOutOfOrderDirectivesAndContinues() { @Test public void reviseWhenNewerRejectsErrorsAndContinues() { AtomicInteger numErrors = new AtomicInteger(0); - plan.reviseWhenNewer(Lists.newArrayList( + List scalingDirectives = Lists.newArrayList( new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 1, 100L), // (1) error: `UnrecognizedProfile` new ScalingDirective("UNKNOWN_PROFILE", 2, 250L), @@ -106,17 +107,24 @@ public void reviseWhenNewerRejectsErrorsAndContinues() { // (3) error: `UnknownBasis` createNewProfileDirective("other_profile", 6, 550L, "NEVER_DEFINED"), new ScalingDirective("new_profile", 7, 400L), - // (4) error: `OutdatedDirective` + // NOT an error (e.g. `OutdatedDirective`), since this is skipped due to the out-of-date timestamp new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 8, 350L), - createNewProfileDirective("another", 9, 500L, "new_profile") - ), failure -> numErrors.incrementAndGet()); + createNewProfileDirective("another", 9, 600L, "new_profile") + ); + plan.reviseWhenNewer(scalingDirectives, failure -> numErrors.incrementAndGet()); - Assert.assertEquals(plan.getLastRevisionEpochMillis(), 500L); + Assert.assertEquals(plan.getLastRevisionEpochMillis(), 600L); Assert.assertEquals(plan.getNumProfiles(), 3); - Assert.assertEquals(numErrors.get(), 4); + Assert.assertEquals(numErrors.get(), 3); Assert.assertEquals(plan.peepStaffing(WorkforceProfiles.BASELINE_NAME), Optional.of(5), WorkforceProfiles.BASELINE_NAME_RENDERING); Assert.assertEquals(plan.peepStaffing("new_profile"), Optional.of(7), "new_profile"); Assert.assertEquals(plan.peepStaffing("another"), Optional.of(9), "another"); + + // verify idempotence - same directives a second time have no effect and cause no new errors (except those raised previously that had later timestamp) + plan.reviseWhenNewer(scalingDirectives, failure -> numErrors.incrementAndGet()); + Assert.assertEquals(plan.getLastRevisionEpochMillis(), 600L); + Assert.assertEquals(plan.getNumProfiles(), 3); + Assert.assertEquals(numErrors.get(), 3); } @Test diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java index dd4243d3fa5..c43a27fa768 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java @@ -32,12 +32,11 @@ import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; import org.apache.gobblin.temporal.dynamic.ScalingDirective; import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; import org.apache.gobblin.temporal.dynamic.DummyScalingDirectiveSource; -import static org.apache.gobblin.temporal.yarn.AbstractDynamicScalingYarnServiceManager.DYNAMIC_SCALING_POLLING_INTERVAL; - /** Tests for {@link AbstractDynamicScalingYarnServiceManager}*/ public class DynamicScalingYarnServiceManagerTest { @@ -51,7 +50,7 @@ public void setup() { // Using 1 second as polling interval so that the test runs faster and // GetScalingDirectivesRunnable.run() will be called equal to amount of sleep introduced between startUp // and shutDown in seconds - Config config = ConfigFactory.empty().withValue(DYNAMIC_SCALING_POLLING_INTERVAL, ConfigValueFactory.fromAnyRef(1)); + Config config = ConfigFactory.empty().withValue(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_POLLING_INTERVAL_SECS, ConfigValueFactory.fromAnyRef(1)); Mockito.when(mockGobblinTemporalApplicationMaster.getConfig()).thenReturn(config); Mockito.when(mockGobblinTemporalApplicationMaster.get_yarnService()).thenReturn(mockDynamicScalingYarnService); } @@ -69,7 +68,7 @@ public void testWhenScalingDirectivesIsNulOrEmpty() throws IOException, Interrup /** Note : this test uses {@link DummyScalingDirectiveSource}*/ @Test - public void testWithDummyScalingDirectiveSource() throws InterruptedException { + public void testWithDummyScalingDirectiveSource() throws IOException, InterruptedException { // DummyScalingDirectiveSource returns 2 scaling directives in first 3 invocations and after that it returns empty list // so the total number of invocations after three invocations should always be 3 TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager = new TestDynamicScalingYarnServiceManager( diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/WorkUnitSizeInfo.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/WorkUnitSizeInfo.java index 107590407a5..556b9277c1e 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/WorkUnitSizeInfo.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/WorkUnitSizeInfo.java @@ -24,10 +24,12 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; +import lombok.AccessLevel; import lombok.Data; import lombok.NoArgsConstructor; import lombok.NonNull; import lombok.RequiredArgsConstructor; +import lombok.Setter; import com.fasterxml.jackson.annotation.JsonIgnore; import com.tdunning.math.stats.TDigest; @@ -49,6 +51,7 @@ * amount of data, known up front. In such cases, the {@link #numConstituents} (aka. parallelism potential) may be most informative. */ @Data +@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization @NoArgsConstructor // IMPORTANT: for jackson (de)serialization @RequiredArgsConstructor public class WorkUnitSizeInfo {