Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add a Flush API to enable finer grained data commit needs for dataflow. #272

Merged
merged 1 commit into from
May 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.google.cloud.bigquery.storage.v1alpha2.Storage.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.GetWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Stream.WriteStream;
import com.google.cloud.bigquery.storage.v1alpha2.stub.BigQueryWriteStub;
Expand Down Expand Up @@ -591,6 +593,108 @@ public final BatchCommitWriteStreamsResponse batchCommitWriteStreams(
return stub.batchCommitWriteStreamsCallable();
}

// AUTO-GENERATED DOCUMENTATION AND METHOD
/**
* Flushes rows to a BUFFERED stream. If users are appending rows to BUFFERED stream, flush
* operation is required in order for the rows to become available for reading. A Flush operation
* flushes up to any previously flushed offset in a BUFFERED stream, to the offset specified in
* the request.
*
* <p>Sample code:
*
* <pre><code>
* try (BigQueryWriteClient bigQueryWriteClient = BigQueryWriteClient.create()) {
* WriteStreamName writeStream = WriteStreamName.of("[PROJECT]", "[DATASET]", "[TABLE]", "[STREAM]");
* FlushRowsResponse response = bigQueryWriteClient.flushRows(writeStream);
* }
* </code></pre>
*
* @param writeStream Required. The stream that is the target of the flush operation.
* @throws com.google.api.gax.rpc.ApiException if the remote call fails
*/
public final FlushRowsResponse flushRows(WriteStreamName writeStream) {
FlushRowsRequest request =
FlushRowsRequest.newBuilder()
.setWriteStream(writeStream == null ? null : writeStream.toString())
.build();
return flushRows(request);
}

// AUTO-GENERATED DOCUMENTATION AND METHOD
/**
* Flushes rows to a BUFFERED stream. If users are appending rows to BUFFERED stream, flush
* operation is required in order for the rows to become available for reading. A Flush operation
* flushes up to any previously flushed offset in a BUFFERED stream, to the offset specified in
* the request.
*
* <p>Sample code:
*
* <pre><code>
* try (BigQueryWriteClient bigQueryWriteClient = BigQueryWriteClient.create()) {
* WriteStreamName writeStream = WriteStreamName.of("[PROJECT]", "[DATASET]", "[TABLE]", "[STREAM]");
* FlushRowsResponse response = bigQueryWriteClient.flushRows(writeStream.toString());
* }
* </code></pre>
*
* @param writeStream Required. The stream that is the target of the flush operation.
* @throws com.google.api.gax.rpc.ApiException if the remote call fails
*/
public final FlushRowsResponse flushRows(String writeStream) {
FlushRowsRequest request = FlushRowsRequest.newBuilder().setWriteStream(writeStream).build();
return flushRows(request);
}

// AUTO-GENERATED DOCUMENTATION AND METHOD
/**
* Flushes rows to a BUFFERED stream. If users are appending rows to BUFFERED stream, flush
* operation is required in order for the rows to become available for reading. A Flush operation
* flushes up to any previously flushed offset in a BUFFERED stream, to the offset specified in
* the request.
*
* <p>Sample code:
*
* <pre><code>
* try (BigQueryWriteClient bigQueryWriteClient = BigQueryWriteClient.create()) {
* WriteStreamName writeStream = WriteStreamName.of("[PROJECT]", "[DATASET]", "[TABLE]", "[STREAM]");
* FlushRowsRequest request = FlushRowsRequest.newBuilder()
* .setWriteStream(writeStream.toString())
* .build();
* FlushRowsResponse response = bigQueryWriteClient.flushRows(request);
* }
* </code></pre>
*
* @param request The request object containing all of the parameters for the API call.
* @throws com.google.api.gax.rpc.ApiException if the remote call fails
*/
public final FlushRowsResponse flushRows(FlushRowsRequest request) {
return flushRowsCallable().call(request);
}

// AUTO-GENERATED DOCUMENTATION AND METHOD
/**
* Flushes rows to a BUFFERED stream. If users are appending rows to BUFFERED stream, flush
* operation is required in order for the rows to become available for reading. A Flush operation
* flushes up to any previously flushed offset in a BUFFERED stream, to the offset specified in
* the request.
*
* <p>Sample code:
*
* <pre><code>
* try (BigQueryWriteClient bigQueryWriteClient = BigQueryWriteClient.create()) {
* WriteStreamName writeStream = WriteStreamName.of("[PROJECT]", "[DATASET]", "[TABLE]", "[STREAM]");
* FlushRowsRequest request = FlushRowsRequest.newBuilder()
* .setWriteStream(writeStream.toString())
* .build();
* ApiFuture&lt;FlushRowsResponse&gt; future = bigQueryWriteClient.flushRowsCallable().futureCall(request);
* // Do something
* FlushRowsResponse response = future.get();
* }
* </code></pre>
*/
public final UnaryCallable<FlushRowsRequest, FlushRowsResponse> flushRowsCallable() {
return stub.flushRowsCallable();
}

@Override
public final void close() {
stub.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import com.google.cloud.bigquery.storage.v1alpha2.Storage.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.GetWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Stream.WriteStream;
import com.google.cloud.bigquery.storage.v1alpha2.stub.BigQueryWriteStubSettings;
Expand Down Expand Up @@ -102,6 +104,11 @@ public UnaryCallSettings<GetWriteStreamRequest, WriteStream> getWriteStreamSetti
return ((BigQueryWriteStubSettings) getStubSettings()).batchCommitWriteStreamsSettings();
}

/** Returns the object with the settings used for calls to flushRows. */
public UnaryCallSettings<FlushRowsRequest, FlushRowsResponse> flushRowsSettings() {
return ((BigQueryWriteStubSettings) getStubSettings()).flushRowsSettings();
}

public static final BigQueryWriteSettings create(BigQueryWriteStubSettings stub)
throws IOException {
return new BigQueryWriteSettings.Builder(stub.toBuilder()).build();
Expand Down Expand Up @@ -229,6 +236,11 @@ public UnaryCallSettings.Builder<GetWriteStreamRequest, WriteStream> getWriteStr
return getStubSettingsBuilder().batchCommitWriteStreamsSettings();
}

/** Returns the builder for the settings used for calls to flushRows. */
public UnaryCallSettings.Builder<FlushRowsRequest, FlushRowsResponse> flushRowsSettings() {
return getStubSettingsBuilder().flushRowsSettings();
}

@Override
public BigQueryWriteSettings build() throws IOException {
return new BigQueryWriteSettings(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.google.cloud.bigquery.storage.v1alpha2.Storage.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.GetWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Stream.WriteStream;
import javax.annotation.Generated;
Expand Down Expand Up @@ -62,6 +64,10 @@ public UnaryCallable<GetWriteStreamRequest, WriteStream> getWriteStreamCallable(
throw new UnsupportedOperationException("Not implemented: batchCommitWriteStreamsCallable()");
}

public UnaryCallable<FlushRowsRequest, FlushRowsResponse> flushRowsCallable() {
throw new UnsupportedOperationException("Not implemented: flushRowsCallable()");
}

@Override
public abstract void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import com.google.cloud.bigquery.storage.v1alpha2.Storage.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.GetWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Stream.WriteStream;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -99,6 +101,7 @@ public class BigQueryWriteStubSettings extends StubSettings<BigQueryWriteStubSet
finalizeWriteStreamSettings;
private final UnaryCallSettings<BatchCommitWriteStreamsRequest, BatchCommitWriteStreamsResponse>
batchCommitWriteStreamsSettings;
private final UnaryCallSettings<FlushRowsRequest, FlushRowsResponse> flushRowsSettings;

/** Returns the object with the settings used for calls to createWriteStream. */
public UnaryCallSettings<CreateWriteStreamRequest, WriteStream> createWriteStreamSettings() {
Expand Down Expand Up @@ -127,6 +130,11 @@ public UnaryCallSettings<GetWriteStreamRequest, WriteStream> getWriteStreamSetti
return batchCommitWriteStreamsSettings;
}

/** Returns the object with the settings used for calls to flushRows. */
public UnaryCallSettings<FlushRowsRequest, FlushRowsResponse> flushRowsSettings() {
return flushRowsSettings;
}

@BetaApi("A restructuring of stub classes is planned, so this may break in the future")
public BigQueryWriteStub createStub() throws IOException {
if (getTransportChannelProvider()
Expand Down Expand Up @@ -201,6 +209,7 @@ protected BigQueryWriteStubSettings(Builder settingsBuilder) throws IOException
getWriteStreamSettings = settingsBuilder.getWriteStreamSettings().build();
finalizeWriteStreamSettings = settingsBuilder.finalizeWriteStreamSettings().build();
batchCommitWriteStreamsSettings = settingsBuilder.batchCommitWriteStreamsSettings().build();
flushRowsSettings = settingsBuilder.flushRowsSettings().build();
}

/** Builder for BigQueryWriteStubSettings. */
Expand All @@ -218,6 +227,7 @@ public static class Builder extends StubSettings.Builder<BigQueryWriteStubSettin
private final UnaryCallSettings.Builder<
BatchCommitWriteStreamsRequest, BatchCommitWriteStreamsResponse>
batchCommitWriteStreamsSettings;
private final UnaryCallSettings.Builder<FlushRowsRequest, FlushRowsResponse> flushRowsSettings;

private static final ImmutableMap<String, ImmutableSet<StatusCode.Code>>
RETRYABLE_CODE_DEFINITIONS;
Expand Down Expand Up @@ -270,12 +280,15 @@ protected Builder(ClientContext clientContext) {

batchCommitWriteStreamsSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();

flushRowsSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();

unaryMethodSettingsBuilders =
ImmutableList.<UnaryCallSettings.Builder<?, ?>>of(
createWriteStreamSettings,
getWriteStreamSettings,
finalizeWriteStreamSettings,
batchCommitWriteStreamsSettings);
batchCommitWriteStreamsSettings,
flushRowsSettings);

initDefaults(this);
}
Expand Down Expand Up @@ -311,6 +324,11 @@ private static Builder initDefaults(Builder builder) {
.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("idempotent"))
.setRetrySettings(RETRY_PARAM_DEFINITIONS.get("default"));

builder
.flushRowsSettings()
.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("non_idempotent"))
.setRetrySettings(RETRY_PARAM_DEFINITIONS.get("default"));

return builder;
}

Expand All @@ -322,13 +340,15 @@ protected Builder(BigQueryWriteStubSettings settings) {
getWriteStreamSettings = settings.getWriteStreamSettings.toBuilder();
finalizeWriteStreamSettings = settings.finalizeWriteStreamSettings.toBuilder();
batchCommitWriteStreamsSettings = settings.batchCommitWriteStreamsSettings.toBuilder();
flushRowsSettings = settings.flushRowsSettings.toBuilder();

unaryMethodSettingsBuilders =
ImmutableList.<UnaryCallSettings.Builder<?, ?>>of(
createWriteStreamSettings,
getWriteStreamSettings,
finalizeWriteStreamSettings,
batchCommitWriteStreamsSettings);
batchCommitWriteStreamsSettings,
flushRowsSettings);
}

// NEXT_MAJOR_VER: remove 'throws Exception'
Expand Down Expand Up @@ -377,6 +397,11 @@ public UnaryCallSettings.Builder<GetWriteStreamRequest, WriteStream> getWriteStr
return batchCommitWriteStreamsSettings;
}

/** Returns the builder for the settings used for calls to flushRows. */
public UnaryCallSettings.Builder<FlushRowsRequest, FlushRowsResponse> flushRowsSettings() {
return flushRowsSettings;
}

@Override
public BigQueryWriteStubSettings build() throws IOException {
return new BigQueryWriteStubSettings(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import com.google.cloud.bigquery.storage.v1alpha2.Storage.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.GetWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Stream.WriteStream;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -103,6 +105,14 @@ public class GrpcBigQueryWriteStub extends BigQueryWriteStub {
.setResponseMarshaller(
ProtoUtils.marshaller(BatchCommitWriteStreamsResponse.getDefaultInstance()))
.build();
private static final MethodDescriptor<FlushRowsRequest, FlushRowsResponse>
flushRowsMethodDescriptor =
MethodDescriptor.<FlushRowsRequest, FlushRowsResponse>newBuilder()
.setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName("google.cloud.bigquery.storage.v1alpha2.BigQueryWrite/FlushRows")
.setRequestMarshaller(ProtoUtils.marshaller(FlushRowsRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(FlushRowsResponse.getDefaultInstance()))
.build();

private final BackgroundResource backgroundResources;

Expand All @@ -113,6 +123,7 @@ public class GrpcBigQueryWriteStub extends BigQueryWriteStub {
finalizeWriteStreamCallable;
private final UnaryCallable<BatchCommitWriteStreamsRequest, BatchCommitWriteStreamsResponse>
batchCommitWriteStreamsCallable;
private final UnaryCallable<FlushRowsRequest, FlushRowsResponse> flushRowsCallable;

private final GrpcStubCallableFactory callableFactory;

Expand Down Expand Up @@ -212,6 +223,19 @@ public Map<String, String> extract(BatchCommitWriteStreamsRequest request) {
}
})
.build();
GrpcCallSettings<FlushRowsRequest, FlushRowsResponse> flushRowsTransportSettings =
GrpcCallSettings.<FlushRowsRequest, FlushRowsResponse>newBuilder()
.setMethodDescriptor(flushRowsMethodDescriptor)
.setParamsExtractor(
new RequestParamsExtractor<FlushRowsRequest>() {
@Override
public Map<String, String> extract(FlushRowsRequest request) {
ImmutableMap.Builder<String, String> params = ImmutableMap.builder();
params.put("write_stream", String.valueOf(request.getWriteStream()));
return params.build();
}
})
.build();

this.createWriteStreamCallable =
callableFactory.createUnaryCallable(
Expand All @@ -234,6 +258,9 @@ public Map<String, String> extract(BatchCommitWriteStreamsRequest request) {
batchCommitWriteStreamsTransportSettings,
settings.batchCommitWriteStreamsSettings(),
clientContext);
this.flushRowsCallable =
callableFactory.createUnaryCallable(
flushRowsTransportSettings, settings.flushRowsSettings(), clientContext);

backgroundResources = new BackgroundResourceAggregation(clientContext.getBackgroundResources());
}
Expand All @@ -260,6 +287,10 @@ public UnaryCallable<GetWriteStreamRequest, WriteStream> getWriteStreamCallable(
return batchCommitWriteStreamsCallable;
}

public UnaryCallable<FlushRowsRequest, FlushRowsResponse> flushRowsCallable() {
return flushRowsCallable;
}

@Override
public final void close() {
shutdown();
Expand Down
Loading