Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added implementation of Google Cloud Spanner receiver - 4th part(actual receiver implementation). #5727

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 141 additions & 10 deletions receiver/googlecloudspannerreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,45 @@ package googlecloudspannerreceiver

import (
"context"
_ "embed"
"fmt"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/model/pdata"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/datasource"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/metadataparser"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/statsreader"
)

//go:embed "internal/metadataconfig/metadata.yaml"
var metadataYaml []byte
ydrozhdzhal marked this conversation as resolved.
Show resolved Hide resolved

var _ component.MetricsReceiver = (*googleCloudSpannerReceiver)(nil)

type googleCloudSpannerReceiver struct {
logger *zap.Logger
nextConsumer consumer.Metrics
config *Config
cancel context.CancelFunc
logger *zap.Logger
nextConsumer consumer.Metrics
config *Config
cancel context.CancelFunc
projectReaders []statsreader.CompositeReader
onCollectData []func(error)
}

func newGoogleCloudSpannerReceiver(
logger *zap.Logger,
config *Config,
nextConsumer consumer.Metrics) (component.MetricsReceiver, error) {

if nextConsumer == nil {
return nil, componenterror.ErrNilNextConsumer
}

r := &googleCloudSpannerReceiver{
logger: logger,
nextConsumer: nextConsumer,
Expand All @@ -45,17 +63,22 @@ func newGoogleCloudSpannerReceiver(
return r, nil
}

func (gcsReceiver *googleCloudSpannerReceiver) Start(ctx context.Context, host component.Host) error {
ctx, gcsReceiver.cancel = context.WithCancel(ctx)
func (r *googleCloudSpannerReceiver) Start(ctx context.Context, _ component.Host) error {
ctx, r.cancel = context.WithCancel(ctx)
err := r.initializeProjectReaders(ctx)
if err != nil {
return err
}

go func() {
ticker := time.NewTicker(gcsReceiver.config.CollectionInterval)
ticker := time.NewTicker(r.config.CollectionInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
// TODO Collect data here
// Ignoring this error because it has been already logged inside collectData
r.notifyOnCollectData(r.collectData(ctx))
case <-ctx.Done():
return
}
Expand All @@ -65,8 +88,116 @@ func (gcsReceiver *googleCloudSpannerReceiver) Start(ctx context.Context, host c
return nil
}

func (gcsReceiver *googleCloudSpannerReceiver) Shutdown(context.Context) error {
gcsReceiver.cancel()
func (r *googleCloudSpannerReceiver) Shutdown(context.Context) error {
for _, projectReader := range r.projectReaders {
projectReader.Shutdown()
}

r.cancel()

return nil
}

func (r *googleCloudSpannerReceiver) initializeProjectReaders(ctx context.Context) error {
readerConfig := statsreader.ReaderConfig{
BackfillEnabled: r.config.BackfillEnabled,
TopMetricsQueryMaxRows: r.config.TopMetricsQueryMaxRows,
}

parseMetadata, err := metadataparser.ParseMetadataConfig(metadataYaml)
if err != nil {
return fmt.Errorf("error occurred during parsing of metadata: %w", err)
}

for _, project := range r.config.Projects {
projectReader, err := newProjectReader(ctx, r.logger, project, parseMetadata, readerConfig)
if err != nil {
return err
}

r.projectReaders = append(r.projectReaders, projectReader)
}

return nil
}

func newProjectReader(ctx context.Context, logger *zap.Logger, project Project, parsedMetadata []*metadata.MetricsMetadata,
readerConfig statsreader.ReaderConfig) (*statsreader.ProjectReader, error) {
logger.Debug("Constructing project reader for project", zap.String("project id", project.ID))

databaseReadersCount := 0
for _, instance := range project.Instances {
databaseReadersCount += len(instance.Databases)
}

databaseReaders := make([]statsreader.CompositeReader, databaseReadersCount)
databaseReaderIndex := 0
for _, instance := range project.Instances {
for _, database := range instance.Databases {
logger.Debug("Constructing database reader for combination of project, instance, database",
zap.String("project id", project.ID), zap.String("instance id", instance.ID), zap.String("database", database))

databaseID := datasource.NewDatabaseID(project.ID, instance.ID, database)

databaseReader, err := statsreader.NewDatabaseReader(ctx, parsedMetadata, databaseID,
project.ServiceAccountKey, readerConfig, logger)
if err != nil {
return nil, err
}

databaseReaders[databaseReaderIndex] = databaseReader
databaseReaderIndex++
}
}

return statsreader.NewProjectReader(databaseReaders, logger), nil
}

func (r *googleCloudSpannerReceiver) collectData(ctx context.Context) error {
var allMetrics []pdata.Metrics
ydrozhdzhal marked this conversation as resolved.
Show resolved Hide resolved

for _, projectReader := range r.projectReaders {
allMetrics = append(allMetrics, projectReader.Read(ctx)...)
}

for _, metric := range allMetrics {
if err := r.nextConsumer.ConsumeMetrics(ctx, metric); err != nil {
// TODO Use obsreport instead to make the component observable and emit metrics on errors
//r.logger.Error("Failed to consume metric(s) because of an error",
// zap.String("metric name", metricName(metric)), zap.Error(err))

return err
}
}

return nil
}

func (r *googleCloudSpannerReceiver) notifyOnCollectData(err error) {
for _, onCollectData := range r.onCollectData {
onCollectData(err)
}
}

func metricName(metric pdata.Metrics) string {
var mName string
resourceMetrics := metric.ResourceMetrics()

for i := 0; i < resourceMetrics.Len(); i++ {
ilm := resourceMetrics.At(i).InstrumentationLibraryMetrics()

for j := 0; j < ilm.Len(); j++ {
metrics := ilm.At(j).Metrics()

for k := 0; k < metrics.Len(); k++ {
mName += metrics.At(k).Name() + ","
}
}
}

if mName != "" {
mName = mName[:len(mName)-1]
}

return mName
}
Loading