From 193e12420b06a72f4db611764f1bb6d5a15bfa56 Mon Sep 17 00:00:00 2001 From: ydrozhdzhal Date: Mon, 18 Oct 2021 16:57:42 +0300 Subject: [PATCH] Added implementation of Google Cloud Spanner receiver - 4th part(actual receiver implementation). This part of implementation is the final one and contains actual receiver implementation. It includes reading and parsing of metadata configuration, initialization of stats readers(project, database) and collection of metrics. --- .../googlecloudspannerreceiver/receiver.go | 151 +++++++- .../receiver_test.go | 331 ++++++++++++++++++ 2 files changed, 472 insertions(+), 10 deletions(-) create mode 100644 receiver/googlecloudspannerreceiver/receiver_test.go diff --git a/receiver/googlecloudspannerreceiver/receiver.go b/receiver/googlecloudspannerreceiver/receiver.go index 42162f92456f..80f29b2a1737 100644 --- a/receiver/googlecloudspannerreceiver/receiver.go +++ b/receiver/googlecloudspannerreceiver/receiver.go @@ -16,20 +16,34 @@ package googlecloudspannerreceiver import ( "context" + _ "embed" + "fmt" "time" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/model/pdata" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/datasource" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/metadataparser" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/statsreader" ) +//go:embed "internal/metadataconfig/metadata.yaml" +var metadataYaml []byte + var _ component.MetricsReceiver = (*googleCloudSpannerReceiver)(nil) type googleCloudSpannerReceiver struct { - logger *zap.Logger - nextConsumer consumer.Metrics - config *Config - cancel context.CancelFunc + logger *zap.Logger + nextConsumer consumer.Metrics + config *Config + cancel context.CancelFunc + projectReaders []statsreader.CompositeReader + onCollectData []func(error) } func newGoogleCloudSpannerReceiver( @@ -37,6 +51,10 @@ func newGoogleCloudSpannerReceiver( config *Config, nextConsumer consumer.Metrics) (component.MetricsReceiver, error) { + if nextConsumer == nil { + return nil, componenterror.ErrNilNextConsumer + } + r := &googleCloudSpannerReceiver{ logger: logger, nextConsumer: nextConsumer, @@ -45,17 +63,22 @@ func newGoogleCloudSpannerReceiver( return r, nil } -func (gcsReceiver *googleCloudSpannerReceiver) Start(ctx context.Context, host component.Host) error { - ctx, gcsReceiver.cancel = context.WithCancel(ctx) +func (r *googleCloudSpannerReceiver) Start(ctx context.Context, _ component.Host) error { + ctx, r.cancel = context.WithCancel(ctx) + err := r.initializeProjectReaders(ctx) + if err != nil { + return err + } go func() { - ticker := time.NewTicker(gcsReceiver.config.CollectionInterval) + ticker := time.NewTicker(r.config.CollectionInterval) defer ticker.Stop() for { select { case <-ticker.C: - // TODO Collect data here + // Ignoring this error because it has been already logged inside collectData + r.notifyOnCollectData(r.collectData(ctx)) case <-ctx.Done(): return } @@ -65,8 +88,116 @@ func (gcsReceiver *googleCloudSpannerReceiver) Start(ctx context.Context, host c return nil } -func (gcsReceiver *googleCloudSpannerReceiver) Shutdown(context.Context) error { - gcsReceiver.cancel() +func (r *googleCloudSpannerReceiver) Shutdown(context.Context) error { + for _, projectReader := range r.projectReaders { + projectReader.Shutdown() + } + + r.cancel() + + return nil +} + +func (r *googleCloudSpannerReceiver) initializeProjectReaders(ctx context.Context) error { + readerConfig := statsreader.ReaderConfig{ + BackfillEnabled: r.config.BackfillEnabled, + TopMetricsQueryMaxRows: r.config.TopMetricsQueryMaxRows, + } + + parseMetadata, err := metadataparser.ParseMetadataConfig(metadataYaml) + if err != nil { + return fmt.Errorf("error occurred during parsing of metadata: %w", err) + } + + for _, project := range r.config.Projects { + projectReader, err := newProjectReader(ctx, r.logger, project, parseMetadata, readerConfig) + if err != nil { + return err + } + + r.projectReaders = append(r.projectReaders, projectReader) + } + + return nil +} + +func newProjectReader(ctx context.Context, logger *zap.Logger, project Project, parsedMetadata []*metadata.MetricsMetadata, + readerConfig statsreader.ReaderConfig) (*statsreader.ProjectReader, error) { + logger.Debug("Constructing project reader for project", zap.String("project id", project.ID)) + + databaseReadersCount := 0 + for _, instance := range project.Instances { + databaseReadersCount += len(instance.Databases) + } + + databaseReaders := make([]statsreader.CompositeReader, databaseReadersCount) + databaseReaderIndex := 0 + for _, instance := range project.Instances { + for _, database := range instance.Databases { + logger.Debug("Constructing database reader for combination of project, instance, database", + zap.String("project id", project.ID), zap.String("instance id", instance.ID), zap.String("database", database)) + + databaseID := datasource.NewDatabaseID(project.ID, instance.ID, database) + + databaseReader, err := statsreader.NewDatabaseReader(ctx, parsedMetadata, databaseID, + project.ServiceAccountKey, readerConfig, logger) + if err != nil { + return nil, err + } + + databaseReaders[databaseReaderIndex] = databaseReader + databaseReaderIndex++ + } + } + + return statsreader.NewProjectReader(databaseReaders, logger), nil +} + +func (r *googleCloudSpannerReceiver) collectData(ctx context.Context) error { + var allMetrics []pdata.Metrics + + for _, projectReader := range r.projectReaders { + allMetrics = append(allMetrics, projectReader.Read(ctx)...) + } + + for _, metric := range allMetrics { + if err := r.nextConsumer.ConsumeMetrics(ctx, metric); err != nil { + // TODO Use obsreport instead to make the component observable and emit metrics on errors + //r.logger.Error("Failed to consume metric(s) because of an error", + // zap.String("metric name", metricName(metric)), zap.Error(err)) + + return err + } + } return nil } + +func (r *googleCloudSpannerReceiver) notifyOnCollectData(err error) { + for _, onCollectData := range r.onCollectData { + onCollectData(err) + } +} + +func metricName(metric pdata.Metrics) string { + var mName string + resourceMetrics := metric.ResourceMetrics() + + for i := 0; i < resourceMetrics.Len(); i++ { + ilm := resourceMetrics.At(i).InstrumentationLibraryMetrics() + + for j := 0; j < ilm.Len(); j++ { + metrics := ilm.At(j).Metrics() + + for k := 0; k < metrics.Len(); k++ { + mName += metrics.At(k).Name() + "," + } + } + } + + if mName != "" { + mName = mName[:len(mName)-1] + } + + return mName +} diff --git a/receiver/googlecloudspannerreceiver/receiver_test.go b/receiver/googlecloudspannerreceiver/receiver_test.go new file mode 100644 index 000000000000..ae23d9326c5c --- /dev/null +++ b/receiver/googlecloudspannerreceiver/receiver_test.go @@ -0,0 +1,331 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package googlecloudspannerreceiver + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/model/pdata" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/statsreader" +) + +const ( + serviceAccountValidPath = "testdata/serviceAccount.json" + serviceAccountInvalidPath = "does not exist" +) + +type mockCompositeReader struct { +} + +func (r mockCompositeReader) Name() string { + return "mockCompositeReader" +} + +func (r mockCompositeReader) Read(_ context.Context) []pdata.Metrics { + md := pdata.NewMetrics() + rms := md.ResourceMetrics() + rm := rms.AppendEmpty() + + ilms := rm.InstrumentationLibraryMetrics() + ilm := ilms.AppendEmpty() + metric := ilm.Metrics().AppendEmpty() + metric.SetName("testMetric") + + return []pdata.Metrics{md} +} + +func (r mockCompositeReader) Shutdown() { + // Do nothing +} + +func TestNewGoogleCloudSpannerReceiver(t *testing.T) { + logger := zap.NewNop() + nextConsumer := consumertest.NewNop() + cfg := createDefaultConfig().(*Config) + + receiver, err := newGoogleCloudSpannerReceiver(logger, cfg, nextConsumer) + receiverCasted := receiver.(*googleCloudSpannerReceiver) + + require.NoError(t, err) + require.NotNil(t, receiver) + + assert.Equal(t, logger, receiverCasted.logger) + assert.Equal(t, nextConsumer, receiverCasted.nextConsumer) + assert.Equal(t, cfg, receiverCasted.config) +} + +func TestNewGoogleCloudSpannerReceiver_NilConsumer(t *testing.T) { + cfg := createDefaultConfig().(*Config) + + metricsReceiver, err := newGoogleCloudSpannerReceiver(zap.NewNop(), cfg, nil) + + require.NotNil(t, err) + require.Nil(t, metricsReceiver) +} + +func createConfig(serviceAccountPath string) *Config { + cfg := createDefaultConfig().(*Config) + + instance := Instance{ + ID: "instanceID", + Databases: []string{"databaseName"}, + } + + project := Project{ + ID: "projectID", + Instances: []Instance{instance}, + ServiceAccountKey: serviceAccountPath, + } + + cfg.Projects = []Project{project} + + return cfg +} + +func TestStart(t *testing.T) { + testCases := map[string]struct { + serviceAccountPath string + expectError bool + }{ + "Happy path": {serviceAccountValidPath, false}, + "With project readers initialization error": {serviceAccountInvalidPath, true}, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + logger := zap.NewNop() + nextConsumer := consumertest.NewNop() + cfg := createConfig(testCase.serviceAccountPath) + + host := componenttest.NewNopHost() + + receiver, err := newGoogleCloudSpannerReceiver(logger, cfg, nextConsumer) + receiverCasted := receiver.(*googleCloudSpannerReceiver) + + require.NoError(t, err) + require.NotNil(t, receiver) + + err = receiverCasted.Start(context.Background(), host) + + if testCase.expectError { + require.Error(t, err) + assert.Equal(t, 0, len(receiverCasted.projectReaders)) + } else { + require.NoError(t, err) + assert.Equal(t, 1, len(receiverCasted.projectReaders)) + } + }) + } +} + +func TestStartInitializesDataCollectionWithCollectDataError(t *testing.T) { + logger := zap.NewNop() + nextConsumer := consumertest.NewErr(errors.New("error")) + cfg := createConfig(serviceAccountValidPath) + cfg.CollectionInterval = 10 * time.Millisecond + host := componenttest.NewNopHost() + receiver, err := newGoogleCloudSpannerReceiver(logger, cfg, nextConsumer) + receiverCasted := receiver.(*googleCloudSpannerReceiver) + + require.NoError(t, err) + require.NotNil(t, receiver) + + errs := make(chan error) + + receiverCasted.onCollectData = append(receiverCasted.onCollectData, func(err error) { + errs <- err + }) + + err = receiverCasted.Start(context.Background(), host) + + assert.NoError(t, <-errs) + + require.NoError(t, err) +} + +func TestStartWithContextDone(t *testing.T) { + logger := zap.NewNop() + nextConsumer := consumertest.NewNop() + cfg := createConfig(serviceAccountValidPath) + ctx := context.Background() + host := componenttest.NewNopHost() + + receiver, err := newGoogleCloudSpannerReceiver(logger, cfg, nextConsumer) + receiverCasted := receiver.(*googleCloudSpannerReceiver) + + require.NoError(t, err) + require.NotNil(t, receiver) + + err = receiverCasted.Start(ctx, host) + + require.NoError(t, err) + receiverCasted.cancel() +} + +func TestInitializeProjectReaders(t *testing.T) { + testCases := map[string]struct { + serviceAccountPath string + expectError bool + replaceMetadataConfig bool + }{ + "Happy path": {serviceAccountValidPath, false, false}, + "With error": {serviceAccountInvalidPath, true, false}, + "With metadata config error": {serviceAccountInvalidPath, true, true}, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + logger := zap.NewNop() + nextConsumer := consumertest.NewNop() + cfg := createConfig(testCase.serviceAccountPath) + + receiver, err := newGoogleCloudSpannerReceiver(logger, cfg, nextConsumer) + receiverCasted := receiver.(*googleCloudSpannerReceiver) + + require.NoError(t, err) + require.NotNil(t, receiver) + + yaml := metadataYaml + + if testCase.replaceMetadataConfig { + metadataYaml = []byte{1} + } + + err = receiverCasted.initializeProjectReaders(context.Background()) + + if testCase.replaceMetadataConfig { + metadataYaml = yaml + } + + if testCase.expectError { + require.Error(t, err) + assert.Equal(t, 0, len(receiverCasted.projectReaders)) + } else { + require.NoError(t, err) + assert.Equal(t, 1, len(receiverCasted.projectReaders)) + } + }) + } +} + +func TestNewProjectReader(t *testing.T) { + testCases := map[string]struct { + serviceAccountPath string + expectError bool + }{ + "Happy path": {serviceAccountValidPath, false}, + "With error": {serviceAccountInvalidPath, true}, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + logger := zap.NewNop() + cfg := createConfig(testCase.serviceAccountPath) + var parsedMetadata []*metadata.MetricsMetadata + + reader, err := newProjectReader(context.Background(), logger, cfg.Projects[0], parsedMetadata, + statsreader.ReaderConfig{}) + + if testCase.expectError { + require.Error(t, err) + assert.Nil(t, reader) + } else { + require.NoError(t, err) + assert.NotNil(t, reader) + } + }) + } +} + +func TestCollectData(t *testing.T) { + logger := zap.NewNop() + + testCases := map[string]struct { + nextConsumer consumer.Metrics + projectReader statsreader.CompositeReader + expectError bool + }{ + "Happy path": {consumertest.NewNop(), statsreader.NewProjectReader([]statsreader.CompositeReader{}, logger), false}, + "With error": {consumertest.NewErr(errors.New("an error")), mockCompositeReader{}, true}, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + cfg := createDefaultConfig().(*Config) + + receiver, err := newGoogleCloudSpannerReceiver(logger, cfg, testCase.nextConsumer) + + require.NoError(t, err) + require.NotNil(t, receiver) + + r := receiver.(*googleCloudSpannerReceiver) + r.projectReaders = []statsreader.CompositeReader{testCase.projectReader} + ctx := context.Background() + + err = r.collectData(ctx) + + if testCase.expectError { + require.NotNil(t, err) + } else { + require.Nil(t, err) + } + }) + } + +} + +func TestGoogleCloudSpannerReceiver_Shutdown(t *testing.T) { + logger := zap.NewNop() + projectReader := statsreader.NewProjectReader([]statsreader.CompositeReader{}, logger) + + receiver := &googleCloudSpannerReceiver{ + projectReaders: []statsreader.CompositeReader{projectReader}, + } + + ctx := context.Background() + ctx, receiver.cancel = context.WithCancel(ctx) + + err := receiver.Shutdown(ctx) + + require.NoError(t, err) +} + +func TestMetricName(t *testing.T) { + md := pdata.NewMetrics() + rms := md.ResourceMetrics() + rm := rms.AppendEmpty() + + ilms := rm.InstrumentationLibraryMetrics() + ilm := ilms.AppendEmpty() + metric := ilm.Metrics().AppendEmpty() + metric.SetName("testMetric1") + metric = ilm.Metrics().AppendEmpty() + metric.SetName("testMetric2") + + metricName := metricName(md) + + assert.Equal(t, "testMetric1,testMetric2", metricName) +}