Skip to content

Commit

Permalink
Scraping service: use buffered channel for requesting refreshes (graf…
Browse files Browse the repository at this point in the history
…ana#886)

* refresh queue, remove mut

* fix logs

* more logs!

* how many logs can i add?

* how many logs can i change?

* more logging

* changelog

(cherry picked from commit 83f0f30)
  • Loading branch information
rfratto committed Sep 8, 2021
1 parent 549761c commit 18f4461
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 35 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
- [BUGFIX] In scraping service mode, scraping service can deadlock when
reloading during join. (@mattdurham)

- [BUGFIX] Scraping service: prevent more than one refresh from being queued at
a time. (@rfratto)

# v0.18.2 (2021-08-12)

- [BUGFIX] Honor the prefix and remove prefix from consul list results (@mattdurham)
Expand Down
29 changes: 7 additions & 22 deletions pkg/prom/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,11 @@ func (c *Cluster) storeValidate(cfg *instance.Config) error {
// Reshard implements agentproto.ScrapingServiceServer, and syncs the state of
// configs with the configstore.
func (c *Cluster) Reshard(ctx context.Context, _ *agentproto.ReshardRequest) (*empty.Empty, error) {
go func() {
c.mut.RLock()
defer c.mut.RUnlock()
err := c.watcher.Refresh(context.Background())
if err != nil {
level.Error(c.log).Log("msg", "failed to perform local reshard", "err", err)
}
}()
c.mut.RLock()
defer c.mut.RUnlock()

level.Info(c.log).Log("msg", "received reshard notification, requesting refresh")
c.watcher.RequestRefresh()
return &empty.Empty{}, nil
}

Expand Down Expand Up @@ -144,20 +141,8 @@ func (c *Cluster) ApplyConfig(
c.cfg = cfg

// Force a refresh so all the configs get updated with new defaults.
level.Info(c.log).Log("msg", "cluster config changed, refreshing from configstore in background")
go func() {
ctx := context.Background()
if c.cfg.ReshardTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, c.cfg.ReshardTimeout)
defer cancel()
}
err := c.watcher.Refresh(ctx)
if err != nil {
level.Error(c.log).Log("msg", "failed to perform local reshard", "err", err)
}
}()

level.Info(c.log).Log("msg", "cluster config changed, queueing refresh")
c.watcher.RequestRefresh()
return nil
}

Expand Down
38 changes: 27 additions & 11 deletions pkg/prom/cluster/config_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type configWatcher struct {
owns OwnershipFunc
validate ValidationFunc

refreshMut sync.Mutex
refreshCh chan struct{}
instanceMut sync.Mutex
instances map[string]struct{}
}
Expand All @@ -63,6 +63,7 @@ func newConfigWatcher(log log.Logger, cfg Config, store configstore.Store, im in
owns: owns,
validate: validate,

refreshCh: make(chan struct{}, 1),
instances: make(map[string]struct{}),
}
if err := w.ApplyConfig(cfg); err != nil {
Expand Down Expand Up @@ -97,22 +98,37 @@ func (w *configWatcher) run(ctx context.Context) {
select {
case <-ctx.Done():
return
case <-time.After(nextPoll):
err := w.Refresh(ctx)
case <-w.refreshCh:
err := w.refresh(ctx)
if err != nil {
level.Error(w.log).Log("msg", "failed polling refresh", "err", err)
level.Error(w.log).Log("msg", "refresh failed", "err", err)
}
case <-time.After(nextPoll):
level.Info(w.log).Log("msg", "reshard timer ticked, scheduling refresh")
w.RequestRefresh()
case ev := <-w.store.Watch():
level.Debug(w.log).Log("msg", "handling event from config store")
if err := w.handleEvent(ev); err != nil {
level.Error(w.log).Log("msg", "failed to handle changed or deleted config", "key", ev.Key, "err", err)
}
}
}
}

// Refresh reloads all configs from the configstore. Deleted configs will be
// removed.
func (w *configWatcher) Refresh(ctx context.Context) (err error) {
// RequestRefresh will queue a refresh. No more than one refresh can be queued at a time.
func (w *configWatcher) RequestRefresh() {
select {
case w.refreshCh <- struct{}{}:
level.Debug(w.log).Log("msg", "successfully scheduled a refresh")
default:
level.Debug(w.log).Log("msg", "ignoring request refresh: refresh already scheduled")
}
}

// refresh reloads all configs from the configstore. Deleted configs will be
// removed. refresh may not be called concurrently and must only be invoked from run.
// Call RequestRefresh to queue a call to refresh.
func (w *configWatcher) refresh(ctx context.Context) (err error) {
w.mut.Lock()
enabled := w.cfg.Enabled
refreshTimeout := w.cfg.ReshardTimeout
Expand All @@ -122,9 +138,7 @@ func (w *configWatcher) Refresh(ctx context.Context) (err error) {
level.Debug(w.log).Log("msg", "refresh skipped because clustering is disabled")
return nil
}

w.refreshMut.Lock()
defer w.refreshMut.Unlock()
level.Info(w.log).Log("msg", "starting refresh")

if refreshTimeout > 0 {
var cancel context.CancelFunc
Expand All @@ -138,7 +152,9 @@ func (w *configWatcher) Refresh(ctx context.Context) (err error) {
if err != nil {
success = "0"
}
reshardDuration.WithLabelValues(success).Observe(time.Since(start).Seconds())
duration := time.Since(start)
level.Info(w.log).Log("msg", "refresh finished", "duration", duration, "success", success, "err", err)
reshardDuration.WithLabelValues(success).Observe(duration.Seconds())
}()

// This is used to determine if the context was already exceeded before calling the kv provider
Expand Down
4 changes: 2 additions & 2 deletions pkg/prom/cluster/config_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func Test_configWatcher_Refresh(t *testing.T) {
return ch, nil
}

err = w.Refresh(context.Background())
err = w.refresh(context.Background())
require.NoError(t, err)

// Then: return a "new" config.
Expand All @@ -61,7 +61,7 @@ func Test_configWatcher_Refresh(t *testing.T) {
return ch, nil
}

err = w.Refresh(context.Background())
err = w.refresh(context.Background())
require.NoError(t, err)

// "hello" and "new" should've been applied, and "hello" should've been deleted
Expand Down

0 comments on commit 18f4461

Please sign in to comment.