Skip to content

Commit

Permalink
feat(bigquery/storage/managedwriter): improve error communication
Browse files Browse the repository at this point in the history
This PR augments the package docs, as well as adding a utility
function for extracting an instance of the service's specific error
proto from an error (by way of the grpc status error details).

Fixes: googleapis#6321
  • Loading branch information
shollyman committed Jul 19, 2022
1 parent 0c82089 commit d1aceab
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 39 deletions.
101 changes: 62 additions & 39 deletions bigquery/storage/managedwriter/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ feature-rich successor to the classic BigQuery streaming interface, which is pre
in cloud.google.com/go/bigquery, and the tabledata.insertAll method if you're more familiar with the BigQuery v2 REST
methods.
Creating a Client
# Creating a Client
To start working with this package, create a client:
Expand All @@ -35,8 +34,7 @@ To start working with this package, create a client:
// TODO: Handle error.
}
Defining the Protocol Buffer Schema
# Defining the Protocol Buffer Schema
The write functionality of BigQuery Storage requires data to be sent using encoded
protocol buffer messages using proto2 wire format. As the protocol buffer is not
Expand Down Expand Up @@ -70,7 +68,7 @@ contains functionality to normalize the descriptor into a self-contained definit
The adapt subpackage also contains functionality for generating a DescriptorProto using
a BigQuery table's schema directly.
Constructing a ManagedStream
# Constructing a ManagedStream
The ManagedStream handles management of the underlying write connection to the BigQuery
Storage service. You can either create a write session explicitly and pass it in, or
Expand Down Expand Up @@ -102,7 +100,7 @@ In addition, NewManagedStream can create new streams implicitly:
// TODO: Handle error.
}
Writing Data
# Writing Data
Use the AppendRows function to write one or more serialized proto messages to a stream. You
can choose to specify an offset in the stream to handle de-duplication for user-created streams,
Expand All @@ -111,42 +109,40 @@ but a "default" stream neither accepts nor reports offsets.
AppendRows returns a future-like object that blocks until the write is successful or yields
an error.
// Define a couple of messages.
mesgs := []*myprotopackage.MyCompiledMessage{
{
UserName: proto.String("johndoe"),
EmailAddress: proto.String("jd@mycompany.mydomain",
FavoriteNumbers: []proto.Int64{1,42,12345},
},
{
UserName: proto.String("janesmith"),
EmailAddress: proto.String("smith@othercompany.otherdomain",
FavoriteNumbers: []proto.Int64{1,3,5,7,9},
},
}
// Define a couple of messages.
mesgs := []*myprotopackage.MyCompiledMessage{
{
UserName: proto.String("johndoe"),
EmailAddress: proto.String("jd@mycompany.mydomain",
FavoriteNumbers: []proto.Int64{1,42,12345},
},
{
UserName: proto.String("janesmith"),
EmailAddress: proto.String("smith@othercompany.otherdomain",
FavoriteNumbers: []proto.Int64{1,3,5,7,9},
},
}
// Encode the messages into binary format.
encoded := make([][]byte, len(mesgs))
for k, v := range mesgs{
b, err := proto.Marshal(v)
// Encode the messages into binary format.
encoded := make([][]byte, len(mesgs))
for k, v := range mesgs{
b, err := proto.Marshal(v)
if err != nil {
// TODO: Handle error.
}
encoded[k] = b
}
// Send the rows to the service, and specify an offset for managing deduplication.
result, err := managedStream.AppendRows(ctx, encoded, WithOffset(0))
// Block until the write is complete and return the result.
returnedOffset, err := result.GetResult(ctx)
if err != nil {
// TODO: Handle error.
}
encoded[k] = b
}
// Send the rows to the service, and specify an offset for managing deduplication.
result, err := managedStream.AppendRows(ctx, encoded, WithOffset(0))
// Block until the write is complete and return the result.
returnedOffset, err := result.GetResult(ctx)
if err != nil {
// TODO: Handle error.
}
Buffered Stream Management
# Buffered Stream Management
For Buffered streams, users control when data is made visible in the destination table/stream
independently of when it is written. Use FlushRows on the ManagedStream to advance the flush
Expand All @@ -156,12 +152,11 @@ point ahead in the stream.
// ahead to make the first 1000 rows available.
flushOffset, err := managedStream.FlushRows(ctx, 1000)
Pending Stream Management
# Pending Stream Management
Pending streams allow users to commit data from multiple streams together once the streams
have been finalized, meaning they'll no longer allow further data writes.
// First, finalize the stream we're writing into.
totalRows, err := managedStream.Finalize(ctx)
if err != nil {
Expand All @@ -176,5 +171,33 @@ have been finalized, meaning they'll no longer allow further data writes.
// table atomically.
resp, err := client.BatchCommitWriteStreams(ctx, req)
# Error Handling
Like other Google Cloud services, this API relies on common components that can provide an
enhanced set of errors when communicating about the results of API interactions.
Specifically, the apierror package (https://pkg.go.dev/github.com/googleapis/gax-go/v2/apierror)
provides convenience methods for extracting structured information about errors.
// By way of example, let's assume the response from an append call returns an error.
_, err := result.GetResult(ctx)
if err != nil {
if apiErr, ok := apierror.FromError(err); ok {
// We now have an instance of APIError, which directly exposes more specific
// details about multiple failure conditions.
log.Printf("result had status %v", apiErr.GRPCStatus())
}
}
Additionally, the service defines a specific StorageError type that has service-specific details
about errors. Please note that StorageError does not implement Go's error interface. The
StorageErrorFromError() function can be used to extract a StorageError from an error returned
by the service.
// Once again, let's assume the response from an append call returns an error.
_, err := result.GetResult(ctx)
if se = StorageErrorFromError(err); se != nil {
log.Printf("storage error code was %s, message was %d, se.GetCode().String(), se.GetErrorMessage())
}
*/
package managedwriter
106 changes: 106 additions & 0 deletions bigquery/storage/managedwriter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ func TestIntegration_ManagedWriter(t *testing.T) {
t.Parallel()
testCommittedStream(ctx, t, mwClient, bqClient, dataset)
})
t.Run("ErrorBehaviors", func(t *testing.T) {
t.Parallel()
testErrorBehaviors(ctx, t, mwClient, bqClient, dataset)
})
t.Run("BufferedStream", func(t *testing.T) {
t.Parallel()
testBufferedStream(ctx, t, mwClient, bqClient, dataset)
Expand Down Expand Up @@ -405,6 +409,108 @@ func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bq
withExactRowCount(int64(len(testSimpleData))))
}

// testErrorBehaviors intentionally issues problematic requests to verify error behaviors.
func testErrorBehaviors(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}

m := &testdata.SimpleMessageProto2{}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())

// setup a new stream.
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(CommittedStream),
WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))

data := make([][]byte, len(testSimpleData))
for k, mesg := range testSimpleData {
b, err := proto.Marshal(mesg)
if err != nil {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data[k] = b
}

// Send an append at an invalid offset.
result, err := ms.AppendRows(ctx, data, WithOffset(99))
if err != nil {
t.Errorf("failed to send append: %v", err)
}
//
off, err := result.GetResult(ctx)
if err == nil {
t.Errorf("expected error, got offset %d", off)
}
se := StorageErrorFromError(err)
if se == nil {
t.Errorf("expected StorageError from response, got nil")
}
wantCode := storagepb.StorageError_OFFSET_OUT_OF_RANGE
if se.GetCode() != wantCode {
t.Errorf("wanted %s, got %s", wantCode.String(), se.GetCode().String())
}
// Send "real" append to advance the offset.
result, err = ms.AppendRows(ctx, data, WithOffset(0))
if err != nil {
t.Errorf("failed to send append: %v", err)
}
off, err = result.GetResult(ctx)
if err != nil {
t.Errorf("expected offset, got error %v", err)
}
wantOffset := int64(0)
if off != wantOffset {
t.Errorf("offset mismatch, got %d want %d", off, wantOffset)
}
// Now, send at the start offset again.
result, err = ms.AppendRows(ctx, data, WithOffset(0))
if err != nil {
t.Errorf("failed to send append: %v", err)
}
off, err = result.GetResult(ctx)
if err == nil {
t.Errorf("expected error, got offset %d", off)
}
se = StorageErrorFromError(err)
if se == nil {
t.Errorf("expected StorageError from response, got nil")
}
wantCode = storagepb.StorageError_OFFSET_ALREADY_EXISTS
if se.GetCode() != wantCode {
t.Errorf("wanted %s, got %s", wantCode.String(), se.GetCode().String())
}
// Finalize the stream.
if _, err := ms.Finalize(ctx); err != nil {
t.Errorf("Finalize had error: %v", err)
}
// Send another append, which is disallowed for finalized streams.
result, err = ms.AppendRows(ctx, data)
if err != nil {
t.Errorf("failed to send append: %v", err)
}
off, err = result.GetResult(ctx)
if err == nil {
t.Errorf("expected error, got offset %d", off)
}
se = StorageErrorFromError(err)
if se == nil {
t.Errorf("expected StorageError from response, got nil")
}
wantCode = storagepb.StorageError_STREAM_FINALIZED
if se.GetCode() != wantCode {
t.Errorf("wanted %s, got %s", wantCode.String(), se.GetCode().String())
}
}

func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
Expand Down
34 changes: 34 additions & 0 deletions bigquery/storage/managedwriter/storageerror.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package managedwriter

import (
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
"google.golang.org/grpc/status"
)

// StorageErrorFromError attempts to extract a storage error if it is present.
func StorageErrorFromError(err error) *storagepb.StorageError {
status, ok := status.FromError(err)
if !ok {
return nil
}
for _, detail := range status.Details() {
if se, ok := detail.(*storagepb.StorageError); ok {
return se
}
}
return nil
}

0 comments on commit d1aceab

Please sign in to comment.