Skip to content

Commit

Permalink
feat(bigquery/storage/managedwriter): naming and doc improvements (#4508
Browse files Browse the repository at this point in the history
)

Minor changes:

WithTracePrefix -> WithTraceID for the option and accompanying downstream usage
exported TableParentFromStreamName to aid users of BatchCommit.

The rest of the PR represents docstring improvements.

Towards #4366
  • Loading branch information
shollyman authored Jul 28, 2021
1 parent 41246e9 commit 663c899
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 37 deletions.
5 changes: 3 additions & 2 deletions bigquery/storage/managedwriter/appendresult.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@ func newAppendResult(data []byte) *AppendResult {
}
}

// Ready blocks until the append request is completed.
// Ready blocks until the append request has reached a completed state,
// which may be a successful append or an error.
func (ar *AppendResult) Ready() <-chan struct{} { return ar.ready }

// GetResult returns the optional offset of this row, or the associated
// error.
// error. It blocks until the result is ready.
func (ar *AppendResult) GetResult(ctx context.Context) (int64, error) {
select {
case <-ctx.Done():
Expand Down
19 changes: 13 additions & 6 deletions bigquery/storage/managedwriter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (c *Client) validateOptions(ctx context.Context, ms *ManagedStream) error {
}
// update type and destination based on stream metadata
ms.streamSettings.streamType = StreamType(info.Type.String())
ms.destinationTable = tableParentFromStreamName(ms.streamSettings.streamID)
ms.destinationTable = TableParentFromStreamName(ms.streamSettings.streamID)
}
if ms.destinationTable == "" {
return fmt.Errorf("no destination table specified")
Expand All @@ -158,7 +158,13 @@ func (c *Client) validateOptions(ctx context.Context, ms *ManagedStream) error {
// BatchCommit is used to commit one or more PendingStream streams belonging to the same table
// as a single transaction. Streams must be finalized before committing.
//
// TODO: this currently exposes the raw proto response, but a future CL will wrap this with a nicer type.
// Format of the parentTable is: projects/{project}/datasets/{dataset}/tables/{table} and the utility
// function TableParentFromStreamName can be used to derive this from a Stream's name.
//
// If the returned response contains stream errors, this indicates that the batch commit failed and no data was
// committed.
//
// TODO: currently returns the raw response. Determine how we want to surface StreamErrors.
func (c *Client) BatchCommit(ctx context.Context, parentTable string, streamNames []string) (*storagepb.BatchCommitWriteStreamsResponse, error) {

// determine table from first streamName, as all must share the same table.
Expand All @@ -167,7 +173,7 @@ func (c *Client) BatchCommit(ctx context.Context, parentTable string, streamName
}

req := &storagepb.BatchCommitWriteStreamsRequest{
Parent: tableParentFromStreamName(streamNames[0]),
Parent: TableParentFromStreamName(streamNames[0]),
WriteStreams: streamNames,
}
return c.rawClient.BatchCommitWriteStreams(ctx, req)
Expand All @@ -183,9 +189,10 @@ func (c *Client) getWriteStream(ctx context.Context, streamName string) (*storag
return c.rawClient.GetWriteStream(ctx, req)
}

// tableParentFromStreamName return the corresponding parent table
// identifier given a fully qualified streamname.
func tableParentFromStreamName(streamName string) string {
// TableParentFromStreamName is a utility function for extracting the parent table
// prefix from a stream name. When an invalid stream ID is passed, this simply returns
// the original stream name.
func TableParentFromStreamName(streamName string) string {
// Stream IDs have the following prefix:
// projects/{project}/datasets/{dataset}/tables/{table}/blah
parts := strings.SplitN(streamName, "/", 7)
Expand Down
2 changes: 1 addition & 1 deletion bigquery/storage/managedwriter/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestTableParentFromStreamName(t *testing.T) {
}

for _, tc := range testCases {
got := tableParentFromStreamName(tc.in)
got := TableParentFromStreamName(tc.in)
if got != tc.want {
t.Errorf("mismatch on %s: got %s want %s", tc.in, got, tc.want)
}
Expand Down
28 changes: 20 additions & 8 deletions bigquery/storage/managedwriter/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package managedwriter will be a thick client around the storage API's BigQueryWriteClient.
//
// It is EXPERIMENTAL and subject to change or removal without notice. This library is in a pre-alpha
// state, and breaking changes are frequent.
//
// Currently, the BigQueryWriteClient this library targets is exposed in the storage v1beta2 endpoint, and is
// a successor to the streaming interface. API method tabledata.insertAll is the primary backend method, and
// the Inserter abstraction is the equivalent to this in the cloud.google.com/go/bigquery package.
/*
Package managedwriter provides an EXPERIMENTAL thick client around the BigQuery storage API's BigQueryWriteClient.
More information about this new write client may also be found in the public documentation: https://cloud.google.com/bigquery/docs/write-api
It is EXPERIMENTAL and subject to change or removal without notice. This library is in a pre-alpha
state, and breaking changes are frequent.
Currently, this client targets the BigQueryWriteClient present in the v1beta2 endpoint, and is intended as a more
feature-rich successor to the classic BigQuery streaming interface, which is presented as the Inserter abstraction
in cloud.google.com/go/bigquery, and the tabledata.insertAll method if you're more familiar with the BigQuery v2 REST
methods.
Appending data is accomplished through the use of streams. There are four stream types, each targetting slightly different
use cases and needs. See the StreamType documentation for more details about each stream type.
This API uses serialized protocol buffer messages for appending data to streams. For users who don't have predefined protocol
buffer messages for sending data, the cloud.google.com/go/bigquery/storage/managedwriter/adapt subpackage includes functionality
for defining protocol buffer messages dynamically using table schema information, which enables users to do things like using
protojson to convert json text into a protocol buffer.
*/
package managedwriter
2 changes: 1 addition & 1 deletion bigquery/storage/managedwriter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
}

// Commit stream and validate.
resp, err := mwClient.BatchCommit(ctx, tableParentFromStreamName(ms.StreamName()), []string{ms.StreamName()})
resp, err := mwClient.BatchCommit(ctx, TableParentFromStreamName(ms.StreamName()), []string{ms.StreamName()})
if err != nil {
t.Errorf("client.BatchCommit: %v", err)
}
Expand Down
21 changes: 16 additions & 5 deletions bigquery/storage/managedwriter/managed_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ var (

// BufferedStream is a form of checkpointed stream, that allows
// you to advance the offset of visible rows via Flush operations.
//
// NOTE: Buffered Streams are currently in limited preview, and as such
// methods like FlushRows() may yield errors for non-enrolled projects.
BufferedStream StreamType = "BUFFERED"

// PendingStream is a stream in which no data is made visible to
Expand Down Expand Up @@ -106,17 +109,17 @@ type streamSettings struct {
// request bytes can be outstanding into the system.
MaxInflightBytes int

// TracePrefix sets a suitable prefix for the trace ID set on
// append requests. Useful for diagnostic purposes.
TracePrefix string
// TraceID can be set when appending data on a stream. It's
// purpose is to aid in debug and diagnostic scenarios.
TraceID string
}

func defaultStreamSettings() *streamSettings {
return &streamSettings{
streamType: DefaultStream,
MaxInflightRequests: 1000,
MaxInflightBytes: 0,
TracePrefix: "defaultManagedWriter",
TraceID: "",
}
}

Expand Down Expand Up @@ -241,7 +244,9 @@ func (ms *ManagedStream) append(pw *pendingWrite, opts ...gax.CallOption) error
reqCopy.GetProtoRows().WriterSchema = &storagepb.ProtoSchema{
ProtoDescriptor: ms.schemaDescriptor,
}
reqCopy.TraceId = ms.streamSettings.TracePrefix
if ms.streamSettings.TraceID != "" {
reqCopy.TraceId = ms.streamSettings.TraceID
}
req = &reqCopy
})

Expand Down Expand Up @@ -290,10 +295,16 @@ func (ms *ManagedStream) Close() error {
ms.mu.Lock()
ms.err = io.EOF
ms.mu.Unlock()
// Propagate cancellation.
if ms.cancel != nil {
ms.cancel()
}
return err
}

// AppendRows sends the append requests to the service, and returns one AppendResult per row.
// The format of the row data is binary serialized protocol buffer bytes, and and the message
// must adhere to the format of the schema Descriptor passed in when creating the managed stream.
func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, offset int64) ([]*AppendResult, error) {
pw := newPendingWrite(data, offset)
// check flow control
Expand Down
2 changes: 1 addition & 1 deletion bigquery/storage/managedwriter/managed_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestManagedStream_FirstAppendBehavior(t *testing.T) {
fc: newFlowController(0, 0),
}
ms.streamSettings.streamID = "FOO"
ms.streamSettings.TracePrefix = "TRACE"
ms.streamSettings.TraceID = "TRACE"
ms.schemaDescriptor = schema

fakeData := [][]byte{
Expand Down
19 changes: 10 additions & 9 deletions bigquery/storage/managedwriter/writer_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ package managedwriter

import "google.golang.org/protobuf/types/descriptorpb"

// WriterOption is used to configure a ManagedWriteClient.
// WriterOption are variadic options used to configure a ManagedStream instance.
type WriterOption func(*ManagedStream)

// WithType sets the write type of the new writer.
// WithType sets the stream type for the managed stream.
func WithType(st StreamType) WriterOption {
return func(ms *ManagedStream) {
ms.streamSettings.streamType = st
Expand All @@ -28,10 +28,10 @@ func WithType(st StreamType) WriterOption {

// WithStreamName allows users to set the stream name this writer will
// append to explicitly. By default, the managed client will create the
// stream when instantiated.
// stream when instantiated if necessary.
//
// Note: Supplying this option causes other options such as WithStreamType
// and WithDestinationTable to be ignored.
// Note: Supplying this option causes other options which affect stream construction
// such as WithStreamType and WithDestinationTable to be ignored.
func WithStreamName(name string) WriterOption {
return func(ms *ManagedStream) {
ms.streamSettings.streamID = name
Expand Down Expand Up @@ -62,15 +62,16 @@ func WithMaxInflightBytes(n int) WriterOption {
}
}

// WithTracePrefix allows instruments requests to the service with a custom trace prefix.
// WithTraceID allows instruments requests to the service with a custom trace prefix.
// This is generally for diagnostic purposes only.
func WithTracePrefix(prefix string) WriterOption {
func WithTraceID(traceID string) WriterOption {
return func(ms *ManagedStream) {
ms.streamSettings.TracePrefix = prefix
ms.streamSettings.TraceID = traceID
}
}

// WithSchemaDescriptor describes the format of messages you'll be sending to the service.
// WithSchemaDescriptor describes the format of the serialized data being sent by
// AppendRows calls on the stream.
func WithSchemaDescriptor(dp *descriptorpb.DescriptorProto) WriterOption {
return func(ms *ManagedStream) {
ms.schemaDescriptor = dp
Expand Down
8 changes: 4 additions & 4 deletions bigquery/storage/managedwriter/writer_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ func TestWriterOptions(t *testing.T) {
},
{
desc: "WithTracePrefix",
options: []WriterOption{WithTracePrefix("foo")},
options: []WriterOption{WithTraceID("foo")},
want: func() *ManagedStream {
ms := &ManagedStream{
streamSettings: defaultStreamSettings(),
}
ms.streamSettings.TracePrefix = "foo"
ms.streamSettings.TraceID = "foo"
return ms
}(),
},
Expand All @@ -88,15 +88,15 @@ func TestWriterOptions(t *testing.T) {
options: []WriterOption{
WithType(PendingStream),
WithMaxInflightBytes(5),
WithTracePrefix("pre"),
WithTraceID("id"),
},
want: func() *ManagedStream {
ms := &ManagedStream{
streamSettings: defaultStreamSettings(),
}
ms.streamSettings.MaxInflightBytes = 5
ms.streamSettings.streamType = PendingStream
ms.streamSettings.TracePrefix = "pre"
ms.streamSettings.TraceID = "id"
return ms
}(),
},
Expand Down

0 comments on commit 663c899

Please sign in to comment.