diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CompletionCallback.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CompletionCallback.java new file mode 100644 index 0000000000000..2792631560773 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CompletionCallback.java @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; + +/** + * A callback for completing a bundle of input. + */ +interface CompletionCallback { + /** + * Handle a successful result. + */ + void handleResult(CommittedBundle inputBundle, InProcessTransformResult result); + + /** + * Handle a result that terminated abnormally due to the provided {@link Throwable}. + */ + void handleThrowable(CommittedBundle inputBundle, Throwable t); +} diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java new file mode 100644 index 0000000000000..05cf55035856e --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java @@ -0,0 +1,361 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessExecutor; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.util.KeyedWorkItem; +import com.google.cloud.dataflow.sdk.util.KeyedWorkItems; +import com.google.cloud.dataflow.sdk.util.TimeDomain; +import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.base.MoreObjects; +import com.google.common.base.Optional; + +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; + +import javax.annotation.Nullable; + +/** + * An {@link InProcessExecutor} that uses an underlying {@link ExecutorService} and + * {@link InProcessEvaluationContext} to execute a {@link Pipeline}. + */ +final class ExecutorServiceParallelExecutor implements InProcessExecutor { + private static final Logger LOG = LoggerFactory.getLogger(ExecutorServiceParallelExecutor.class); + + private final ExecutorService executorService; + private final InProcessEvaluationContext evaluationContext; + private final Collection> incompleteRootNodes; + + private final ConcurrentMap currentEvaluations; + private final ConcurrentMap, Boolean> scheduledExecutors; + + private final Queue allUpdates; + private final BlockingQueue visibleUpdates; + + private final CompletionCallback defaultCompletionCallback; + + public static ExecutorServiceParallelExecutor create( + ExecutorService executorService, InProcessEvaluationContext context) { + return new ExecutorServiceParallelExecutor(executorService, context); + } + + private ExecutorServiceParallelExecutor( + ExecutorService executorService, InProcessEvaluationContext context) { + this.executorService = executorService; + this.evaluationContext = context; + this.incompleteRootNodes = new CopyOnWriteArrayList<>(); + + currentEvaluations = new ConcurrentHashMap<>(); + scheduledExecutors = new ConcurrentHashMap<>(); + + this.allUpdates = new ConcurrentLinkedQueue<>(); + this.visibleUpdates = new ArrayBlockingQueue<>(20); + + defaultCompletionCallback = new TimerlessCompletionCallback(); + } + + @Override + public void start(Collection> roots) { + incompleteRootNodes.addAll(roots); + Runnable monitorRunnable = new MonitorRunnable(); + executorService.submit(monitorRunnable); + } + + @SuppressWarnings("unchecked") + public void scheduleConsumption(AppliedPTransform consumer, CommittedBundle bundle, + CompletionCallback onComplete) { + evaluateBundle(consumer, bundle, onComplete); + } + + private void evaluateBundle(final AppliedPTransform transform, + @Nullable final CommittedBundle bundle, final CompletionCallback onComplete) { + final StepAndKey stepAndKey = StepAndKey.of(transform, bundle == null ? null : bundle.getKey()); + TransformExecutorService state = + getStepAndKeyExecutorService(stepAndKey, bundle == null ? true : !bundle.isKeyed()); + TransformExecutor callable = + TransformExecutor.create(evaluationContext, bundle, transform, onComplete, state); + state.schedule(callable); + } + + private void scheduleConsumers(CommittedBundle bundle) { + for (AppliedPTransform consumer : + evaluationContext.getConsumers(bundle.getPCollection())) { + scheduleConsumption(consumer, bundle, defaultCompletionCallback); + } + } + + private TransformExecutorService getStepAndKeyExecutorService( + StepAndKey stepAndKey, boolean parallelizable) { + if (!currentEvaluations.containsKey(stepAndKey)) { + TransformExecutorService evaluationState = + parallelizable + ? TransformExecutorServices.parallel(executorService, scheduledExecutors) + : TransformExecutorServices.serial(executorService, scheduledExecutors); + currentEvaluations.putIfAbsent(stepAndKey, evaluationState); + } + return currentEvaluations.get(stepAndKey); + } + + @Override + public void awaitCompletion() throws Throwable { + VisibleExecutorUpdate update; + do { + update = visibleUpdates.take(); + if (update.throwable.isPresent()) { + if (update.throwable.get() instanceof Exception) { + throw update.throwable.get(); + } else { + throw update.throwable.get(); + } + } + } while (!update.isDone()); + executorService.shutdown(); + } + + private class TimerlessCompletionCallback implements CompletionCallback { + @Override + public void handleResult(CommittedBundle inputBundle, InProcessTransformResult result) { + Iterable> resultBundles = + evaluationContext.handleResult(inputBundle, Collections.emptyList(), result); + for (CommittedBundle outputBundle : resultBundles) { + allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle)); + } + } + + @Override + public void handleThrowable(CommittedBundle inputBundle, Throwable t) { + allUpdates.offer(ExecutorUpdate.fromThrowable(t)); + } + } + + private class TimerCompletionCallback implements CompletionCallback { + private final Iterable timers; + + private TimerCompletionCallback(Iterable timers) { + this.timers = timers; + } + + @Override + public void handleResult(CommittedBundle inputBundle, InProcessTransformResult result) { + Iterable> resultBundles = + evaluationContext.handleResult(inputBundle, timers, result); + for (CommittedBundle outputBundle : resultBundles) { + allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle)); + } + } + + @Override + public void handleThrowable(CommittedBundle inputBundle, Throwable t) { + allUpdates.offer(ExecutorUpdate.fromThrowable(t)); + } + } + + /** + * An internal status update on the state of the executor. + * + * Used to signal when the executor should be shut down (due to an exception). + */ + private static class ExecutorUpdate { + private final Optional> bundle; + private final Optional throwable; + + public static ExecutorUpdate fromBundle(CommittedBundle bundle) { + return new ExecutorUpdate(bundle, null); + } + + public static ExecutorUpdate fromThrowable(Throwable t) { + return new ExecutorUpdate(null, t); + } + + private ExecutorUpdate(CommittedBundle producedBundle, Throwable throwable) { + this.bundle = Optional.fromNullable(producedBundle); + this.throwable = Optional.fromNullable(throwable); + } + + public Optional> getBundle() { + return bundle; + } + + public Optional getException() { + return throwable; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(ExecutorUpdate.class) + .add("bundle", bundle).add("exception", throwable) + .toString(); + } + } + + /** + * An update of interest to the user. Used in {@link #awaitCompletion} to decide weather to + * return normally or throw an exception. + */ + private static class VisibleExecutorUpdate { + private final Optional throwable; + private final boolean done; + + public static VisibleExecutorUpdate fromThrowable(Throwable e) { + return new VisibleExecutorUpdate(false, e); + } + + public static VisibleExecutorUpdate finished() { + return new VisibleExecutorUpdate(true, null); + } + + private VisibleExecutorUpdate(boolean done, @Nullable Throwable exception) { + this.throwable = Optional.fromNullable(exception); + this.done = done; + } + + public boolean isDone() { + return done; + } + } + + private class MonitorRunnable implements Runnable { + @Override + public void run() { + Thread.currentThread() + .setName( + String.format( + "%s$%s-monitor", + evaluationContext.getPipelineOptions().getAppName(), + ExecutorServiceParallelExecutor.class.getSimpleName())); + try { + while (true) { + ExecutorUpdate update = allUpdates.poll(); + if (update != null) { + LOG.debug("Executor Update: {}", update); + if (update.getBundle().isPresent()) { + scheduleConsumers(update.getBundle().get()); + } else if (update.getException().isPresent()) { + visibleUpdates.offer( + VisibleExecutorUpdate.fromThrowable(update.getException().get())); + } + } else { + Thread.sleep(50L); + } + mightNeedMoreWork(); + fireTimers(); + if (finishIfDone()) { + break; + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Monitor died due to being interrupted"); + while (!visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(e))) { + visibleUpdates.poll(); + } + } catch (Throwable t) { + LOG.error("Monitor thread died due to throwable", t); + while (!visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(t))) { + visibleUpdates.poll(); + } + } + } + + private void addMoreWork(Collection> activeRoots) { + for (AppliedPTransform activeRoot : activeRoots) { + scheduleConsumption(activeRoot, null, defaultCompletionCallback); + } + } + + public void fireTimers() throws Exception { + try { + for (Map.Entry, Map> transformTimers : + evaluationContext.extractFiredTimers().entrySet()) { + AppliedPTransform transform = transformTimers.getKey(); + for (Map.Entry keyTimers : transformTimers.getValue().entrySet()) { + for (TimeDomain domain : TimeDomain.values()) { + Collection delivery = keyTimers.getValue().getTimers(domain); + if (delivery.isEmpty()) { + continue; + } + KeyedWorkItem work = + KeyedWorkItems.timersWorkItem(keyTimers.getKey(), delivery); + @SuppressWarnings({"unchecked", "rawtypes"}) + CommittedBundle bundle = + InProcessBundle + .>keyed( + (PCollection) transform.getInput(), keyTimers.getKey()) + .add(WindowedValue.valueInEmptyWindows(work)) + .commit(Instant.now()); + scheduleConsumption(transform, bundle, new TimerCompletionCallback(delivery)); + } + } + } + } catch (Exception e) { + LOG.error("Internal Error while delivering timers", e); + throw e; + } + } + + private boolean finishIfDone() { + if (evaluationContext.isDone()) { + LOG.debug("Pipeline is finished. Shutting down. {}"); + while (!visibleUpdates.offer(VisibleExecutorUpdate.finished())) { + visibleUpdates.poll(); + } + executorService.shutdown(); + return true; + } + return false; + } + + private void mightNeedMoreWork() { + synchronized (scheduledExecutors) { + for (TransformExecutor executor : scheduledExecutors.keySet()) { + Thread thread = executor.getThread(); + if (thread != null) { + switch (thread.getState()) { + case BLOCKED: + case WAITING: + case TERMINATED: + case TIMED_WAITING: + break; + default: + return; + } + } + } + } + addMoreWork(incompleteRootNodes); + } + } +} + diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java index 60c8543a2f207..f06c9030d1786 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java @@ -15,13 +15,14 @@ */ package com.google.cloud.dataflow.sdk.runners.inprocess; +import com.google.cloud.dataflow.sdk.options.ApplicationNameOptions; import com.google.cloud.dataflow.sdk.options.Default; import com.google.cloud.dataflow.sdk.options.PipelineOptions; /** * Options that can be used to configure the {@link InProcessPipelineRunner}. */ -public interface InProcessPipelineOptions extends PipelineOptions { +public interface InProcessPipelineOptions extends PipelineOptions, ApplicationNameOptions { @Default.InstanceFactory(NanosOffsetClock.Factory.class) Clock getClock(); diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java index 7a268ee5fa62a..9d06d67a831c8 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java @@ -33,6 +33,7 @@ import org.joda.time.Instant; +import java.util.Collection; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -174,21 +175,21 @@ public InProcessPipelineOptions getPipelineOptions() { */ public static interface InProcessExecutor { /** - * @param root the root {@link AppliedPTransform} to schedule - */ - void scheduleRoot(AppliedPTransform root); - - /** - * @param consumer the {@link AppliedPTransform} to schedule - * @param bundle the input bundle to the consumer + * Starts this executor. The provided collection is the collection of root transforms to + * initially schedule. + * + * @param rootTransforms */ - void scheduleConsumption(AppliedPTransform consumer, CommittedBundle bundle); + void start(Collection> rootTransforms); /** * Blocks until the job being executed enters a terminal state. A job is completed after all * root {@link AppliedPTransform AppliedPTransforms} have completed, and all * {@link CommittedBundle Bundles} have been consumed. Jobs may also terminate abnormally. + * + * @throws Throwable whenever an executor thread throws anything, transfers the throwable to the + * waiting thread and rethrows it */ - void awaitCompletion(); + void awaitCompletion() throws Throwable; } } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutor.java new file mode 100644 index 0000000000000..f9d7965eaa440 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutor.java @@ -0,0 +1,96 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.common.base.Throwables; + +import java.util.concurrent.Callable; + +import javax.annotation.Nullable; + +class TransformExecutor implements Callable { + public static TransformExecutor create( + InProcessEvaluationContext evaluationContext, + CommittedBundle inputBundle, + AppliedPTransform transform, + CompletionCallback completionCallback, + TransformExecutorService transformEvaluationState) { + return new TransformExecutor<>( + evaluationContext, inputBundle, transform, completionCallback, transformEvaluationState); + } + + private final InProcessEvaluationContext evaluationContext; + + private final CommittedBundle inputBundle; + private final AppliedPTransform transform; + + private final CompletionCallback onComplete; + + private final TransformExecutorService transformEvaluationState; + + private Thread thread; + + private TransformExecutor( + InProcessEvaluationContext evaluationContext, + CommittedBundle inputBundle, + AppliedPTransform transform, + CompletionCallback completionCallback, + TransformExecutorService transformEvaluationState) { + this.evaluationContext = evaluationContext; + + this.inputBundle = inputBundle; + this.transform = transform; + + this.onComplete = completionCallback; + + this.transformEvaluationState = transformEvaluationState; + } + + @Override + public InProcessTransformResult call() { + this.thread = Thread.currentThread(); + try { + TransformEvaluator evaluator = + evaluationContext.getTransformEvaluator(transform, inputBundle); + if (inputBundle != null) { + for (WindowedValue value : inputBundle.getElements()) { + evaluator.processElement(value); + } + } + InProcessTransformResult result = evaluator.finishBundle(); + onComplete.handleResult(inputBundle, result); + return result; + } catch (Throwable t) { + onComplete.handleThrowable(inputBundle, t); + throw Throwables.propagate(t); + } finally { + this.thread = null; + transformEvaluationState.complete(this); + } + } + + /** + * If this {@link TransformExecutor} is currently executing, return the thread it is executing in. + * Otherwise, return null. + */ + @Nullable + public Thread getThread() { + return this.thread; + } +} diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorService.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorService.java new file mode 100644 index 0000000000000..3f00da6ebe517 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorService.java @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +/** + * Schedules and completes {@link TransformExecutor TransformExecutors}, controlling concurrency as + * appropriate for the {@link StepAndKey} the executor exists for. + */ +interface TransformExecutorService { + /** + * Schedule the provided work to be eventually executed. + */ + void schedule(TransformExecutor work); + + /** + * Finish executing the provided work. This may cause additional + * {@link TransformExecutor TransformExecutors} to be evaluated. + */ + void complete(TransformExecutor completed); +} + diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServices.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServices.java new file mode 100644 index 0000000000000..f2d55ff9f50e6 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServices.java @@ -0,0 +1,147 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.common.base.MoreObjects; + +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Static factory methods for constructing instances of {@link TransformExecutorService}. + */ +final class TransformExecutorServices { + private TransformExecutorServices() { + // Do not instantiate + } + + /** + * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors} in + * parallel. + */ + public static TransformExecutorService parallel( + ExecutorService executor, Map, Boolean> scheduled) { + return new ParallelEvaluationState(executor, scheduled); + } + + /** + * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors} in + * serial. + */ + public static TransformExecutorService serial( + ExecutorService executor, Map, Boolean> scheduled) { + return new SerialEvaluationState(executor, scheduled); + } + + /** + * The evaluation of a step where the input is unkeyed. Unkeyed inputs can be evaluated in + * parallel with an arbitrary amount of parallelism. + */ + private static class ParallelEvaluationState implements TransformExecutorService { + private final ExecutorService executor; + private final Map, Boolean> scheduled; + + private ParallelEvaluationState( + ExecutorService executor, Map, Boolean> scheduled) { + this.executor = executor; + this.scheduled = scheduled; + } + + @Override + public void schedule(TransformExecutor work) { + executor.submit(work); + scheduled.put(work, true); + } + + @Override + public void complete(TransformExecutor completed) { + scheduled.remove(completed); + } + } + + /** + * The evaluation of a (Step, Key) pair. (Step, Key) computations are processed serially; + * scheduling a computation will add it to the Work Queue if a computation is in progress, and + * completing a computation will schedule the next item in the work queue, if it exists. + */ + private static class SerialEvaluationState implements TransformExecutorService { + private final ExecutorService executor; + private final Map, Boolean> scheduled; + + private AtomicReference> currentlyEvaluating; + private final Queue> workQueue; + + private SerialEvaluationState( + ExecutorService executor, Map, Boolean> scheduled) { + this.scheduled = scheduled; + this.executor = executor; + this.currentlyEvaluating = new AtomicReference<>(); + this.workQueue = new ConcurrentLinkedQueue<>(); + } + + /** + * Schedules the work, adding it to the work queue if there is a bundle currently being + * evaluated and scheduling it immediately otherwise. + */ + @Override + public void schedule(TransformExecutor work) { + workQueue.offer(work); + updateCurrentlyEvaluating(); + } + + @Override + public void complete(TransformExecutor completed) { + if (!currentlyEvaluating.compareAndSet(completed, null)) { + throw new IllegalStateException( + "Finished work " + + completed + + " but could not complete due to unexpected currently executing " + + currentlyEvaluating.get()); + } + scheduled.remove(completed); + updateCurrentlyEvaluating(); + } + + private void updateCurrentlyEvaluating() { + if (currentlyEvaluating.get() == null) { + // Only synchronize if we need to update what's currently evaluating + synchronized (this) { + TransformExecutor newWork = workQueue.poll(); + if (newWork != null) { + if (currentlyEvaluating.compareAndSet(null, newWork)) { + scheduled.put(newWork, true); + executor.submit(newWork); + } else { + workQueue.offer(newWork); + } + } + } + } + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(SerialEvaluationState.class) + .add("currentlyEvaluating", currentlyEvaluating) + .add("workQueue", workQueue) + .toString(); + } + } +} + diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServicesTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServicesTest.java new file mode 100644 index 0000000000000..2c66dc2c325f8 --- /dev/null +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServicesTest.java @@ -0,0 +1,134 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import static org.hamcrest.Matchers.any; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import com.google.common.util.concurrent.MoreExecutors; + +import org.hamcrest.Matchers; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; + +/** + * Tests for {@link TransformExecutorServices}. + */ +@RunWith(JUnit4.class) +public class TransformExecutorServicesTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + private ExecutorService executorService; + private Map, Boolean> scheduled; + + @Before + public void setup() { + executorService = MoreExecutors.newDirectExecutorService(); + scheduled = new ConcurrentHashMap<>(); + } + + @Test + public void parallelScheduleMultipleSchedulesBothImmediately() { + @SuppressWarnings("unchecked") + TransformExecutor first = mock(TransformExecutor.class); + @SuppressWarnings("unchecked") + TransformExecutor second = mock(TransformExecutor.class); + + TransformExecutorService parallel = + TransformExecutorServices.parallel(executorService, scheduled); + parallel.schedule(first); + parallel.schedule(second); + + verify(first).call(); + verify(second).call(); + assertThat( + scheduled, + Matchers.allOf( + Matchers., Boolean>hasEntry(first, true), + Matchers., Boolean>hasEntry(second, true))); + + parallel.complete(first); + assertThat(scheduled, Matchers., Boolean>hasEntry(second, true)); + assertThat( + scheduled, + not( + Matchers., Boolean>hasEntry( + Matchers.>equalTo(first), any(Boolean.class)))); + parallel.complete(second); + assertThat(scheduled.isEmpty(), is(true)); + } + + @Test + public void serialScheduleTwoWaitsForFirstToComplete() { + @SuppressWarnings("unchecked") + TransformExecutor first = mock(TransformExecutor.class); + @SuppressWarnings("unchecked") + TransformExecutor second = mock(TransformExecutor.class); + + TransformExecutorService serial = TransformExecutorServices.serial(executorService, scheduled); + serial.schedule(first); + verify(first).call(); + + serial.schedule(second); + verify(second, never()).call(); + + assertThat(scheduled, Matchers., Boolean>hasEntry(first, true)); + assertThat( + scheduled, + not( + Matchers., Boolean>hasEntry( + Matchers.>equalTo(second), any(Boolean.class)))); + + serial.complete(first); + verify(second).call(); + assertThat(scheduled, Matchers., Boolean>hasEntry(second, true)); + assertThat( + scheduled, + not( + Matchers., Boolean>hasEntry( + Matchers.>equalTo(first), any(Boolean.class)))); + + serial.complete(second); + } + + @Test + public void serialCompleteNotExecutingTaskThrows() { + @SuppressWarnings("unchecked") + TransformExecutor first = mock(TransformExecutor.class); + @SuppressWarnings("unchecked") + TransformExecutor second = mock(TransformExecutor.class); + + TransformExecutorService serial = TransformExecutorServices.serial(executorService, scheduled); + serial.schedule(first); + thrown.expect(IllegalStateException.class); + thrown.expectMessage("unexpected currently executing"); + + serial.complete(second); + } +} diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java new file mode 100644 index 0000000000000..6358b275c1f7a --- /dev/null +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java @@ -0,0 +1,315 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.when; + +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.WithKeys; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.util.concurrent.MoreExecutors; + +import org.hamcrest.Matchers; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Tests for {@link TransformExecutor}. + */ +@RunWith(JUnit4.class) +public class TransformExecutorTest { + private PCollection created; + private PCollection> downstream; + + private CountDownLatch evaluatorCompleted; + + private RegisteringCompletionCallback completionCallback; + private TransformExecutorService transformEvaluationState; + @Mock private InProcessEvaluationContext evaluationContext; + private Map, Boolean> scheduled; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + + scheduled = new HashMap<>(); + transformEvaluationState = + TransformExecutorServices.parallel(MoreExecutors.newDirectExecutorService(), scheduled); + + evaluatorCompleted = new CountDownLatch(1); + completionCallback = new RegisteringCompletionCallback(evaluatorCompleted); + + TestPipeline p = TestPipeline.create(); + created = p.apply(Create.of("foo", "spam", "third")); + downstream = created.apply(WithKeys.of(3)); + } + + @Test + public void callWithNullInputBundleFinishesBundleAndCompletes() throws Exception { + final InProcessTransformResult result = + StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); + final AtomicBoolean finishCalled = new AtomicBoolean(false); + TransformEvaluator evaluator = + new TransformEvaluator() { + @Override + public void processElement(WindowedValue element) throws Exception { + throw new IllegalArgumentException("Shouldn't be called"); + } + + @Override + public InProcessTransformResult finishBundle() throws Exception { + finishCalled.set(true); + return result; + } + }; + + when(evaluationContext.getTransformEvaluator(created.getProducingTransformInternal(), null)) + .thenReturn(evaluator); + + TransformExecutor executor = + TransformExecutor.create( + evaluationContext, + null, + created.getProducingTransformInternal(), + completionCallback, + transformEvaluationState); + executor.call(); + + assertThat(finishCalled.get(), is(true)); + assertThat(completionCallback.handledResult, equalTo(result) + ); + assertThat(completionCallback.handledThrowable, is(nullValue())); + assertThat(scheduled, not(Matchers.>hasKey(executor))); + } + + @Test + public void inputBundleProcessesEachElementFinishesAndCompletes() throws Exception { + final InProcessTransformResult result = + StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build(); + final Collection> elementsProcessed = new ArrayList<>(); + TransformEvaluator evaluator = + new TransformEvaluator() { + @Override + public void processElement(WindowedValue element) throws Exception { + elementsProcessed.add(element); + return; + } + + @Override + public InProcessTransformResult finishBundle() throws Exception { + return result; + } + }; + + WindowedValue foo = WindowedValue.valueInGlobalWindow("foo"); + WindowedValue spam = WindowedValue.valueInGlobalWindow("spam"); + WindowedValue third = WindowedValue.valueInGlobalWindow("third"); + CommittedBundle inputBundle = + InProcessBundle.unkeyed(created) + .add(foo) + .add(spam) + .add(third) + .commit(Instant.now()); + when( + evaluationContext.getTransformEvaluator( + downstream.getProducingTransformInternal(), inputBundle)) + .thenReturn(evaluator); + + TransformExecutor executor = + TransformExecutor.create( + evaluationContext, + inputBundle, + downstream.getProducingTransformInternal(), + completionCallback, + transformEvaluationState); + + Executors.newSingleThreadExecutor().submit(executor); + + evaluatorCompleted.await(); + + assertThat(elementsProcessed, containsInAnyOrder(spam, third, foo)); + assertThat(completionCallback.handledResult, equalTo(result)); + assertThat(completionCallback.handledThrowable, is(nullValue())); + assertThat(scheduled, not(Matchers.>hasKey(executor))); + } + + @Test + public void processElementThrowsExceptionCallsback() throws Exception { + final InProcessTransformResult result = + StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build(); + final Exception exception = new Exception(); + TransformEvaluator evaluator = + new TransformEvaluator() { + @Override + public void processElement(WindowedValue element) throws Exception { + throw exception; + } + + @Override + public InProcessTransformResult finishBundle() throws Exception { + return result; + } + }; + + WindowedValue foo = WindowedValue.valueInGlobalWindow("foo"); + CommittedBundle inputBundle = + InProcessBundle.unkeyed(created) + .add(foo) + .commit(Instant.now()); + when( + evaluationContext.getTransformEvaluator( + downstream.getProducingTransformInternal(), inputBundle)) + .thenReturn(evaluator); + + TransformExecutor executor = + TransformExecutor.create( + evaluationContext, + inputBundle, + downstream.getProducingTransformInternal(), + completionCallback, + transformEvaluationState); + Executors.newSingleThreadExecutor().submit(executor); + + evaluatorCompleted.await(); + + assertThat(completionCallback.handledResult, is(nullValue())); + assertThat(completionCallback.handledThrowable, Matchers.equalTo(exception)); + assertThat(scheduled, not(Matchers.>hasKey(executor))); + } + + @Test + public void finishBundleThrowsExceptionCallsback() throws Exception { + final Exception exception = new Exception(); + TransformEvaluator evaluator = + new TransformEvaluator() { + @Override + public void processElement(WindowedValue element) throws Exception {} + + @Override + public InProcessTransformResult finishBundle() throws Exception { + throw exception; + } + }; + + CommittedBundle inputBundle = InProcessBundle.unkeyed(created).commit(Instant.now()); + when( + evaluationContext.getTransformEvaluator( + downstream.getProducingTransformInternal(), inputBundle)) + .thenReturn(evaluator); + + TransformExecutor executor = + TransformExecutor.create( + evaluationContext, + inputBundle, + downstream.getProducingTransformInternal(), + completionCallback, + transformEvaluationState); + Executors.newSingleThreadExecutor().submit(executor); + + evaluatorCompleted.await(); + + assertThat(completionCallback.handledResult, is(nullValue())); + assertThat(completionCallback.handledThrowable, Matchers.equalTo(exception)); + assertThat(scheduled, not(Matchers.>hasKey(executor))); + } + + @Test + public void duringCallGetThreadIsNonNull() throws Exception { + final InProcessTransformResult result = + StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build(); + final CountDownLatch testLatch = new CountDownLatch(1); + final CountDownLatch evaluatorLatch = new CountDownLatch(1); + TransformEvaluator evaluator = + new TransformEvaluator() { + @Override + public void processElement(WindowedValue element) throws Exception { + throw new IllegalArgumentException("Shouldn't be called"); + } + + @Override + public InProcessTransformResult finishBundle() throws Exception { + testLatch.countDown(); + evaluatorLatch.await(); + return result; + } + }; + + when( + evaluationContext.getTransformEvaluator( + created.getProducingTransformInternal(), null)) + .thenReturn(evaluator); + + TransformExecutor executor = + TransformExecutor.create( + evaluationContext, + null, + created.getProducingTransformInternal(), + completionCallback, + transformEvaluationState); + + Executors.newSingleThreadExecutor().submit(executor); + testLatch.await(); + assertThat(executor.getThread(), not(nullValue())); + + // Finish the execution so everything can get closed down cleanly. + evaluatorLatch.countDown(); + } + + private static class RegisteringCompletionCallback implements CompletionCallback { + private InProcessTransformResult handledResult = null; + private Throwable handledThrowable = null; + private final CountDownLatch onMethod; + + private RegisteringCompletionCallback(CountDownLatch onMethod) { + this.onMethod = onMethod; + } + + @Override + public void handleResult(CommittedBundle inputBundle, InProcessTransformResult result) { + handledResult = result; + onMethod.countDown(); + } + + @Override + public void handleThrowable(CommittedBundle inputBundle, Throwable t) { + handledThrowable = t; + onMethod.countDown(); + } + } +}