diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 4f688555d36a..065c8db13676 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -28,6 +28,7 @@ kv.allocator.load_based_rebalancingenumeration2whether to rebalance based on the distribution of QPS across stores [off = 0, leases = 1, leases and replicas = 2] kv.allocator.qps_rebalance_thresholdfloat0.25minimum fraction away from the mean a store's QPS (such as queries per second) can be before it is considered overfull or underfull kv.allocator.range_rebalance_thresholdfloat0.05minimum fraction away from the mean a store's range count can be before it is considered overfull or underfull +kv.bulk_io_write.addsstable_max_ratefloat1.7976931348623157E+308maximum number of AddSSTable requests per second for a single store kv.bulk_io_write.concurrent_addsstable_requestsinteger1number of AddSSTable requests a store will handle concurrently before queuing kv.bulk_io_write.concurrent_export_requestsinteger3number of export requests a store will handle concurrently before queuing kv.bulk_io_write.concurrent_import_requestsinteger1number of import requests a store will handle concurrently before queuing diff --git a/pkg/storage/batcheval/eval_context.go b/pkg/storage/batcheval/eval_context.go index 82a3fbf18bf9..ddc451a1c2ca 100644 --- a/pkg/storage/batcheval/eval_context.go +++ b/pkg/storage/batcheval/eval_context.go @@ -37,6 +37,7 @@ type Limiters struct { BulkIOWriteRate *rate.Limiter ConcurrentImportRequests limit.ConcurrentRequestLimiter ConcurrentExportRequests limit.ConcurrentRequestLimiter + AddSSTableRequestRate *rate.Limiter ConcurrentAddSSTableRequests limit.ConcurrentRequestLimiter // concurrentRangefeedIters is a semaphore used to limit the number of // rangefeeds in the "catch-up" state across the store. The "catch-up" state diff --git a/pkg/storage/store.go b/pkg/storage/store.go index fac92d0f3491..d91ebb3ae00a 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -123,6 +123,15 @@ var importRequestsLimit = settings.RegisterPositiveIntSetting( 1, ) +// addSSTableRequestMaxRate is the maximum number of AddSSTable requests per second. +var addSSTableRequestMaxRate = settings.RegisterNonNegativeFloatSetting( + "kv.bulk_io_write.addsstable_max_rate", + "maximum number of AddSSTable requests per second for a single store", + float64(rate.Inf), +) + +const addSSTableRequestBurst = 32 + // addSSTableRequestLimit limits concurrent AddSSTable requests. var addSSTableRequestLimit = settings.RegisterPositiveIntSetting( "kv.bulk_io_write.concurrent_addsstable_requests", @@ -860,6 +869,16 @@ func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescript } s.limiters.ConcurrentExportRequests.SetLimit(limit) }) + s.limiters.AddSSTableRequestRate = rate.NewLimiter( + rate.Limit(addSSTableRequestMaxRate.Get(&cfg.Settings.SV)), addSSTableRequestBurst) + addSSTableRequestMaxRate.SetOnChange(&cfg.Settings.SV, func() { + rateLimit := addSSTableRequestMaxRate.Get(&cfg.Settings.SV) + if math.IsInf(rateLimit, 0) { + // This value causes the burst limit to be ignored + rateLimit = float64(rate.Inf) + } + s.limiters.AddSSTableRequestRate.SetLimit(rate.Limit(rateLimit)) + }) s.limiters.ConcurrentAddSSTableRequests = limit.MakeConcurrentRequestLimiter( "addSSTableRequestLimiter", int(addSSTableRequestLimit.Get(&cfg.Settings.SV)), ) @@ -2759,6 +2778,10 @@ func (s *Store) Send( return nil, roachpb.NewError(err) } defer s.limiters.ConcurrentAddSSTableRequests.Finish() + + if err := s.limiters.AddSSTableRequestRate.Wait(ctx); err != nil { + return nil, roachpb.NewError(err) + } s.engine.PreIngestDelay(ctx) }