Skip to content

Commit

Permalink
chore: rework pattern ingester queue metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney committed Aug 12, 2024
1 parent b28572a commit c65ba1f
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 11 deletions.
8 changes: 4 additions & 4 deletions pkg/pattern/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Config struct {
MaxEvictionRatio float64 `yaml:"max_eviction_ratio,omitempty" doc:"description=The maximum eviction ratio of patterns per stream. Once that ratio is reached, the stream will throttled pattern detection."`
MetricAggregation aggregation.Config `yaml:"metric_aggregation,omitempty" doc:"description=Configures the metric aggregation and storage behavior of the pattern ingester."`
TeeParallelism int `yaml:"tee_parallelism,omitempty" doc:"description=The number of parallel goroutines to use for forwarding requests to the pattern ingester."`
TeeBufferSize int `yaml:"tee_buffer_size,omitempty" doc:"Maxiumum number of pending teed request to pattern ingesters. If the buffer is full the request is dropped."`
TeeQueueSize int `yaml:"tee_queue_size,omitempty" doc:"Maxiumum number of pending teed request to pattern ingesters. If the queue is full the request is dropped."`

// For testing.
factory ring_client.PoolFactory `yaml:"-"`
Expand Down Expand Up @@ -90,10 +90,10 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
"The number of parallel goroutines to use for forwarding requests to the pattern ingester.",
)
fs.IntVar(
&cfg.TeeBufferSize,
"pattern-ingester.tee-buffer-size",
&cfg.TeeQueueSize,
"pattern-ingester.tee-queue-size",
100,
"Maxiumum number of pending teed request to pattern ingesters. If the buffer is full the request is dropped.",
"Maxiumum number of pending teed request to pattern ingesters. If the queue is full the request is dropped.",
)
}

Expand Down
23 changes: 16 additions & 7 deletions pkg/pattern/tee.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Tee struct {
ingesterMetricAppends *prometheus.CounterVec

teedRequests *prometheus.CounterVec
teeQueueSize *prometheus.GaugeVec

requestCh chan request

Expand Down Expand Up @@ -54,14 +55,18 @@ func NewTee(
ingesterMetricAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "pattern_ingester_metric_appends_total",
Help: "The total number of metric only batch appends sent to pattern ingesters. These requests will not be processed for patterns.",
}, []string{"ingester", "status"}),
}, []string{"status"}),
teedRequests: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "pattern_ingester_teed_requests_total",
Help: "The total number of batch appends sent to fallback pattern ingesters, for not owned streams.",
}, []string{"tenant", "status"}),
teeQueueSize: promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{
Name: "pattern_ingester_tee_queue_size",
Help: "Current number of requests in the pattern ingester tee queue.",
}, []string{"tenant", "status"}),
cfg: cfg,
ringClient: ringClient,
requestCh: make(chan request, cfg.TeeBufferSize),
requestCh: make(chan request, cfg.TeeQueueSize),
quit: make(chan struct{}),
}

Expand All @@ -78,6 +83,7 @@ func (t *Tee) run() {
case <-t.quit:
return
case req := <-t.requestCh:
t.teeQueueSize.WithLabelValues(req.tenant).Dec()
ctx, cancel := context.WithTimeout(
user.InjectOrgID(context.Background(), req.tenant),
t.cfg.ClientConfig.RemoteTimeout,
Expand All @@ -94,6 +100,7 @@ func (t *Tee) run() {
func (t *Tee) sendStream(ctx context.Context, stream distributor.KeyedStream) error {
err := t.sendOwnedStream(ctx, stream)
if err == nil {
t.ingesterMetricAppends.WithLabelValues("success").Inc()
// Success, return early
return nil
}
Expand All @@ -104,7 +111,7 @@ func (t *Tee) sendStream(ctx context.Context, stream distributor.KeyedStream) er
// try to forward request to any pattern ingester so we at least capture the metrics.
replicationSet, err := t.ringClient.Ring().GetReplicationSetForOperation(ring.WriteNoExtend)
if replicationSet.Instances == nil {
t.ingesterMetricAppends.WithLabelValues("none", "fail").Inc()
t.ingesterMetricAppends.WithLabelValues("fail").Inc()
return errors.New("no instances found for fallback")
}

Expand All @@ -120,15 +127,16 @@ func (t *Tee) sendStream(ctx context.Context, stream distributor.KeyedStream) er

_, err = client.(logproto.PatternClient).Push(ctx, req)
if err != nil {
t.ingesterMetricAppends.WithLabelValues(addr, "fail").Inc()
continue
}
t.ingesterMetricAppends.WithLabelValues(addr, "success").Inc()

t.ingesterMetricAppends.WithLabelValues("success").Inc()
// bail after any success to prevent sending more than one
return nil
}
}

t.ingesterMetricAppends.WithLabelValues("fail").Inc()
return err
}

Expand All @@ -139,6 +147,7 @@ func (t *Tee) sendOwnedStream(ctx context.Context, stream distributor.KeyedStrea
return err
}
if replicationSet.Instances == nil {
t.ingesterAppends.WithLabelValues("none", "fail").Inc()
return errors.New("no instances found")
}
addr := replicationSet.Instances[0].Addr
Expand All @@ -159,7 +168,6 @@ func (t *Tee) sendOwnedStream(ctx context.Context, stream distributor.KeyedStrea
}
// Success here means the stream will be processed for both metrics and patterns
t.ingesterAppends.WithLabelValues(addr, "success").Inc()
t.ingesterMetricAppends.WithLabelValues(addr, "success").Inc()
return nil
}

Expand All @@ -177,6 +185,7 @@ func (t *Tee) Duplicate(tenant string, streams []distributor.KeyedStream) {
select {
case t.requestCh <- req:
t.teedRequests.WithLabelValues(tenant, "queued").Inc()
t.teeQueueSize.WithLabelValues(req.tenant).Inc()
return
default:
t.teedRequests.WithLabelValues(tenant, "dropped").Inc()
Expand All @@ -189,5 +198,5 @@ func (t *Tee) Duplicate(tenant string, streams []distributor.KeyedStream) {
// Stop will cancel any ongoing requests and stop the goroutine listening for requests
func (t *Tee) Stop() {
close(t.quit)
t.requestCh = make(chan request, t.cfg.TeeBufferSize)
t.requestCh = make(chan request, t.cfg.TeeQueueSize)
}

0 comments on commit c65ba1f

Please sign in to comment.