diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index 9789cb854343..3c8118b0b34e 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "net/url" + "os" "strconv" "strings" "time" @@ -29,6 +30,7 @@ import ( btopt "cloud.google.com/go/bigtable/internal/option" "cloud.google.com/go/internal/trace" gax "github.com/googleapis/gax-go/v2" + "go.opentelemetry.io/otel/metric" "google.golang.org/api/option" "google.golang.org/api/option/internaloption" gtransport "google.golang.org/api/transport/grpc" @@ -52,10 +54,11 @@ const mtlsProdAddr = "bigtable.mtls.googleapis.com:443" // // A Client is safe to use concurrently, except for its Close method. type Client struct { - connPool gtransport.ConnPool - client btpb.BigtableClient - project, instance string - appProfile string + connPool gtransport.ConnPool + client btpb.BigtableClient + project, instance string + appProfile string + metricsTracerFactory *builtinMetricsTracerFactory } // ClientConfig has configurations for the client. @@ -63,8 +66,25 @@ type ClientConfig struct { // The id of the app profile to associate with all data operations sent from this client. // If unspecified, the default app profile for the instance will be used. AppProfile string + + // If not set or set to nil, client side metrics will be collected and exported + // + // To disable client side metrics, set 'MetricsProvider' to 'NoopMetricsProvider' + // + // TODO: support user provided meter provider + MetricsProvider MetricsProvider +} + +// MetricsProvider is a wrapper for built in metrics meter provider +type MetricsProvider interface { + isMetricsProvider() } +// NoopMetricsProvider can be used to disable built in metrics +type NoopMetricsProvider struct{} + +func (NoopMetricsProvider) isMetricsProvider() {} + // NewClient creates a new Client for a given project and instance. // The default ClientConfig will be used. func NewClient(ctx context.Context, project, instance string, opts ...option.ClientOption) (*Client, error) { @@ -95,17 +115,33 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C return nil, fmt.Errorf("dialing: %w", err) } + metricsProvider := config.MetricsProvider + if emulatorAddr := os.Getenv("BIGTABLE_EMULATOR_HOST"); emulatorAddr != "" { + // Do not emit metrics when emulator is being used + metricsProvider = NoopMetricsProvider{} + } + + // Create a OpenTelemetry metrics configuration + metricsTracerFactory, err := newBuiltinMetricsTracerFactory(ctx, project, instance, config.AppProfile, metricsProvider) + if err != nil { + return nil, err + } + return &Client{ - connPool: connPool, - client: btpb.NewBigtableClient(connPool), - project: project, - instance: instance, - appProfile: config.AppProfile, + connPool: connPool, + client: btpb.NewBigtableClient(connPool), + project: project, + instance: instance, + appProfile: config.AppProfile, + metricsTracerFactory: metricsTracerFactory, }, nil } // Close closes the Client. func (c *Client) Close() error { + if c.metricsTracerFactory != nil { + c.metricsTracerFactory.shutdown() + } return c.connPool.Close() } @@ -166,19 +202,21 @@ func init() { } // Convert error to grpc status error -func convertToGrpcStatusErr(err error) error { - if err != nil { - if errStatus, ok := status.FromError(err); ok { - return status.Error(errStatus.Code(), errStatus.Message()) - } +func convertToGrpcStatusErr(err error) (codes.Code, error) { + if err == nil { + return codes.OK, nil + } - ctxStatus := status.FromContextError(err) - if ctxStatus.Code() != codes.Unknown { - return status.Error(ctxStatus.Code(), ctxStatus.Message()) - } + if errStatus, ok := status.FromError(err); ok { + return errStatus.Code(), status.Error(errStatus.Code(), errStatus.Message()) } - return err + ctxStatus := status.FromContextError(err) + if ctxStatus.Code() != codes.Unknown { + return ctxStatus.Code(), status.Error(ctxStatus.Code(), ctxStatus.Message()) + } + + return codes.Unknown, err } func (c *Client) fullTableName(table string) string { @@ -285,6 +323,10 @@ func (ti *tableImpl) ApplyReadModifyWrite(ctx context.Context, row string, m *Re return ti.Table.ApplyReadModifyWrite(ctx, row, m) } +func (ti *tableImpl) newBuiltinMetricsTracer(ctx context.Context, isStreaming bool) *builtinMetricsTracer { + return ti.Table.newBuiltinMetricsTracer(ctx, isStreaming) +} + // TODO(dsymonds): Read method that returns a sequence of ReadItems. // ReadRows reads rows from a table. f is called for each row. @@ -299,9 +341,19 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable.ReadRows") defer func() { trace.EndSpan(ctx, err) }() + mt := t.newBuiltinMetricsTracer(ctx, true) + defer recordOperationCompletion(mt) + + err = t.readRows(ctx, arg, f, mt, opts...) + statusCode, statusErr := convertToGrpcStatusErr(err) + mt.currOp.setStatus(statusCode.String()) + return statusErr +} + +func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *builtinMetricsTracer, opts ...ReadOption) (err error) { var prevRowKey string attrMap := make(map[string]interface{}) - err = gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { + err = gaxInvokeWithRecorder(ctx, mt, "ReadRows", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { req := &btpb.ReadRowsRequest{ AppProfileId: t.c.appProfile, } @@ -340,12 +392,17 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts cr = newChunkReader() } + // Ignore error since header is only being used to record builtin metrics + // Failure to record metrics should not fail the operation + *headerMD, _ = stream.Header() for { res, err := stream.Recv() if err == io.EOF { + *trailerMD = stream.Trailer() break } if err != nil { + *trailerMD = stream.Trailer() // Reset arg for next Invoke call. if arg == nil { // Should be lowest possible key value, an empty byte array @@ -381,6 +438,7 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts cancel() for { if _, err := stream.Recv(); err != nil { + *trailerMD = stream.Trailer() // The stream has ended. We don't return an error // because the caller has intentionally interrupted the scan. return nil @@ -407,7 +465,7 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts return err }, retryOptions...) - return convertToGrpcStatusErr(err) + return err } // ReadRow is a convenience implementation of a single-row reader. @@ -922,7 +980,16 @@ func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...Appl ctx = mergeOutgoingMetadata(ctx, t.md) ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable/Apply") defer func() { trace.EndSpan(ctx, err) }() + mt := t.newBuiltinMetricsTracer(ctx, false) + defer recordOperationCompletion(mt) + + err = t.apply(ctx, mt, row, m, opts...) + statusCode, statusErr := convertToGrpcStatusErr(err) + mt.currOp.setStatus(statusCode.String()) + return statusErr +} +func (t *Table) apply(ctx context.Context, mt *builtinMetricsTracer, row string, m *Mutation, opts ...ApplyOption) (err error) { after := func(res proto.Message) { for _, o := range opts { o.after(res) @@ -945,15 +1012,15 @@ func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...Appl callOptions = retryOptions } var res *btpb.MutateRowResponse - err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { + err := gaxInvokeWithRecorder(ctx, mt, "MutateRow", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { var err error - res, err = t.c.client.MutateRow(ctx, req) + res, err = t.c.client.MutateRow(ctx, req, grpc.Header(headerMD), grpc.Trailer(trailerMD)) return err }, callOptions...) if err == nil { after(res) } - return convertToGrpcStatusErr(err) + return err } req := &btpb.CheckAndMutateRowRequest{ @@ -982,15 +1049,15 @@ func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...Appl callOptions = retryOptions } var cmRes *btpb.CheckAndMutateRowResponse - err = gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { + err = gaxInvokeWithRecorder(ctx, mt, "CheckAndMutateRow", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { var err error - cmRes, err = t.c.client.CheckAndMutateRow(ctx, req) + cmRes, err = t.c.client.CheckAndMutateRow(ctx, req, grpc.Header(headerMD), grpc.Trailer(trailerMD)) return err }, callOptions...) if err == nil { after(cmRes) } - return convertToGrpcStatusErr(err) + return err } // An ApplyOption is an optional argument to Apply. @@ -1136,23 +1203,7 @@ func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutatio } for _, group := range groupEntries(origEntries, maxMutations) { - attrMap := make(map[string]interface{}) - err = gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { - attrMap["rowCount"] = len(group) - trace.TracePrintf(ctx, attrMap, "Row count in ApplyBulk") - err := t.doApplyBulk(ctx, group, opts...) - if err != nil { - // We want to retry the entire request with the current group - return err - } - group = t.getApplyBulkRetries(group) - if len(group) > 0 && len(idempotentRetryCodes) > 0 { - // We have at least one mutation that needs to be retried. - // Return an arbitrary error that is retryable according to callOptions. - return status.Errorf(idempotentRetryCodes[0], "Synthetic error: partial failure of ApplyBulk") - } - return nil - }, retryOptions...) + err := t.applyGroup(ctx, group, opts...) if err != nil { return nil, err } @@ -1173,6 +1224,33 @@ func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutatio return nil, nil } +func (t *Table) applyGroup(ctx context.Context, group []*entryErr, opts ...ApplyOption) (err error) { + attrMap := make(map[string]interface{}) + mt := t.newBuiltinMetricsTracer(ctx, true) + defer recordOperationCompletion(mt) + + err = gaxInvokeWithRecorder(ctx, mt, "MutateRows", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { + attrMap["rowCount"] = len(group) + trace.TracePrintf(ctx, attrMap, "Row count in ApplyBulk") + err := t.doApplyBulk(ctx, group, headerMD, trailerMD, opts...) + if err != nil { + // We want to retry the entire request with the current group + return err + } + group = t.getApplyBulkRetries(group) + if len(group) > 0 && len(idempotentRetryCodes) > 0 { + // We have at least one mutation that needs to be retried. + // Return an arbitrary error that is retryable according to callOptions. + return status.Errorf(idempotentRetryCodes[0], "Synthetic error: partial failure of ApplyBulk") + } + return nil + }, retryOptions...) + + statusCode, statusErr := convertToGrpcStatusErr(err) + mt.currOp.setStatus(statusCode.String()) + return statusErr +} + // getApplyBulkRetries returns the entries that need to be retried func (t *Table) getApplyBulkRetries(entries []*entryErr) []*entryErr { var retryEntries []*entryErr @@ -1187,7 +1265,7 @@ func (t *Table) getApplyBulkRetries(entries []*entryErr) []*entryErr { } // doApplyBulk does the work of a single ApplyBulk invocation -func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, opts ...ApplyOption) error { +func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD, trailerMD *metadata.MD, opts ...ApplyOption) error { after := func(res proto.Message) { for _, o := range opts { o.after(res) @@ -1207,16 +1285,23 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, opts ... } else { req.AuthorizedViewName = t.c.fullAuthorizedViewName(t.table, t.authorizedView) } + stream, err := t.c.client.MutateRows(ctx, req) if err != nil { return err } + + // Ignore error since header is only being used to record builtin metrics + // Failure to record metrics should not fail the operation + *headerMD, _ = stream.Header() for { res, err := stream.Recv() if err == io.EOF { + *trailerMD = stream.Trailer() break } if err != nil { + *trailerMD = stream.Trailer() return err } @@ -1288,6 +1373,17 @@ func (ts Timestamp) TruncateToMilliseconds() Timestamp { // It returns the newly written cells. func (t *Table) ApplyReadModifyWrite(ctx context.Context, row string, m *ReadModifyWrite) (Row, error) { ctx = mergeOutgoingMetadata(ctx, t.md) + + mt := t.newBuiltinMetricsTracer(ctx, false) + defer recordOperationCompletion(mt) + + updatedRow, err := t.applyReadModifyWrite(ctx, mt, row, m) + statusCode, statusErr := convertToGrpcStatusErr(err) + mt.currOp.setStatus(statusCode.String()) + return updatedRow, statusErr +} + +func (t *Table) applyReadModifyWrite(ctx context.Context, mt *builtinMetricsTracer, row string, m *ReadModifyWrite) (Row, error) { req := &btpb.ReadModifyWriteRowRequest{ AppProfileId: t.c.appProfile, RowKey: []byte(row), @@ -1298,18 +1394,23 @@ func (t *Table) ApplyReadModifyWrite(ctx context.Context, row string, m *ReadMod } else { req.AuthorizedViewName = t.c.fullAuthorizedViewName(t.table, t.authorizedView) } - res, err := t.c.client.ReadModifyWriteRow(ctx, req) - if err != nil { - return nil, err - } - if res.Row == nil { - return nil, errors.New("unable to apply ReadModifyWrite: res.Row=nil") - } - r := make(Row) - for _, fam := range res.Row.Families { // res is *btpb.Row, fam is *btpb.Family - decodeFamilyProto(r, row, fam) - } - return r, nil + + var r Row + err := gaxInvokeWithRecorder(ctx, mt, "ReadModifyWriteRow", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { + res, err := t.c.client.ReadModifyWriteRow(ctx, req, grpc.Header(headerMD), grpc.Trailer(trailerMD)) + if err != nil { + return err + } + if res.Row == nil { + return errors.New("unable to apply ReadModifyWrite: res.Row=nil") + } + r = make(Row) + for _, fam := range res.Row.Families { // res is *btpb.Row, fam is *btpb.Family + decodeFamilyProto(r, row, fam) + } + return nil + }) + return r, err } // ReadModifyWrite represents a set of operations on a single row of a table. @@ -1353,8 +1454,19 @@ func (m *ReadModifyWrite) Increment(family, column string, delta int64) { // the table of approximately equal size, which can be used to break up the data for distributed tasks like mapreduces. func (t *Table) SampleRowKeys(ctx context.Context) ([]string, error) { ctx = mergeOutgoingMetadata(ctx, t.md) + + mt := t.newBuiltinMetricsTracer(ctx, true) + defer recordOperationCompletion(mt) + + rowKeys, err := t.sampleRowKeys(ctx, mt) + statusCode, statusErr := convertToGrpcStatusErr(err) + mt.currOp.setStatus(statusCode.String()) + return rowKeys, statusErr +} + +func (t *Table) sampleRowKeys(ctx context.Context, mt *builtinMetricsTracer) ([]string, error) { var sampledRowKeys []string - err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { + err := gaxInvokeWithRecorder(ctx, mt, "SampleRowKeys", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { sampledRowKeys = nil req := &btpb.SampleRowKeysRequest{ AppProfileId: t.c.appProfile, @@ -1371,12 +1483,18 @@ func (t *Table) SampleRowKeys(ctx context.Context) ([]string, error) { if err != nil { return err } + + // Ignore error since header is only being used to record builtin metrics + // Failure to record metrics should not fail the operation + *headerMD, _ = stream.Header() for { res, err := stream.Recv() if err == io.EOF { + *trailerMD = stream.Trailer() break } if err != nil { + *trailerMD = stream.Trailer() return err } @@ -1389,5 +1507,108 @@ func (t *Table) SampleRowKeys(ctx context.Context) ([]string, error) { } return nil }, retryOptions...) - return sampledRowKeys, convertToGrpcStatusErr(err) + + return sampledRowKeys, err +} + +func (t *Table) newBuiltinMetricsTracer(ctx context.Context, isStreaming bool) *builtinMetricsTracer { + mt := t.c.metricsTracerFactory.createBuiltinMetricsTracer(ctx, t.table, isStreaming) + return &mt +} + +// recordOperationCompletion records as many operation specific metrics as it can +func recordOperationCompletion(mt *builtinMetricsTracer) { + if !mt.builtInEnabled { + return + } + + // Calculate elapsed time + elapsedTimeMs := float64(time.Since(mt.currOp.startTime).Nanoseconds()) / 1000000 + + // Attributes for operation_latencies + // Ignore error seen while creating metric attributes since metric can still + // be recorded with rest of the attributes + opLatAttrs, _ := mt.toOtelMetricAttrs(metricNameOperationLatencies) + mt.instrumentOperationLatencies.Record(mt.ctx, elapsedTimeMs, metric.WithAttributes(opLatAttrs...)) + + // Attributes for retry_count + // Ignore error seen while creating metric attributes since metric can still + // be recorded with rest of the attributes + retryCntAttrs, _ := mt.toOtelMetricAttrs(metricNameRetryCount) + + // Only record when retry count is greater than 0 so the retry + // graph will be less confusing + if mt.currOp.attemptCount > 1 { + mt.instrumentRetryCount.Add(mt.ctx, mt.currOp.attemptCount-1, metric.WithAttributes(retryCntAttrs...)) + } +} + +// gaxInvokeWithRecorder: +// - wraps 'f' in a new function 'callWrapper' that: +// - updates tracer state and records built in attempt specific metrics +// - does not return errors seen while recording the metrics +// +// - then, calls gax.Invoke with 'callWrapper' as an argument +func gaxInvokeWithRecorder(ctx context.Context, mt *builtinMetricsTracer, method string, + f func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error, opts ...gax.CallOption) error { + + mt.method = method + callWrapper := func(ctx context.Context, callSettings gax.CallSettings) error { + // Increment number of attempts + mt.currOp.incrementAttemptCount() + + attemptHeaderMD := metadata.New(nil) + attempTrailerMD := metadata.New(nil) + mt.currOp.currAttempt = attemptTracer{} + + // record start time + mt.currOp.currAttempt.setStartTime(time.Now()) + + // f makes calls to CBT service + err := f(ctx, &attemptHeaderMD, &attempTrailerMD, callSettings) + + // Set attempt status + statusCode, _ := convertToGrpcStatusErr(err) + mt.currOp.currAttempt.setStatus(statusCode.String()) + + // Get location attributes from metadata and set it in tracer + // Ignore get location error since the metric can still be recorded with rest of the attributes + clusterID, zoneID, _ := extractLocation(attemptHeaderMD, attempTrailerMD) + mt.currOp.currAttempt.setClusterID(clusterID) + mt.currOp.currAttempt.setZoneID(zoneID) + + // Set server latency in tracer + serverLatency, serverLatencyErr := extractServerLatency(attemptHeaderMD, attempTrailerMD) + mt.currOp.currAttempt.setServerLatencyErr(serverLatencyErr) + mt.currOp.currAttempt.setServerLatency(serverLatency) + + // Record attempt specific metrics + recordAttemptCompletion(mt) + return err + } + return gax.Invoke(ctx, callWrapper, opts...) +} + +// recordAttemptCompletion records as many attempt specific metrics as it can +func recordAttemptCompletion(mt *builtinMetricsTracer) { + if !mt.builtInEnabled { + return + } + + // Calculate elapsed time + elapsedTime := float64(time.Since(mt.currOp.currAttempt.startTime).Nanoseconds()) / 1000000 + + // Attributes for attempt_latencies + // Ignore error seen while creating metric attributes since metric can still + // be recorded with rest of the attributes + attemptLatAttrs, _ := mt.toOtelMetricAttrs(metricNameAttemptLatencies) + mt.instrumentAttemptLatencies.Record(mt.ctx, elapsedTime, metric.WithAttributes(attemptLatAttrs...)) + + // Attributes for server_latencies + // Ignore error seen while creating metric attributes since metric can still + // be recorded with rest of the attributes + serverLatAttrs, _ := mt.toOtelMetricAttrs(metricNameServerLatencies) + if mt.currOp.currAttempt.serverLatencyErr == nil { + mt.instrumentServerLatencies.Record(mt.ctx, mt.currOp.currAttempt.serverLatency, metric.WithAttributes(serverLatAttrs...)) + } } diff --git a/bigtable/bigtable_test.go b/bigtable/bigtable_test.go index 7c204f369bef..f082a8b2a91a 100644 --- a/bigtable/bigtable_test.go +++ b/bigtable/bigtable_test.go @@ -30,6 +30,8 @@ import ( "google.golang.org/grpc" ) +var disableMetricsConfig = ClientConfig{MetricsProvider: NoopMetricsProvider{}} + func TestPrefix(t *testing.T) { for _, test := range []struct { prefix, succ string @@ -253,8 +255,9 @@ func TestApplyErrors(t *testing.T) { ctx := context.Background() table := &Table{ c: &Client{ - project: "P", - instance: "I", + project: "P", + instance: "I", + metricsTracerFactory: &builtinMetricsTracerFactory{}, }, table: "t", } @@ -581,9 +584,9 @@ func TestReadRowsInvalidRowSet(t *testing.T) { if err := adminClient.CreateTable(ctx, testEnv.config.Table); err != nil { t.Fatalf("CreateTable(%v) failed: %v", testEnv.config.Table, err) } - client, err := NewClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn)) + client, err := NewClientWithConfig(ctx, testEnv.config.Project, testEnv.config.Instance, disableMetricsConfig, option.WithGRPCConn(conn)) if err != nil { - t.Fatalf("NewClient failed: %v", err) + t.Fatalf("NewClientWithConfig failed: %v", err) } defer client.Close() table := client.Open(testEnv.config.Table) @@ -657,9 +660,9 @@ func TestReadRowsRequestStats(t *testing.T) { t.Fatalf("CreateTable(%v) failed: %v", testEnv.config.Table, err) } - client, err := NewClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn)) + client, err := NewClientWithConfig(ctx, testEnv.config.Project, testEnv.config.Instance, disableMetricsConfig, option.WithGRPCConn(conn)) if err != nil { - t.Fatalf("NewClient failed: %v", err) + t.Fatalf("NewClientWithConfig failed: %v", err) } defer client.Close() table := client.Open(testEnv.config.Table) @@ -785,9 +788,9 @@ func TestMutateRowsWithAggregates(t *testing.T) { t.Fatalf("CreateTable(%v) failed: %v", testEnv.config.Table, err) } - client, err := NewClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn)) + client, err := NewClientWithConfig(ctx, testEnv.config.Project, testEnv.config.Instance, disableMetricsConfig, option.WithGRPCConn(conn)) if err != nil { - t.Fatalf("NewClient failed: %v", err) + t.Fatalf("NewClientWithConfig failed: %v", err) } defer client.Close() table := client.Open(testEnv.config.Table) diff --git a/bigtable/bttest/example_test.go b/bigtable/bttest/example_test.go index 5854a221ecac..9cc6ab1a6c16 100644 --- a/bigtable/bttest/example_test.go +++ b/bigtable/bttest/example_test.go @@ -56,7 +56,7 @@ func ExampleNewServer() { log.Fatalln(err) } - client, err := bigtable.NewClient(ctx, proj, instance, option.WithGRPCConn(conn)) + client, err := bigtable.NewClientWithConfig(ctx, proj, instance, bigtable.ClientConfig{MetricsProvider: bigtable.NoopMetricsProvider{}}, option.WithGRPCConn(conn)) if err != nil { log.Fatalln(err) } diff --git a/bigtable/conformance_test.go b/bigtable/conformance_test.go index f23f2b0db1d9..8077a9ef20de 100644 --- a/bigtable/conformance_test.go +++ b/bigtable/conformance_test.go @@ -53,7 +53,7 @@ func TestConformance(t *testing.T) { t.Fatal(err) } defer conn.Close() - c, err := NewClient(ctx, "some-project", "some-instance", option.WithGRPCConn(conn)) + c, err := NewClientWithConfig(ctx, "some-project", "some-instance", disableMetricsConfig, option.WithGRPCConn(conn)) if err != nil { t.Fatal(err) } diff --git a/bigtable/export_test.go b/bigtable/export_test.go index 64511e7f955a..1b741944bb83 100644 --- a/bigtable/export_test.go +++ b/bigtable/export_test.go @@ -103,6 +103,7 @@ type IntegrationEnv interface { // NewInstanceAdminClient will return nil if instance administration is unsupported in this environment NewInstanceAdminClient() (*InstanceAdminClient, error) NewClient() (*Client, error) + NewClientWithConfig(ClientConfig) (*Client, error) Close() Peer() *peer.Peer } @@ -240,6 +241,15 @@ func (e *EmulatedEnv) NewInstanceAdminClient() (*InstanceAdminClient, error) { // NewClient builds a new connected data client for this environment func (e *EmulatedEnv) NewClient() (*Client, error) { + return e.newEmulatedClient(ClientConfig{}) +} + +// NewClient builds a new connected data client with provided config for this environment +func (e *EmulatedEnv) NewClientWithConfig(config ClientConfig) (*Client, error) { + return e.newEmulatedClient(config) +} + +func (e *EmulatedEnv) newEmulatedClient(config ClientConfig) (*Client, error) { o, err := btopt.DefaultClientOptions(e.server.Addr, e.server.Addr, Scope, clientUserAgent) if err != nil { return nil, err @@ -263,7 +273,7 @@ func (e *EmulatedEnv) NewClient() (*Client, error) { if err != nil { return nil, err } - return NewClient(ctx, e.config.Project, e.config.Instance, option.WithGRPCConn(conn)) + return NewClientWithConfig(ctx, e.config.Project, e.config.Instance, config, option.WithGRPCConn(conn)) } // ProdEnv encapsulates the state necessary to connect to the external Bigtable service @@ -334,6 +344,15 @@ func (e *ProdEnv) NewInstanceAdminClient() (*InstanceAdminClient, error) { // NewClient builds a connected data client for this environment func (e *ProdEnv) NewClient() (*Client, error) { + return e.newProdClient(ClientConfig{}) +} + +// NewClientWithConfig builds a connected data client with provided config for this environment +func (e *ProdEnv) NewClientWithConfig(config ClientConfig) (*Client, error) { + return e.newProdClient(config) +} + +func (e *ProdEnv) newProdClient(config ClientConfig) (*Client, error) { clientOpts := headersInterceptor.CallOptions() if endpoint := e.config.DataEndpoint; endpoint != "" { clientOpts = append(clientOpts, option.WithEndpoint(endpoint)) @@ -343,6 +362,5 @@ func (e *ProdEnv) NewClient() (*Client, error) { // For DirectPath tests, we need to add an interceptor to check the peer IP. clientOpts = append(clientOpts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.Peer(e.peerInfo)))) } - - return NewClient(context.Background(), e.config.Project, e.config.Instance, clientOpts...) + return NewClientWithConfig(context.Background(), e.config.Project, e.config.Instance, config, clientOpts...) } diff --git a/bigtable/go.mod b/bigtable/go.mod index 16b0b1d3f248..9855c94ca6bc 100644 --- a/bigtable/go.mod +++ b/bigtable/go.mod @@ -10,6 +10,10 @@ require ( github.com/google/go-cmp v0.6.0 github.com/googleapis/cloud-bigtable-clients-test v0.0.2 github.com/googleapis/gax-go/v2 v2.13.0 + go.opentelemetry.io/otel v1.24.0 // Use older version compatible with Go 1.20 + go.opentelemetry.io/otel/metric v1.24.0 // Use older version compatible with Go 1.20 + go.opentelemetry.io/otel/sdk v1.24.0 // Use older version compatible with Go 1.20 + go.opentelemetry.io/otel/sdk/metric v1.24.0 // Use older version compatible with Go 1.20 google.golang.org/api v0.189.0 google.golang.org/genproto v0.0.0-20240722135656-d784300faade google.golang.org/genproto/googleapis/rpc v0.0.0-20240722135656-d784300faade @@ -18,6 +22,12 @@ require ( rsc.io/binaryregexp v0.2.0 ) +require ( + cloud.google.com/go/monitoring v1.20.1 + github.com/google/uuid v1.6.0 + google.golang.org/genproto/googleapis/api v0.0.0-20240722135656-d784300faade +) + require ( cel.dev/expr v0.15.0 // indirect cloud.google.com/go/auth v0.7.2 // indirect @@ -34,14 +44,10 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/s2a-go v0.1.7 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect - go.opentelemetry.io/otel v1.24.0 // indirect - go.opentelemetry.io/otel/metric v1.24.0 // indirect - go.opentelemetry.io/otel/sdk v1.24.0 // indirect go.opentelemetry.io/otel/trace v1.24.0 // indirect golang.org/x/crypto v0.25.0 // indirect golang.org/x/net v0.27.0 // indirect @@ -50,5 +56,4 @@ require ( golang.org/x/sys v0.22.0 // indirect golang.org/x/text v0.16.0 // indirect golang.org/x/time v0.5.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240722135656-d784300faade // indirect ) diff --git a/bigtable/go.sum b/bigtable/go.sum index 3191c519fc55..e896aaee936b 100644 --- a/bigtable/go.sum +++ b/bigtable/go.sum @@ -13,6 +13,8 @@ cloud.google.com/go/iam v1.1.10 h1:ZSAr64oEhQSClwBL670MsJAW5/RLiC6kfw3Bqmd5ZDI= cloud.google.com/go/iam v1.1.10/go.mod h1:iEgMq62sg8zx446GCaijmA2Miwg5o3UbO+nI47WHJps= cloud.google.com/go/longrunning v0.5.9 h1:haH9pAuXdPAMqHvzX0zlWQigXT7B0+CL4/2nXXdBo5k= cloud.google.com/go/longrunning v0.5.9/go.mod h1:HD+0l9/OOW0za6UWdKJtXoFAX/BGg/3Wj8p10NeWF7c= +cloud.google.com/go/monitoring v1.20.1 h1:XmM6uk4+mI2ZhWdI2n/2GNhJdpeQN+1VdG2UWEDhX48= +cloud.google.com/go/monitoring v1.20.1/go.mod h1:FYSe/brgfuaXiEzOQFhTjsEsJv+WePyK71X7Y8qo6uQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.4.1 h1:iKLQ0xPNFxR/2hzXZMrBo8f1j86j5WHzznCCQxV/b8g= @@ -100,6 +102,8 @@ go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGX go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw= go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg= +go.opentelemetry.io/otel/sdk/metric v1.24.0 h1:yyMQrPzF+k88/DbH7o4FMAs80puqd+9osbiBrJrz/w8= +go.opentelemetry.io/otel/sdk/metric v1.24.0/go.mod h1:I6Y5FjH6rvEnTTAYQz3Mmv2kl6Ek5IIrmwTLqMrrOE0= go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/bigtable/integration_test.go b/bigtable/integration_test.go index b6fe2bbc3866..a6bdfaff8b29 100644 --- a/bigtable/integration_test.go +++ b/bigtable/integration_test.go @@ -38,6 +38,8 @@ import ( "cloud.google.com/go/internal/optional" "cloud.google.com/go/internal/testutil" "cloud.google.com/go/internal/uid" + monitoring "cloud.google.com/go/monitoring/apiv3/v2" + "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" "github.com/google/go-cmp/cmp" gax "github.com/googleapis/gax-go/v2" "google.golang.org/api/iterator" @@ -46,6 +48,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" ) const ( @@ -276,6 +279,7 @@ func TestIntegration_ReadRowList(t *testing.T) { t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want) } } + func TestIntegration_ReadRowListReverse(t *testing.T) { ctx := context.Background() _, _, _, table, _, cleanup, err := setupIntegration(ctx, t) @@ -749,6 +753,101 @@ func TestIntegration_HighlyConcurrentReadsAndWrites(t *testing.T) { wg.Wait() } +func TestIntegration_ExportBuiltInMetrics(t *testing.T) { + ctx := context.Background() + + // Reduce sampling period for faster test runs + origSamplePeriod := defaultSamplePeriod + defaultSamplePeriod = time.Minute + defer func() { + defaultSamplePeriod = origSamplePeriod + }() + + // record start time + testStartTime := time.Now() + tsListStart := ×tamppb.Timestamp{ + Seconds: testStartTime.Unix(), + Nanos: int32(testStartTime.Nanosecond()), + } + + testEnv, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) + if err != nil { + t.Fatal(err) + } + defer cleanup() + + if testing.Short() || !testEnv.Config().UseProd { + t.Skip("Skip long running tests in short mode or non-prod environments") + } + + columnFamilyName := "export" + if err := adminClient.CreateColumnFamily(ctx, tableName, columnFamilyName); err != nil { + t.Fatalf("Creating column family: %v", err) + } + + for i := 0; i < 10; i++ { + mut := NewMutation() + mut.Set(columnFamilyName, "col", 1000, []byte("test")) + if err := table.Apply(ctx, fmt.Sprintf("row-%v", i), mut); err != nil { + t.Fatalf("Apply: %v", err) + } + } + err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool { + return true + }, RowFilter(ColumnFilter("col"))) + if err != nil { + t.Fatalf("ReadRows: %v", err) + } + + // Validate that metrics are exported + elapsedTime := time.Since(testStartTime) + if elapsedTime < 2*defaultSamplePeriod { + // Ensure at least 2 datapoints are recorded + time.Sleep(2*defaultSamplePeriod - elapsedTime) + } + + // Sleep some more + time.Sleep(30 * time.Second) + + monitoringClient, err := monitoring.NewMetricClient(ctx) + if err != nil { + t.Errorf("Failed to create metric client: %v", err) + } + metricNamesValidate := []string{ + metricNameOperationLatencies, + metricNameAttemptLatencies, + metricNameServerLatencies, + } + + // Try for 5m with 10s sleep between retries + testutil.Retry(t, 10, 30*time.Second, func(r *testutil.R) { + for _, metricName := range metricNamesValidate { + timeListEnd := time.Now() + tsListEnd := ×tamppb.Timestamp{ + Seconds: timeListEnd.Unix(), + Nanos: int32(timeListEnd.Nanosecond()), + } + + // ListTimeSeries can list only one metric type at a time. + // So, call ListTimeSeries with different metric names + iter := monitoringClient.ListTimeSeries(ctx, &monitoringpb.ListTimeSeriesRequest{ + Name: fmt.Sprintf("projects/%s", testEnv.Config().Project), + Interval: &monitoringpb.TimeInterval{ + StartTime: tsListStart, + EndTime: tsListEnd, + }, + Filter: fmt.Sprintf("metric.type = starts_with(\"bigtable.googleapis.com/client/%v\")", metricName), + }) + + // Assert at least 1 datapoint was exported + _, err := iter.Next() + if err != nil { + r.Errorf("%v not exported\n", metricName) + } + } + }) +} + func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { ctx := context.Background() testEnv, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) @@ -757,7 +856,7 @@ func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { } defer cleanup() - if !testEnv.Config().UseProd { + if testing.Short() { t.Skip("Skip long running tests in short mode") } diff --git a/bigtable/internal/testproxy/proxy.go b/bigtable/internal/testproxy/proxy.go index d5bf11be4753..49a79de8b698 100644 --- a/bigtable/internal/testproxy/proxy.go +++ b/bigtable/internal/testproxy/proxy.go @@ -441,7 +441,8 @@ func (s *goTestProxyServer) CreateClient(ctx context.Context, req *pb.CreateClie } config := bigtable.ClientConfig{ - AppProfile: req.AppProfileId, + AppProfile: req.AppProfileId, + MetricsProvider: bigtable.NoopMetricsProvider{}, } c, err := bigtable.NewClientWithConfig(ctx, req.ProjectId, req.InstanceId, config, option.WithGRPCConn(conn)) if err != nil { diff --git a/bigtable/internal/testproxy/proxy_test.go b/bigtable/internal/testproxy/proxy_test.go index 8f9fc747f56a..5682a7a7d54a 100644 --- a/bigtable/internal/testproxy/proxy_test.go +++ b/bigtable/internal/testproxy/proxy_test.go @@ -84,7 +84,7 @@ func populateTable(bts *bttest.Server) error { } } - dataClient, err := bigtable.NewClient(ctx, "client", "instance", + dataClient, err := bigtable.NewClientWithConfig(ctx, "client", "instance", bigtable.ClientConfig{MetricsProvider: bigtable.NoopMetricsProvider{}}, option.WithGRPCConn(conn), option.WithGRPCDialOption(grpc.WithBlock())) if err != nil { return fmt.Errorf("testproxy setup: can't create Bigtable client: %v", err) diff --git a/bigtable/metric_util.go b/bigtable/metric_util.go new file mode 100644 index 000000000000..070b28fc0391 --- /dev/null +++ b/bigtable/metric_util.go @@ -0,0 +1,94 @@ +/* +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bigtable + +import ( + "fmt" + "strconv" + "strings" + + btpb "google.golang.org/genproto/googleapis/bigtable/v2" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/proto" +) + +const ( + defaultCluster = "unspecified" + defaultZone = "global" +) + +// get GFE latency in ms from response metadata +func extractServerLatency(headerMD metadata.MD, trailerMD metadata.MD) (float64, error) { + serverTimingStr := "" + + // Check whether server latency available in response header metadata + if headerMD != nil { + headerMDValues := headerMD.Get(serverTimingMDKey) + if len(headerMDValues) != 0 { + serverTimingStr = headerMDValues[0] + } + } + + if len(serverTimingStr) == 0 { + // Check whether server latency available in response trailer metadata + if trailerMD != nil { + trailerMDValues := trailerMD.Get(serverTimingMDKey) + if len(trailerMDValues) != 0 { + serverTimingStr = trailerMDValues[0] + } + } + } + + serverLatencyMillisStr := strings.TrimPrefix(serverTimingStr, serverTimingValPrefix) + serverLatencyMillis, err := strconv.ParseFloat(strings.TrimSpace(serverLatencyMillisStr), 64) + if !strings.HasPrefix(serverTimingStr, serverTimingValPrefix) || err != nil { + return serverLatencyMillis, err + } + + return serverLatencyMillis, nil +} + +// Obtain cluster and zone from response metadata +func extractLocation(headerMD metadata.MD, trailerMD metadata.MD) (string, string, error) { + var locationMetadata []string + + // Check whether location metadata available in response header metadata + if headerMD != nil { + locationMetadata = headerMD.Get(locationMDKey) + } + + if locationMetadata == nil { + // Check whether location metadata available in response trailer metadata + // if none found in response header metadata + if trailerMD != nil { + locationMetadata = trailerMD.Get(locationMDKey) + } + } + + if len(locationMetadata) < 1 { + return defaultCluster, defaultZone, fmt.Errorf("failed to get location metadata") + } + + // Unmarshal binary location metadata + responseParams := &btpb.ResponseParams{} + err := proto.Unmarshal([]byte(locationMetadata[0]), responseParams) + if err != nil { + return defaultCluster, defaultZone, err + } + + return responseParams.GetClusterId(), responseParams.GetZoneId(), nil +} diff --git a/bigtable/metrics.go b/bigtable/metrics.go new file mode 100644 index 000000000000..4a0413095f9e --- /dev/null +++ b/bigtable/metrics.go @@ -0,0 +1,403 @@ +/* +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bigtable + +import ( + "context" + "errors" + "fmt" + "log" + "os" + "time" + + "cloud.google.com/go/bigtable/internal" + "github.com/google/uuid" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "google.golang.org/api/option" +) + +const ( + builtInMetricsMeterName = "bigtable.googleapis.com/internal/client/" + + metricsPrefix = "bigtable/" + locationMDKey = "x-goog-ext-425905942-bin" + serverTimingMDKey = "server-timing" + serverTimingValPrefix = "gfet4t7; dur=" + + // Monitored resource labels + monitoredResLabelKeyProject = "project_id" + monitoredResLabelKeyInstance = "instance" + monitoredResLabelKeyTable = "table" + monitoredResLabelKeyCluster = "cluster" + monitoredResLabelKeyZone = "zone" + + // Metric labels + metricLabelKeyAppProfile = "app_profile" + metricLabelKeyMethod = "method" + metricLabelKeyStatus = "status" + metricLabelKeyStreamingOperation = "streaming" + metricLabelKeyClientName = "client_name" + metricLabelKeyClientUID = "client_uid" + + // Metric names + metricNameOperationLatencies = "operation_latencies" + metricNameAttemptLatencies = "attempt_latencies" + metricNameServerLatencies = "server_latencies" + metricNameRetryCount = "retry_count" +) + +// These are effectively const, but for testing purposes they are mutable +var ( + // duration between two metric exports + defaultSamplePeriod = 5 * time.Minute + + clientName = fmt.Sprintf("go-bigtable/%v", internal.Version) + + bucketBounds = []float64{0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 8.0, 10.0, 13.0, 16.0, 20.0, 25.0, 30.0, 40.0, + 50.0, 65.0, 80.0, 100.0, 130.0, 160.0, 200.0, 250.0, 300.0, 400.0, 500.0, 650.0, + 800.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0, 100000.0, 200000.0, + 400000.0, 800000.0, 1600000.0, 3200000.0} + + // All the built-in metrics have same attributes except 'status' and 'streaming' + // These attributes need to be added to only few of the metrics + metricsDetails = map[string]metricInfo{ + metricNameOperationLatencies: { + additionalAttrs: []string{ + metricLabelKeyStatus, + metricLabelKeyStreamingOperation, + }, + recordedPerAttempt: false, + }, + metricNameAttemptLatencies: { + additionalAttrs: []string{ + metricLabelKeyStatus, + metricLabelKeyStreamingOperation, + }, + recordedPerAttempt: true, + }, + metricNameServerLatencies: { + additionalAttrs: []string{ + metricLabelKeyStatus, + metricLabelKeyStreamingOperation, + }, + recordedPerAttempt: true, + }, + metricNameRetryCount: { + additionalAttrs: []string{ + metricLabelKeyStatus, + }, + recordedPerAttempt: true, + }, + } + + // Generates unique client ID in the format go-@ + generateClientUID = func() (string, error) { + hostname := "localhost" + hostname, err := os.Hostname() + if err != nil { + return "", err + } + return "go-" + uuid.NewString() + "@" + hostname, nil + } + + exporterOpts = []option.ClientOption{} +) + +type metricInfo struct { + additionalAttrs []string + recordedPerAttempt bool +} + +type builtinMetricsTracerFactory struct { + enabled bool + + // To be called on client close + shutdown func() + + // attributes that are specific to a client instance and + // do not change across different function calls on client + clientAttributes []attribute.KeyValue + + operationLatencies metric.Float64Histogram + serverLatencies metric.Float64Histogram + attemptLatencies metric.Float64Histogram + retryCount metric.Int64Counter +} + +func newBuiltinMetricsTracerFactory(ctx context.Context, project, instance, appProfile string, metricsProvider MetricsProvider) (*builtinMetricsTracerFactory, error) { + clientUID, err := generateClientUID() + if err != nil { + log.Printf("built-in metrics: generateClientUID failed: %v. Using empty string in the %v metric atteribute", err, metricLabelKeyClientUID) + } + + tracerFactory := &builtinMetricsTracerFactory{ + enabled: false, + clientAttributes: []attribute.KeyValue{ + attribute.String(monitoredResLabelKeyProject, project), + attribute.String(monitoredResLabelKeyInstance, instance), + attribute.String(metricLabelKeyAppProfile, appProfile), + attribute.String(metricLabelKeyClientUID, clientUID), + attribute.String(metricLabelKeyClientName, clientName), + }, + shutdown: func() {}, + } + + var meterProvider *sdkmetric.MeterProvider + if metricsProvider == nil { + // Create default meter provider + mpOptions, err := builtInMeterProviderOptions(project) + if err != nil { + return tracerFactory, err + } + meterProvider = sdkmetric.NewMeterProvider(mpOptions...) + + tracerFactory.enabled = true + tracerFactory.shutdown = func() { meterProvider.Shutdown(ctx) } + } else { + switch metricsProvider.(type) { + case NoopMetricsProvider: + tracerFactory.enabled = false + return tracerFactory, nil + default: + tracerFactory.enabled = false + return tracerFactory, errors.New("unknown MetricsProvider type") + } + } + + // Create meter and instruments + meter := meterProvider.Meter(builtInMetricsMeterName, metric.WithInstrumentationVersion(internal.Version)) + err = tracerFactory.createInstruments(meter) + return tracerFactory, err +} + +func builtInMeterProviderOptions(project string) ([]sdkmetric.Option, error) { + defaultExporter, err := newMonitoringExporter(context.Background(), project, exporterOpts...) + if err != nil { + return nil, err + } + + return []sdkmetric.Option{sdkmetric.WithReader( + sdkmetric.NewPeriodicReader( + defaultExporter, + sdkmetric.WithInterval(defaultSamplePeriod), + ), + )}, nil +} + +func (tf *builtinMetricsTracerFactory) createInstruments(meter metric.Meter) error { + var err error + + // Create operation_latencies + tf.operationLatencies, err = meter.Float64Histogram( + metricNameOperationLatencies, + metric.WithDescription("Total time until final operation success or failure, including retries and backoff."), + metric.WithUnit("ms"), + metric.WithExplicitBucketBoundaries(bucketBounds...), + ) + if err != nil { + return err + } + + // Create attempt_latencies + tf.attemptLatencies, err = meter.Float64Histogram( + metricNameAttemptLatencies, + metric.WithDescription("Client observed latency per RPC attempt."), + metric.WithUnit("ms"), + metric.WithExplicitBucketBoundaries(bucketBounds...), + ) + if err != nil { + return err + } + + // Create server_latencies + tf.serverLatencies, err = meter.Float64Histogram( + metricNameServerLatencies, + metric.WithDescription("The latency measured from the moment that the RPC entered the Google data center until the RPC was completed."), + metric.WithUnit("ms"), + metric.WithExplicitBucketBoundaries(bucketBounds...), + ) + if err != nil { + return err + } + + // Create retry_count + tf.retryCount, err = meter.Int64Counter( + metricNameRetryCount, + metric.WithDescription("The number of additional RPCs sent after the initial attempt."), + ) + return err +} + +// builtinMetricsTracer is created one per operation +// It is used to store metric instruments, attribute values +// and other data required to obtain and record them +type builtinMetricsTracer struct { + ctx context.Context + builtInEnabled bool + + // attributes that are specific to a client instance and + // do not change across different operations on client + clientAttributes []attribute.KeyValue + + instrumentOperationLatencies metric.Float64Histogram + instrumentServerLatencies metric.Float64Histogram + instrumentAttemptLatencies metric.Float64Histogram + instrumentRetryCount metric.Int64Counter + + tableName string + method string + isStreaming bool + + currOp opTracer +} + +// opTracer is used to record metrics for the entire operation, including retries. +type opTracer struct { + attemptCount int64 + + startTime time.Time + + // gRPC status code of last completed attempt + status string + + currAttempt attemptTracer +} + +func (o *opTracer) setStartTime(t time.Time) { + o.startTime = t + +} + +func (o *opTracer) setStatus(status string) { + o.status = status +} + +func (o *opTracer) incrementAttemptCount() { + o.attemptCount++ +} + +// attemptTracer is used to record metrics for each individual attempt of the operation. +type attemptTracer struct { + startTime time.Time + clusterID string + zoneID string + + // gRPC status code + status string + + // Server latency in ms + serverLatency float64 + + // Error seen while getting server latency from headers + serverLatencyErr error +} + +func (a *attemptTracer) setStartTime(t time.Time) { + a.startTime = t +} + +func (a *attemptTracer) setClusterID(clusterID string) { + a.clusterID = clusterID +} + +func (a *attemptTracer) setZoneID(zoneID string) { + a.zoneID = zoneID +} + +func (a *attemptTracer) setStatus(status string) { + a.status = status +} + +func (a *attemptTracer) setServerLatency(latency float64) { + a.serverLatency = latency +} + +func (a *attemptTracer) setServerLatencyErr(err error) { + a.serverLatencyErr = err +} + +func (tf *builtinMetricsTracerFactory) createBuiltinMetricsTracer(ctx context.Context, tableName string, isStreaming bool) builtinMetricsTracer { + // Operation has started but not the attempt. + // So, create only operation tracer and not attempt tracer + currOpTracer := opTracer{} + currOpTracer.setStartTime(time.Now()) + + return builtinMetricsTracer{ + ctx: ctx, + builtInEnabled: tf.enabled, + + currOp: currOpTracer, + clientAttributes: tf.clientAttributes, + + instrumentOperationLatencies: tf.operationLatencies, + instrumentServerLatencies: tf.serverLatencies, + instrumentAttemptLatencies: tf.attemptLatencies, + instrumentRetryCount: tf.retryCount, + + tableName: tableName, + isStreaming: isStreaming, + } +} + +// toOtelMetricAttrs: +// - converts metric attributes values captured throughout the operation / attempt +// to OpenTelemetry attributes format, +// - combines these with common client attributes and returns +func (mt *builtinMetricsTracer) toOtelMetricAttrs(metricName string) ([]attribute.KeyValue, error) { + // Create attribute key value pairs for attributes common to all metricss + attrKeyValues := []attribute.KeyValue{ + attribute.String(metricLabelKeyMethod, mt.method), + + // Add resource labels to otel metric labels. + // These will be used for creating the monitored resource but exporter + // will not add them to Google Cloud Monitoring metric labels + attribute.String(monitoredResLabelKeyTable, mt.tableName), + + // Irrespective of whether metric is attempt specific or operation specific, + // use last attempt's cluster and zone + attribute.String(monitoredResLabelKeyCluster, mt.currOp.currAttempt.clusterID), + attribute.String(monitoredResLabelKeyZone, mt.currOp.currAttempt.zoneID), + } + attrKeyValues = append(attrKeyValues, mt.clientAttributes...) + + // Get metric details + mDetails, found := metricsDetails[metricName] + if !found { + return attrKeyValues, fmt.Errorf("unable to create attributes list for unknown metric: %v", metricName) + } + + status := mt.currOp.status + if mDetails.recordedPerAttempt { + status = mt.currOp.currAttempt.status + } + + // Add additional attributes to metrics + for _, attrKey := range mDetails.additionalAttrs { + switch attrKey { + case metricLabelKeyStatus: + attrKeyValues = append(attrKeyValues, attribute.String(metricLabelKeyStatus, status)) + case metricLabelKeyStreamingOperation: + attrKeyValues = append(attrKeyValues, attribute.Bool(metricLabelKeyStreamingOperation, mt.isStreaming)) + default: + return attrKeyValues, fmt.Errorf("unknown additional attribute: %v", attrKey) + } + } + + return attrKeyValues, nil +} diff --git a/bigtable/metrics_monitoring_exporter.go b/bigtable/metrics_monitoring_exporter.go new file mode 100644 index 000000000000..29bc957b3b49 --- /dev/null +++ b/bigtable/metrics_monitoring_exporter.go @@ -0,0 +1,345 @@ +/* +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This is a modified version of https://github.com/GoogleCloudPlatform/opentelemetry-operations-go/blob/exporter/metric/v0.46.0/exporter/metric/metric.go + +package bigtable + +import ( + "context" + "errors" + "fmt" + "math" + "reflect" + "sync" + "time" + + monitoring "cloud.google.com/go/monitoring/apiv3/v2" + "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" + "go.opentelemetry.io/otel/attribute" + otelmetric "go.opentelemetry.io/otel/sdk/metric" + otelmetricdata "go.opentelemetry.io/otel/sdk/metric/metricdata" + "google.golang.org/api/option" + "google.golang.org/genproto/googleapis/api/distribution" + googlemetricpb "google.golang.org/genproto/googleapis/api/metric" + monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" + "google.golang.org/protobuf/types/known/timestamppb" +) + +const ( + bigtableResourceType = "bigtable_client_raw" + + // The number of timeserieses to send to GCM in a single request. This + // is a hard limit in the GCM API, so we never want to exceed 200. + sendBatchSize = 200 +) + +var ( + monitoredResLabelsSet = map[string]bool{ + monitoredResLabelKeyProject: true, + monitoredResLabelKeyInstance: true, + monitoredResLabelKeyCluster: true, + monitoredResLabelKeyTable: true, + monitoredResLabelKeyZone: true, + } + + errShutdown = fmt.Errorf("exporter is shutdown") +) + +type errUnexpectedAggregationKind struct { + kind string +} + +func (e errUnexpectedAggregationKind) Error() string { + return fmt.Sprintf("the metric kind is unexpected: %v", e.kind) +} + +// monitoringExporter is the implementation of OpenTelemetry metric exporter for +// Google Cloud Monitoring. +// Default exporter for built-in metrics +type monitoringExporter struct { + shutdown chan struct{} + client *monitoring.MetricClient + shutdownOnce sync.Once + projectID string +} + +func newMonitoringExporter(ctx context.Context, project string, opts ...option.ClientOption) (*monitoringExporter, error) { + client, err := monitoring.NewMetricClient(ctx, opts...) + if err != nil { + return nil, err + } + return &monitoringExporter{ + client: client, + shutdown: make(chan struct{}), + projectID: project, + }, nil +} + +// ForceFlush does nothing, the exporter holds no state. +func (e *monitoringExporter) ForceFlush(ctx context.Context) error { return ctx.Err() } + +// Shutdown shuts down the client connections. +func (e *monitoringExporter) Shutdown(ctx context.Context) error { + err := errShutdown + e.shutdownOnce.Do(func() { + close(e.shutdown) + err = errors.Join(ctx.Err(), e.client.Close()) + }) + return err +} + +// Export exports OpenTelemetry Metrics to Google Cloud Monitoring. +func (me *monitoringExporter) Export(ctx context.Context, rm *otelmetricdata.ResourceMetrics) error { + select { + case <-me.shutdown: + return errShutdown + default: + } + + return me.exportTimeSeries(ctx, rm) +} + +// Temporality returns the Temporality to use for an instrument kind. +func (me *monitoringExporter) Temporality(ik otelmetric.InstrumentKind) otelmetricdata.Temporality { + return otelmetricdata.CumulativeTemporality +} + +// Aggregation returns the Aggregation to use for an instrument kind. +func (me *monitoringExporter) Aggregation(ik otelmetric.InstrumentKind) otelmetric.Aggregation { + return otelmetric.DefaultAggregationSelector(ik) +} + +// exportTimeSeries create TimeSeries from the records in cps. +// res should be the common resource among all TimeSeries, such as instance id, application name and so on. +func (me *monitoringExporter) exportTimeSeries(ctx context.Context, rm *otelmetricdata.ResourceMetrics) error { + tss, err := me.recordsToTimeSeriesPbs(rm) + if len(tss) == 0 { + return err + } + + name := fmt.Sprintf("projects/%s", me.projectID) + + errs := []error{err} + for i := 0; i < len(tss); i += sendBatchSize { + j := i + sendBatchSize + if j >= len(tss) { + j = len(tss) + } + + req := &monitoringpb.CreateTimeSeriesRequest{ + Name: name, + TimeSeries: tss[i:j], + } + errs = append(errs, me.client.CreateServiceTimeSeries(ctx, req)) + } + + return errors.Join(errs...) +} + +// recordToMetricAndMonitoredResourcePbs converts data from records to Metric and Monitored resource proto type for Cloud Monitoring. +func (me *monitoringExporter) recordToMetricAndMonitoredResourcePbs(metrics otelmetricdata.Metrics, attributes attribute.Set) (*googlemetricpb.Metric, *monitoredrespb.MonitoredResource) { + mr := &monitoredrespb.MonitoredResource{ + Type: bigtableResourceType, + Labels: map[string]string{}, + } + labels := make(map[string]string) + addAttributes := func(attr *attribute.Set) { + iter := attr.Iter() + for iter.Next() { + kv := iter.Attribute() + labelKey := string(kv.Key) + + if _, isResLabel := monitoredResLabelsSet[labelKey]; isResLabel { + // Add labels to monitored resource + mr.Labels[labelKey] = kv.Value.Emit() + } else { + // Add labels to metric + labels[labelKey] = kv.Value.Emit() + + } + } + } + addAttributes(&attributes) + return &googlemetricpb.Metric{ + Type: fmt.Sprintf("%v%s", builtInMetricsMeterName, metrics.Name), + Labels: labels, + }, mr +} + +func (me *monitoringExporter) recordsToTimeSeriesPbs(rm *otelmetricdata.ResourceMetrics) ([]*monitoringpb.TimeSeries, error) { + var ( + tss []*monitoringpb.TimeSeries + errs []error + ) + for _, scope := range rm.ScopeMetrics { + if scope.Scope.Name != builtInMetricsMeterName { + // Filter out metric data for instruments that are not part of the bigtable builtin metrics + continue + } + for _, metrics := range scope.Metrics { + ts, err := me.recordToTimeSeriesPb(metrics) + errs = append(errs, err) + tss = append(tss, ts...) + } + } + + return tss, errors.Join(errs...) +} + +// recordToTimeSeriesPb converts record to TimeSeries proto type with common resource. +// ref. https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TimeSeries +func (me *monitoringExporter) recordToTimeSeriesPb(m otelmetricdata.Metrics) ([]*monitoringpb.TimeSeries, error) { + var tss []*monitoringpb.TimeSeries + var errs []error + if m.Data == nil { + return nil, nil + } + switch a := m.Data.(type) { + case otelmetricdata.Histogram[float64]: + for _, point := range a.DataPoints { + metric, mr := me.recordToMetricAndMonitoredResourcePbs(m, point.Attributes) + ts, err := histogramToTimeSeries(point, m, mr) + if err != nil { + errs = append(errs, err) + continue + } + ts.Metric = metric + tss = append(tss, ts) + } + case otelmetricdata.Sum[int64]: + for _, point := range a.DataPoints { + metric, mr := me.recordToMetricAndMonitoredResourcePbs(m, point.Attributes) + var ts *monitoringpb.TimeSeries + var err error + ts, err = sumToTimeSeries[int64](point, m, mr) + if err != nil { + errs = append(errs, err) + continue + } + ts.Metric = metric + tss = append(tss, ts) + } + default: + errs = append(errs, errUnexpectedAggregationKind{kind: reflect.TypeOf(m.Data).String()}) + } + return tss, errors.Join(errs...) +} + +func sumToTimeSeries[N int64 | float64](point otelmetricdata.DataPoint[N], metrics otelmetricdata.Metrics, mr *monitoredrespb.MonitoredResource) (*monitoringpb.TimeSeries, error) { + interval, err := toNonemptyTimeIntervalpb(point.StartTime, point.Time) + if err != nil { + return nil, err + } + value, valueType := numberDataPointToValue[N](point) + return &monitoringpb.TimeSeries{ + Resource: mr, + Unit: string(metrics.Unit), + MetricKind: googlemetricpb.MetricDescriptor_CUMULATIVE, + ValueType: valueType, + Points: []*monitoringpb.Point{{ + Interval: interval, + Value: value, + }}, + }, nil +} + +func histogramToTimeSeries[N int64 | float64](point otelmetricdata.HistogramDataPoint[N], metrics otelmetricdata.Metrics, mr *monitoredrespb.MonitoredResource) (*monitoringpb.TimeSeries, error) { + interval, err := toNonemptyTimeIntervalpb(point.StartTime, point.Time) + if err != nil { + return nil, err + } + distributionValue := histToDistribution(point) + return &monitoringpb.TimeSeries{ + Resource: mr, + Unit: string(metrics.Unit), + MetricKind: googlemetricpb.MetricDescriptor_CUMULATIVE, + ValueType: googlemetricpb.MetricDescriptor_DISTRIBUTION, + Points: []*monitoringpb.Point{{ + Interval: interval, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DistributionValue{ + DistributionValue: distributionValue, + }, + }, + }}, + }, nil +} + +func toNonemptyTimeIntervalpb(start, end time.Time) (*monitoringpb.TimeInterval, error) { + // The end time of a new interval must be at least a millisecond after the end time of the + // previous interval, for all non-gauge types. + // https://cloud.google.com/monitoring/api/ref_v3/rpc/google.monitoring.v3#timeinterval + if end.Sub(start).Milliseconds() <= 1 { + end = start.Add(time.Millisecond) + } + startpb := timestamppb.New(start) + endpb := timestamppb.New(end) + err := errors.Join( + startpb.CheckValid(), + endpb.CheckValid(), + ) + if err != nil { + return nil, err + } + + return &monitoringpb.TimeInterval{ + StartTime: startpb, + EndTime: endpb, + }, nil +} + +func histToDistribution[N int64 | float64](hist otelmetricdata.HistogramDataPoint[N]) *distribution.Distribution { + counts := make([]int64, len(hist.BucketCounts)) + for i, v := range hist.BucketCounts { + counts[i] = int64(v) + } + var mean float64 + if !math.IsNaN(float64(hist.Sum)) && hist.Count > 0 { // Avoid divide-by-zero + mean = float64(hist.Sum) / float64(hist.Count) + } + return &distribution.Distribution{ + Count: int64(hist.Count), + Mean: mean, + BucketCounts: counts, + BucketOptions: &distribution.Distribution_BucketOptions{ + Options: &distribution.Distribution_BucketOptions_ExplicitBuckets{ + ExplicitBuckets: &distribution.Distribution_BucketOptions_Explicit{ + Bounds: hist.Bounds, + }, + }, + }, + } +} + +func numberDataPointToValue[N int64 | float64]( + point otelmetricdata.DataPoint[N], +) (*monitoringpb.TypedValue, googlemetricpb.MetricDescriptor_ValueType) { + switch v := any(point.Value).(type) { + case int64: + return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_Int64Value{ + Int64Value: v, + }}, + googlemetricpb.MetricDescriptor_INT64 + case float64: + return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DoubleValue{ + DoubleValue: v, + }}, + googlemetricpb.MetricDescriptor_DOUBLE + } + // It is impossible to reach this statement + return nil, googlemetricpb.MetricDescriptor_INT64 +} diff --git a/bigtable/metrics_monitoring_exporter_test.go b/bigtable/metrics_monitoring_exporter_test.go new file mode 100644 index 000000000000..d150dc24492e --- /dev/null +++ b/bigtable/metrics_monitoring_exporter_test.go @@ -0,0 +1,611 @@ +/* +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package bigtable + +import ( + "context" + "errors" + "fmt" + "net" + "reflect" + "strings" + "sync" + "testing" + "time" + + "go.opentelemetry.io/otel/attribute" + otelmetric "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" + + "cloud.google.com/go/internal/testutil" + "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" + "google.golang.org/api/option" + googlemetricpb "google.golang.org/genproto/googleapis/api/metric" + metricpb "google.golang.org/genproto/googleapis/api/metric" + monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/types/known/emptypb" +) + +type MetricsTestServer struct { + lis net.Listener + srv *grpc.Server + Endpoint string + userAgent string + createMetricDescriptorReqs []*monitoringpb.CreateMetricDescriptorRequest + createServiceTimeSeriesReqs []*monitoringpb.CreateTimeSeriesRequest + RetryCount int + mu sync.Mutex +} + +func (m *MetricsTestServer) Shutdown() { + // this will close mts.lis + m.srv.GracefulStop() +} + +// Pops out the UserAgent from the most recent CreateTimeSeriesRequests or CreateServiceTimeSeriesRequests. +func (m *MetricsTestServer) UserAgent() string { + m.mu.Lock() + defer m.mu.Unlock() + ua := m.userAgent + m.userAgent = "" + return ua +} + +// Pops out the CreateServiceTimeSeriesRequests which the test server has received so far. +func (m *MetricsTestServer) CreateServiceTimeSeriesRequests() []*monitoringpb.CreateTimeSeriesRequest { + m.mu.Lock() + defer m.mu.Unlock() + reqs := m.createServiceTimeSeriesReqs + m.createServiceTimeSeriesReqs = nil + return reqs +} + +func (m *MetricsTestServer) appendCreateMetricDescriptorReq(ctx context.Context, req *monitoringpb.CreateMetricDescriptorRequest) { + m.mu.Lock() + defer m.mu.Unlock() + m.createMetricDescriptorReqs = append(m.createMetricDescriptorReqs, req) +} + +func (m *MetricsTestServer) appendCreateServiceTimeSeriesReq(ctx context.Context, req *monitoringpb.CreateTimeSeriesRequest) { + m.mu.Lock() + defer m.mu.Unlock() + m.createServiceTimeSeriesReqs = append(m.createServiceTimeSeriesReqs, req) + if md, ok := metadata.FromIncomingContext(ctx); ok { + m.userAgent = strings.Join(md.Get("User-Agent"), ";") + } +} + +func (m *MetricsTestServer) Serve() error { + return m.srv.Serve(m.lis) +} + +type fakeMetricServiceServer struct { + monitoringpb.UnimplementedMetricServiceServer + metricsTestServer *MetricsTestServer +} + +func (f *fakeMetricServiceServer) CreateServiceTimeSeries( + ctx context.Context, + req *monitoringpb.CreateTimeSeriesRequest, +) (*emptypb.Empty, error) { + f.metricsTestServer.appendCreateServiceTimeSeriesReq(ctx, req) + return &emptypb.Empty{}, nil +} + +func (f *fakeMetricServiceServer) CreateMetricDescriptor( + ctx context.Context, + req *monitoringpb.CreateMetricDescriptorRequest, +) (*metricpb.MetricDescriptor, error) { + f.metricsTestServer.appendCreateMetricDescriptorReq(ctx, req) + return &metricpb.MetricDescriptor{}, nil +} + +func NewMetricTestServer() (*MetricsTestServer, error) { + srv := grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{Time: 5 * time.Minute})) + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + return nil, err + } + testServer := &MetricsTestServer{ + Endpoint: lis.Addr().String(), + lis: lis, + srv: srv, + } + + monitoringpb.RegisterMetricServiceServer( + srv, + &fakeMetricServiceServer{metricsTestServer: testServer}, + ) + + return testServer, nil +} + +func requireNoError(t *testing.T, err error) { + if err != nil { + t.Fatalf("Received unexpected error: \n%v", err) + } +} + +func assertNoError(t *testing.T, err error) { + if err != nil { + t.Errorf("Received unexpected error: \n%v", err) + } +} + +func assertErrorIs(t *testing.T, gotErr error, wantErr error) { + if !errors.Is(gotErr, wantErr) { + t.Errorf("error got: %v, want: %v", gotErr, wantErr) + } +} + +func assertEqual(t *testing.T, got, want interface{}) { + if !testutil.Equal(got, want) { + t.Errorf("got: %+v, want: %+v", got, want) + } + +} + +func TestExportMetrics(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute) + defer cancel() + + testServer, err := NewMetricTestServer() + //nolint:errcheck + go testServer.Serve() + defer testServer.Shutdown() + assertNoError(t, err) + + res := &resource.Resource{} + + clientOpts := []option.ClientOption{ + option.WithEndpoint(testServer.Endpoint), + option.WithoutAuthentication(), + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + } + exporter, err := newMonitoringExporter(ctx, "PROJECT_ID_NOT_REAL", clientOpts...) + if err != nil { + t.Errorf("Error occurred when creating exporter: %v", err) + } + + // Reduce sampling period to reduce test run time + origSamplePeriod := defaultSamplePeriod + defaultSamplePeriod = 5 * time.Second + defer func() { + defaultSamplePeriod = origSamplePeriod + }() + provider := metric.NewMeterProvider( + metric.WithReader(metric.NewPeriodicReader(exporter, metric.WithInterval(defaultSamplePeriod))), + metric.WithResource(res), + ) + + //nolint:errcheck + defer func() { + err = provider.Shutdown(ctx) + assertNoError(t, err) + }() + + meterBuiltIn := provider.Meter(builtInMetricsMeterName) + counterBuiltIn, err := meterBuiltIn.Int64Counter("name.lastvalue") + requireNoError(t, err) + + meterNameNotBuiltIn := "testing" + meterNotbuiltIn := provider.Meter(meterNameNotBuiltIn) + counterNotBuiltIn, err := meterNotbuiltIn.Int64Counter("name.lastvalue") + requireNoError(t, err) + + // record start time + testStartTime := time.Now() + + // record data points + counterBuiltIn.Add(ctx, 1) + counterNotBuiltIn.Add(ctx, 1) + + // Calculate elapsed time + elapsedTime := time.Since(testStartTime) + if elapsedTime < 3*defaultSamplePeriod { + // Ensure at least 2 datapoints are recorded + time.Sleep(3*defaultSamplePeriod - elapsedTime) + } + + gotCalls := testServer.CreateServiceTimeSeriesRequests() + for _, gotCall := range gotCalls { + for _, ts := range gotCall.TimeSeries { + if strings.Contains(ts.Metric.Type, meterNameNotBuiltIn) { + t.Errorf("Exporter should only export builtin metrics") + } + } + } +} + +func TestExportCounter(t *testing.T) { + ctx := context.Background() + testServer, err := NewMetricTestServer() + //nolint:errcheck + go testServer.Serve() + defer testServer.Shutdown() + assertNoError(t, err) + + clientOpts := []option.ClientOption{ + option.WithEndpoint(testServer.Endpoint), + option.WithoutAuthentication(), + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + } + exporter, err := newMonitoringExporter(ctx, "PROJECT_ID_NOT_REAL", clientOpts...) + assertNoError(t, err) + provider := metric.NewMeterProvider( + metric.WithReader(metric.NewPeriodicReader(exporter)), + metric.WithResource( + resource.NewWithAttributes( + semconv.SchemaURL, + attribute.String("test_id", "abc123"), + )), + ) + + //nolint:errcheck + defer func() { + err = provider.Shutdown(ctx) + assertNoError(t, err) + }() + + // Start meter + meter := provider.Meter(builtInMetricsMeterName) + + // Register counter value + counter, err := meter.Int64Counter("counter-a") + assertNoError(t, err) + clabels := []attribute.KeyValue{attribute.Key("key").String("value")} + counter.Add(ctx, 100, otelmetric.WithAttributes(clabels...)) +} + +func TestExportHistogram(t *testing.T) { + ctx := context.Background() + testServer, err := NewMetricTestServer() + //nolint:errcheck + go testServer.Serve() + defer testServer.Shutdown() + assertNoError(t, err) + + clientOpts := []option.ClientOption{ + option.WithEndpoint(testServer.Endpoint), + option.WithoutAuthentication(), + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + } + exporter, err := newMonitoringExporter(ctx, "PROJECT_ID_NOT_REAL", clientOpts...) + assertNoError(t, err) + provider := metric.NewMeterProvider( + metric.WithReader(metric.NewPeriodicReader(exporter)), + metric.WithResource( + resource.NewWithAttributes( + semconv.SchemaURL, + attribute.String("test_id", "abc123"), + ), + ), + ) + assertNoError(t, err) + + //nolint:errcheck + defer func() { + err = provider.Shutdown(ctx) + assertNoError(t, err) + }() + + // Start meter + meter := provider.Meter(builtInMetricsMeterName) + + // Register counter value + counter, err := meter.Float64Histogram("counter-a") + assertNoError(t, err) + clabels := []attribute.KeyValue{attribute.Key("key").String("value")} + counter.Record(ctx, 100, otelmetric.WithAttributes(clabels...)) + counter.Record(ctx, 50, otelmetric.WithAttributes(clabels...)) + counter.Record(ctx, 200, otelmetric.WithAttributes(clabels...)) +} + +func TestRecordToMpb(t *testing.T) { + metricName := "testing" + + me := &monitoringExporter{} + + monitoredResLabelValueProject := "project01" + monitoredResLabelValueInstance := "instance01" + monitoredResLabelValueZone := "zone01" + monitoredResLabelValueTable := "table01" + monitoredResLabelValueCluster := "cluster01" + + inputAttributes := attribute.NewSet( + attribute.Key("a").String("A"), + attribute.Key("b").Int64(100), + attribute.Key(monitoredResLabelKeyProject).String(monitoredResLabelValueProject), + attribute.Key(monitoredResLabelKeyInstance).String(monitoredResLabelValueInstance), + attribute.Key(monitoredResLabelKeyZone).String(monitoredResLabelValueZone), + attribute.Key(monitoredResLabelKeyTable).String(monitoredResLabelValueTable), + attribute.Key(monitoredResLabelKeyCluster).String(monitoredResLabelValueCluster), + ) + inputMetrics := metricdata.Metrics{ + Name: metricName, + } + + wantMetric := &googlemetricpb.Metric{ + Type: fmt.Sprintf("%v%s", builtInMetricsMeterName, metricName), + Labels: map[string]string{ + "a": "A", + "b": "100", + }, + } + + wantMonitoredResource := &monitoredrespb.MonitoredResource{ + Type: "bigtable_client_raw", + Labels: map[string]string{ + monitoredResLabelKeyProject: monitoredResLabelValueProject, + monitoredResLabelKeyInstance: monitoredResLabelValueInstance, + monitoredResLabelKeyZone: monitoredResLabelValueZone, + monitoredResLabelKeyTable: monitoredResLabelValueTable, + monitoredResLabelKeyCluster: monitoredResLabelValueCluster, + }, + } + + gotMetric, gotMonitoredResource := me.recordToMetricAndMonitoredResourcePbs(inputMetrics, inputAttributes) + if !reflect.DeepEqual(wantMetric, gotMetric) { + t.Errorf("Metric: expected: %v, actual: %v", wantMetric, gotMetric) + } + if !reflect.DeepEqual(wantMonitoredResource, gotMonitoredResource) { + t.Errorf("Monitored resource: expected: %v, actual: %v", wantMonitoredResource, gotMonitoredResource) + } +} + +func TestTimeIntervalStaggering(t *testing.T) { + var tm time.Time + + interval, err := toNonemptyTimeIntervalpb(tm, tm) + if err != nil { + t.Fatalf("conversion to PB failed: %v", err) + } + + if err := interval.StartTime.CheckValid(); err != nil { + t.Fatalf("unable to convert start time from PB: %v", err) + } + start := interval.StartTime.AsTime() + + if err := interval.EndTime.CheckValid(); err != nil { + t.Fatalf("unable to convert end time to PB: %v", err) + } + end := interval.EndTime.AsTime() + + if end.Before(start.Add(time.Millisecond)) { + t.Fatalf("expected end=%v to be at least %v after start=%v, but it wasn't", end, time.Millisecond, start) + } +} + +func TestTimeIntervalPassthru(t *testing.T) { + var tm time.Time + + interval, err := toNonemptyTimeIntervalpb(tm, tm.Add(time.Second)) + if err != nil { + t.Fatalf("conversion to PB failed: %v", err) + } + + if err := interval.StartTime.CheckValid(); err != nil { + t.Fatalf("unable to convert start time from PB: %v", err) + } + start := interval.StartTime.AsTime() + + if err := interval.EndTime.CheckValid(); err != nil { + t.Fatalf("unable to convert end time to PB: %v", err) + } + end := interval.EndTime.AsTime() + + assertEqual(t, start, tm) + assertEqual(t, end, tm.Add(time.Second)) +} + +func TestConcurrentCallsAfterShutdown(t *testing.T) { + testServer, err := NewMetricTestServer() + //nolint:errcheck + go testServer.Serve() + defer testServer.Shutdown() + assertNoError(t, err) + + ctx := context.Background() + clientOpts := []option.ClientOption{ + option.WithEndpoint(testServer.Endpoint), + option.WithoutAuthentication(), + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + } + exporter, err := newMonitoringExporter(ctx, "PROJECT_ID_NOT_REAL", clientOpts...) + assertNoError(t, err) + + err = exporter.Shutdown(ctx) + assertNoError(t, err) + + var wg sync.WaitGroup + wg.Add(3) + + go func() { + err := exporter.Shutdown(ctx) + assertErrorIs(t, err, errShutdown) + wg.Done() + }() + go func() { + err := exporter.ForceFlush(ctx) + assertNoError(t, err) + wg.Done() + }() + go func() { + err := exporter.Export(ctx, &metricdata.ResourceMetrics{}) + assertErrorIs(t, err, errShutdown) + wg.Done() + }() + + wg.Wait() +} + +func TestConcurrentExport(t *testing.T) { + testServer, err := NewMetricTestServer() + //nolint:errcheck + go testServer.Serve() + defer testServer.Shutdown() + assertNoError(t, err) + + ctx := context.Background() + clientOpts := []option.ClientOption{ + option.WithEndpoint(testServer.Endpoint), + option.WithoutAuthentication(), + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + } + exporter, err := newMonitoringExporter(ctx, "PROJECT_ID_NOT_REAL", clientOpts...) + assertNoError(t, err) + + defer func() { + err := exporter.Shutdown(ctx) + assertNoError(t, err) + }() + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + err := exporter.Export(ctx, &metricdata.ResourceMetrics{ + ScopeMetrics: []metricdata.ScopeMetrics{ + { + Metrics: []metricdata.Metrics{ + {Name: "testing", Data: metricdata.Histogram[float64]{}}, + {Name: "test/of/path", Data: metricdata.Histogram[float64]{}}, + }, + }, + }, + }) + assertNoError(t, err) + wg.Done() + }() + go func() { + err := exporter.Export(ctx, &metricdata.ResourceMetrics{ + ScopeMetrics: []metricdata.ScopeMetrics{ + { + Metrics: []metricdata.Metrics{ + {Name: "testing", Data: metricdata.Histogram[float64]{}}, + {Name: "test/of/path", Data: metricdata.Histogram[float64]{}}, + }, + }, + }, + }) + assertNoError(t, err) + wg.Done() + }() + + wg.Wait() +} + +func TestBatchingExport(t *testing.T) { + ctx := context.Background() + setup := func(t *testing.T) (metric.Exporter, *MetricsTestServer) { + testServer, err := NewMetricTestServer() + //nolint:errcheck + go testServer.Serve() + t.Cleanup(testServer.Shutdown) + + assertNoError(t, err) + + clientOpts := []option.ClientOption{ + option.WithEndpoint(testServer.Endpoint), + option.WithoutAuthentication(), + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + } + exporter, err := newMonitoringExporter(ctx, "PROJECT_ID_NOT_REAL", clientOpts...) + assertNoError(t, err) + + t.Cleanup(func() { + ctx := context.Background() + err := exporter.Shutdown(ctx) + assertNoError(t, err) + }) + + return exporter, testServer + } + + createMetrics := func(n int) []metricdata.Metrics { + inputMetrics := make([]metricdata.Metrics, n) + for i := 0; i < n; i++ { + inputMetrics[i] = metricdata.Metrics{Name: "testing", Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + {}, + }, + }} + } + + return inputMetrics + } + + for _, tc := range []struct { + desc string + numMetrics int + expectedCreateTSCalls int + }{ + {desc: "0 metrics"}, + { + desc: "150 metrics", + numMetrics: 150, + expectedCreateTSCalls: 1, + }, + { + desc: "200 metrics", + numMetrics: 200, + expectedCreateTSCalls: 1, + }, + { + desc: "201 metrics", + numMetrics: 201, + expectedCreateTSCalls: 2, + }, + { + desc: "500 metrics", + numMetrics: 500, + expectedCreateTSCalls: 3, + }, + { + desc: "1199 metrics", + numMetrics: 1199, + expectedCreateTSCalls: 6, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + exporter, testServer := setup(t) + input := createMetrics(tc.numMetrics) + + err := exporter.Export(ctx, &metricdata.ResourceMetrics{ + ScopeMetrics: []metricdata.ScopeMetrics{ + { + Scope: instrumentation.Scope{ + Name: builtInMetricsMeterName, + }, + Metrics: input, + }, + }, + }) + assertNoError(t, err) + + gotCalls := testServer.CreateServiceTimeSeriesRequests() + assertEqual(t, len(gotCalls), tc.expectedCreateTSCalls) + }) + } +} diff --git a/bigtable/metrics_test.go b/bigtable/metrics_test.go new file mode 100644 index 000000000000..43b36dfc8fe7 --- /dev/null +++ b/bigtable/metrics_test.go @@ -0,0 +1,470 @@ +/* +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bigtable + +import ( + "context" + "fmt" + "sort" + "strings" + "testing" + "time" + + "cloud.google.com/go/internal/testutil" + "github.com/google/go-cmp/cmp/cmpopts" + "go.opentelemetry.io/otel/attribute" + "google.golang.org/api/option" + btpb "google.golang.org/genproto/googleapis/bigtable/v2" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + + "google.golang.org/grpc/metadata" +) + +var ( + clusterID1 = "cluster-id-1" + clusterID2 = "cluster-id-2" + zoneID1 = "zone-id-1" + + testHeaders, _ = proto.Marshal(&btpb.ResponseParams{ + ClusterId: &clusterID1, + ZoneId: &zoneID1, + }) + testTrailers, _ = proto.Marshal(&btpb.ResponseParams{ + ClusterId: &clusterID2, + ZoneId: &zoneID1, + }) + + testHeaderMD = &metadata.MD{ + locationMDKey: []string{string(testHeaders)}, + serverTimingMDKey: []string{"gfet4t7; dur=1234"}, + } + testTrailerMD = &metadata.MD{ + locationMDKey: []string{string(testTrailers)}, + serverTimingMDKey: []string{"gfet4t7; dur=5678"}, + } +) + +func equalErrs(gotErr error, wantErr error) bool { + if gotErr == nil && wantErr == nil { + return true + } + if gotErr == nil || wantErr == nil { + return false + } + return strings.Contains(gotErr.Error(), wantErr.Error()) +} + +func TestNewBuiltinMetricsTracerFactory(t *testing.T) { + ctx := context.Background() + project := "test-project" + instance := "test-instance" + appProfile := "test-app-profile" + clientUID := "test-uid" + + wantClientAttributes := []attribute.KeyValue{ + attribute.String(monitoredResLabelKeyProject, project), + attribute.String(monitoredResLabelKeyInstance, instance), + attribute.String(metricLabelKeyAppProfile, appProfile), + attribute.String(metricLabelKeyClientUID, clientUID), + attribute.String(metricLabelKeyClientName, clientName), + } + wantMetricNamesStdout := []string{metricNameAttemptLatencies, metricNameAttemptLatencies, metricNameOperationLatencies, metricNameRetryCount, metricNameServerLatencies} + wantMetricTypesGCM := []string{} + for _, wantMetricName := range wantMetricNamesStdout { + wantMetricTypesGCM = append(wantMetricTypesGCM, builtInMetricsMeterName+wantMetricName) + } + + // Reduce sampling period to reduce test run time + origSamplePeriod := defaultSamplePeriod + defaultSamplePeriod = 5 * time.Second + defer func() { + defaultSamplePeriod = origSamplePeriod + }() + + // return constant client UID instead of random, so that attributes can be compared + origGenerateClientUID := generateClientUID + generateClientUID = func() (string, error) { + return clientUID, nil + } + defer func() { + generateClientUID = origGenerateClientUID + }() + + // Setup mock monitoring server + monitoringServer, err := NewMetricTestServer() + if err != nil { + t.Fatalf("Error setting up metrics test server") + } + go monitoringServer.Serve() + defer monitoringServer.Shutdown() + origExporterOpts := exporterOpts + exporterOpts = []option.ClientOption{ + option.WithEndpoint(monitoringServer.Endpoint), + option.WithoutAuthentication(), + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + } + defer func() { + exporterOpts = origExporterOpts + }() + + // Setup fake Bigtable server + isFirstAttempt := true + headerAndErrorInjector := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if strings.HasSuffix(info.FullMethod, "ReadRows") { + if isFirstAttempt { + // Fail first attempt + isFirstAttempt = false + return status.Error(codes.Unavailable, "Mock Unavailable error") + } + header := metadata.New(map[string]string{ + serverTimingMDKey: "gfet4t7; dur=123", + locationMDKey: string(testHeaders), + }) + ss.SendHeader(header) + } + return handler(srv, ss) + } + + tests := []struct { + desc string + config ClientConfig + wantBuiltinEnabled bool + setEmulator bool + wantCreateTSCallsCount int // No. of CreateTimeSeries calls + }{ + { + desc: "should create a new tracer factory with default meter provider", + config: ClientConfig{}, + wantBuiltinEnabled: true, + wantCreateTSCallsCount: 2, + }, + { + desc: "should create a new tracer factory with noop meter provider", + config: ClientConfig{MetricsProvider: NoopMetricsProvider{}}, + }, + { + desc: "should not create instruments when BIGTABLE_EMULATOR_HOST is set", + config: ClientConfig{}, + setEmulator: true, + }, + } + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + if test.setEmulator { + // Set environment variable + t.Setenv("BIGTABLE_EMULATOR_HOST", "localhost:8086") + } + + // open table and compare errors + tbl, cleanup, gotErr := setupFakeServer(project, instance, test.config, grpc.StreamInterceptor(headerAndErrorInjector)) + defer cleanup() + if gotErr != nil { + t.Fatalf("err: got: %v, want: %v", gotErr, nil) + return + } + + gotClient := tbl.c + + if gotClient.metricsTracerFactory.enabled != test.wantBuiltinEnabled { + t.Errorf("builtinEnabled: got: %v, want: %v", gotClient.metricsTracerFactory.enabled, test.wantBuiltinEnabled) + } + + if diff := testutil.Diff(gotClient.metricsTracerFactory.clientAttributes, wantClientAttributes, + cmpopts.IgnoreUnexported(attribute.KeyValue{}, attribute.Value{})); diff != "" { + t.Errorf("clientAttributes: got=-, want=+ \n%v", diff) + } + + // Check instruments + gotNonNilInstruments := gotClient.metricsTracerFactory.operationLatencies != nil && + gotClient.metricsTracerFactory.serverLatencies != nil && + gotClient.metricsTracerFactory.attemptLatencies != nil && + gotClient.metricsTracerFactory.retryCount != nil + if test.wantBuiltinEnabled != gotNonNilInstruments { + t.Errorf("NonNilInstruments: got: %v, want: %v", gotNonNilInstruments, test.wantBuiltinEnabled) + } + + // record start time + testStartTime := time.Now() + + // pop out all old requests + monitoringServer.CreateServiceTimeSeriesRequests() + + // Perform read rows operation + isFirstAttempt = true + err := tbl.ReadRows(ctx, NewRange("a", "z"), func(r Row) bool { + return true + }) + if err != nil { + t.Fatalf("ReadRows failed: %v", err) + } + + // Calculate elapsed time + elapsedTime := time.Since(testStartTime) + if elapsedTime < 3*defaultSamplePeriod { + // Ensure at least 2 datapoints are recorded + time.Sleep(3*defaultSamplePeriod - elapsedTime) + } + + // Get new CreateServiceTimeSeriesRequests + gotCreateTSCalls := monitoringServer.CreateServiceTimeSeriesRequests() + for _, gotCreateTSCall := range gotCreateTSCalls { + gotMetricTypes := []string{} + for _, ts := range gotCreateTSCall.TimeSeries { + gotMetricTypes = append(gotMetricTypes, ts.Metric.Type) + } + sort.Strings(gotMetricTypes) + if !testutil.Equal(gotMetricTypes, wantMetricTypesGCM) { + t.Errorf("Metric types missing in req. got: %v, want: %v", gotMetricTypes, wantMetricTypesGCM) + } + } + + gotCreateTSCallsCount := len(gotCreateTSCalls) + if gotCreateTSCallsCount < test.wantCreateTSCallsCount { + t.Errorf("No. of CreateServiceTimeSeriesRequests: got: %v, want: %v", gotCreateTSCalls, test.wantCreateTSCallsCount) + } + }) + } +} + +func TestToOtelMetricAttrs(t *testing.T) { + mt := builtinMetricsTracer{ + tableName: "my-table", + method: "ReadRows", + isStreaming: true, + currOp: opTracer{ + status: codes.OK.String(), + currAttempt: attemptTracer{ + startTime: time.Now(), + clusterID: "my-cluster", + zoneID: "my-zone", + }, + attemptCount: 1, + }, + } + tests := []struct { + desc string + mt builtinMetricsTracer + metricName string + wantAttrs []attribute.KeyValue + wantError error + }{ + { + desc: "Known metric", + mt: mt, + metricName: metricNameOperationLatencies, + wantAttrs: []attribute.KeyValue{ + attribute.String(monitoredResLabelKeyTable, "my-table"), + attribute.String(metricLabelKeyMethod, "ReadRows"), + attribute.Bool(metricLabelKeyStreamingOperation, true), + attribute.String(metricLabelKeyStatus, codes.OK.String()), + attribute.String(monitoredResLabelKeyCluster, clusterID1), + attribute.String(monitoredResLabelKeyZone, zoneID1), + }, + wantError: nil, + }, + { + desc: "Unknown metric", + mt: mt, + metricName: "unknown_metric", + wantAttrs: []attribute.KeyValue{ + attribute.String(monitoredResLabelKeyTable, "my-table"), + attribute.String(metricLabelKeyMethod, "ReadRows"), + attribute.String(monitoredResLabelKeyCluster, clusterID1), + attribute.String(monitoredResLabelKeyZone, zoneID1), + }, + wantError: fmt.Errorf("unable to create attributes list for unknown metric: unknown_metric"), + }, + } + + lessKeyValue := func(a, b attribute.KeyValue) bool { return a.Key < b.Key } + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + gotAttrs, gotErr := test.mt.toOtelMetricAttrs(test.metricName) + if !equalErrs(gotErr, test.wantError) { + t.Errorf("error got: %v, want: %v", gotErr, test.wantError) + } + if diff := testutil.Diff(gotAttrs, test.wantAttrs, + cmpopts.IgnoreUnexported(attribute.KeyValue{}, attribute.Value{}), + cmpopts.SortSlices(lessKeyValue)); diff != "" { + t.Errorf("got=-, want=+ \n%v", diff) + } + }) + } +} + +func TestGetServerLatency(t *testing.T) { + invalidFormat := "invalid format" + invalidFormatMD := metadata.MD{ + serverTimingMDKey: []string{invalidFormat}, + } + invalidFormatErr := fmt.Errorf("strconv.ParseFloat: parsing %q: invalid syntax", invalidFormat) + + tests := []struct { + desc string + headerMD metadata.MD + trailerMD metadata.MD + wantLatency float64 + wantError error + }{ + { + desc: "No server latency in header or trailer", + headerMD: metadata.MD{}, + trailerMD: metadata.MD{}, + wantLatency: 0, + wantError: fmt.Errorf("strconv.ParseFloat: parsing \"\": invalid syntax"), + }, + { + desc: "Server latency in header", + headerMD: metadata.MD{ + serverTimingMDKey: []string{"gfet4t7; dur=1234"}, + }, + trailerMD: metadata.MD{}, + wantLatency: 1234, + wantError: nil, + }, + { + desc: "Server latency in trailer", + headerMD: metadata.MD{}, + trailerMD: metadata.MD{ + serverTimingMDKey: []string{"gfet4t7; dur=5678"}, + }, + wantLatency: 5678, + wantError: nil, + }, + { + desc: "Server latency in both header and trailer", + headerMD: metadata.MD{ + serverTimingMDKey: []string{"gfet4t7; dur=1234"}, + }, + trailerMD: metadata.MD{ + serverTimingMDKey: []string{"gfet4t7; dur=5678"}, + }, + wantLatency: 1234, + wantError: nil, + }, + { + desc: "Invalid server latency format in header", + headerMD: invalidFormatMD, + trailerMD: metadata.MD{}, + wantLatency: 0, + wantError: invalidFormatErr, + }, + { + desc: "Invalid server latency format in trailer", + headerMD: metadata.MD{}, + trailerMD: invalidFormatMD, + wantLatency: 0, + wantError: invalidFormatErr, + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + gotLatency, gotErr := extractServerLatency(test.headerMD, test.trailerMD) + if !equalErrs(gotErr, test.wantError) { + t.Errorf("error got: %v, want: %v", gotErr, test.wantError) + } + if gotLatency != test.wantLatency { + t.Errorf("latency got: %v, want: %v", gotLatency, test.wantLatency) + } + }) + } +} + +func TestGetLocation(t *testing.T) { + invalidFormatErr := "cannot parse invalid wire-format data" + tests := []struct { + desc string + headerMD metadata.MD + trailerMD metadata.MD + wantCluster string + wantZone string + wantError error + }{ + { + desc: "No location metadata in header or trailer", + headerMD: metadata.MD{}, + trailerMD: metadata.MD{}, + wantCluster: defaultCluster, + wantZone: defaultZone, + wantError: fmt.Errorf("failed to get location metadata"), + }, + { + desc: "Location metadata in header", + headerMD: *testHeaderMD, + trailerMD: metadata.MD{}, + wantCluster: clusterID1, + wantZone: zoneID1, + wantError: nil, + }, + { + desc: "Location metadata in trailer", + headerMD: metadata.MD{}, + trailerMD: *testTrailerMD, + wantCluster: clusterID2, + wantZone: zoneID1, + wantError: nil, + }, + { + desc: "Location metadata in both header and trailer", + headerMD: *testHeaderMD, + trailerMD: *testTrailerMD, + wantCluster: clusterID1, + wantZone: zoneID1, + wantError: nil, + }, + { + desc: "Invalid location metadata format in header", + headerMD: metadata.MD{ + locationMDKey: []string{"invalid format"}, + }, + trailerMD: metadata.MD{}, + wantCluster: defaultCluster, + wantZone: defaultZone, + wantError: fmt.Errorf(invalidFormatErr), + }, + { + desc: "Invalid location metadata format in trailer", + headerMD: metadata.MD{}, + trailerMD: metadata.MD{ + locationMDKey: []string{"invalid format"}, + }, + wantCluster: defaultCluster, + wantZone: defaultZone, + wantError: fmt.Errorf(invalidFormatErr), + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + gotCluster, gotZone, gotErr := extractLocation(test.headerMD, test.trailerMD) + if gotCluster != test.wantCluster { + t.Errorf("cluster got: %v, want: %v", gotCluster, test.wantCluster) + } + if gotZone != test.wantZone { + t.Errorf("zone got: %v, want: %v", gotZone, test.wantZone) + } + if !equalErrs(gotErr, test.wantError) { + t.Errorf("error got: %v, want: %v", gotErr, test.wantError) + } + }) + } +} diff --git a/bigtable/retry_test.go b/bigtable/retry_test.go index 9d59175cc28e..e92c79652d66 100644 --- a/bigtable/retry_test.go +++ b/bigtable/retry_test.go @@ -33,7 +33,7 @@ import ( "google.golang.org/protobuf/types/known/wrapperspb" ) -func setupFakeServer(opt ...grpc.ServerOption) (tbl *Table, cleanup func(), err error) { +func setupFakeServer(project, instance string, config ClientConfig, opt ...grpc.ServerOption) (tbl *Table, cleanup func(), err error) { srv, err := bttest.NewServer("localhost:0", opt...) if err != nil { return nil, nil, err @@ -43,12 +43,12 @@ func setupFakeServer(opt ...grpc.ServerOption) (tbl *Table, cleanup func(), err return nil, nil, err } - client, err := NewClient(context.Background(), "client", "instance", option.WithGRPCConn(conn), option.WithGRPCDialOption(grpc.WithBlock())) + client, err := NewClientWithConfig(context.Background(), project, instance, config, option.WithGRPCConn(conn), option.WithGRPCDialOption(grpc.WithBlock())) if err != nil { return nil, nil, err } - adminClient, err := NewAdminClient(context.Background(), "client", "instance", option.WithGRPCConn(conn), option.WithGRPCDialOption(grpc.WithBlock())) + adminClient, err := NewAdminClient(context.Background(), project, instance, option.WithGRPCConn(conn), option.WithGRPCDialOption(grpc.WithBlock())) if err != nil { return nil, nil, err } @@ -68,6 +68,10 @@ func setupFakeServer(opt ...grpc.ServerOption) (tbl *Table, cleanup func(), err return t, cleanupFunc, nil } +func setupDefaultFakeServer(opt ...grpc.ServerOption) (tbl *Table, cleanup func(), err error) { + return setupFakeServer("client", "instance", ClientConfig{}, opt...) +} + func TestRetryApply(t *testing.T) { ctx := context.Background() @@ -82,7 +86,7 @@ func TestRetryApply(t *testing.T) { } return handler(ctx, req) } - tbl, cleanup, err := setupFakeServer(grpc.UnaryInterceptor(errInjector)) + tbl, cleanup, err := setupDefaultFakeServer(grpc.UnaryInterceptor(errInjector)) if err != nil { t.Fatalf("fake server setup: %v", err) } @@ -176,7 +180,7 @@ func TestRetryApplyBulk_OverallRequestFailure(t *testing.T) { return handler(ctx, ss) } - tbl, cleanup, err := setupFakeServer(grpc.StreamInterceptor(errInjector)) + tbl, cleanup, err := setupDefaultFakeServer(grpc.StreamInterceptor(errInjector)) defer cleanup() if err != nil { t.Fatalf("fake server setup: %v", err) @@ -233,7 +237,7 @@ func TestRetryApplyBulk_FailuresAndRetriesInOneRequest(t *testing.T) { return handler(ctx, ss) } - tbl, cleanup, err := setupFakeServer(grpc.StreamInterceptor(errInjector)) + tbl, cleanup, err := setupDefaultFakeServer(grpc.StreamInterceptor(errInjector)) defer cleanup() if err != nil { t.Fatalf("fake server setup: %v", err) @@ -281,7 +285,7 @@ func TestRetryApplyBulk_UnretryableErrors(t *testing.T) { return handler(ctx, ss) } - tbl, cleanup, err := setupFakeServer(grpc.StreamInterceptor(errInjector)) + tbl, cleanup, err := setupDefaultFakeServer(grpc.StreamInterceptor(errInjector)) defer cleanup() if err != nil { t.Fatalf("fake server setup: %v", err) @@ -329,7 +333,7 @@ func TestRetryApplyBulk_IndividualErrorsAndDeadlineExceeded(t *testing.T) { return handler(ctx, ss) } - tbl, cleanup, err := setupFakeServer(grpc.StreamInterceptor(errInjector)) + tbl, cleanup, err := setupDefaultFakeServer(grpc.StreamInterceptor(errInjector)) defer cleanup() if err != nil { t.Fatalf("fake server setup: %v", err) @@ -346,8 +350,8 @@ func TestRetryApplyBulk_IndividualErrorsAndDeadlineExceeded(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, -10*time.Millisecond) defer cancel() errors, err := tbl.ApplyBulk(ctx, []string{"row1", "row2", "row3"}, []*Mutation{m1, m2, m3}) - wantErr := context.DeadlineExceeded - if wantErr != err { + wantErr := status.Error(codes.DeadlineExceeded, context.DeadlineExceeded.Error()) + if !equalErrs(wantErr, err) { t.Fatalf("deadline exceeded error: got: %v, want: %v", err, wantErr) } if errors != nil { @@ -405,7 +409,7 @@ func TestRetryReadRows(t *testing.T) { return handler(ctx, ss) } - tbl, cleanup, err := setupFakeServer(grpc.StreamInterceptor(errInjector)) + tbl, cleanup, err := setupDefaultFakeServer(grpc.StreamInterceptor(errInjector)) defer cleanup() if err != nil { t.Fatalf("fake server setup: %v", err) @@ -476,7 +480,7 @@ func TestRetryReverseReadRows(t *testing.T) { return handler(ctx, ss) } - tbl, cleanup, err := setupFakeServer(grpc.StreamInterceptor(errInjector)) + tbl, cleanup, err := setupDefaultFakeServer(grpc.StreamInterceptor(errInjector)) defer cleanup() if err != nil { t.Fatalf("fake server setup: %v", err)