Skip to content

Commit

Permalink
feat: allow updating export options at runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
TheSpiritXIII committed Jun 7, 2024
1 parent 034c8c7 commit 17fc0fd
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 42 deletions.
4 changes: 1 addition & 3 deletions cmd/rule-evaluator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,14 +299,12 @@ func main() {
}
{
// Storage Processing.
ctxStorage, cancelStorage := context.WithCancel(ctx)
g.Add(func() error {
err = destination.Run(ctxStorage)
err = destination.Run()
_ = level.Info(logger).Log("msg", "Background processing of storage stopped")
return err
}, func(error) {
_ = level.Info(logger).Log("msg", "Stopping background storage processing...")
cancelStorage()
})
}
cwd, err := os.Getwd()
Expand Down
105 changes: 79 additions & 26 deletions pkg/export/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ import (
"bytes"
"context"
"errors"
"flag"
"fmt"
"math"
"os"
"os/exec"
"reflect"
"runtime/debug"
"strings"
"sync"
"testing"
"time"

monitoring "cloud.google.com/go/monitoring/apiv3/v2"
Expand All @@ -41,6 +42,7 @@ import (
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/record"
"golang.org/x/oauth2/google"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -110,6 +112,7 @@ var (
// Exporter converts Prometheus samples into Cloud Monitoring samples and exports them.
type Exporter struct {
logger log.Logger
ctx context.Context
opts ExporterOpts

metricClient *monitoring.MetricClient
Expand All @@ -122,7 +125,7 @@ type Exporter struct {

// The external labels may be updated asynchronously by configuration changes
// and must be locked with mtx.
mtx sync.Mutex
mtx sync.RWMutex
externalLabels labels.Labels
// A set of metrics for which we defaulted the metadata to untyped and have
// issued a warning about that.
Expand Down Expand Up @@ -208,6 +211,7 @@ type ExporterOpts struct {
Efficiency EfficiencyOpts
}

// Default defaults any unset values.
func (opts *ExporterOpts) Default() {
if opts.Efficiency.BatchSize == 0 {
opts.Efficiency.BatchSize = BatchSizeMax
Expand Down Expand Up @@ -260,7 +264,7 @@ type EfficiencyOpts struct {
ShardBufferSize uint
}

// NopExporter returns an inactive exporter.
// NopExporter returns a permanently inactive exporter.
func NopExporter() *Exporter {
return &Exporter{
opts: ExporterOpts{Disable: true},
Expand Down Expand Up @@ -316,11 +320,18 @@ func newMetricClient(ctx context.Context, opts ExporterOpts) (*monitoring.Metric
if opts.Endpoint != "" {
clientOpts = append(clientOpts, option.WithEndpoint(opts.Endpoint))
}
if opts.DisableAuth {
// Disable auth when the exporter is disabled because we don't care about.
if opts.DisableAuth || opts.Disable {
clientOpts = append(clientOpts,
option.WithoutAuthentication(),
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
)
} else {
// If no credentials are found, gRPC panics so let's do it earlier.
_, err := google.FindDefaultCredentials(ctx)
if err != nil {
return nil, err
}
}
if opts.CredentialsFile != "" {
clientOpts = append(clientOpts, option.WithCredentialsFile(opts.CredentialsFile))
Expand Down Expand Up @@ -385,6 +396,7 @@ func New(ctx context.Context, logger log.Logger, reg prometheus.Registerer, opts
}
e := &Exporter{
logger: logger,
ctx: ctx,
opts: opts,
metricClient: metricClient,
nextc: make(chan struct{}, 1),
Expand Down Expand Up @@ -417,20 +429,38 @@ const (

// ApplyConfig updates the exporter state to the given configuration.
// Must be called at least once before Export() can be used.
func (e *Exporter) ApplyConfig(cfg *config.Config) (err error) {
func (e *Exporter) ApplyConfig(cfg *config.Config, opts *ExporterOpts) (err error) {
if e.seriesCache == nil {
return errors.New("nop exporter")
}
optsChanged := false
e.mtx.RLock()
if opts != nil {
opts.Default()
if err := opts.Validate(); err != nil {
e.mtx.RUnlock()
return err
}
optsChanged = !reflect.DeepEqual(e.opts, opts)
} else {
optsCopy := e.opts
opts = &optsCopy
}
e.mtx.RUnlock()

// If project_id, location, or cluster were set through the external_labels in the config file,
// these values take precedence. If they are unset, the flag value, which defaults to an
// environment-specific value on GCE/GKE, is used.
builder := labels.NewBuilder(cfg.GlobalConfig.ExternalLabels)

if !cfg.GlobalConfig.ExternalLabels.Has(KeyProjectID) {
builder.Set(KeyProjectID, e.opts.ProjectID)
builder.Set(KeyProjectID, opts.ProjectID)
}
if !cfg.GlobalConfig.ExternalLabels.Has(KeyLocation) {
builder.Set(KeyLocation, e.opts.Location)
builder.Set(KeyLocation, opts.Location)
}
if !cfg.GlobalConfig.ExternalLabels.Has(KeyCluster) {
builder.Set(KeyCluster, e.opts.Cluster)
builder.Set(KeyCluster, opts.Cluster)
}
lset := builder.Labels()

Expand Down Expand Up @@ -458,10 +488,20 @@ func (e *Exporter) ApplyConfig(cfg *config.Config) (err error) {
}
}

metricClient := e.metricClient
if optsChanged {
metricClient, err = newMetricClient(e.ctx, *opts)
if err != nil {
return fmt.Errorf("create metric client: %w", err)
}
}

// New external labels possibly invalidate the cached series conversions.
e.mtx.Lock()
e.metricClient = metricClient
e.externalLabels = lset
e.seriesCache.forceRefresh()
e.opts = *opts
e.mtx.Unlock()

return nil
Expand All @@ -471,8 +511,7 @@ func (e *Exporter) ApplyConfig(cfg *config.Config) (err error) {
// based on a series ID we got through exported sample records.
// Must be called before any call to Export is made.
func (e *Exporter) SetLabelsByIDFunc(f func(storage.SeriesRef) labels.Labels) {
// Prevent panics in case a default disabled exporter was instantiated (see Global()).
if e.opts.Disable {
if e.seriesCache == nil {
return
}
if e.seriesCache.getLabelsByRef != nil {
Expand Down Expand Up @@ -570,18 +609,12 @@ const (
mainModuleName = "github.com/GoogleCloudPlatform/prometheus-engine"
)

// Testing returns true if running within a unit test.
// TODO(TheSpiritXIII): Replace with https://github.com/golang/go/issues/52600
func Testing() bool {
return flag.Lookup("test.v") != nil
}

// Version is used in the User Agent. This version is automatically detected if
// this function is imported as a library. However, the version is statically
// set if this function is used in a binary in prometheus-engine due to Golang
// restrictions. While testing, the static version is validated for correctness.
func Version() (string, error) {
if Testing() {
if testing.Testing() {
// TODO(TheSpiritXIII): After https://github.com/golang/go/issues/50603 just return an empty
// string here. For now, use the opportunity to confirm that the static version is correct.
// We manually get the closest git tag if the user is running the unit test locally, but
Expand Down Expand Up @@ -657,10 +690,10 @@ func Version() (string, error) {
// The per-shard overhead is minimal and thus a high number can be picked, which allows us
// to cover a large range of potential throughput and latency combinations without requiring
// user configuration or, even worse, runtime changes to the shard number.
func (e *Exporter) Run(ctx context.Context) error {
defer e.metricClient.Close()
go e.seriesCache.run(ctx)
go e.lease.Run(ctx)
func (e *Exporter) Run() error {
defer e.Close()
go e.seriesCache.run(e.ctx)
go e.lease.Run(e.ctx)

timer := time.NewTimer(batchDelayMax)
stopTimer := func() {
Expand All @@ -673,26 +706,37 @@ func (e *Exporter) Run(ctx context.Context) error {
}
defer stopTimer()

curBatch := newBatch(e.logger, e.opts.Efficiency.ShardCount, e.opts.Efficiency.BatchSize)
e.mtx.RLock()
opts := e.opts
e.mtx.RUnlock()

curBatch := newBatch(e.logger, opts.Efficiency.ShardCount, opts.Efficiency.BatchSize)

// Send the currently accumulated batch to GCM asynchronously.
send := func() {
e.mtx.RLock()
opts := e.opts
sendFunc := e.metricClient.CreateTimeSeries
e.mtx.RUnlock()

// Send the batch and once it completed, trigger next to process remaining data in the
// shards that were part of the batch. This ensures that if we didn't take all samples
// from a shard when filling the batch, we'll come back for them and any queue built-up
// gets sent eventually.
go func(ctx context.Context, b *batch) {
b.send(ctx, e.metricClient.CreateTimeSeries)
if !opts.Disable {
b.send(ctx, sendFunc)
}
// We could only trigger if we didn't fully empty shards in this batch.
// Benchmarking showed no beneficial impact of this optimization.
e.triggerNext()
}(ctx, curBatch)
}(e.ctx, curBatch)

// Reset state for new batch.
stopTimer()
timer.Reset(batchDelayMax)

curBatch = newBatch(e.logger, e.opts.Efficiency.ShardCount, e.opts.Efficiency.BatchSize)
curBatch = newBatch(e.logger, opts.Efficiency.ShardCount, opts.Efficiency.BatchSize)
}

for {
Expand All @@ -701,7 +745,7 @@ func (e *Exporter) Run(ctx context.Context) error {
// buffered data. In-flight requests will be aborted as well.
// This is fine once we persist data submitted via Export() but for now there may be some
// data loss on shutdown.
case <-ctx.Done():
case <-e.ctx.Done():
return nil
// This is activated for each new sample that arrives
case <-e.nextc:
Expand Down Expand Up @@ -733,6 +777,15 @@ func (e *Exporter) Run(ctx context.Context) error {
}
}

func (e *Exporter) Close() {
e.mtx.Lock()
if err := e.metricClient.Close(); err != nil {
_ = e.logger.Log("msg", "error closing metric client", "err", err)
}
e.metricClient = nil
e.mtx.Unlock()
}

// CtxKey is a dedicated type for keys of context-embedded values propagated
// with the scrape context.
type ctxKey int
Expand Down
2 changes: 1 addition & 1 deletion pkg/export/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func TestExporter_drainBacklog(t *testing.T) {
}

//nolint:errcheck
go e.Run(ctx)
go e.Run()
// As our samples are all for the same series, each batch can only contain a single sample.
// The exporter waits for the batch delay duration before sending it.
// We sleep for an appropriate multiple of it to allow it to drain the shard.
Expand Down
17 changes: 10 additions & 7 deletions pkg/export/gcm/promtest/local_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
"golang.org/x/oauth2"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"

"github.com/GoogleCloudPlatform/prometheus-engine/pkg/export"
"github.com/go-kit/log"
Expand Down Expand Up @@ -74,7 +75,9 @@ func (l *localExportWithGCM) Ref() string { return "export-pkg-with-gcm" }
func (l *localExportWithGCM) start(t testing.TB, _ e2e.Environment) (v1.API, map[string]string) {
t.Helper()

ctx := context.Background()
ctx, cancel := context.WithCancel(signals.SetupSignalHandler())
t.Cleanup(cancel)

creds, err := google.CredentialsFromJSON(ctx, l.gcmSA, gcm.DefaultAuthScopes()...)
if err != nil {
t.Fatalf("create credentials from JSON: %s", err)
Expand Down Expand Up @@ -106,18 +109,18 @@ func (l *localExportWithGCM) start(t testing.TB, _ e2e.Environment) (v1.API, map
}

// Apply empty config, so resources labels are attached.
if err := l.e.ApplyConfig(&config.DefaultConfig); err != nil {
if err := l.e.ApplyConfig(&config.DefaultConfig, nil); err != nil {
t.Fatalf("apply config: %v", err)
}
l.e.SetLabelsByIDFunc(func(ref storage.SeriesRef) labels.Labels {
return l.labelsByRef[ref]
})

cancelableCtx, cancel := context.WithCancel(ctx)
//nolint:errcheck
go l.e.Run(cancelableCtx)
// TODO(bwplotka): Consider listening for KILL signal too.
t.Cleanup(cancel)
go func() {
if err := l.e.Run(); err != nil {
t.Logf("running exporter: %s", err)
}
}()

return v1.NewAPI(cl), map[string]string{
"cluster": cluster,
Expand Down
10 changes: 8 additions & 2 deletions pkg/export/setup/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"os"
"strconv"
"testing"

"cloud.google.com/go/compute/metadata"
"github.com/GoogleCloudPlatform/prometheus-engine/pkg/export"
Expand Down Expand Up @@ -78,9 +79,14 @@ func SetGlobal(exporter *export.Exporter) (err error) {
// Global returns the global instance of the GCM exporter.
func Global() *export.Exporter {
if globalExporter == nil {
// This should usually be a panic but we set an inactive default exporter in this case
// to not break existing tests in Prometheus.
if !testing.Testing() {
panic("must set a global exporter")
}

fmt.Fprintln(os.Stderr, "No global GCM exporter was set, setting default inactive exporter.")

// We don't want to change all upstream Prometheus unit tests, so let's just create a
// disabled exporter. These are created on-demand to prevent race conditions between tests.
return export.NopExporter()
}
return globalExporter
Expand Down
6 changes: 3 additions & 3 deletions pkg/export/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ func NewStorage(exporter *Exporter) *Storage {

// ApplyConfig applies the new configuration to the storage.
func (s *Storage) ApplyConfig(cfg *config.Config) error {
return s.exporter.ApplyConfig(cfg)
return s.exporter.ApplyConfig(cfg, nil)
}

// Run background processing of the storage.
func (s *Storage) Run(ctx context.Context) error {
return s.exporter.Run(ctx)
func (s *Storage) Run() error {
return s.exporter.Run()
}

func (s *Storage) labelsByID(id storage.SeriesRef) labels.Labels {
Expand Down

0 comments on commit 17fc0fd

Please sign in to comment.