Skip to content

Commit

Permalink
Fix additional nullness errors in BigQueryIO
Browse files Browse the repository at this point in the history
  • Loading branch information
kennknowles committed May 31, 2022
1 parent a31d329 commit 4e8d7fb
Show file tree
Hide file tree
Showing 66 changed files with 624 additions and 459 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,8 @@ object Snippets {
BigQueryIO.write<WeatherData>()
.to(
object : DynamicDestinations<WeatherData, Long>() {
override fun getDestination(elem: ValueInSingleWindow<WeatherData>): Long? {
return elem.value!!.year
override fun getDestination(elem: ValueInSingleWindow<WeatherData>?): Long {
return elem!!.value!!.year
}

override fun getTable(destination: Long?): TableDestination {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.checkerframework.checker.nullness.qual.Nullable;

/** A {@link Coder} for {@link Void}. Uses zero bytes per {@link Void}. */
public class VoidCoder extends AtomicCoder<Void> {
public class VoidCoder extends AtomicCoder<@Nullable Void> {

public static VoidCoder of() {
return INSTANCE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* A {@link State} that can be read via {@link #read()}.
Expand All @@ -42,7 +41,6 @@ public interface ReadableState<T> {
* of the returned object should not modify state without going through the appropriate state
* interface, and modification to the state should not be mirrored in the returned object.
*/
@Nullable
T read();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@
package org.apache.beam.sdk.state;

import org.apache.beam.sdk.annotations.Internal;
import org.checkerframework.checker.nullness.qual.Nullable;

/** <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> */
@Internal
public class ReadableStates {

/** A {@link ReadableState} constructed from a constant value, hence immediately available. */
public static <T> ReadableState<T> immediate(final @Nullable T value) {
public static <T> ReadableState<T> immediate(final T value) {
return new ReadableState<T>() {
@Override
public @Nullable T read() {
public T read() {
return value;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.dataflow.qual.Pure;
import org.joda.time.Duration;
import org.joda.time.Instant;

Expand Down Expand Up @@ -98,6 +99,7 @@ public abstract class StartBundleContext {
* Returns the {@code PipelineOptions} specified with the {@link
* org.apache.beam.sdk.PipelineRunner} invoking this {@code DoFn}.
*/
@Pure
public abstract PipelineOptions getPipelineOptions();
}

Expand All @@ -107,6 +109,7 @@ public abstract class FinishBundleContext {
* Returns the {@code PipelineOptions} specified with the {@link
* org.apache.beam.sdk.PipelineRunner} invoking this {@code DoFn}.
*/
@Pure
public abstract PipelineOptions getPipelineOptions();

/**
Expand Down Expand Up @@ -141,6 +144,7 @@ public abstract class WindowedContext {
* Returns the {@code PipelineOptions} specified with the {@link
* org.apache.beam.sdk.PipelineRunner} invoking this {@code DoFn}.
*/
@Pure
public abstract PipelineOptions getPipelineOptions();

/**
Expand Down Expand Up @@ -240,6 +244,7 @@ public abstract class ProcessContext extends WindowedContext {
* <p>The element will not be changed -- it is safe to cache, etc. without copying.
* Implementation of {@link DoFn.ProcessElement} method should not mutate the element.
*/
@Pure
public abstract InputT element();

/**
Expand All @@ -248,13 +253,15 @@ public abstract class ProcessContext extends WindowedContext {
* @throws IllegalArgumentException if this is not a side input
* @see ParDo.SingleOutput#withSideInputs
*/
@Pure
public abstract <T> T sideInput(PCollectionView<T> view);

/**
* Returns the timestamp of the input element.
*
* <p>See {@link Window} for more information.
*/
@Pure
public abstract Instant timestamp();

/**
Expand All @@ -264,6 +271,7 @@ public abstract class ProcessContext extends WindowedContext {
* <p>Generally all data is in a single, uninteresting pane unless custom triggering and/or late
* data has been explicitly requested. See {@link Window} for more information.
*/
@Pure
public abstract PaneInfo pane();
}

Expand All @@ -287,6 +295,7 @@ public abstract class OnTimerContext extends WindowedContext {
public abstract class OnWindowExpirationContext extends WindowedContext {

/** Returns the window in which the window expiration is firing. */
@Pure
public abstract BoundedWindow window();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,4 +437,39 @@ public class Preconditions {
}
return obj;
}

/**
* Ensures that a piece of state passed as a parameter to the calling method is not null.
*
* @param reference an object reference
* @return the non-null reference that was validated
* @throws IllegalStateException if {@code reference} is null
*/
@CanIgnoreReturnValue
@EnsuresNonNull("#1")
public static <T extends @NonNull Object> T checkStateNotNull(@Nullable T reference) {
if (reference == null) {
throw new IllegalStateException();
}
return reference;
}

/**
* Ensures that a piece of state passed as a parameter to the calling method is not null.
*
* @param reference an object reference
* @param errorMessage the exception message to use if the check fails; will be converted to a
* string using {@link String#valueOf(Object)}
* @return the non-null reference that was validated
* @throws IllegalStateException if {@code reference} is null
*/
@CanIgnoreReturnValue
@EnsuresNonNull("#1")
public static <T extends @NonNull Object> T checkStateNotNull(
@Nullable T reference, @Nullable Object errorMessage) {
if (reference == null) {
throw new IllegalStateException(String.valueOf(errorMessage));
}
return reference;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;

/**
Expand All @@ -38,12 +37,9 @@
*/
@AutoValue
@Internal
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public abstract class FailsafeValueInSingleWindow<T, ErrorT> {
/** Returns the value of this {@code FailsafeValueInSingleWindow}. */
public abstract @Nullable T getValue();
public abstract T getValue();

/** Returns the timestamp of this {@code FailsafeValueInSingleWindow}. */
public abstract Instant getTimestamp();
Expand All @@ -55,7 +51,7 @@ public abstract class FailsafeValueInSingleWindow<T, ErrorT> {
public abstract PaneInfo getPane();

/** Returns the failsafe value of this {@code FailsafeValueInSingleWindow}. */
public abstract @Nullable ErrorT getFailsafeValue();
public abstract ErrorT getFailsafeValue();

public static <T, ErrorT> FailsafeValueInSingleWindow<T, ErrorT> of(
T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo, ErrorT failsafeValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.beam.sdk.transforms.SerializableComparator;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.dataflow.qual.Pure;

/**
* An immutable key/value pair.
Expand All @@ -37,19 +38,20 @@
* @param <K> the type of the key
* @param <V> the type of the value
*/
public class KV<K extends @Nullable Object, V extends @Nullable Object> implements Serializable {
public class KV<K, V> implements Serializable {
/** Returns a {@link KV} with the given key and value. */
public static <K extends @Nullable Object, V extends @Nullable Object> KV<K, V> of(
K key, V value) {
public static <K, V> KV<K, V> of(K key, V value) {
return new KV<>(key, value);
}

/** Returns the key of this {@link KV}. */
@Pure
public K getKey() {
return key;
}

/** Returns the value of this {@link KV}. */
@Pure
public V getValue() {
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,18 @@
*/
@AutoValue
@Internal
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public abstract class ValueInSingleWindow<T> {
/** Returns the value of this {@code ValueInSingleWindow}. */
public abstract @Nullable T getValue();
@SuppressWarnings("nullness")
public T getValue() {
return getNullableValue();
};

/**
* Workaround for autovalue code generation, which does not allow type variables to be
* instantiated with nullable actual parameters.
*/
protected abstract @Nullable T getNullableValue();

/** Returns the timestamp of this {@code ValueInSingleWindow}. */
public abstract Instant getTimestamp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.MimeTypes;

@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
class AvroRowWriter<AvroT, T> extends BigQueryRowWriter<T> {
private final DataFileWriter<AvroT> writer;
private final Schema schema;
private final SerializableFunction<AvroWriteRequest<T>, AvroT> toAvroRecord;

@SuppressWarnings({
"nullness" // calling superclass method in constructor flagged as error; TODO: fix
})
AvroRowWriter(
String basename,
Schema schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
Expand All @@ -80,9 +81,6 @@
import org.slf4j.LoggerFactory;

/** PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery. */
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
class BatchLoads<DestinationT, ElementT>
extends PTransform<PCollection<KV<DestinationT, ElementT>>, WriteResult> {
private static final Logger LOG = LoggerFactory.getLogger(BatchLoads.class);
Expand Down Expand Up @@ -141,12 +139,12 @@ class BatchLoads<DestinationT, ElementT>
private int maxFilesPerPartition;
private long maxBytesPerPartition;
private int numFileShards;
private Duration triggeringFrequency;
private @Nullable Duration triggeringFrequency;
private ValueProvider<String> customGcsTempLocation;
private ValueProvider<String> loadJobProjectId;
private @Nullable ValueProvider<String> loadJobProjectId;
private final Coder<ElementT> elementCoder;
private final RowWriterFactory<ElementT, DestinationT> rowWriterFactory;
private final String kmsKey;
private final @Nullable String kmsKey;
private final boolean clusteringEnabled;
private final String tempDataset;

Expand Down Expand Up @@ -242,7 +240,8 @@ void setMaxBytesPerPartition(long maxBytesPerPartition) {
}

@Override
public void validate(PipelineOptions options) {
public void validate(@Nullable PipelineOptions maybeOptions) {
PipelineOptions options = Preconditions.checkArgumentNotNull(maybeOptions);
// We will use a BigQuery load job -- validate the temp location.
String tempLocation;
if (customGcsTempLocation == null) {
Expand Down Expand Up @@ -274,6 +273,7 @@ public void validate(PipelineOptions options) {

// Expand the pipeline when the user has requested periodically-triggered file writes.
private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> input) {
Duration triggeringFrequency = Preconditions.checkStateNotNull(this.triggeringFrequency);
Pipeline p = input.getPipeline();
final PCollectionView<String> loadJobIdPrefixView = createJobIdPrefixView(p, JobType.LOAD);
final PCollectionView<String> tempLoadJobIdPrefixView =
Expand Down
Loading

0 comments on commit 4e8d7fb

Please sign in to comment.