diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 690dc113bf1..578696f6222 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -147,9 +147,6 @@ public abstract class AbstractJobLauncher implements JobLauncher {
   // This contains all job context information
   protected final JobContext jobContext;
 
-  // Helper to prepare WorkUnit with necessary information. This final object can make sure the uniqueness of task IDs
-  protected final WorkUnitPreparator workUnitPreparator;
-
   // This (optional) JobLock is used to prevent the next scheduled run
   // of the job from starting if the current run has not finished yet
   protected Optional<JobLock> jobLockOptional = Optional.absent();
@@ -230,7 +227,6 @@ public AbstractJobLauncher(Properties jobProps, List<? extends Tag<?>> metadataT
 
       this.jobContext = new JobContext(this.jobProps, LOG, instanceBroker, troubleshooter.getIssueRepository());
       this.eventBus.register(this.jobContext);
-      this.workUnitPreparator = new WorkUnitPreparator(this.jobContext.getJobId());
 
       this.cancellationExecutor = Executors.newSingleThreadExecutor(
           ExecutorsUtils.newDaemonThreadFactory(Optional.of(LOG), Optional.of("CancellationExecutor")));
@@ -553,21 +549,13 @@ public void apply(JobListener jobListener, JobContext jobContext)
           TimingEvent workUnitsPreparationTimer =
               this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.WORK_UNITS_PREPARATION);
           workUnitStream = processWorkUnitStream(workUnitStream, jobState);
+
           // If it is a streaming source, workunits cannot be counted
           this.jobContext.getJobState().setProp(NUM_WORKUNITS,
               workUnitStream.isSafeToMaterialize() ? workUnitStream.getMaterializedWorkUnitCollection().size() : 0);
           this.gobblinJobMetricsReporter.reportWorkUnitCountMetrics(this.jobContext.getJobState().getPropAsInt(NUM_WORKUNITS), jobState);
-          // dump the work unit if tracking logs are enabled
-          if (jobState.getPropAsBoolean(ConfigurationKeys.WORK_UNIT_ENABLE_TRACKING_LOGS)) {
-            workUnitStream = workUnitStream.transform(new Function<WorkUnit, WorkUnit>() {
-              @Nullable
-              @Override
-              public WorkUnit apply(@Nullable WorkUnit input) {
-                LOG.info("Work unit tracking log: {}", input);
-                return input;
-              }
-            });
-          }
+          // dump the work unit if tracking logs are enabled (*AFTER* any materialization done for counting)
+          workUnitStream = addWorkUnitTrackingPerConfig(workUnitStream, jobState, LOG);
 
           workUnitsPreparationTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,
               EventName.WORK_UNITS_PREPARATION));
@@ -710,13 +698,13 @@ private void executeUnfinishedCommitSequences(String jobName)
     }
   }
 
-  protected WorkUnitStream executeHandlers (WorkUnitStream workUnitStream, DestinationDatasetHandlerService datasetHandlerService){
+  protected WorkUnitStream executeHandlers(WorkUnitStream workUnitStream, DestinationDatasetHandlerService datasetHandlerService) {
     return datasetHandlerService.executeHandlers(workUnitStream);
   }
 
   protected WorkUnitStream processWorkUnitStream(WorkUnitStream workUnitStream, JobState jobState) {
     // Add task ids
-    workUnitStream = prepareWorkUnits(workUnitStream);
+    workUnitStream = prepareWorkUnits(workUnitStream, jobState);
     // Remove skipped workUnits from the list of work units to execute.
     workUnitStream = workUnitStream.filter(new SkippedWorkUnitsFilter(jobState));
     // Add surviving tasks to jobState
@@ -836,7 +824,7 @@ protected void runWorkUnitStream(WorkUnitStream workUnitStream) throws Exception
   /**
    * Materialize a {@link WorkUnitStream} into an in-memory list. Note that infinite work unit streams cannot be materialized.
    */
-  protected List<WorkUnit> materializeWorkUnitList(WorkUnitStream workUnitStream) {
+  public static List<WorkUnit> materializeWorkUnitList(WorkUnitStream workUnitStream) {
     if (!workUnitStream.isFiniteStream()) {
       throw new UnsupportedOperationException("Cannot materialize an infinite work unit stream.");
     }
@@ -904,11 +892,38 @@ public void run() {
     });
   }
 
+  /** @return transformed `workUnits` after assigning task IDs, removing skipped ones, and registering those remaining with `jobState` */
+  public static WorkUnitStream prepareWorkUnits(WorkUnitStream workUnits, JobState jobState) {
+    return assignIdsToWorkUnits(workUnits, jobState) // assign task ids
+        .filter(new SkippedWorkUnitsFilter(jobState)) // remove skipped workUnits
+        .transform(new MultiWorkUnitForEach() { // add remaining to jobState
+          @Override
+          public void forWorkUnit(WorkUnit workUnit) {
+            jobState.incrementTaskCount();
+            jobState.addTaskState(new TaskState(new WorkUnitState(workUnit, jobState)));
+          }
+        });
+  }
+
+  /** @return `workUnitStream` transformed to add "tracking" (sic.: actually *trace* logging), if indicated by `jobState` config */
+  public static WorkUnitStream addWorkUnitTrackingPerConfig(WorkUnitStream workUnitStream, JobState jobState, Logger log) {
+    return !jobState.getPropAsBoolean(ConfigurationKeys.WORK_UNIT_ENABLE_TRACKING_LOGS)
+        ? workUnitStream // no-op, when not enabled
+        : workUnitStream.transform(new Function<WorkUnit, WorkUnit>() {
+          @Nullable
+          @Override
+          public WorkUnit apply(@Nullable WorkUnit input) {
+            log.info("Work unit tracking log: {}", input);
+            return input;
+          }
+        });
+  }
+
   /**
-   * Prepare the flattened {@link WorkUnit}s for execution by populating the job and task IDs.
+   * Prepare the flattened {@link WorkUnit}s for execution by populating the job and unique task IDs.
    */
-  private WorkUnitStream prepareWorkUnits(WorkUnitStream workUnits) {
-    return workUnits.transform(workUnitPreparator);
+  private static WorkUnitStream assignIdsToWorkUnits(WorkUnitStream workUnits, JobState jobState) {
+    return workUnits.transform(new WorkUnitPreparator(jobState.getJobId()));
   }
 
   private static abstract class MultiWorkUnitForEach implements Function<WorkUnit, WorkUnit> {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
index 5f8783fcc60..c58641e8936 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
@@ -163,6 +163,13 @@ public JobState(String jobName, String jobId) {
     this.setId(jobId);
   }
 
+  public JobState(Properties properties) {
+    this(
+        JobState.getStateFromProps(properties),
+        JobState.getJobNameFromProps(properties),
+        JobState.getJobIdFromProps(properties));
+  }
+
   public JobState(State properties, String jobName, String jobId) {
     super(properties);
     this.jobName = jobName;
@@ -191,6 +198,17 @@ public static String getJobIdFromProps(Properties props) {
         : JobLauncherUtils.newJobId(JobState.getJobNameFromProps(props));
   }
 
+  public static State getStateFromProps(Properties props) {
+    return JobState.getStateFromProps(props, JobState.getJobIdFromProps(props));
+  }
+
+  public static State getStateFromProps(Properties props, String jobIdPropValue) {
+    State state = new State();
+    state.addAll(props);
+    state.setProp(ConfigurationKeys.JOB_ID_KEY, jobIdPropValue); // in case not yet directly defined as such
+    return state;
+  }
+
   public static String getJobGroupFromState(State state) {
     return state.getProp(ConfigurationKeys.JOB_GROUP_KEY);
   }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index 1893cebd71a..f1c1fa319aa 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -255,8 +255,7 @@ public MRJobLauncher(Properties jobProps, Configuration conf, SharedResourcesBro
     // adding dependent jars/files to the DistributedCache that also updates the conf)
     this.job = Job.getInstance(this.conf, JOB_NAME_PREFIX + this.jobContext.getJobName());
 
-    this.parallelRunnerThreads = Integer.parseInt(jobProps.getProperty(ParallelRunner.PARALLEL_RUNNER_THREADS_KEY,
-        Integer.toString(ParallelRunner.DEFAULT_PARALLEL_RUNNER_THREADS)));
+    this.parallelRunnerThreads = ParallelRunner.getNumThreadsConfig(jobProps);
 
     // StateStore interface uses the following key (rootDir, storeName, tableName)
     // The state store base is the root directory and the last two elements of the path are used as the storeName and
@@ -682,23 +681,12 @@ private void prepareJobInput(List<WorkUnit> workUnits) throws IOException {
     try {
       ParallelRunner parallelRunner = closer.register(new ParallelRunner(this.parallelRunnerThreads, this.fs));
 
-      int multiTaskIdSequence = 0;
+      JobLauncherUtils.WorkUnitPathCalculator pathCalculator = new JobLauncherUtils.WorkUnitPathCalculator();
       // Serialize each work unit into a file named after the task ID
       for (WorkUnit workUnit : workUnits) {
-
-        String workUnitFileName;
-        if (workUnit.isMultiWorkUnit()) {
-          workUnitFileName = JobLauncherUtils.newMultiTaskId(this.jobContext.getJobId(), multiTaskIdSequence++)
-              + JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION;
-        } else {
-          workUnitFileName = workUnit.getProp(ConfigurationKeys.TASK_ID_KEY) + JobLauncherUtils.WORK_UNIT_FILE_EXTENSION;
-        }
-        Path workUnitFile = new Path(this.jobInputPath, workUnitFileName);
-        LOG.debug("Writing work unit file " + workUnitFileName);
-
+        Path workUnitFile = pathCalculator.calcNextPath(workUnit, this.jobContext.getJobId(), this.jobInputPath);
+        LOG.debug("Writing work unit file {}", workUnitFile.getName());
         parallelRunner.serializeToFile(workUnit, workUnitFile);
-
-        // Append the work unit file path to the job input file
       }
     } catch (Throwable t) {
       throw closer.rethrow(t);
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
index 89bf4d4bdce..6439f41b20d 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
@@ -27,6 +27,8 @@
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 
+import javax.annotation.concurrent.NotThreadSafe;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -39,6 +41,7 @@
 import com.google.common.collect.Lists;
 import com.google.common.io.Closer;
 
+import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -61,6 +64,25 @@ public class JobLauncherUtils {
   // A cache for proxied FileSystems by owners
   private static Cache<String, FileSystem> fileSystemCacheByOwners = CacheBuilder.newBuilder().build();
 
+  /** Calculate monotonically-increasing paths for multi-WU files */
+  @AllArgsConstructor
+  @NotThreadSafe
+  public static class WorkUnitPathCalculator {
+    private int nextMultiWorkUnitTaskId;
+
+    public WorkUnitPathCalculator() {
+      this(0);
+    }
+
+    // Serialize each work unit into a file named after the task ID
+    public Path calcNextPath(WorkUnit workUnit, String jobId, Path basePath) {
+      String workUnitFileName = workUnit.isMultiWorkUnit()
+          ? JobLauncherUtils.newMultiTaskId(jobId, nextMultiWorkUnitTaskId++) + JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION
+          : workUnit.getProp(ConfigurationKeys.TASK_ID_KEY) + JobLauncherUtils.WORK_UNIT_FILE_EXTENSION;
+      return new Path(basePath, workUnitFileName);
+    }
+  }
+
   /**
    * Create a new job ID.
    *
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java
index ac319379123..941f6624633 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/ParallelRunner.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.util;
 
+import java.util.Properties;
 import lombok.Data;
 import lombok.Getter;
 import lombok.Setter;
@@ -86,6 +87,10 @@ public class ParallelRunner implements Closeable {
   public static final String PARALLEL_RUNNER_THREADS_KEY = "parallel.runner.threads";
   public static final int DEFAULT_PARALLEL_RUNNER_THREADS = 10;
 
+  public static int getNumThreadsConfig(Properties props) {
+    return Integer.parseInt(props.getProperty(PARALLEL_RUNNER_THREADS_KEY, Integer.toString(DEFAULT_PARALLEL_RUNNER_THREADS)));
+  }
+
   private final ExecutorService executor;
 
   /**