From a67d53ddf13b7d382d4c7856cafb068919021912 Mon Sep 17 00:00:00 2001 From: shollyman Date: Fri, 23 Jun 2023 11:36:50 -0700 Subject: [PATCH] fix(bigquery/storage/managedwriter): correct reconnection logic (#8164) Signalling for an AppendRows stream when schema changes is predicated on the backend's status for the connection. For a simplex (non-multiplexed) connection, the expectation is the client closes and reconnects to signal there's a change in the schema. For a connection in multiplex mode, no reconnection is necessary and the backend will look at the schema for changes. In managedwriter, we allow a user to specify multiplex at the outset, but for connections that haven't actually sent writes for more than a single stream ID the backend doesn't recognize the multiplex status. This PR expands the interface for send optimizer to signal whether the optimizer has sent writes for multiple connections, and uses it when making the determination about schema-based reconnects. It also augments the schema evolution test to validate using multiple combinations of writer and client options. --- bigquery/storage/managedwriter/connection.go | 6 +- .../storage/managedwriter/integration_test.go | 68 +++++++++++++++---- .../storage/managedwriter/send_optimizer.go | 22 ++++++ 3 files changed, 83 insertions(+), 13 deletions(-) diff --git a/bigquery/storage/managedwriter/connection.go b/bigquery/storage/managedwriter/connection.go index 4eec3a4f683b..3ea97a5825df 100644 --- a/bigquery/storage/managedwriter/connection.go +++ b/bigquery/storage/managedwriter/connection.go @@ -368,8 +368,12 @@ func (co *connection) lockingAppend(pw *pendingWrite) error { forceReconnect := false if pw.writer != nil && pw.descVersion != nil && pw.descVersion.isNewer(pw.writer.curDescVersion) { pw.writer.curDescVersion = pw.descVersion - if !canMultiplex(pw.writeStreamID) { + if co.optimizer == nil { forceReconnect = true + } else { + if !co.optimizer.isMultiplexing() { + forceReconnect = true + } } } diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index c1cb1163a17b..7833f300ff7e 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -245,10 +245,6 @@ func TestIntegration_ManagedWriter(t *testing.T) { t.Parallel() testPendingStream(ctx, t, mwClient, bqClient, dataset) }) - t.Run("SchemaEvolution", func(t *testing.T) { - t.Parallel() - testSchemaEvolution(ctx, t, mwClient, bqClient, dataset) - }) t.Run("SimpleCDC", func(t *testing.T) { t.Parallel() testSimpleCDC(ctx, t, mwClient, bqClient, dataset) @@ -267,6 +263,56 @@ func TestIntegration_ManagedWriter(t *testing.T) { }) } +func TestIntegration_SchemaEvolution(t *testing.T) { + + testcases := []struct { + desc string + clientOpts []option.ClientOption + writerOpts []WriterOption + }{ + { + desc: "Simplex_Committed", + writerOpts: []WriterOption{ + WithType(CommittedStream), + }, + }, + { + desc: "Simplex_Default", + writerOpts: []WriterOption{ + WithType(DefaultStream), + }, + }, + { + desc: "Multiplex_Default", + clientOpts: []option.ClientOption{ + WithMultiplexing(), + WithMultiplexPoolLimit(2), + }, + writerOpts: []WriterOption{ + WithType(DefaultStream), + }, + }, + } + + for _, tc := range testcases { + mwClient, bqClient := getTestClients(context.Background(), t, tc.clientOpts...) + defer mwClient.Close() + defer bqClient.Close() + + dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient, "asia-east1") + if err != nil { + t.Fatalf("failed to init test dataset: %v", err) + } + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + t.Run(tc.desc, func(t *testing.T) { + testSchemaEvolution(ctx, t, mwClient, bqClient, dataset, tc.writerOpts...) + }) + } +} + func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { testTable := dataset.Table(tableIDs.New()) if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { @@ -1094,7 +1140,7 @@ func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bq } } -func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { +func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset, opts ...WriterOption) { testTable := dataset.Table(tableIDs.New()) if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) @@ -1104,11 +1150,9 @@ func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bq descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) // setup a new stream. - ms, err := mwClient.NewManagedStream(ctx, - WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)), - WithSchemaDescriptor(descriptorProto), - WithType(CommittedStream), - ) + opts = append(opts, WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID))) + opts = append(opts, WithSchemaDescriptor(descriptorProto)) + ms, err := mwClient.NewManagedStream(ctx, opts...) if err != nil { t.Fatalf("NewManagedStream: %v", err) } @@ -1154,7 +1198,7 @@ func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bq // this subjects us to a possible race, as the backend that services GetWriteStream isn't necessarily the // one in charge of the stream, and thus may report ready early. for { - resp, err := ms.AppendRows(ctx, [][]byte{latestRow}, WithOffset(curOffset)) + resp, err := ms.AppendRows(ctx, [][]byte{latestRow}) if err != nil { t.Errorf("got error on dupe append: %v", err) break @@ -1181,7 +1225,7 @@ func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bq t.Errorf("failed to marshal evolved message: %v", err) } // Send an append with an evolved schema - res, err := ms.AppendRows(ctx, [][]byte{b}, WithOffset(curOffset), UpdateSchemaDescriptor(descriptorProto)) + res, err := ms.AppendRows(ctx, [][]byte{b}, UpdateSchemaDescriptor(descriptorProto)) if err != nil { t.Errorf("failed evolved append: %v", err) } diff --git a/bigquery/storage/managedwriter/send_optimizer.go b/bigquery/storage/managedwriter/send_optimizer.go index 7385f21de201..008fae50ee94 100644 --- a/bigquery/storage/managedwriter/send_optimizer.go +++ b/bigquery/storage/managedwriter/send_optimizer.go @@ -35,6 +35,9 @@ type sendOptimizer interface { // optimizeSend handles possible manipulation of a request, and triggers the send. optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, pw *pendingWrite) error + + // isMultiplexing tracks if we've actually sent writes to more than a single stream on this connection. + isMultiplexing() bool } // verboseOptimizer is a primarily a testing optimizer that always sends the full request. @@ -50,6 +53,11 @@ func (vo *verboseOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsC return arc.Send(pw.constructFullRequest(true)) } +func (vo *verboseOptimizer) isMultiplexing() bool { + // we declare this no to ensure we always reconnect on schema changes. + return false +} + // simplexOptimizer is used for connections bearing AppendRowsRequest for only a single stream. // // The optimizations here are straightforward: @@ -80,6 +88,11 @@ func (so *simplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsC return err } +func (so *simplexOptimizer) isMultiplexing() bool { + // A simplex optimizer is not designed for multiplexing. + return false +} + // multiplexOptimizer is used for connections where requests for multiple default streams are sent on a common // connection. Only default streams can currently be multiplexed. // @@ -93,10 +106,12 @@ func (so *simplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsC type multiplexOptimizer struct { prevStream string prevDescriptorVersion *descriptorVersion + multiplexStreams bool } func (mo *multiplexOptimizer) signalReset() { mo.prevStream = "" + mo.multiplexStreams = false mo.prevDescriptorVersion = nil } @@ -139,11 +154,18 @@ func (mo *multiplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRow mo.prevStream = pw.writeStreamID mo.prevDescriptorVersion = pw.descVersion } + // Also, note that we've sent traffic for multiple streams, which means the backend recognizes this + // is a multiplex stream as well. + mo.multiplexStreams = true } } return err } +func (mo *multiplexOptimizer) isMultiplexing() bool { + return mo.multiplexStreams +} + // getDescriptorFromAppend is a utility method for extracting the deeply nested schema // descriptor from a request. It returns a nil if the descriptor is not set. func getDescriptorFromAppend(req *storagepb.AppendRowsRequest) *descriptorpb.DescriptorProto {