Skip to content

Commit

Permalink
Revert "Update BQIO to a single scheduled executor service reduce thr…
Browse files Browse the repository at this point in the history
…eads (#23234)" (#23793)

This reverts commit 8e2431c.
  • Loading branch information
reuvenlax authored Oct 22, 2022
1 parent 77e96da commit 01da3fc
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 98 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.ExecutorOptions;
import org.apache.beam.sdk.options.Hidden;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.util.UnboundedScheduledExecutorService;
import org.checkerframework.checker.nullness.qual.Nullable;

/** Options used to configure Google Cloud Storage. */
Expand All @@ -48,22 +48,20 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline

/**
* The ExecutorService instance to use to create threads, can be overridden to specify an
* ExecutorService that is compatible with the user's environment. If unset, the default is to use
* {@link ExecutorOptions#getScheduledExecutorService()}.
*
* @deprecated use {@link ExecutorOptions#getScheduledExecutorService()} instead
* ExecutorService that is compatible with the user's environment. If unset, the default is to
* create an ExecutorService with an unbounded number of threads; this is compatible with Google
* AppEngine.
*/
@JsonIgnore
@Description(
"The ExecutorService instance to use to create multiple threads. Can be overridden "
+ "to specify an ExecutorService that is compatible with the user's environment. If unset, "
+ "the default is to create an ExecutorService with an unbounded number of threads; this "
+ "is compatible with Google AppEngine.")
@Default.InstanceFactory(ExecutorServiceFactory.class)
@Hidden
@Deprecated
ExecutorService getExecutorService();

/**
* @deprecated use {@link ExecutorOptions#setScheduledExecutorService} instead. If set, it may
* result in multiple ExecutorServices, and therefore thread pools, in the runtime.
*/
@Deprecated
void setExecutorService(ExecutorService value);

/** GCS endpoint to use. If unspecified, uses the default endpoint. */
Expand Down Expand Up @@ -134,7 +132,14 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline
class ExecutorServiceFactory implements DefaultValueFactory<ExecutorService> {
@Override
public ExecutorService create(PipelineOptions options) {
return options.as(ExecutorOptions.class).getScheduledExecutorService();
/* The SDK requires an unbounded thread pool because a step may create X writers
* each requiring their own thread to perform the writes otherwise a writer may
* block causing deadlock for the step because the writers buffer is full.
* Also, the MapTaskExecutor launches the steps in reverse order and completes
* them in forward order thus requiring enough threads so that each step's writers
* can be active.
*/
return new UnboundedScheduledExecutorService();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.core.ApiFuture;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.FixedHeaderProvider;
Expand Down Expand Up @@ -106,7 +105,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -122,7 +120,6 @@
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.ExecutorOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.FluentBackoff;
Expand Down Expand Up @@ -1488,36 +1485,12 @@ private static BigQueryWriteClient newBigQueryWriteClient(BigQueryOptions option
return BigQueryWriteClient.create(
BigQueryWriteSettings.newBuilder()
.setCredentialsProvider(() -> options.as(GcpOptions.class).getGcpCredential())
.setBackgroundExecutorProvider(new OptionsExecutionProvider(options))
.build());
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* OptionsExecutionProvider is a utility class used to wrap the Pipeline-wide {@link
* ScheduledExecutorService} into a supplier for the {@link BigQueryWriteClient}.
*/
private static class OptionsExecutionProvider implements ExecutorProvider {

private final BigQueryOptions options;

public OptionsExecutionProvider(BigQueryOptions options) {
this.options = options;
}

@Override
public boolean shouldAutoClose() {
return false;
}

@Override
public ScheduledExecutorService getExecutor() {
return options.as(ExecutorOptions.class).getScheduledExecutorService();
}
}

public static CustomHttpErrors createBigQueryClientCustomErrors() {
CustomHttpErrors.Builder builder = new CustomHttpErrors.Builder();
// 403 errors, to list tables, matching this URL:
Expand Down

0 comments on commit 01da3fc

Please sign in to comment.