diff --git a/bigquery/go.mod b/bigquery/go.mod index c6a8436dc38a..0030535ae9c4 100644 --- a/bigquery/go.mod +++ b/bigquery/go.mod @@ -9,14 +9,14 @@ require ( cloud.google.com/go/storage v1.23.0 github.com/golang/protobuf v1.5.2 github.com/google/go-cmp v0.5.8 - github.com/googleapis/gax-go/v2 v2.4.0 + github.com/googleapis/gax-go/v2 v2.5.1 go.opencensus.io v0.23.0 golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f google.golang.org/api v0.90.0 google.golang.org/genproto v0.0.0-20220802133213-ce4fa296bf78 google.golang.org/grpc v1.48.0 - google.golang.org/protobuf v1.28.0 + google.golang.org/protobuf v1.28.1 ) require ( diff --git a/bigquery/go.sum b/bigquery/go.sum index d4856744403c..e6b2b863e7da 100644 --- a/bigquery/go.sum +++ b/bigquery/go.sum @@ -181,8 +181,9 @@ github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pf github.com/googleapis/gax-go/v2 v2.1.1/go.mod h1:hddJymUZASv3XPyGkUpKj8pPO47Rmb0eJc8R6ouapiM= github.com/googleapis/gax-go/v2 v2.2.0/go.mod h1:as02EH8zWkzwUoLbBaFeQ+arQaj/OthfcblKl4IGNaM= github.com/googleapis/gax-go/v2 v2.3.0/go.mod h1:b8LNqSzNabLiUpXKkY7HAR5jr6bIT99EXz9pXxye9YM= -github.com/googleapis/gax-go/v2 v2.4.0 h1:dS9eYAjhrE2RjmzYw2XAPvcXfmcQLtFEQWn0CR82awk= github.com/googleapis/gax-go/v2 v2.4.0/go.mod h1:XOTVJ59hdnfJLIP/dh8n5CGryZR2LxK9wbMD5+iXC6c= +github.com/googleapis/gax-go/v2 v2.5.1 h1:kBRZU0PSuI7PspsSb/ChWoVResUcwNVIdpB049pKTiw= +github.com/googleapis/gax-go/v2 v2.5.1/go.mod h1:h6B0KMMFNtI2ddbGJn3T3ZbwkeT6yqEF02fYlzkUCyo= github.com/googleapis/go-type-adapters v1.0.0 h1:9XdMn+d/G57qq1s8dNc5IesGCXHf6V2HZ2JwRxfA2tA= github.com/googleapis/go-type-adapters v1.0.0/go.mod h1:zHW75FOG2aur7gAO2B+MLby+cLsWGBF62rFAi7WjWO4= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= @@ -604,6 +605,7 @@ google.golang.org/genproto v0.0.0-20220608133413-ed9918b62aac/go.mod h1:KEWEmljW google.golang.org/genproto v0.0.0-20220616135557-88e70c0c3a90/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA= google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA= google.golang.org/genproto v0.0.0-20220624142145-8cd45d7dbd1f/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA= +google.golang.org/genproto v0.0.0-20220722212130-b98a9ff5e252/go.mod h1:GkXuJDJ6aQ7lnJcRF+SJVgFdQhypqgl3LB1C9vabdRE= google.golang.org/genproto v0.0.0-20220802133213-ce4fa296bf78 h1:QntLWYqZeuBtJkth3m/6DLznnI0AHJr+AgJXvVh/izw= google.golang.org/genproto v0.0.0-20220802133213-ce4fa296bf78/go.mod h1:iHe1svFLAZg9VWz891+QbRMwUv9O/1Ww+/mngYeThbc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= @@ -653,8 +655,9 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= +google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= diff --git a/bigquery/storage/managedwriter/doc.go b/bigquery/storage/managedwriter/doc.go index 045cf92c2f81..a81ff5df079a 100644 --- a/bigquery/storage/managedwriter/doc.go +++ b/bigquery/storage/managedwriter/doc.go @@ -170,5 +170,34 @@ have been finalized, meaning they'll no longer allow further data writes. // Using the client, we can commit data from multple streams to the same // table atomically. resp, err := client.BatchCommitWriteStreams(ctx, req) + +# Error Handling + +Like other Google Cloud services, this API relies on common components that can provide an +enhanced set of errors when communicating about the results of API interactions. + +Specifically, the apierror package (https://pkg.go.dev/github.com/googleapis/gax-go/v2/apierror) +provides convenience methods for extracting structured information about errors. + +The BigQuery Storage API service augments applicable errors with service-specific details in +the form of a StorageError message. The StorageError message is accessed via the ExtractProtoMessage +method in the apierror package. Note that the StorageError messsage does not implement Go's error +interface. + +An example of accessing the structured error details: + + // By way of example, let's assume the response from an append call returns an error. + _, err := result.GetResult(ctx) + if err != nil { + if apiErr, ok := apierror.FromError(err); ok { + // We now have an instance of APIError, which directly exposes more specific + // details about multiple failure conditions include transport-level errors. + storageErr := &storagepb.StorageError{} + if e := apiErr.Details().ExtractProtoMessage(storageErr); e != nil { + // storageErr now contains service-specific information about the error. + log.Printf("Received service-specific error code %s", storageErr.GetCode().String()) + } + } + } */ package managedwriter diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index e252c5bdb8fb..7c6edb5101e7 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -139,6 +139,10 @@ func TestIntegration_ManagedWriter(t *testing.T) { t.Parallel() testCommittedStream(ctx, t, mwClient, bqClient, dataset) }) + t.Run("ErrorBehaviors", func(t *testing.T) { + t.Parallel() + testErrorBehaviors(ctx, t, mwClient, bqClient, dataset) + }) t.Run("BufferedStream", func(t *testing.T) { t.Parallel() testBufferedStream(ctx, t, mwClient, bqClient, dataset) @@ -404,6 +408,124 @@ func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bq withExactRowCount(int64(len(testSimpleData)))) } +// testErrorBehaviors intentionally issues problematic requests to verify error behaviors. +func testErrorBehaviors(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { + testTable := dataset.Table(tableIDs.New()) + if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { + t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) + } + + m := &testdata.SimpleMessageProto2{} + descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) + + // setup a new stream. + ms, err := mwClient.NewManagedStream(ctx, + WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)), + WithType(CommittedStream), + WithSchemaDescriptor(descriptorProto), + ) + if err != nil { + t.Fatalf("NewManagedStream: %v", err) + } + validateTableConstraints(ctx, t, bqClient, testTable, "before send", + withExactRowCount(0)) + + data := make([][]byte, len(testSimpleData)) + for k, mesg := range testSimpleData { + b, err := proto.Marshal(mesg) + if err != nil { + t.Errorf("failed to marshal message %d: %v", k, err) + } + data[k] = b + } + + // Send an append at an invalid offset. + result, err := ms.AppendRows(ctx, data, WithOffset(99)) + if err != nil { + t.Errorf("failed to send append: %v", err) + } + // + off, err := result.GetResult(ctx) + if err == nil { + t.Errorf("expected error, got offset %d", off) + } + + apiErr, ok := apierror.FromError(err) + if !ok { + t.Errorf("expected apierror, got %T: %v", err, err) + } + se := &storagepb.StorageError{} + e := apiErr.Details().ExtractProtoMessage(se) + if e != nil { + t.Errorf("expected storage error, but extraction failed: %v", e) + } + wantCode := storagepb.StorageError_OFFSET_OUT_OF_RANGE + if se.GetCode() != wantCode { + t.Errorf("wanted %s, got %s", wantCode.String(), se.GetCode().String()) + } + // Send "real" append to advance the offset. + result, err = ms.AppendRows(ctx, data, WithOffset(0)) + if err != nil { + t.Errorf("failed to send append: %v", err) + } + off, err = result.GetResult(ctx) + if err != nil { + t.Errorf("expected offset, got error %v", err) + } + wantOffset := int64(0) + if off != wantOffset { + t.Errorf("offset mismatch, got %d want %d", off, wantOffset) + } + // Now, send at the start offset again. + result, err = ms.AppendRows(ctx, data, WithOffset(0)) + if err != nil { + t.Errorf("failed to send append: %v", err) + } + off, err = result.GetResult(ctx) + if err == nil { + t.Errorf("expected error, got offset %d", off) + } + apiErr, ok = apierror.FromError(err) + if !ok { + t.Errorf("expected apierror, got %T: %v", err, err) + } + se = &storagepb.StorageError{} + e = apiErr.Details().ExtractProtoMessage(se) + if e != nil { + t.Errorf("expected storage error, but extraction failed: %v", e) + } + wantCode = storagepb.StorageError_OFFSET_ALREADY_EXISTS + if se.GetCode() != wantCode { + t.Errorf("wanted %s, got %s", wantCode.String(), se.GetCode().String()) + } + // Finalize the stream. + if _, err := ms.Finalize(ctx); err != nil { + t.Errorf("Finalize had error: %v", err) + } + // Send another append, which is disallowed for finalized streams. + result, err = ms.AppendRows(ctx, data) + if err != nil { + t.Errorf("failed to send append: %v", err) + } + off, err = result.GetResult(ctx) + if err == nil { + t.Errorf("expected error, got offset %d", off) + } + apiErr, ok = apierror.FromError(err) + if !ok { + t.Errorf("expected apierror, got %T: %v", err, err) + } + se = &storagepb.StorageError{} + e = apiErr.Details().ExtractProtoMessage(se) + if e != nil { + t.Errorf("expected storage error, but extraction failed: %v", e) + } + wantCode = storagepb.StorageError_STREAM_FINALIZED + if se.GetCode() != wantCode { + t.Errorf("wanted %s, got %s", wantCode.String(), se.GetCode().String()) + } +} + func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { testTable := dataset.Table(tableIDs.New()) if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {