diff --git a/cmd/rule-evaluator/main.go b/cmd/rule-evaluator/main.go index 1b66265659..08af825d0f 100644 --- a/cmd/rule-evaluator/main.go +++ b/cmd/rule-evaluator/main.go @@ -307,14 +307,12 @@ func main() { } { // Storage Processing. - ctxStorage, cancelStorage := context.WithCancel(ctx) g.Add(func() error { - err = destination.Run(ctxStorage) + err = destination.Run() _ = level.Info(logger).Log("msg", "Background processing of storage stopped") return err }, func(error) { _ = level.Info(logger).Log("msg", "Stopping background storage processing...") - cancelStorage() }) } cwd, err := os.Getwd() diff --git a/pkg/export/export.go b/pkg/export/export.go index 50a309d5a4..b1f7b120d1 100644 --- a/pkg/export/export.go +++ b/pkg/export/export.go @@ -18,14 +18,15 @@ import ( "bytes" "context" "errors" - "flag" "fmt" "math" "os" "os/exec" + "reflect" "runtime/debug" "strings" "sync" + "testing" "time" monitoring "cloud.google.com/go/monitoring/apiv3/v2" @@ -41,6 +42,7 @@ import ( "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/record" + "golang.org/x/oauth2/google" "google.golang.org/api/option" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -107,12 +109,18 @@ var ( "See https://www.cloudinfrastructuremap.com/") ) +type metricServiceClient interface { + Close() error + CreateTimeSeries(context.Context, *monitoring_pb.CreateTimeSeriesRequest, ...gax.CallOption) error +} + // Exporter converts Prometheus samples into Cloud Monitoring samples and exports them. type Exporter struct { logger log.Logger + ctx context.Context opts ExporterOpts - metricClient *monitoring.MetricClient + metricClient metricServiceClient seriesCache *seriesCache shards []*shard @@ -122,7 +130,7 @@ type Exporter struct { // The external labels may be updated asynchronously by configuration changes // and must be locked with mtx. - mtx sync.Mutex + mtx sync.RWMutex externalLabels labels.Labels // A set of metrics for which we defaulted the metadata to untyped and have // issued a warning about that. @@ -132,6 +140,8 @@ type Exporter struct { // It is checked for on each batch provided to the Export method. // If unset, data is always sent. lease Lease + + newMetricClient func(ctx context.Context, opts ExporterOpts) (metricServiceClient, error) } const ( @@ -208,6 +218,7 @@ type ExporterOpts struct { Efficiency EfficiencyOpts } +// Default defaults any unset values. func (opts *ExporterOpts) Default() { if opts.Efficiency.BatchSize == 0 { opts.Efficiency.BatchSize = BatchSizeMax @@ -260,7 +271,7 @@ type EfficiencyOpts struct { ShardBufferSize uint } -// NopExporter returns an inactive exporter. +// NopExporter returns a permanently inactive exporter. func NopExporter() *Exporter { return &Exporter{ opts: ExporterOpts{Disable: true}, @@ -299,7 +310,7 @@ func (alwaysLease) OnLeaderChange(func()) { // We never lose the lease as it's always owned. } -func newMetricClient(ctx context.Context, opts ExporterOpts) (*monitoring.MetricClient, error) { +func defaultNewMetricClient(ctx context.Context, opts ExporterOpts) (metricServiceClient, error) { version, err := Version() if err != nil { return nil, fmt.Errorf("unable to fetch user agent version: %w", err) @@ -316,11 +327,19 @@ func newMetricClient(ctx context.Context, opts ExporterOpts) (*monitoring.Metric if opts.Endpoint != "" { clientOpts = append(clientOpts, option.WithEndpoint(opts.Endpoint)) } - if opts.DisableAuth { + // Disable auth when the exporter is disabled because we don't want a panic when default + // credentials are not found. + if opts.DisableAuth || opts.Disable { clientOpts = append(clientOpts, option.WithoutAuthentication(), option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), ) + } else if opts.CredentialsFile == "" && len(opts.CredentialsFromJSON) == 0 { + // If no credentials are found, gRPC panics so we check manually. + _, err := google.FindDefaultCredentials(ctx) + if err != nil { + return nil, err + } } if opts.CredentialsFile != "" { clientOpts = append(clientOpts, option.WithCredentialsFile(opts.CredentialsFile)) @@ -379,14 +398,11 @@ func New(ctx context.Context, logger log.Logger, reg prometheus.Registerer, opts lease = NopLease() } - metricClient, err := newMetricClient(ctx, opts) - if err != nil { - return nil, fmt.Errorf("create metric client: %w", err) - } e := &Exporter{ logger: logger, + ctx: ctx, opts: opts, - metricClient: metricClient, + newMetricClient: defaultNewMetricClient, nextc: make(chan struct{}, 1), shards: make([]*shard, opts.Efficiency.ShardCount), warnedUntypedMetrics: map[string]struct{}{}, @@ -417,20 +433,38 @@ const ( // ApplyConfig updates the exporter state to the given configuration. // Must be called at least once before Export() can be used. -func (e *Exporter) ApplyConfig(cfg *config.Config) (err error) { +func (e *Exporter) ApplyConfig(cfg *config.Config, opts *ExporterOpts) (err error) { + if e.seriesCache == nil { + return errors.New("nop exporter") + } + optsChanged := false + e.mtx.RLock() + if opts != nil { + opts.Default() + if err := opts.Validate(); err != nil { + e.mtx.RUnlock() + return err + } + optsChanged = !reflect.DeepEqual(e.opts, opts) + } else { + optsCopy := e.opts + opts = &optsCopy + } + e.mtx.RUnlock() + // If project_id, location, or cluster were set through the external_labels in the config file, // these values take precedence. If they are unset, the flag value, which defaults to an // environment-specific value on GCE/GKE, is used. builder := labels.NewBuilder(cfg.GlobalConfig.ExternalLabels) if !cfg.GlobalConfig.ExternalLabels.Has(KeyProjectID) { - builder.Set(KeyProjectID, e.opts.ProjectID) + builder.Set(KeyProjectID, opts.ProjectID) } if !cfg.GlobalConfig.ExternalLabels.Has(KeyLocation) { - builder.Set(KeyLocation, e.opts.Location) + builder.Set(KeyLocation, opts.Location) } if !cfg.GlobalConfig.ExternalLabels.Has(KeyCluster) { - builder.Set(KeyCluster, e.opts.Cluster) + builder.Set(KeyCluster, opts.Cluster) } lset := builder.Labels() @@ -458,10 +492,20 @@ func (e *Exporter) ApplyConfig(cfg *config.Config) (err error) { } } + metricClient := e.metricClient + if optsChanged || metricClient == nil { + metricClient, err = e.newMetricClient(e.ctx, *opts) + if err != nil { + return fmt.Errorf("create metric client: %w", err) + } + } + // New external labels possibly invalidate the cached series conversions. e.mtx.Lock() + e.metricClient = metricClient e.externalLabels = lset e.seriesCache.forceRefresh() + e.opts = *opts e.mtx.Unlock() return nil @@ -471,8 +515,7 @@ func (e *Exporter) ApplyConfig(cfg *config.Config) (err error) { // based on a series ID we got through exported sample records. // Must be called before any call to Export is made. func (e *Exporter) SetLabelsByIDFunc(f func(storage.SeriesRef) labels.Labels) { - // Prevent panics in case a default disabled exporter was instantiated (see Global()). - if e.opts.Disable { + if e.seriesCache == nil { return } if e.seriesCache.getLabelsByRef != nil { @@ -570,18 +613,12 @@ const ( mainModuleName = "github.com/GoogleCloudPlatform/prometheus-engine" ) -// Testing returns true if running within a unit test. -// TODO(TheSpiritXIII): Replace with https://github.com/golang/go/issues/52600 -func Testing() bool { - return flag.Lookup("test.v") != nil -} - // Version is used in the User Agent. This version is automatically detected if // this function is imported as a library. However, the version is statically // set if this function is used in a binary in prometheus-engine due to Golang // restrictions. While testing, the static version is validated for correctness. func Version() (string, error) { - if Testing() { + if testing.Testing() { // TODO(TheSpiritXIII): After https://github.com/golang/go/issues/50603 just return an empty // string here. For now, use the opportunity to confirm that the static version is correct. // We manually get the closest git tag if the user is running the unit test locally, but @@ -657,10 +694,10 @@ func Version() (string, error) { // The per-shard overhead is minimal and thus a high number can be picked, which allows us // to cover a large range of potential throughput and latency combinations without requiring // user configuration or, even worse, runtime changes to the shard number. -func (e *Exporter) Run(ctx context.Context) error { - defer e.metricClient.Close() - go e.seriesCache.run(ctx) - go e.lease.Run(ctx) +func (e *Exporter) Run() error { + defer e.close() + go e.seriesCache.run(e.ctx) + go e.lease.Run(e.ctx) timer := time.NewTimer(batchDelayMax) stopTimer := func() { @@ -673,26 +710,37 @@ func (e *Exporter) Run(ctx context.Context) error { } defer stopTimer() - curBatch := newBatch(e.logger, e.opts.Efficiency.ShardCount, e.opts.Efficiency.BatchSize) + e.mtx.RLock() + opts := e.opts + e.mtx.RUnlock() + + curBatch := newBatch(e.logger, opts.Efficiency.ShardCount, opts.Efficiency.BatchSize) // Send the currently accumulated batch to GCM asynchronously. send := func() { + e.mtx.RLock() + opts := e.opts + sendFunc := e.metricClient.CreateTimeSeries + e.mtx.RUnlock() + // Send the batch and once it completed, trigger next to process remaining data in the // shards that were part of the batch. This ensures that if we didn't take all samples // from a shard when filling the batch, we'll come back for them and any queue built-up // gets sent eventually. go func(ctx context.Context, b *batch) { - b.send(ctx, e.metricClient.CreateTimeSeries) + if !opts.Disable { + b.send(ctx, sendFunc) + } // We could only trigger if we didn't fully empty shards in this batch. // Benchmarking showed no beneficial impact of this optimization. e.triggerNext() - }(ctx, curBatch) + }(e.ctx, curBatch) // Reset state for new batch. stopTimer() timer.Reset(batchDelayMax) - curBatch = newBatch(e.logger, e.opts.Efficiency.ShardCount, e.opts.Efficiency.BatchSize) + curBatch = newBatch(e.logger, opts.Efficiency.ShardCount, opts.Efficiency.BatchSize) } for { @@ -701,7 +749,7 @@ func (e *Exporter) Run(ctx context.Context) error { // buffered data. In-flight requests will be aborted as well. // This is fine once we persist data submitted via Export() but for now there may be some // data loss on shutdown. - case <-ctx.Done(): + case <-e.ctx.Done(): return nil // This is activated for each new sample that arrives case <-e.nextc: @@ -733,6 +781,15 @@ func (e *Exporter) Run(ctx context.Context) error { } } +func (e *Exporter) close() { + e.mtx.Lock() + if err := e.metricClient.Close(); err != nil { + _ = e.logger.Log("msg", "error closing metric client", "err", err) + } + e.metricClient = nil + e.mtx.Unlock() +} + // CtxKey is a dedicated type for keys of context-embedded values propagated // with the scrape context. type ctxKey int diff --git a/pkg/export/export_test.go b/pkg/export/export_test.go index 51612740ef..84f794b44b 100644 --- a/pkg/export/export_test.go +++ b/pkg/export/export_test.go @@ -16,29 +16,25 @@ package export import ( "context" + "errors" "fmt" - "net" "os" "sync" "testing" "time" - monitoring "cloud.google.com/go/monitoring/apiv3/v2" monitoring_pb "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" "github.com/go-kit/log" "github.com/google/go-cmp/cmp" gax "github.com/googleapis/gax-go/v2" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/textparse" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/record" - "google.golang.org/api/option" monitoredres_pb "google.golang.org/genproto/googleapis/api/monitoredres" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/test/bufconn" - empty_pb "google.golang.org/protobuf/types/known/emptypb" timestamp_pb "google.golang.org/protobuf/types/known/timestamppb" + "k8s.io/apimachinery/pkg/util/wait" ) func TestBatchAdd(t *testing.T) { @@ -332,43 +328,24 @@ type testMetricService struct { samples []*monitoring_pb.TimeSeries } -func (srv *testMetricService) CreateTimeSeries(_ context.Context, req *monitoring_pb.CreateTimeSeriesRequest) (*empty_pb.Empty, error) { +func (srv *testMetricService) CreateTimeSeries(_ context.Context, req *monitoring_pb.CreateTimeSeriesRequest, _ ...gax.CallOption) error { srv.samples = append(srv.samples, req.TimeSeries...) - return &empty_pb.Empty{}, nil + return nil } -func TestExporter_drainBacklog(t *testing.T) { - var ( - srv = grpc.NewServer() - listener = bufconn.Listen(1e6) - metricServer = &testMetricService{} - ) - monitoring_pb.RegisterMetricServiceServer(srv, metricServer) - - //nolint:errcheck - go srv.Serve(listener) - defer srv.Stop() - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() +func (srv *testMetricService) Close() error { + return nil +} - bufDialer := func(context.Context, string) (net.Conn, error) { - return listener.Dial() - } - metricClient, err := monitoring.NewMetricClient(ctx, - option.WithoutAuthentication(), - option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), - option.WithGRPCDialOption(grpc.WithContextDialer(bufDialer)), - ) - if err != nil { - t.Fatalf("Creating metric client failed: %s", err) - } +func TestExporter_drainBacklog(t *testing.T) { + ctx := context.Background() e, err := New(ctx, log.NewJSONLogger(log.NewSyncWriter(os.Stderr)), nil, ExporterOpts{DisableAuth: true}, NopLease()) if err != nil { t.Fatalf("Creating Exporter failed: %s", err) } - e.metricClient = metricClient + metricServer := testMetricService{} + e.metricClient = &metricServer e.SetLabelsByIDFunc(func(storage.SeriesRef) labels.Labels { return labels.FromStrings("project_id", "test", "location", "test") @@ -382,14 +359,89 @@ func TestExporter_drainBacklog(t *testing.T) { } //nolint:errcheck - go e.Run(ctx) + go e.Run() // As our samples are all for the same series, each batch can only contain a single sample. // The exporter waits for the batch delay duration before sending it. // We sleep for an appropriate multiple of it to allow it to drain the shard. - time.Sleep(55 * batchDelayMax) + ctxTimeout, cancel := context.WithTimeout(ctx, 60*batchDelayMax) + defer cancel() + + pollErr := wait.PollUntilContextCancel(ctxTimeout, batchDelayMax, false, func(_ context.Context) (bool, error) { + // Check that we received all samples that went in. + if got, want := len(metricServer.samples), 50; got != want { + err = fmt.Errorf("got %d, want %d", got, want) + return false, nil + } + return true, nil + }) + if pollErr != nil { + if wait.Interrupted(pollErr) && err != nil { + pollErr = err + } + t.Fatalf("did not get samples: %s", pollErr) + } +} + +func TestApplyConfig(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + + e, err := New(ctx, log.NewJSONLogger(log.NewSyncWriter(os.Stderr)), nil, ExporterOpts{DisableAuth: true}, NopLease()) + if err != nil { + t.Fatalf("Create exporter: %s", err) + } + e.SetLabelsByIDFunc(func(storage.SeriesRef) labels.Labels { + return labels.FromStrings("location", "us-central1-c") + }) + + metricServer := testMetricService{} + e.newMetricClient = func(_ context.Context, _ ExporterOpts) (metricServiceClient, error) { + return &metricServer, nil + } + sendAndTestSample := func(expectedProjectID string) { + e.Export(nil, []record.RefSample{{Ref: 1, T: int64(0), V: float64(0)}}, nil) + + var err error + pollErr := wait.PollUntilContextCancel(ctx, batchDelayMax, false, func(_ context.Context) (bool, error) { + if len(metricServer.samples) == 0 { + err = errors.New("no samples sent") + return false, nil + } + + sample := metricServer.samples[len(metricServer.samples)-1] + projectID := sample.Resource.Labels[KeyProjectID] + if projectID != expectedProjectID { + err = fmt.Errorf("expected project ID %q but got %q", expectedProjectID, projectID) + return false, nil + } + + return true, nil + }) + if pollErr != nil { + if wait.Interrupted(pollErr) && err != nil { + pollErr = err + } + t.Fatalf("did not get samples: %s", pollErr) + } + } + + if err := e.ApplyConfig(&config.Config{}, &ExporterOpts{ProjectID: "test"}); err != nil { + t.Fatalf("Initial apply: %s", err) + } + go func() { + if err := e.Run(); err != nil { + t.Errorf("Run exporter: %s", err) + } + }() + sendAndTestSample("test") + + if err := e.ApplyConfig(&config.Config{}, &ExporterOpts{ProjectID: "abc"}); err != nil { + t.Fatalf("Initial apply: %s", err) + } + sendAndTestSample("abc") - // Check that we received all samples that went in. - if got, want := len(metricServer.samples), 50; got != want { - t.Fatalf("got %d, want %d", got, want) + if err := e.ApplyConfig(&config.Config{}, &ExporterOpts{ProjectID: "xyz"}); err != nil { + t.Fatalf("Initial apply: %s", err) } + sendAndTestSample("xyz") } diff --git a/pkg/export/gcm/promtest/local_export.go b/pkg/export/gcm/promtest/local_export.go index da9a997b29..322cad4024 100644 --- a/pkg/export/gcm/promtest/local_export.go +++ b/pkg/export/gcm/promtest/local_export.go @@ -38,10 +38,10 @@ import ( "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/record" "golang.org/x/oauth2" + "sigs.k8s.io/controller-runtime/pkg/manager/signals" "github.com/GoogleCloudPlatform/prometheus-engine/pkg/export" "github.com/go-kit/log" - "github.com/prometheus/client_golang/prometheus" "golang.org/x/oauth2/google" ) @@ -74,7 +74,9 @@ func (l *localExportWithGCM) Ref() string { return "export-pkg-with-gcm" } func (l *localExportWithGCM) start(t testing.TB, _ e2e.Environment) (v1.API, map[string]string) { t.Helper() - ctx := context.Background() + ctx, cancel := context.WithCancel(signals.SetupSignalHandler()) + t.Cleanup(cancel) + creds, err := google.CredentialsFromJSON(ctx, l.gcmSA, gcm.DefaultAuthScopes()...) if err != nil { t.Fatalf("create credentials from JSON: %s", err) @@ -100,24 +102,24 @@ func (l *localExportWithGCM) start(t testing.TB, _ e2e.Environment) (v1.API, map ProjectID: creds.ProjectID, CredentialsFromJSON: l.gcmSA, } - l.e, err = export.New(ctx, log.NewJSONLogger(os.Stderr), prometheus.NewRegistry(), exporterOpts, export.NopLease()) + l.e, err = export.New(ctx, log.NewJSONLogger(os.Stderr), nil, exporterOpts, export.NopLease()) if err != nil { t.Fatalf("create exporter: %v", err) } // Apply empty config, so resources labels are attached. - if err := l.e.ApplyConfig(&config.DefaultConfig); err != nil { + if err := l.e.ApplyConfig(&config.DefaultConfig, nil); err != nil { t.Fatalf("apply config: %v", err) } l.e.SetLabelsByIDFunc(func(ref storage.SeriesRef) labels.Labels { return l.labelsByRef[ref] }) - cancelableCtx, cancel := context.WithCancel(ctx) - //nolint:errcheck - go l.e.Run(cancelableCtx) - // TODO(bwplotka): Consider listening for KILL signal too. - t.Cleanup(cancel) + go func() { + if err := l.e.Run(); err != nil { + t.Logf("running exporter: %s", err) + } + }() return v1.NewAPI(cl), map[string]string{ "cluster": cluster, diff --git a/pkg/export/setup/setup.go b/pkg/export/setup/setup.go index 6ba4679482..069aa517e2 100644 --- a/pkg/export/setup/setup.go +++ b/pkg/export/setup/setup.go @@ -22,6 +22,7 @@ import ( "os" "strconv" "strings" + "testing" "time" "cloud.google.com/go/compute/metadata" @@ -82,9 +83,14 @@ func SetGlobal(exporter *export.Exporter) (err error) { // Global returns the global instance of the GCM exporter. func Global() *export.Exporter { if globalExporter == nil { - // This should usually be a panic but we set an inactive default exporter in this case - // to not break existing tests in Prometheus. + if !testing.Testing() { + panic("must set a global exporter") + } + fmt.Fprintln(os.Stderr, "No global GCM exporter was set, setting default inactive exporter.") + + // We don't want to change all upstream Prometheus unit tests, so let's just create a + // disabled exporter. These are created on-demand to prevent race conditions between tests. return export.NopExporter() } return globalExporter diff --git a/pkg/export/storage.go b/pkg/export/storage.go index 3f6634e6fd..50f8c96aa3 100644 --- a/pkg/export/storage.go +++ b/pkg/export/storage.go @@ -55,12 +55,12 @@ func NewStorage(exporter *Exporter) *Storage { // ApplyConfig applies the new configuration to the storage. func (s *Storage) ApplyConfig(cfg *config.Config) error { - return s.exporter.ApplyConfig(cfg) + return s.exporter.ApplyConfig(cfg, nil) } // Run background processing of the storage. -func (s *Storage) Run(ctx context.Context) error { - return s.exporter.Run(ctx) +func (s *Storage) Run() error { + return s.exporter.Run() } func (s *Storage) labelsByID(id storage.SeriesRef) labels.Labels {