diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java deleted file mode 100644 index 2037d21742265..0000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.options; - -import com.fasterxml.jackson.annotation.JsonIgnore; -import java.util.concurrent.ScheduledExecutorService; -import org.apache.beam.sdk.util.UnboundedScheduledExecutorService; - -/** - * Options for configuring the {@link ScheduledExecutorService} used throughout the Java runtime. - */ -public interface ExecutorOptions extends PipelineOptions { - - /** - * The {@link ScheduledExecutorService} instance to use to create threads, can be overridden to - * specify a {@link ScheduledExecutorService} that is compatible with the user's environment. If - * unset, the default is to create an {@link UnboundedScheduledExecutorService}. - */ - @JsonIgnore - @Description( - "The ScheduledExecutorService instance to use to create threads, can be overridden to specify " - + "a ScheduledExecutorService that is compatible with the user's environment. If unset, " - + "the default is to create an UnboundedScheduledExecutorService.") - @Default.InstanceFactory(ScheduledExecutorServiceFactory.class) - @Hidden - ScheduledExecutorService getScheduledExecutorService(); - - void setScheduledExecutorService(ScheduledExecutorService value); - - /** Returns the default {@link ScheduledExecutorService} to use within the Apache Beam SDK. */ - class ScheduledExecutorServiceFactory implements DefaultValueFactory { - @Override - public ScheduledExecutorService create(PipelineOptions options) { - /* The SDK requires an unbounded thread pool because a step may create X writers - * each requiring their own thread to perform the writes otherwise a writer may - * block causing deadlock for the step because the writers buffer is full. - * Also, the MapTaskExecutor launches the steps in reverse order and completes - * them in forward order thus requiring enough threads so that each step's writers - * can be active. - */ - return new UnboundedScheduledExecutorService(); - } - } -} diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java index fea7be7f5c723..0b14b244da5e2 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java @@ -29,10 +29,10 @@ import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.ExecutorOptions; import org.apache.beam.sdk.options.Hidden; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.InstanceBuilder; +import org.apache.beam.sdk.util.UnboundedScheduledExecutorService; import org.checkerframework.checker.nullness.qual.Nullable; /** Options used to configure Google Cloud Storage. */ @@ -48,22 +48,20 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline /** * The ExecutorService instance to use to create threads, can be overridden to specify an - * ExecutorService that is compatible with the user's environment. If unset, the default is to use - * {@link ExecutorOptions#getScheduledExecutorService()}. - * - * @deprecated use {@link ExecutorOptions#getScheduledExecutorService()} instead + * ExecutorService that is compatible with the user's environment. If unset, the default is to + * create an ExecutorService with an unbounded number of threads; this is compatible with Google + * AppEngine. */ @JsonIgnore + @Description( + "The ExecutorService instance to use to create multiple threads. Can be overridden " + + "to specify an ExecutorService that is compatible with the user's environment. If unset, " + + "the default is to create an ExecutorService with an unbounded number of threads; this " + + "is compatible with Google AppEngine.") @Default.InstanceFactory(ExecutorServiceFactory.class) @Hidden - @Deprecated ExecutorService getExecutorService(); - /** - * @deprecated use {@link ExecutorOptions#setScheduledExecutorService} instead. If set, it may - * result in multiple ExecutorServices, and therefore thread pools, in the runtime. - */ - @Deprecated void setExecutorService(ExecutorService value); /** GCS endpoint to use. If unspecified, uses the default endpoint. */ @@ -134,7 +132,14 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline class ExecutorServiceFactory implements DefaultValueFactory { @Override public ExecutorService create(PipelineOptions options) { - return options.as(ExecutorOptions.class).getScheduledExecutorService(); + /* The SDK requires an unbounded thread pool because a step may create X writers + * each requiring their own thread to perform the writes otherwise a writer may + * block causing deadlock for the step because the writers buffer is full. + * Also, the MapTaskExecutor launches the steps in reverse order and completes + * them in forward order thus requiring enough threads so that each step's writers + * can be active. + */ + return new UnboundedScheduledExecutorService(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 9df75a5be943a..7702538de1e31 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -29,7 +29,6 @@ import com.google.api.client.util.ExponentialBackOff; import com.google.api.client.util.Sleeper; import com.google.api.core.ApiFuture; -import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.FixedHeaderProvider; @@ -106,7 +105,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -122,7 +120,6 @@ import org.apache.beam.sdk.extensions.gcp.util.Transport; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; -import org.apache.beam.sdk.options.ExecutorOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.FluentBackoff; @@ -1488,36 +1485,12 @@ private static BigQueryWriteClient newBigQueryWriteClient(BigQueryOptions option return BigQueryWriteClient.create( BigQueryWriteSettings.newBuilder() .setCredentialsProvider(() -> options.as(GcpOptions.class).getGcpCredential()) - .setBackgroundExecutorProvider(new OptionsExecutionProvider(options)) .build()); } catch (Exception e) { throw new RuntimeException(e); } } - /** - * OptionsExecutionProvider is a utility class used to wrap the Pipeline-wide {@link - * ScheduledExecutorService} into a supplier for the {@link BigQueryWriteClient}. - */ - private static class OptionsExecutionProvider implements ExecutorProvider { - - private final BigQueryOptions options; - - public OptionsExecutionProvider(BigQueryOptions options) { - this.options = options; - } - - @Override - public boolean shouldAutoClose() { - return false; - } - - @Override - public ScheduledExecutorService getExecutor() { - return options.as(ExecutorOptions.class).getScheduledExecutorService(); - } - } - public static CustomHttpErrors createBigQueryClientCustomErrors() { CustomHttpErrors.Builder builder = new CustomHttpErrors.Builder(); // 403 errors, to list tables, matching this URL: