diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 633fc587ffd4..e1cca3755b55 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -22,10 +22,12 @@ import ( // Config for a querier. type Config struct { + TailMaxDuration time.Duration `yaml:"tail_max_duration"` } // RegisterFlags register flags. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + f.DurationVar(&cfg.TailMaxDuration, "querier.tail-max-duration", 1*time.Hour, "Limit the duration for which live tailing request would be served") } // Querier handlers queries. @@ -262,7 +264,7 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, return q.queryDroppedStreams(ctx, req, from, to, labels) }, func(connectedIngestersAddr []string) (map[string]logproto.Querier_TailClient, error) { return q.tailDisconnectedIngesters(ctx, req, connectedIngestersAddr) - }), nil + }, q.cfg.TailMaxDuration), nil } // passed to tailer for querying dropped streams diff --git a/pkg/querier/tail.go b/pkg/querier/tail.go index 766fa984685d..a0845e011eef 100644 --- a/pkg/querier/tail.go +++ b/pkg/querier/tail.go @@ -83,12 +83,13 @@ type Tailer struct { querierTailClients map[string]logproto.Querier_TailClient // addr -> grpc clients for tailing logs from ingesters querierTailClientsMtx sync.Mutex - stopped bool - blocked bool - blockedMtx sync.RWMutex - delayFor time.Duration - responseChan chan *TailResponse - closeErrChan chan error + stopped bool + blocked bool + blockedMtx sync.RWMutex + delayFor time.Duration + responseChan chan *TailResponse + closeErrChan chan error + tailMaxDuration time.Duration // when tail client is slow, drop entry and store its details in droppedEntries to notify client droppedEntries []droppedEntry @@ -103,22 +104,31 @@ func (t *Tailer) readTailClients() { // keeps sending oldest entry to responseChan. If channel is blocked drop the entry // When channel is unblocked, send details of dropped entries with current entry func (t *Tailer) loop() { - ticker := time.NewTicker(checkConnectionsWithIngestersPeriod) - defer ticker.Stop() + checkConnectionTicker := time.NewTicker(checkConnectionsWithIngestersPeriod) + defer checkConnectionTicker.Stop() + + tailMaxDurationTicker := time.NewTicker(t.tailMaxDuration) + defer tailMaxDurationTicker.Stop() tailResponse := new(TailResponse) for { if t.stopped { - break + return } select { - case <-ticker.C: + case <-checkConnectionTicker.C: // Try to reconnect dropped ingesters and connect to new ingesters if err := t.checkIngesterConnections(); err != nil { - level.Error(util.Logger).Log("Error reconnecting to disconnected ingesters", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error reconnecting to disconnected ingesters", "err", err) + } + case <-tailMaxDurationTicker.C: + if err := t.close(); err != nil { + level.Error(util.Logger).Log("msg", "Error closing Tailer", "err", err) } + t.closeErrChan <- errors.New("reached tail max duration limit") + return default: } @@ -134,8 +144,8 @@ func (t *Tailer) loop() { if err := t.close(); err != nil { level.Error(util.Logger).Log("Error closing Tailer", fmt.Sprintf("%v", err)) } - t.closeErrChan <- errors.New("All ingesters closed the connection") - break + t.closeErrChan <- errors.New("all ingesters closed the connection") + return } time.Sleep(nextEntryWait) continue @@ -316,7 +326,7 @@ func (t *Tailer) getCloseErrorChan() <-chan error { func newTailer(delayFor time.Duration, querierTailClients map[string]logproto.Querier_TailClient, queryDroppedStreams func(from, to time.Time, labels string) (iter.EntryIterator, error), - tailDisconnectedIngesters func([]string) (map[string]logproto.Querier_TailClient, error)) *Tailer { + tailDisconnectedIngesters func([]string) (map[string]logproto.Querier_TailClient, error), tailMaxDuration time.Duration) *Tailer { t := Tailer{ openStreamIterator: iter.NewHeapIterator([]iter.EntryIterator{}, logproto.FORWARD), //droppedStreamsIterator: &droppedStreamsIterator{}, @@ -326,6 +336,7 @@ func newTailer(delayFor time.Duration, querierTailClients map[string]logproto.Qu responseChan: make(chan *TailResponse, bufferSizeForTailResponse), closeErrChan: make(chan error), tailDisconnectedIngesters: tailDisconnectedIngesters, + tailMaxDuration: tailMaxDuration, } t.readTailClients()