Skip to content

Commit

Permalink
feat: allow updating export options at runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
TheSpiritXIII committed May 22, 2024
1 parent 38b1213 commit f8da410
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 64 deletions.
4 changes: 1 addition & 3 deletions cmd/rule-evaluator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,16 +357,14 @@ func main() {
}
{
// Storage Processing.
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
err = destination.Run(ctx)
err = destination.Run()
//nolint:errcheck
level.Info(logger).Log("msg", "Background processing of storage stopped")
return err
}, func(error) {
//nolint:errcheck
level.Info(logger).Log("msg", "Stopping background storage processing...")
cancel()
})
}
cwd, err := os.Getwd()
Expand Down
141 changes: 96 additions & 45 deletions pkg/export/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"math"
"os"
"os/exec"
"reflect"
"runtime/debug"
"strings"
"sync"
Expand Down Expand Up @@ -110,8 +111,14 @@ var (
// Exporter converts Prometheus samples into Cloud Monitoring samples and exports them.
type Exporter struct {
logger log.Logger
ctx context.Context
opts ExporterOpts

// A lease on a time range for which the exporter send sample data.
// It is checked for on each batch provided to the Export method.
// If unset, data is always sent.
lease Lease

metricClient *monitoring.MetricClient
seriesCache *seriesCache
shards []*shard
Expand All @@ -122,7 +129,7 @@ type Exporter struct {

// The external labels may be updated asynchronously by configuration changes
// and must be locked with mtx.
mtx sync.Mutex
mtx sync.RWMutex
externalLabels labels.Labels
// A set of metrics for which we defaulted the metadata to untyped and have
// issued a warning about that.
Expand Down Expand Up @@ -189,11 +196,6 @@ type ExporterOpts struct {
// Prefix under which metrics are written to GCM.
MetricTypePrefix string

// A lease on a time range for which the exporter send sample data.
// It is checked for on each batch provided to the Export method.
// If unset, data is always sent.
Lease Lease

// Request URL and body for generating an alternative GCE token source.
// This allows metrics to be exported to an alternative project.
TokenURL string
Expand All @@ -208,6 +210,27 @@ type ExporterOpts struct {
Efficiency EfficiencyOpts
}

// Default defaults any unset values.
func (opts *ExporterOpts) Default() error {
if opts.Efficiency.BatchSize == 0 {
opts.Efficiency.BatchSize = BatchSizeMax
}
if opts.Efficiency.BatchSize > BatchSizeMax {
return fmt.Errorf("maximum supported batch size is %d, got %d", BatchSizeMax, opts.Efficiency.BatchSize)
}
if opts.Efficiency.ShardCount == 0 {
opts.Efficiency.ShardCount = DefaultShardCount
}
if opts.Efficiency.ShardBufferSize == 0 {
opts.Efficiency.ShardBufferSize = DefaultShardBufferSize
}

if opts.MetricTypePrefix == "" {
opts.MetricTypePrefix = MetricTypePrefix
}
return nil
}

// EfficiencyOpts represents exporter options that allows fine-tuning of
// internal data structure sizes. Only for advance users. No compatibility
// guarantee (might change in future).
Expand Down Expand Up @@ -262,7 +285,7 @@ func (alwaysLease) OnLeaderChange(func()) {
// We never lose the lease as it's always owned.
}

func newMetricClient(ctx context.Context, opts ExporterOpts) (*monitoring.MetricClient, error) {
func newMetricClient(ctx context.Context, opts *ExporterOpts) (*monitoring.MetricClient, error) {
version, err := Version()
if err != nil {
return nil, fmt.Errorf("unable to fetch user agent version: %w", err)
Expand Down Expand Up @@ -310,7 +333,7 @@ func newMetricClient(ctx context.Context, opts ExporterOpts) (*monitoring.Metric
}

// New returns a new Cloud Monitoring Exporter.
func New(ctx context.Context, logger log.Logger, reg prometheus.Registerer, opts ExporterOpts) (*Exporter, error) {
func New(ctx context.Context, logger log.Logger, reg prometheus.Registerer, lease Lease, opts ExporterOpts) (*Exporter, error) {
grpc_prometheus.EnableClientHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 15, 20, 30, 40, 50, 60}),
)
Expand All @@ -334,32 +357,20 @@ func New(ctx context.Context, logger log.Logger, reg prometheus.Registerer, opts
)
}

if opts.Efficiency.BatchSize == 0 {
opts.Efficiency.BatchSize = BatchSizeMax
}
if opts.Efficiency.BatchSize > BatchSizeMax {
return nil, fmt.Errorf("maximum supported batch size is %d, got %d", BatchSizeMax, opts.Efficiency.BatchSize)
}
if opts.Efficiency.ShardCount == 0 {
opts.Efficiency.ShardCount = DefaultShardCount
}
if opts.Efficiency.ShardBufferSize == 0 {
opts.Efficiency.ShardBufferSize = DefaultShardBufferSize
}

if opts.MetricTypePrefix == "" {
opts.MetricTypePrefix = MetricTypePrefix
if err := opts.Default(); err != nil {
return nil, err
}
if opts.Lease == nil {
opts.Lease = alwaysLease{}
if lease == nil {
lease = alwaysLease{}
}

metricClient, err := newMetricClient(ctx, opts)
metricClient, err := newMetricClient(ctx, &opts)
if err != nil {
return nil, fmt.Errorf("create metric client: %w", err)
}
e := &Exporter{
logger: logger,
ctx: ctx,
opts: opts,
metricClient: metricClient,
nextc: make(chan struct{}, 1),
Expand All @@ -370,7 +381,7 @@ func New(ctx context.Context, logger log.Logger, reg prometheus.Registerer, opts

// Whenever the lease is lost, clear the series cache so we don't start off of out-of-range
// reset timestamps when we gain the lease again.
opts.Lease.OnLeaderChange(e.seriesCache.clear)
lease.OnLeaderChange(e.seriesCache.clear)

for i := range e.shards {
e.shards[i] = newShard(opts.Efficiency.ShardBufferSize)
Expand All @@ -391,20 +402,34 @@ const (

// ApplyConfig updates the exporter state to the given configuration.
// Must be called at least once before Export() can be used.
func (e *Exporter) ApplyConfig(cfg *config.Config) (err error) {
func (e *Exporter) ApplyConfig(cfg *config.Config, opts *ExporterOpts) (err error) {
optsChanged := false
e.mtx.RLock()
if opts != nil {
if err := opts.Default(); err != nil {
e.mtx.RUnlock()
return err
}
optsChanged = !reflect.DeepEqual(e.opts, opts)
} else {
optsCopy := e.opts
opts = &optsCopy
}
e.mtx.RUnlock()

// If project_id, location, or cluster were set through the external_labels in the config file,
// these values take precedence. If they are unset, the flag value, which defaults to an
// environment-specific value on GCE/GKE, is used.
builder := labels.NewBuilder(cfg.GlobalConfig.ExternalLabels)

if !cfg.GlobalConfig.ExternalLabels.Has(KeyProjectID) {
builder.Set(KeyProjectID, e.opts.ProjectID)
builder.Set(KeyProjectID, opts.ProjectID)
}
if !cfg.GlobalConfig.ExternalLabels.Has(KeyLocation) {
builder.Set(KeyLocation, e.opts.Location)
builder.Set(KeyLocation, opts.Location)
}
if !cfg.GlobalConfig.ExternalLabels.Has(KeyCluster) {
builder.Set(KeyCluster, e.opts.Cluster)
builder.Set(KeyCluster, opts.Cluster)
}
lset := builder.Labels()

Expand Down Expand Up @@ -432,10 +457,20 @@ func (e *Exporter) ApplyConfig(cfg *config.Config) (err error) {
}
}

metricClient := e.metricClient
if optsChanged {
metricClient, err = newMetricClient(e.ctx, opts)
if err != nil {
return fmt.Errorf("create metric client: %w", err)
}
}

// New external labels possibly invalidate the cached series conversions.
e.mtx.Lock()
e.metricClient = metricClient
e.externalLabels = lset
e.seriesCache.forceRefresh()
e.opts = *opts
e.mtx.Unlock()

return nil
Expand All @@ -445,10 +480,6 @@ func (e *Exporter) ApplyConfig(cfg *config.Config) (err error) {
// based on a series ID we got through exported sample records.
// Must be called before any call to Export is made.
func (e *Exporter) SetLabelsByIDFunc(f func(storage.SeriesRef) labels.Labels) {
// Prevent panics in case a default disabled exporter was instantiated (see Global()).
if e.opts.Disable {
return
}
if e.seriesCache.getLabelsByRef != nil {
panic("SetLabelsByIDFunc must only be called once")
}
Expand All @@ -470,7 +501,7 @@ func (e *Exporter) Export(metadata MetadataFunc, batch []record.RefSample, exemp

e.mtx.Lock()
externalLabels := e.externalLabels
start, end, ok := e.opts.Lease.Range()
start, end, ok := e.lease.Range()
e.mtx.Unlock()

if !ok {
Expand Down Expand Up @@ -631,10 +662,10 @@ func Version() (string, error) {
// The per-shard overhead is minimal and thus a high number can be picked, which allows us
// to cover a large range of potential throughput and latency combinations without requiring
// user configuration or, even worse, runtime changes to the shard number.
func (e *Exporter) Run(ctx context.Context) error {
defer e.metricClient.Close()
go e.seriesCache.run(ctx)
go e.opts.Lease.Run(ctx)
func (e *Exporter) Run() error {
defer e.Close()
go e.seriesCache.run(e.ctx)
go e.lease.Run(e.ctx)

timer := time.NewTimer(batchDelayMax)
stopTimer := func() {
Expand All @@ -647,26 +678,37 @@ func (e *Exporter) Run(ctx context.Context) error {
}
defer stopTimer()

curBatch := newBatch(e.logger, e.opts.Efficiency.ShardCount, e.opts.Efficiency.BatchSize)
e.mtx.RLock()
opts := e.opts
e.mtx.RUnlock()

curBatch := newBatch(e.logger, opts.Efficiency.ShardCount, opts.Efficiency.BatchSize)

// Send the currently accumulated batch to GCM asynchronously.
send := func() {
e.mtx.RLock()
opts := e.opts
sendFunc := e.metricClient.CreateTimeSeries
e.mtx.RUnlock()

// Send the batch and once it completed, trigger next to process remaining data in the
// shards that were part of the batch. This ensures that if we didn't take all samples
// from a shard when filling the batch, we'll come back for them and any queue built-up
// gets sent eventually.
go func(ctx context.Context, b *batch) {
b.send(ctx, e.metricClient.CreateTimeSeries)
if !opts.Disable {
b.send(ctx, sendFunc)
}
// We could only trigger if we didn't fully empty shards in this batch.
// Benchmarking showed no beneficial impact of this optimization.
e.triggerNext()
}(ctx, curBatch)
}(e.ctx, curBatch)

// Reset state for new batch.
stopTimer()
timer.Reset(batchDelayMax)

curBatch = newBatch(e.logger, e.opts.Efficiency.ShardCount, e.opts.Efficiency.BatchSize)
curBatch = newBatch(e.logger, opts.Efficiency.ShardCount, opts.Efficiency.BatchSize)
}

for {
Expand All @@ -675,7 +717,7 @@ func (e *Exporter) Run(ctx context.Context) error {
// buffered data. In-flight requests will be aborted as well.
// This is fine once we persist data submitted via Export() but for now there may be some
// data loss on shutdown.
case <-ctx.Done():
case <-e.ctx.Done():
return nil
// This is activated for each new sample that arrives
case <-e.nextc:
Expand Down Expand Up @@ -707,6 +749,15 @@ func (e *Exporter) Run(ctx context.Context) error {
}
}

func (e *Exporter) Close() {
e.mtx.Lock()
if err := e.metricClient.Close(); err != nil {
_ = e.logger.Log("msg", "error closing metric client", "err", err)
}
e.metricClient = nil
e.mtx.Unlock()
}

// CtxKey is a dedicated type for keys of context-embedded values propagated
// with the scrape context.
type ctxKey int
Expand Down
6 changes: 3 additions & 3 deletions pkg/export/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func TestExporter_wrapMetadata(t *testing.T) {
}

ctx := context.Background()
e, err := New(ctx, log.NewJSONLogger(log.NewSyncWriter(os.Stderr)), nil, ExporterOpts{DisableAuth: true})
e, err := New(ctx, log.NewJSONLogger(log.NewSyncWriter(os.Stderr)), nil, nil, ExporterOpts{DisableAuth: true})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -364,7 +364,7 @@ func TestExporter_drainBacklog(t *testing.T) {
t.Fatalf("Creating metric client failed: %s", err)
}

e, err := New(ctx, log.NewJSONLogger(log.NewSyncWriter(os.Stderr)), nil, ExporterOpts{DisableAuth: true})
e, err := New(ctx, log.NewJSONLogger(log.NewSyncWriter(os.Stderr)), nil, nil, ExporterOpts{DisableAuth: true})
if err != nil {
t.Fatalf("Creating Exporter failed: %s", err)
}
Expand All @@ -382,7 +382,7 @@ func TestExporter_drainBacklog(t *testing.T) {
}

//nolint:errcheck
go e.Run(ctx)
go e.Run()
// As our samples are all for the same series, each batch can only contain a single sample.
// The exporter waits for the batch delay duration before sending it.
// We sleep for an appropriate multiple of it to allow it to drain the shard.
Expand Down
19 changes: 11 additions & 8 deletions pkg/export/gcm/promtest/local_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ func (l *localExportWithGCM) Ref() string { return "export-pkg-with-gcm" }
func (l *localExportWithGCM) start(t testing.TB, _ e2e.Environment) (v1.API, map[string]string) {
t.Helper()

ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
// TODO(bwplotka): Consider listening for KILL signal too.
t.Cleanup(cancel)

creds, err := google.CredentialsFromJSON(ctx, l.gcmSA, gcm.DefaultAuthScopes()...)
if err != nil {
t.Fatalf("create credentials from JSON: %s", err)
Expand All @@ -93,7 +96,7 @@ func (l *localExportWithGCM) start(t testing.TB, _ e2e.Environment) (v1.API, map
t.Fatalf("create Prometheus client: %s", err)
}

l.e, err = export.New(ctx, log.NewJSONLogger(os.Stderr), prometheus.NewRegistry(), export.ExporterOpts{
l.e, err = export.New(ctx, log.NewJSONLogger(os.Stderr), prometheus.NewRegistry(), nil, export.ExporterOpts{
UserAgentEnv: "pe-github-action-test",
Endpoint: "monitoring.googleapis.com:443",
Compression: "none",
Expand All @@ -110,18 +113,18 @@ func (l *localExportWithGCM) start(t testing.TB, _ e2e.Environment) (v1.API, map
}

// Apply empty config, so resources labels are attached.
if err := l.e.ApplyConfig(&config.DefaultConfig); err != nil {
if err := l.e.ApplyConfig(&config.DefaultConfig, nil); err != nil {
t.Fatalf("apply config: %v", err)
}
l.e.SetLabelsByIDFunc(func(ref storage.SeriesRef) labels.Labels {
return l.labelsByRef[ref]
})

cancelableCtx, cancel := context.WithCancel(ctx)
//nolint:errcheck
go l.e.Run(cancelableCtx)
// TODO(bwplotka): Consider listening for KILL signal too.
t.Cleanup(cancel)
go func() {
if err := l.e.Run(); err != nil {
t.Logf("running exporter: %s", err)
}
}()

return v1.NewAPI(cl), map[string]string{
"cluster": cluster,
Expand Down
Loading

0 comments on commit f8da410

Please sign in to comment.