diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
index c1b273ca9bec2..5a0930cd35f5b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.examples;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Pattern;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
@@ -29,17 +32,12 @@
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Arrays;
-import java.util.List;
-import java.util.regex.Pattern;
-
/**
- * An example that verifies word counts in Shakespeare and includes Dataflow best practices.
+ * An example that verifies word counts in Shakespeare and includes Beam best practices.
*
*
This class, {@link DebuggingWordCount}, is the third in a series of four successively more
* detailed 'word count' examples. You may first want to take a look at {@link MinimalWordCount}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
index 842cb54b40fd2..5f60524209c19 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
@@ -17,6 +17,13 @@
*/
package org.apache.beam.examples;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
import org.apache.beam.examples.common.ExampleOptions;
import org.apache.beam.examples.common.ExampleUtils;
@@ -33,19 +40,9 @@
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-
import org.joda.time.Duration;
import org.joda.time.Instant;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
/**
* An example that counts words in text, and can run over either unbounded or bounded input
@@ -186,8 +183,7 @@ public static interface Options extends WordCount.WordCountOptions,
public static void main(String[] args) throws IOException {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setBigQuerySchema(getSchema());
- // DataflowExampleUtils creates the necessary input sources to simplify execution of this
- // Pipeline.
+ // ExampleUtils creates the necessary input sources to simplify execution of this Pipeline.
ExampleUtils exampleUtils = new ExampleUtils(options);
exampleUtils.setup();
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
index 42d30bb5fa94a..d42d6214973d3 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.examples;
+import com.google.common.base.Strings;
+import com.google.common.io.Resources;
+import java.io.IOException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
@@ -37,11 +40,6 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-import com.google.common.base.Strings;
-import com.google.common.io.Resources;
-
-import java.io.IOException;
-
/**
* An example that counts words in Shakespeare and includes Beam best practices.
*
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java
index 54cc99ea32fc6..2eef525faed0b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java
@@ -17,14 +17,13 @@
*/
package org.apache.beam.examples.common;
+import com.google.api.services.bigquery.model.TableSchema;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
-import com.google.api.services.bigquery.model.TableSchema;
-
/**
* Options that can be used to configure BigQuery tables in Beam examples.
* The project defaults to the project being used to run the example.
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java
index 43afeb4c61128..8b7ed073f3593 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java
@@ -17,26 +17,23 @@
*/
package org.apache.beam.examples.common;
+import com.google.common.base.MoreObjects;
+import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
-
-import com.google.common.base.MoreObjects;
-
import org.joda.time.DateTimeUtils;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
-import java.util.concurrent.ThreadLocalRandom;
-
/**
* Options that can be used to configure the Beam examples.
*/
public interface ExampleOptions extends PipelineOptions {
- @Description("Whether to keep jobs running on the Dataflow service after local process exit")
+ @Description("Whether to keep jobs running after local process exit")
@Default.Boolean(false)
boolean getKeepJobsRunning();
void setKeepJobsRunning(boolean keepJobsRunning);
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
index 7f03fc0d1c6f6..eadb580a257c4 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
@@ -17,13 +17,6 @@
*/
package org.apache.beam.examples.common;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.options.BigQueryOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PubsubOptions;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
-import org.apache.beam.sdk.util.Transport;
-
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
import com.google.api.client.util.BackOff;
@@ -43,12 +36,17 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
-
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.Transport;
/**
* The utility class that sets up and tears down external resources,
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java b/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java
index 0a93521821d84..e6a1495e545d5 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java
@@ -17,6 +17,12 @@
*/
package org.apache.beam.examples.common;
+import com.google.api.services.pubsub.Pubsub;
+import com.google.api.services.pubsub.model.PublishRequest;
+import com.google.api.services.pubsub.model.PubsubMessage;
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
@@ -28,14 +34,6 @@
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.util.Transport;
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.model.PublishRequest;
-import com.google.api.services.pubsub.model.PubsubMessage;
-import com.google.common.collect.ImmutableMap;
-
-import java.io.IOException;
-import java.util.Arrays;
-
/**
* A batch Dataflow pipeline for injecting a set of GCS files into
* a PubSub topic line by line. Empty lines are skipped.
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index 120c64fae54ac..56c7855e45e09 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -21,6 +21,21 @@
import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.base.MoreObjects;
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.Key;
+import com.google.datastore.v1.Value;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
import org.apache.beam.examples.common.ExampleOptions;
import org.apache.beam.examples.common.ExampleUtils;
@@ -53,26 +68,8 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.common.base.MoreObjects;
-import com.google.datastore.v1.Entity;
-import com.google.datastore.v1.Key;
-import com.google.datastore.v1.Value;
-
import org.joda.time.Duration;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
/**
* An example that computes the most popular hash tags
* for every prefix, which can be used for auto-completion.
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/README.md b/examples/java/src/main/java/org/apache/beam/examples/complete/README.md
index 99c93ef4b82a5..b98be7a723960 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/README.md
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/README.md
@@ -43,14 +43,14 @@ This directory contains end-to-end example pipelines that perform complex data p
Windowing to perform time-based aggregations of data.
TrafficMaxLaneFlow
- — A streaming Cloud Dataflow example using BigQuery output in the
+ — A streaming Beam Example using BigQuery output in the
traffic sensor domain. Demonstrates the Cloud Dataflow streaming
runner, sliding windows, Cloud Pub/Sub topic ingestion, the use of the
AvroCoder to encode a custom class, and custom
Combine transforms.
TrafficRoutes
- — A streaming Cloud Dataflow example using BigQuery output in the
+ — A streaming Beam Example using BigQuery output in the
traffic sensor domain. Demonstrates the Cloud Dataflow streaming
runner, GroupByKey, keyed state, sliding windows, and Cloud
Pub/Sub topic ingestion.
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
index 3f30f2150504a..348bab84b996d 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
@@ -17,6 +17,11 @@
*/
package org.apache.beam.examples.complete;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.util.ArrayList;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
import org.apache.beam.examples.common.ExampleOptions;
import org.apache.beam.examples.common.ExampleUtils;
@@ -31,15 +36,8 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
/**
- * A streaming Dataflow Example using BigQuery output.
+ * A streaming Beam Example using BigQuery output.
*
*
This pipeline example reads lines of the input text file, splits each line
* into individual words, capitalizes those words, and writes the output to
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index 76b6b6a060085..a5a939263ee4e 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -17,6 +17,12 @@
*/
package org.apache.beam.examples.complete;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
@@ -51,17 +57,9 @@
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.TupleTag;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashSet;
-import java.util.Set;
-
/**
* An example that computes a basic TF-IDF search table for a directory or GCS prefix.
*
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
index aff41ccb5a4ec..1b2064ad068ee 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.examples.complete;
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.TableRowJsonCoder;
import org.apache.beam.sdk.io.TextIO;
@@ -38,14 +40,9 @@
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-
-import com.google.api.services.bigquery.model.TableRow;
-
import org.joda.time.Duration;
import org.joda.time.Instant;
-import java.util.List;
-
/**
* An example that reads Wikipedia edit data from Cloud Storage and computes the user with
* the longest string of edits separated by no more than an hour within each month.
@@ -184,7 +181,7 @@ public void processElement(ProcessContext c) {
/**
* Options supported by this class.
*
- *
Inherits standard Dataflow configuration options.
+ *
Inherits standard Beam configuration options.
*/
private static interface Options extends PipelineOptions {
@Description(
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
index 394b4327025c0..1b27e650f03e9 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
@@ -17,6 +17,14 @@
*/
package org.apache.beam.examples.complete;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.avro.reflect.Nullable;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
import org.apache.beam.examples.common.ExampleOptions;
import org.apache.beam.examples.common.ExampleUtils;
@@ -39,24 +47,13 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-
-import org.apache.avro.reflect.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
/**
- * A Dataflow Example that runs in both batch and streaming modes with traffic sensor data.
+ * A Beam Example that runs in both batch and streaming modes with traffic sensor data.
* You can configure the running mode by setting {@literal --streaming} to true or false.
*
*
Concepts: The batch and streaming runners, sliding windows,
@@ -332,7 +329,7 @@ public static void main(String[] args) throws IOException {
.withValidation()
.as(TrafficMaxLaneFlowOptions.class);
options.setBigQuerySchema(FormatMaxesFn.getSchema());
- // Using DataflowExampleUtils to set up required resources.
+ // Using ExampleUtils to set up required resources.
ExampleUtils exampleUtils = new ExampleUtils(options);
exampleUtils.setup();
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
index ef716e9dd832d..f3c2d3936ee74 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
@@ -17,6 +17,19 @@
*/
package org.apache.beam.examples.complete;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.reflect.Nullable;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
import org.apache.beam.examples.common.ExampleOptions;
import org.apache.beam.examples.common.ExampleUtils;
@@ -38,29 +51,13 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.common.collect.Lists;
-
-import org.apache.avro.reflect.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.Map;
-
/**
- * A Dataflow Example that runs in both batch and streaming modes with traffic sensor data.
+ * A Beam Example that runs in both batch and streaming modes with traffic sensor data.
* You can configure the running mode by setting {@literal --streaming} to true or false.
*
*
Concepts: The batch and streaming runners, GroupByKey, sliding windows.
@@ -343,7 +340,7 @@ public static void main(String[] args) throws IOException {
.as(TrafficRoutesOptions.class);
options.setBigQuerySchema(FormatStatsFn.getSchema());
- // Using DataflowExampleUtils to set up required resources.
+ // Using ExampleUtils to set up required resources.
ExampleUtils exampleUtils = new ExampleUtils(options);
exampleUtils.setup();
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
index 09d9c29734e90..439cf020aaa9f 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
@@ -17,6 +17,11 @@
*/
package org.apache.beam.examples.cookbook;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Default;
@@ -31,13 +36,6 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-
-import java.util.ArrayList;
-import java.util.List;
-
/**
* An example that reads the public samples of weather data from BigQuery, counts the number of
* tornadoes that occur in each month, and writes the results to BigQuery.
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
index 67918a3f74284..1d280a6e154b5 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
@@ -17,6 +17,11 @@
*/
package org.apache.beam.examples.cookbook;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Default;
@@ -34,13 +39,6 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-
-import java.util.ArrayList;
-import java.util.List;
-
/**
* An example that reads the public 'Shakespeare' data, and for each word in
* the dataset that is over a given length, generates a string containing the
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
index 215e2ffc78981..9a9e79968670e 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
@@ -22,6 +22,14 @@
import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.Key;
+import com.google.datastore.v1.PropertyFilter;
+import com.google.datastore.v1.Query;
+import com.google.datastore.v1.Value;
+import java.util.Map;
+import java.util.UUID;
+import javax.annotation.Nullable;
import org.apache.beam.examples.WordCount;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
@@ -36,16 +44,6 @@
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
-import com.google.datastore.v1.Entity;
-import com.google.datastore.v1.Key;
-import com.google.datastore.v1.PropertyFilter;
-import com.google.datastore.v1.Query;
-import com.google.datastore.v1.Value;
-
-import java.util.Map;
-import java.util.UUID;
-import javax.annotation.Nullable;
-
/**
* A WordCount example using DatastoreIO.
*
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
index 9a0f7a2a54938..6c42520ff7c6e 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
@@ -17,6 +17,12 @@
*/
package org.apache.beam.examples.cookbook;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Default;
@@ -32,14 +38,6 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.logging.Logger;
-
/**
* This is an example that demonstrates several approaches to filtering, and use of the Mean
* transform. It shows how to dynamically set parameters by defining and using new pipeline options,
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
index 5ff2ce22a0d9e..1b91bf1e0615b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.examples.cookbook;
+import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
@@ -33,8 +34,6 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
-import com.google.api.services.bigquery.model.TableRow;
-
/**
* This example shows how to do a join on two collections.
* It uses a sample of the GDELT 'world event' data (http://goo.gl/OB6oin), joining the event
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
index 4f266d30ac968..3772a7bc5b86a 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
@@ -17,6 +17,11 @@
*/
package org.apache.beam.examples.cookbook;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Default;
@@ -31,13 +36,6 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-
-import java.util.ArrayList;
-import java.util.List;
-
/**
* An example that reads the public samples of weather data from BigQuery, and finds
* the maximum temperature ('mean_temp') for each month.
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
index 04ac2c363ed64..db59435555458 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
@@ -17,6 +17,13 @@
*/
package org.apache.beam.examples.cookbook;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
import org.apache.beam.examples.common.ExampleOptions;
import org.apache.beam.examples.common.ExampleUtils;
@@ -42,19 +49,9 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-
import org.joda.time.Duration;
import org.joda.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
/**
* This example illustrates the basic concepts behind triggering. It shows how to use different
* trigger definitions to produce partial (speculative) results before all the data is processed and
diff --git a/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java b/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java
index f463b1e651dbd..c1bd5d45e38bf 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java
@@ -18,16 +18,14 @@
package org.apache.beam.examples;
import com.google.common.io.Files;
-
+import java.io.File;
+import java.nio.charset.StandardCharsets;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import java.io.File;
-import java.nio.charset.StandardCharsets;
-
/**
* Tests for {@link DebuggingWordCount}.
*/
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
index f93dc2b2f7792..ca0c9d6bda09d 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
@@ -18,6 +18,7 @@
package org.apache.beam.examples;
+import java.util.Date;
import org.apache.beam.examples.WordCount.WordCountOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -25,13 +26,10 @@
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.util.IOChannelUtils;
-
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import java.util.Date;
-
/**
* End-to-end tests of WordCount.
*/
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java
index 9d36a3e3e9703..98c5b17621be1 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.examples;
+import java.util.Arrays;
+import java.util.List;
import org.apache.beam.examples.WordCount.CountWords;
import org.apache.beam.examples.WordCount.ExtractWordsFn;
import org.apache.beam.examples.WordCount.FormatAsTextFn;
@@ -30,7 +32,6 @@
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
-
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;
@@ -38,9 +39,6 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import java.util.Arrays;
-import java.util.List;
-
/**
* Tests of WordCount.
*/
diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
index 6f28dec0ecdb2..b6751c528bc87 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
@@ -17,6 +17,11 @@
*/
package org.apache.beam.examples.complete;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
import org.apache.beam.examples.complete.AutoComplete.CompletionCandidate;
import org.apache.beam.examples.complete.AutoComplete.ComputeTopCompletions;
import org.apache.beam.sdk.Pipeline;
@@ -33,19 +38,12 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
-
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
/**
* Tests of AutoComplete.
*/
diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
index c7ce67e0c6958..c2d654ec18d99 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.examples.complete;
+import java.net.URI;
+import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringDelegateCoder;
import org.apache.beam.sdk.testing.PAssert;
@@ -27,15 +29,11 @@
import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import java.net.URI;
-import java.util.Arrays;
-
/**
* Tests of {@link TfIdf}.
*/
diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/TopWikipediaSessionsTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/TopWikipediaSessionsTest.java
index d19998ee3896d..42fb06a031b34 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/complete/TopWikipediaSessionsTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/complete/TopWikipediaSessionsTest.java
@@ -17,22 +17,19 @@
*/
package org.apache.beam.examples.complete;
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
-
-import com.google.api.services.bigquery.model.TableRow;
-
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import java.util.Arrays;
-
/** Unit tests for {@link TopWikipediaSessions}. */
@RunWith(JUnit4.class)
public class TopWikipediaSessionsTest {
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
index fbd775cf50c8f..8bcab4a705221 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
@@ -21,7 +21,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
-
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesTest.java
index b986c0bdfa3b8..87e1614ee0ca2 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesTest.java
@@ -17,21 +17,18 @@
*/
package org.apache.beam.examples.cookbook;
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.List;
import org.apache.beam.examples.cookbook.BigQueryTornadoes.ExtractTornadoesFn;
import org.apache.beam.examples.cookbook.BigQueryTornadoes.FormatCountsFn;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.values.KV;
-
-import com.google.api.services.bigquery.model.TableRow;
-
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import java.util.List;
-
/**
* Test case for {@link BigQueryTornadoes}.
*/
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/CombinePerKeyExamplesTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/CombinePerKeyExamplesTest.java
index 6d0b16793865f..34e06799e6408 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/CombinePerKeyExamplesTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/CombinePerKeyExamplesTest.java
@@ -17,21 +17,18 @@
*/
package org.apache.beam.examples.cookbook;
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.List;
import org.apache.beam.examples.cookbook.CombinePerKeyExamples.ExtractLargeWordsFn;
import org.apache.beam.examples.cookbook.CombinePerKeyExamples.FormatShakespeareOutputFn;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.values.KV;
-
-import com.google.api.services.bigquery.model.TableRow;
-
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import java.util.List;
-
/** Unit tests for {@link CombinePerKeyExamples}. */
@RunWith(JUnit4.class)
public class CombinePerKeyExamplesTest {
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/DeDupExampleTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/DeDupExampleTest.java
index 20e247062c499..c725e4f6bf02e 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/DeDupExampleTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/DeDupExampleTest.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.examples.cookbook;
+import java.util.Arrays;
+import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.PAssert;
@@ -25,15 +27,11 @@
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.values.PCollection;
-
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import java.util.Arrays;
-import java.util.List;
-
/** Unit tests for {@link DeDupExample}. */
@RunWith(JUnit4.class)
public class DeDupExampleTest {
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/FilterExamplesTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/FilterExamplesTest.java
index 2598a971dd2ff..279478c50a243 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/FilterExamplesTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/FilterExamplesTest.java
@@ -17,21 +17,18 @@
*/
package org.apache.beam.examples.cookbook;
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Arrays;
+import java.util.List;
import org.apache.beam.examples.cookbook.FilterExamples.FilterSingleMonthDataFn;
import org.apache.beam.examples.cookbook.FilterExamples.ProjectionFn;
import org.apache.beam.sdk.transforms.DoFnTester;
-
-import com.google.api.services.bigquery.model.TableRow;
-
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import java.util.Arrays;
-import java.util.List;
-
/** Unit tests for {@link FilterExamples}. */
@RunWith(JUnit4.class)
public class FilterExamplesTest {
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/JoinExamplesTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/JoinExamplesTest.java
index 9b04667407552..60f71a2120f53 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/JoinExamplesTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/JoinExamplesTest.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.examples.cookbook;
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Arrays;
+import java.util.List;
import org.apache.beam.examples.cookbook.JoinExamples.ExtractCountryInfoFn;
import org.apache.beam.examples.cookbook.JoinExamples.ExtractEventDataFn;
import org.apache.beam.sdk.Pipeline;
@@ -27,9 +30,6 @@
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-
-import com.google.api.services.bigquery.model.TableRow;
-
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;
@@ -37,9 +37,6 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import java.util.Arrays;
-import java.util.List;
-
/** Unit tests for {@link JoinExamples}. */
@RunWith(JUnit4.class)
public class JoinExamplesTest {
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/MaxPerKeyExamplesTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/MaxPerKeyExamplesTest.java
index 1d5bcf473c1cb..b5ea0fc4bf0e2 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/MaxPerKeyExamplesTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/MaxPerKeyExamplesTest.java
@@ -17,22 +17,19 @@
*/
package org.apache.beam.examples.cookbook;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.common.collect.ImmutableList;
+import java.util.List;
import org.apache.beam.examples.cookbook.MaxPerKeyExamples.ExtractTempFn;
import org.apache.beam.examples.cookbook.MaxPerKeyExamples.FormatMaxesFn;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.values.KV;
-
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.common.collect.ImmutableList;
-
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import java.util.List;
-
/** Unit tests for {@link MaxPerKeyExamples}. */
@RunWith(JUnit4.class)
public class MaxPerKeyExamplesTest {
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
index fee3c141358d6..3848ca1135e82 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
@@ -17,6 +17,13 @@
*/
package org.apache.beam.examples.cookbook;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import org.apache.beam.examples.cookbook.TriggerExample.ExtractFlowInfo;
import org.apache.beam.examples.cookbook.TriggerExample.TotalFlow;
import org.apache.beam.sdk.Pipeline;
@@ -32,11 +39,6 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
-
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
@@ -45,11 +47,6 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
/**
* Unit Tests for {@link TriggerExample}.
* The results generated by triggers are by definition non-deterministic and hence hard to test.
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
index ff8ca552b57c7..a49da7bdfbb65 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.examples;
+import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -28,8 +29,6 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptors;
-import java.util.Arrays;
-
/**
* An example that counts words in Shakespeare, using Java 8 language features.
*
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index 01ffb1de6883e..f9957ebc0e3b9 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.examples.complete.game;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
import org.apache.beam.sdk.Pipeline;
@@ -45,7 +48,6 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptors;
-
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -54,10 +56,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TimeZone;
-
/**
* This class is the fourth in a series of four pipelines that tell a story in a 'gaming'
* domain, following {@link UserScore}, {@link HourlyTeamScore}, and {@link LeaderBoard}.
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
index e489607dee4d6..d408e2132dabf 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.examples.complete.game;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
@@ -30,17 +33,12 @@
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
-
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TimeZone;
-
/**
* This class is the second in a series of four pipelines that tell a story in a 'gaming'
* domain, following {@link UserScore}. In addition to the concepts introduced in {@link UserScore},
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index bd223059e3051..8dd4e39bee01f 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.examples.complete.game;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
import org.apache.beam.examples.common.ExampleOptions;
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.examples.complete.game.utils.WriteToBigQuery;
@@ -39,17 +42,12 @@
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TimeZone;
-
/**
* This class is the third in a series of four pipelines that tell a story in a 'gaming' domain,
* following {@link UserScore} and {@link HourlyTeamScore}. Concepts include: processing unbounded
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index c97eb4152991e..65036cee6b922 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.examples.complete.game;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.avro.reflect.Nullable;
import org.apache.beam.examples.complete.game.utils.WriteToBigQuery;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
@@ -36,14 +39,9 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
-
-import org.apache.avro.reflect.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.Map;
-
/**
* This class is the first in a series of four pipelines that tell a story in a 'gaming' domain.
* Concepts: batch processing; reading input from Google Cloud Storage and writing output to
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/injector/Injector.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/injector/Injector.java
index 034a186462927..8f8bd9febc31c 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/injector/Injector.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/injector/Injector.java
@@ -21,11 +21,6 @@
import com.google.api.services.pubsub.model.PublishRequest;
import com.google.api.services.pubsub.model.PubsubMessage;
import com.google.common.collect.ImmutableMap;
-
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-
import java.io.BufferedOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -36,6 +31,9 @@
import java.util.List;
import java.util.Random;
import java.util.TimeZone;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
/**
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/injector/InjectorUtils.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/injector/InjectorUtils.java
index 53e644d67fe75..8cba6c2d05f0f 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/injector/InjectorUtils.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/injector/InjectorUtils.java
@@ -29,7 +29,6 @@
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.PubsubScopes;
import com.google.api.services.pubsub.model.Topic;
-
import java.io.IOException;
class InjectorUtils {
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/injector/RetryHttpInitializerWrapper.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/injector/RetryHttpInitializerWrapper.java
index 45be28791fc76..059999cae7d23 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/injector/RetryHttpInitializerWrapper.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/injector/RetryHttpInitializerWrapper.java
@@ -28,7 +28,6 @@
import com.google.api.client.http.HttpUnsuccessfulResponseHandler;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.client.util.Sleeper;
-
import java.io.IOException;
import java.util.logging.Logger;
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
index 6af6e15a53217..40c4286f3afeb 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
@@ -17,6 +17,14 @@
*/
package org.apache.beam.examples.complete.game.utils;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import org.apache.beam.examples.complete.game.UserScore;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
@@ -31,16 +39,6 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
/**
* Generate, format, and write BigQuery table row information. Use provided information about
* the field names and types, as well as lambda functions that describe how to generate their
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
index c59fd61f68a92..09f3b6cf90d97 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.examples.complete.game.utils;
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Map;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
@@ -27,10 +29,6 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
-import com.google.api.services.bigquery.model.TableRow;
-
-import java.util.Map;
-
/**
* Generate, format, and write BigQuery table row information. Subclasses {@link WriteToBigQuery}
* to require windowing; so this subclass may be used for writes that require access to the
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
index 4dfa474c7cf51..85841a7801474 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
@@ -17,6 +17,15 @@
*/
package org.apache.beam.examples;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
+import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.GcsOptions;
@@ -29,9 +38,6 @@
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptors;
-
-import com.google.common.collect.ImmutableList;
-
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -39,15 +45,6 @@
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.channels.FileChannel;
-import java.nio.channels.SeekableByteChannel;
-import java.nio.file.Files;
-import java.nio.file.StandardOpenOption;
-import java.util.Arrays;
-import java.util.List;
-
/**
* To keep {@link MinimalWordCountJava8} simple, it is not factored or testable. This test
* file should be maintained with a copy of its code for a basic smoke test.
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/GameStatsTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/GameStatsTest.java
index df8800ddb5f6a..7cd03f365b345 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/GameStatsTest.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/GameStatsTest.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.examples.complete.game;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
import org.apache.beam.examples.complete.game.GameStats.CalculateSpammyUsers;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.testing.PAssert;
@@ -25,16 +28,11 @@
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-
/**
* Tests of GameStats.
* Because the pipeline was designed for easy readability and explanations, it lacks good
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
index b917b4cf535f9..f9fefb61f35c6 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.examples.complete.game;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
import org.apache.beam.examples.complete.game.UserScore.GameActionInfo;
import org.apache.beam.examples.complete.game.UserScore.ParseEventFn;
import org.apache.beam.sdk.Pipeline;
@@ -31,17 +34,12 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
-
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-
/**
* Tests of HourlyTeamScore.
* Because the pipeline was designed for easy readability and explanations, it lacks good
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
index 75d371a8caa49..7c86adf3754fa 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.examples.complete.game;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
import org.apache.beam.examples.complete.game.UserScore.ExtractAndSumScore;
import org.apache.beam.examples.complete.game.UserScore.GameActionInfo;
import org.apache.beam.examples.complete.game.UserScore.ParseEventFn;
@@ -32,17 +35,12 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
-
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-
/**
* Tests of UserScore.
*/
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
new file mode 100644
index 0000000000000..a97d3f306d0ee
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SetCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Generic side input handler that uses {@link StateInternals} to store all data. Both the actual
+ * side-input data and data about the windows for which we have side inputs available are stored
+ * using {@code StateInternals}.
+ *
+ *
The given {@code StateInternals} must not be scoped to an element key. The state
+ * must instead be scoped to one key group for which the side input is being managed.
+ *
+ *
This is useful for runners that transmit the side-input elements in band, as opposed
+ * to how Dataflow has an external service for managing side inputs.
+ *
+ *
Note: storing the available windows in an extra state is redundant for now but in the
+ * future we might want to know which windows we have available so that we can garbage collect
+ * side input data. For now, this will never clean up side-input data because we have no way
+ * of knowing when we reach the GC horizon.
+ */
+public class SideInputHandler implements ReadyCheckingSideInputReader {
+
+ /** The list of side inputs that we're handling. */
+ protected final Collection> sideInputs;
+
+ /** State internals that are scoped not to the key of a value but instead to one key group. */
+ private final StateInternals stateInternals;
+
+ /**
+ * A state tag for each side input that we handle. The state is used to track
+ * for which windows we have input available.
+ */
+ private final Map<
+ PCollectionView>,
+ StateTag<
+ Object,
+ AccumulatorCombiningState<
+ BoundedWindow,
+ Set,
+ Set>>> availableWindowsTags;
+
+ /**
+ * State tag for the actual contents of each side input per window.
+ */
+ private final Map<
+ PCollectionView>,
+ StateTag