diff --git a/pkg/ruler/registry.go b/pkg/ruler/registry.go index 868b7f29a6f9..29297fcab5a7 100644 --- a/pkg/ruler/registry.go +++ b/pkg/ruler/registry.go @@ -128,8 +128,12 @@ func (r *walRegistry) get(tenant string) storage.Storage { } func (r *walRegistry) Appender(ctx context.Context) storage.Appender { + // concurrency-safe retrieval of remote-write config for this tenant, using the global remote-write for defaults + r.overridesMu.Lock() tenant, _ := user.ExtractOrgID(ctx) rwCfg, err := r.getTenantRemoteWriteConfig(tenant, r.config.RemoteWrite) + r.overridesMu.Unlock() + if err != nil { level.Error(r.logger).Log("msg", "error retrieving remote-write config; discarding samples", "user", tenant, "err", err) return discardingAppender{} diff --git a/pkg/ruler/registry_test.go b/pkg/ruler/registry_test.go index 261b6d383676..7ab12d8962ae 100644 --- a/pkg/ruler/registry_test.go +++ b/pkg/ruler/registry_test.go @@ -405,6 +405,30 @@ func TestTenantRemoteWriteConfigWithOverrideConcurrentAccess(t *testing.T) { }) } +func TestAppenderConcurrentAccess(t *testing.T) { + require.NotPanics(t, func() { + reg := setupRegistry(t, cfg, newFakeLimits()) + var wg sync.WaitGroup + for i := 0; i < 1000; i++ { + wg.Add(1) + go func(reg *walRegistry) { + defer wg.Done() + + _ = reg.Appender(user.InjectOrgID(context.Background(), enabledRWTenant)) + }(reg) + + wg.Add(1) + go func(reg *walRegistry) { + defer wg.Done() + + _ = reg.Appender(user.InjectOrgID(context.Background(), additionalHeadersRWTenant)) + }(reg) + } + + wg.Wait() + }) +} + func TestTenantRemoteWriteConfigWithoutOverride(t *testing.T) { reg := setupRegistry(t, backCompatCfg, newFakeLimitsBackwardCompat())