Skip to content

Commit

Permalink
feat: allow updating export options at runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
TheSpiritXIII committed Jun 18, 2024
1 parent 7fe3d0b commit ffd0c0c
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 90 deletions.
4 changes: 1 addition & 3 deletions cmd/rule-evaluator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,12 @@ func main() {
}
{
// Storage Processing.
ctxStorage, cancelStorage := context.WithCancel(ctx)
g.Add(func() error {
err = destination.Run(ctxStorage)
err = destination.Run()
_ = level.Info(logger).Log("msg", "Background processing of storage stopped")
return err
}, func(error) {
_ = level.Info(logger).Log("msg", "Stopping background storage processing...")
cancelStorage()
})
}
cwd, err := os.Getwd()
Expand Down
123 changes: 90 additions & 33 deletions pkg/export/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ import (
"bytes"
"context"
"errors"
"flag"
"fmt"
"math"
"os"
"os/exec"
"reflect"
"runtime/debug"
"strings"
"sync"
"testing"
"time"

monitoring "cloud.google.com/go/monitoring/apiv3/v2"
Expand All @@ -41,6 +42,7 @@ import (
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/record"
"golang.org/x/oauth2/google"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -107,12 +109,18 @@ var (
"See https://www.cloudinfrastructuremap.com/")
)

type metricServiceClient interface {
Close() error
CreateTimeSeries(context.Context, *monitoring_pb.CreateTimeSeriesRequest, ...gax.CallOption) error
}

// Exporter converts Prometheus samples into Cloud Monitoring samples and exports them.
type Exporter struct {
logger log.Logger
ctx context.Context
opts ExporterOpts

metricClient *monitoring.MetricClient
metricClient metricServiceClient
seriesCache *seriesCache
shards []*shard

Expand All @@ -122,7 +130,7 @@ type Exporter struct {

// The external labels may be updated asynchronously by configuration changes
// and must be locked with mtx.
mtx sync.Mutex
mtx sync.RWMutex
externalLabels labels.Labels
// A set of metrics for which we defaulted the metadata to untyped and have
// issued a warning about that.
Expand All @@ -132,6 +140,8 @@ type Exporter struct {
// It is checked for on each batch provided to the Export method.
// If unset, data is always sent.
lease Lease

newMetricClient func(ctx context.Context, opts ExporterOpts) (metricServiceClient, error)
}

const (
Expand Down Expand Up @@ -208,6 +218,7 @@ type ExporterOpts struct {
Efficiency EfficiencyOpts
}

// Default defaults any unset values.
func (opts *ExporterOpts) Default() {
if opts.Efficiency.BatchSize == 0 {
opts.Efficiency.BatchSize = BatchSizeMax
Expand Down Expand Up @@ -260,7 +271,7 @@ type EfficiencyOpts struct {
ShardBufferSize uint
}

// NopExporter returns an inactive exporter.
// NopExporter returns a permanently inactive exporter.
func NopExporter() *Exporter {
return &Exporter{
opts: ExporterOpts{Disable: true},
Expand Down Expand Up @@ -299,7 +310,7 @@ func (alwaysLease) OnLeaderChange(func()) {
// We never lose the lease as it's always owned.
}

func newMetricClient(ctx context.Context, opts ExporterOpts) (*monitoring.MetricClient, error) {
func defaultNewMetricClient(ctx context.Context, opts ExporterOpts) (metricServiceClient, error) {
version, err := Version()
if err != nil {
return nil, fmt.Errorf("unable to fetch user agent version: %w", err)
Expand All @@ -316,11 +327,19 @@ func newMetricClient(ctx context.Context, opts ExporterOpts) (*monitoring.Metric
if opts.Endpoint != "" {
clientOpts = append(clientOpts, option.WithEndpoint(opts.Endpoint))
}
if opts.DisableAuth {
// Disable auth when the exporter is disabled because we don't want a panic when default
// credentials are not found.
if opts.DisableAuth || opts.Disable {
clientOpts = append(clientOpts,
option.WithoutAuthentication(),
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
)
} else if opts.CredentialsFile == "" && len(opts.CredentialsFromJSON) == 0 {
// If no credentials are found, gRPC panics so we check manually.
_, err := google.FindDefaultCredentials(ctx)
if err != nil {
return nil, err
}
}
if opts.CredentialsFile != "" {
clientOpts = append(clientOpts, option.WithCredentialsFile(opts.CredentialsFile))
Expand Down Expand Up @@ -379,14 +398,11 @@ func New(ctx context.Context, logger log.Logger, reg prometheus.Registerer, opts
lease = NopLease()
}

metricClient, err := newMetricClient(ctx, opts)
if err != nil {
return nil, fmt.Errorf("create metric client: %w", err)
}
e := &Exporter{
logger: logger,
ctx: ctx,
opts: opts,
metricClient: metricClient,
newMetricClient: defaultNewMetricClient,
nextc: make(chan struct{}, 1),
shards: make([]*shard, opts.Efficiency.ShardCount),
warnedUntypedMetrics: map[string]struct{}{},
Expand Down Expand Up @@ -417,20 +433,38 @@ const (

// ApplyConfig updates the exporter state to the given configuration.
// Must be called at least once before Export() can be used.
func (e *Exporter) ApplyConfig(cfg *config.Config) (err error) {
func (e *Exporter) ApplyConfig(cfg *config.Config, opts *ExporterOpts) (err error) {
if e.seriesCache == nil {
return errors.New("nop exporter")
}
optsChanged := false
e.mtx.RLock()
if opts != nil {
opts.Default()
if err := opts.Validate(); err != nil {
e.mtx.RUnlock()
return err
}
optsChanged = !reflect.DeepEqual(e.opts, opts)
} else {
optsCopy := e.opts
opts = &optsCopy
}
e.mtx.RUnlock()

// If project_id, location, or cluster were set through the external_labels in the config file,
// these values take precedence. If they are unset, the flag value, which defaults to an
// environment-specific value on GCE/GKE, is used.
builder := labels.NewBuilder(cfg.GlobalConfig.ExternalLabels)

if !cfg.GlobalConfig.ExternalLabels.Has(KeyProjectID) {
builder.Set(KeyProjectID, e.opts.ProjectID)
builder.Set(KeyProjectID, opts.ProjectID)
}
if !cfg.GlobalConfig.ExternalLabels.Has(KeyLocation) {
builder.Set(KeyLocation, e.opts.Location)
builder.Set(KeyLocation, opts.Location)
}
if !cfg.GlobalConfig.ExternalLabels.Has(KeyCluster) {
builder.Set(KeyCluster, e.opts.Cluster)
builder.Set(KeyCluster, opts.Cluster)
}
lset := builder.Labels()

Expand Down Expand Up @@ -458,10 +492,20 @@ func (e *Exporter) ApplyConfig(cfg *config.Config) (err error) {
}
}

metricClient := e.metricClient
if optsChanged || metricClient == nil {
metricClient, err = e.newMetricClient(e.ctx, *opts)
if err != nil {
return fmt.Errorf("create metric client: %w", err)
}
}

// New external labels possibly invalidate the cached series conversions.
e.mtx.Lock()
e.metricClient = metricClient
e.externalLabels = lset
e.seriesCache.forceRefresh()
e.opts = *opts
e.mtx.Unlock()

return nil
Expand All @@ -471,8 +515,7 @@ func (e *Exporter) ApplyConfig(cfg *config.Config) (err error) {
// based on a series ID we got through exported sample records.
// Must be called before any call to Export is made.
func (e *Exporter) SetLabelsByIDFunc(f func(storage.SeriesRef) labels.Labels) {
// Prevent panics in case a default disabled exporter was instantiated (see Global()).
if e.opts.Disable {
if e.seriesCache == nil {
return
}
if e.seriesCache.getLabelsByRef != nil {
Expand Down Expand Up @@ -570,18 +613,12 @@ const (
mainModuleName = "github.com/GoogleCloudPlatform/prometheus-engine"
)

// Testing returns true if running within a unit test.
// TODO(TheSpiritXIII): Replace with https://github.com/golang/go/issues/52600
func Testing() bool {
return flag.Lookup("test.v") != nil
}

// Version is used in the User Agent. This version is automatically detected if
// this function is imported as a library. However, the version is statically
// set if this function is used in a binary in prometheus-engine due to Golang
// restrictions. While testing, the static version is validated for correctness.
func Version() (string, error) {
if Testing() {
if testing.Testing() {
// TODO(TheSpiritXIII): After https://github.com/golang/go/issues/50603 just return an empty
// string here. For now, use the opportunity to confirm that the static version is correct.
// We manually get the closest git tag if the user is running the unit test locally, but
Expand Down Expand Up @@ -657,10 +694,10 @@ func Version() (string, error) {
// The per-shard overhead is minimal and thus a high number can be picked, which allows us
// to cover a large range of potential throughput and latency combinations without requiring
// user configuration or, even worse, runtime changes to the shard number.
func (e *Exporter) Run(ctx context.Context) error {
defer e.metricClient.Close()
go e.seriesCache.run(ctx)
go e.lease.Run(ctx)
func (e *Exporter) Run() error {
defer e.close()
go e.seriesCache.run(e.ctx)
go e.lease.Run(e.ctx)

timer := time.NewTimer(batchDelayMax)
stopTimer := func() {
Expand All @@ -673,26 +710,37 @@ func (e *Exporter) Run(ctx context.Context) error {
}
defer stopTimer()

curBatch := newBatch(e.logger, e.opts.Efficiency.ShardCount, e.opts.Efficiency.BatchSize)
e.mtx.RLock()
opts := e.opts
e.mtx.RUnlock()

curBatch := newBatch(e.logger, opts.Efficiency.ShardCount, opts.Efficiency.BatchSize)

// Send the currently accumulated batch to GCM asynchronously.
send := func() {
e.mtx.RLock()
opts := e.opts
sendFunc := e.metricClient.CreateTimeSeries
e.mtx.RUnlock()

// Send the batch and once it completed, trigger next to process remaining data in the
// shards that were part of the batch. This ensures that if we didn't take all samples
// from a shard when filling the batch, we'll come back for them and any queue built-up
// gets sent eventually.
go func(ctx context.Context, b *batch) {
b.send(ctx, e.metricClient.CreateTimeSeries)
if !opts.Disable {
b.send(ctx, sendFunc)
}
// We could only trigger if we didn't fully empty shards in this batch.
// Benchmarking showed no beneficial impact of this optimization.
e.triggerNext()
}(ctx, curBatch)
}(e.ctx, curBatch)

// Reset state for new batch.
stopTimer()
timer.Reset(batchDelayMax)

curBatch = newBatch(e.logger, e.opts.Efficiency.ShardCount, e.opts.Efficiency.BatchSize)
curBatch = newBatch(e.logger, opts.Efficiency.ShardCount, opts.Efficiency.BatchSize)
}

for {
Expand All @@ -701,7 +749,7 @@ func (e *Exporter) Run(ctx context.Context) error {
// buffered data. In-flight requests will be aborted as well.
// This is fine once we persist data submitted via Export() but for now there may be some
// data loss on shutdown.
case <-ctx.Done():
case <-e.ctx.Done():
return nil
// This is activated for each new sample that arrives
case <-e.nextc:
Expand Down Expand Up @@ -733,6 +781,15 @@ func (e *Exporter) Run(ctx context.Context) error {
}
}

func (e *Exporter) close() {
e.mtx.Lock()
if err := e.metricClient.Close(); err != nil {
_ = e.logger.Log("msg", "error closing metric client", "err", err)
}
e.metricClient = nil
e.mtx.Unlock()
}

// CtxKey is a dedicated type for keys of context-embedded values propagated
// with the scrape context.
type ctxKey int
Expand Down
Loading

0 comments on commit ffd0c0c

Please sign in to comment.