Skip to content

Commit

Permalink
[Receive] Fix race condition when adding multiple new tenants at once (
Browse files Browse the repository at this point in the history
…thanos-io#7941)

* [Receive] fix race condition

Signed-off-by: Yi Jin <yi.jin@databricks.com>

* add a change log

Signed-off-by: Yi Jin <yi.jin@databricks.com>

* memorize tsdb local clients without race condition

Signed-off-by: Yi Jin <yi.jin@databricks.com>

* fix data race in testing with some concurrent safe helper functions

Signed-off-by: Yi Jin <yi.jin@databricks.com>

* address comments

Signed-off-by: Yi Jin <yi.jin@databricks.com>

---------

Signed-off-by: Yi Jin <yi.jin@databricks.com>
  • Loading branch information
jnyi committed Dec 13, 2024
1 parent 1c69c7e commit 033bcea
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 109 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7885](https://github.com/thanos-io/thanos/pull/7885) Store: Return chunks to the pool after completing a Series call.
- [#7893](https://github.com/thanos-io/thanos/pull/7893) Sidecar: Fix retrieval of external labels for Prometheus v3.0.0.
- [#7903](https://github.com/thanos-io/thanos/pull/7903) Query: Fix panic on regex store matchers.
- [#7915](https://github.com/thanos-io/thanos/pull/7915) Store: Close block series client at the end to not reuse chunk buffer
- [#7941](https://github.com/thanos-io/thanos/pull/7941) Receive: Fix race condition when adding multiple new tenants, see [issue-7892](https://github.com/thanos-io/thanos/issues/7892).

### Added
- [#7763](https://github.com/thanos-io/thanos/pull/7763) Ruler: use native histograms for client latency metrics.
Expand Down
82 changes: 52 additions & 30 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type MultiTSDB struct {
hashFunc metadata.HashFunc
hashringConfigs []HashringConfig

tsdbClients []store.Client
exemplarClients map[string]*exemplars.TSDB

metricNameFilterEnabled bool
}

Expand Down Expand Up @@ -101,6 +104,8 @@ func NewMultiTSDB(
mtx: &sync.RWMutex{},
tenants: map[string]*tenant{},
labels: labels,
tsdbClients: make([]store.Client, 0),
exemplarClients: map[string]*exemplars.TSDB{},
tenantLabelName: tenantLabelName,
bucket: bucket,
allowOutOfOrderUpload: allowOutOfOrderUpload,
Expand All @@ -114,14 +119,47 @@ func NewMultiTSDB(
return mt
}

func (t *MultiTSDB) GetTenants() []string {
// testGetTenant returns the tenant with the given tenantID for testing purposes.
func (t *MultiTSDB) testGetTenant(tenantID string) *tenant {
t.mtx.RLock()
tenants := make([]string, 0, len(t.tenants))
for tname := range t.tenants {
tenants = append(tenants, tname)
}
defer t.mtx.RUnlock()
return tenants
return t.tenants[tenantID]
}

func (t *MultiTSDB) updateTSDBClients() {
t.tsdbClients = t.tsdbClients[:0]
for _, tenant := range t.tenants {
client := tenant.client()
if client != nil {
t.tsdbClients = append(t.tsdbClients, client)
}
}
}

func (t *MultiTSDB) addTenantUnlocked(tenantID string, newTenant *tenant) {
t.tenants[tenantID] = newTenant
t.updateTSDBClients()
if newTenant.exemplars() != nil {
t.exemplarClients[tenantID] = newTenant.exemplars()
}
}

func (t *MultiTSDB) addTenantLocked(tenantID string, newTenant *tenant) {
t.mtx.Lock()
defer t.mtx.Unlock()
t.addTenantUnlocked(tenantID, newTenant)
}

func (t *MultiTSDB) removeTenantUnlocked(tenantID string) {
delete(t.tenants, tenantID)
delete(t.exemplarClients, tenantID)
t.updateTSDBClients()
}

func (t *MultiTSDB) removeTenantLocked(tenantID string) {
t.mtx.Lock()
defer t.mtx.Unlock()
t.removeTenantUnlocked(tenantID)
}

type localClient struct {
Expand Down Expand Up @@ -426,7 +464,7 @@ func (t *MultiTSDB) Prune(ctx context.Context) error {
}

level.Info(t.logger).Log("msg", "Pruned tenant", "tenant", tenantID)
delete(t.tenants, tenantID)
t.removeTenantUnlocked(tenantID)
}

return merr.Err()
Expand Down Expand Up @@ -586,33 +624,18 @@ func (t *MultiTSDB) RemoveLockFilesIfAny() error {
return merr.Err()
}

// TSDBLocalClients should be used as read-only.
func (t *MultiTSDB) TSDBLocalClients() []store.Client {
t.mtx.RLock()
defer t.mtx.RUnlock()

res := make([]store.Client, 0, len(t.tenants))
for _, tenant := range t.tenants {
client := tenant.client()
if client != nil {
res = append(res, client)
}
}

return res
return t.tsdbClients
}

// TSDBExemplars should be used as read-only.
func (t *MultiTSDB) TSDBExemplars() map[string]*exemplars.TSDB {
t.mtx.RLock()
defer t.mtx.RUnlock()

res := make(map[string]*exemplars.TSDB, len(t.tenants))
for k, tenant := range t.tenants {
e := tenant.exemplars()
if e != nil {
res[k] = e
}
}
return res
return t.exemplarClients
}

func (t *MultiTSDB) TenantStats(limit int, statsByLabelName string, tenantIDs ...string) []status.TenantStats {
Expand Down Expand Up @@ -687,9 +710,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
nil,
)
if err != nil {
t.mtx.Lock()
delete(t.tenants, tenantID)
t.mtx.Unlock()
t.removeTenantLocked(tenantID)
return err
}
var ship *shipper.Shipper
Expand All @@ -712,6 +733,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
options = append(options, store.WithCuckooMetricNameStoreFilter())
}
tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, options...), s, ship, exemplars.NewTSDB(s, lset))
t.addTenantLocked(tenantID, tenant) // need to update the client list once store is ready & client != nil
level.Info(logger).Log("msg", "TSDB is now ready")
return nil
}
Expand Down Expand Up @@ -740,7 +762,7 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan
}

tenant = newTenant()
t.tenants[tenantID] = tenant
t.addTenantUnlocked(tenantID, tenant)
t.mtx.Unlock()

logger := log.With(t.logger, "tenant", tenantID)
Expand Down
7 changes: 5 additions & 2 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func TestMultiTSDB(t *testing.T) {
testutil.Ok(t, m.Open())
testutil.Ok(t, appendSample(m, testTenant, time.Now()))

tenant := m.tenants[testTenant]
tenant := m.testGetTenant(testTenant)
db := tenant.readyStorage().Get()

testutil.Equals(t, 0, len(db.Blocks()))
Expand Down Expand Up @@ -843,7 +843,10 @@ func appendSampleWithLabels(m *MultiTSDB, tenant string, lbls labels.Labels, tim

func queryLabelValues(ctx context.Context, m *MultiTSDB) error {
proxy := store.NewProxyStore(nil, nil, func() []store.Client {
clients := m.TSDBLocalClients()
m.mtx.Lock()
defer m.mtx.Unlock()
clients := make([]store.Client, len(m.tsdbClients))
copy(clients, m.tsdbClients)
if len(clients) > 0 {
clients[0] = &slowClient{clients[0]}
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/receive/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) {

for _, c := range tc.cfg {
for _, tenantId := range c.Tenants {
if m.tenants[tenantId] == nil {
if m.testGetTenant(tenantId) == nil {
err = appendSample(m, tenantId, time.Now())
require.NoError(t, err)
}
Expand Down Expand Up @@ -294,7 +294,7 @@ func TestLabelSetsOfTenantsWhenAddingTenants(t *testing.T) {

for _, c := range initialConfig {
for _, tenantId := range c.Tenants {
if m.tenants[tenantId] == nil {
if m.testGetTenant(tenantId) == nil {
err = appendSample(m, tenantId, time.Now())
require.NoError(t, err)
}
Expand All @@ -319,7 +319,7 @@ func TestLabelSetsOfTenantsWhenAddingTenants(t *testing.T) {

for _, c := range changedConfig {
for _, tenantId := range c.Tenants {
if m.tenants[tenantId] == nil {
if m.testGetTenant(tenantId) == nil {
err = appendSample(m, tenantId, time.Now())
require.NoError(t, err)
}
Expand Down Expand Up @@ -534,7 +534,7 @@ func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) {

for _, c := range initialConfig {
for _, tenantId := range c.Tenants {
if m.tenants[tenantId] == nil {
if m.testGetTenant(tenantId) == nil {
err = appendSample(m, tenantId, time.Now())
require.NoError(t, err)
}
Expand Down Expand Up @@ -704,7 +704,7 @@ func TestAddingLabelsWhenTenantAppearsInMultipleHashrings(t *testing.T) {

for _, c := range initialConfig {
for _, tenantId := range c.Tenants {
if m.tenants[tenantId] == nil {
if m.testGetTenant(tenantId) == nil {
err = appendSample(m, tenantId, time.Now())
require.NoError(t, err)
}
Expand Down Expand Up @@ -778,7 +778,7 @@ func TestReceiverLabelsNotOverwrittenByExternalLabels(t *testing.T) {

for _, c := range cfg {
for _, tenantId := range c.Tenants {
if m.tenants[tenantId] == nil {
if m.testGetTenant(tenantId) == nil {
err = appendSample(m, tenantId, time.Now())
require.NoError(t, err)
}
Expand Down
71 changes: 0 additions & 71 deletions test/e2e/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1215,74 +1215,3 @@ func TestReceiveCpnp(t *testing.T) {
}, v)

}

func TestNewTenant(t *testing.T) {
e, err := e2e.NewDockerEnvironment("new-tenant")
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

// Setup 3 ingestors.
i1 := e2ethanos.NewReceiveBuilder(e, "i1").WithIngestionEnabled().Init()
i2 := e2ethanos.NewReceiveBuilder(e, "i2").WithIngestionEnabled().Init()
i3 := e2ethanos.NewReceiveBuilder(e, "i3").WithIngestionEnabled().Init()

h := receive.HashringConfig{
Endpoints: []receive.Endpoint{
{Address: i1.InternalEndpoint("grpc")},
{Address: i2.InternalEndpoint("grpc")},
{Address: i3.InternalEndpoint("grpc")},
},
}

// Setup 1 distributor with double replication
r1 := e2ethanos.NewReceiveBuilder(e, "r1").WithRouting(2, h).Init()
testutil.Ok(t, e2e.StartAndWaitReady(i1, i2, i3, r1))

q := e2ethanos.NewQuerierBuilder(e, "1", i1.InternalEndpoint("grpc"), i2.InternalEndpoint("grpc"), i3.InternalEndpoint("grpc")).Init()
testutil.Ok(t, e2e.StartAndWaitReady(q))
testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(3), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics()))

rp1 := e2ethanos.NewReverseProxy(e, "1", "tenant-1", "http://"+r1.InternalEndpoint("remote-write"))
prom1 := e2ethanos.NewPrometheus(e, "1", e2ethanos.DefaultPromConfig("prom1", 0, "http://"+rp1.InternalEndpoint("http")+"/api/v1/receive", "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage())
testutil.Ok(t, e2e.StartAndWaitReady(rp1, prom1))

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
t.Cleanup(cancel)

expectedReplicationFactor := 2.0

queryAndAssert(t, ctx, q.Endpoint("http"), func() string { return "count(up) by (prometheus, tenant_id)" }, time.Now, promclient.QueryOptions{
Deduplicate: false,
}, model.Vector{
&model.Sample{
Metric: model.Metric{
"prometheus": "prom1",
"tenant_id": "tenant-1",
},
Value: model.SampleValue(expectedReplicationFactor),
},
})

rp2 := e2ethanos.NewReverseProxy(e, "2", "tenant-2", "http://"+r1.InternalEndpoint("remote-write"))
prom2 := e2ethanos.NewPrometheus(e, "2", e2ethanos.DefaultPromConfig("prom2", 0, "http://"+rp2.InternalEndpoint("http")+"/api/v1/receive", "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage())
testutil.Ok(t, e2e.StartAndWaitReady(rp2, prom2))

queryAndAssert(t, ctx, q.Endpoint("http"), func() string { return "count(up) by (prometheus, tenant_id)" }, time.Now, promclient.QueryOptions{
Deduplicate: false,
}, model.Vector{
&model.Sample{
Metric: model.Metric{
"prometheus": "prom1",
"tenant_id": "tenant-1",
},
Value: model.SampleValue(expectedReplicationFactor),
},
&model.Sample{
Metric: model.Metric{
"prometheus": "prom2",
"tenant_id": "tenant-2",
},
Value: model.SampleValue(expectedReplicationFactor),
},
})
}

0 comments on commit 033bcea

Please sign in to comment.