Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added tail length limit to limit duration of live tailing of logs to 1 hour #756

Merged
merged 4 commits into from
Jul 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (

// Config for a querier.
type Config struct {
TailMaxDuration time.Duration `yaml:"tail_max_duration"`
}

// RegisterFlags register flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.TailMaxDuration, "querier.tail-max-duration", 1*time.Hour, "Limit the duration for which live tailing request would be served")
}

// Querier handlers queries.
Expand Down Expand Up @@ -262,7 +264,7 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer,
return q.queryDroppedStreams(ctx, req, from, to, labels)
}, func(connectedIngestersAddr []string) (map[string]logproto.Querier_TailClient, error) {
return q.tailDisconnectedIngesters(ctx, req, connectedIngestersAddr)
}), nil
}, q.cfg.TailMaxDuration), nil
}

// passed to tailer for querying dropped streams
Expand Down
39 changes: 25 additions & 14 deletions pkg/querier/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,13 @@ type Tailer struct {
querierTailClients map[string]logproto.Querier_TailClient // addr -> grpc clients for tailing logs from ingesters
querierTailClientsMtx sync.Mutex

stopped bool
blocked bool
blockedMtx sync.RWMutex
delayFor time.Duration
responseChan chan *TailResponse
closeErrChan chan error
stopped bool
blocked bool
blockedMtx sync.RWMutex
delayFor time.Duration
responseChan chan *TailResponse
closeErrChan chan error
tailMaxDuration time.Duration

// when tail client is slow, drop entry and store its details in droppedEntries to notify client
droppedEntries []droppedEntry
Expand All @@ -103,22 +104,31 @@ func (t *Tailer) readTailClients() {
// keeps sending oldest entry to responseChan. If channel is blocked drop the entry
// When channel is unblocked, send details of dropped entries with current entry
func (t *Tailer) loop() {
ticker := time.NewTicker(checkConnectionsWithIngestersPeriod)
defer ticker.Stop()
checkConnectionTicker := time.NewTicker(checkConnectionsWithIngestersPeriod)
defer checkConnectionTicker.Stop()

tailMaxDurationTicker := time.NewTicker(t.tailMaxDuration)
defer tailMaxDurationTicker.Stop()

tailResponse := new(TailResponse)

for {
if t.stopped {
break
return
}

select {
case <-ticker.C:
case <-checkConnectionTicker.C:
// Try to reconnect dropped ingesters and connect to new ingesters
if err := t.checkIngesterConnections(); err != nil {
level.Error(util.Logger).Log("Error reconnecting to disconnected ingesters", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error reconnecting to disconnected ingesters", "err", err)
}
case <-tailMaxDurationTicker.C:
if err := t.close(); err != nil {
level.Error(util.Logger).Log("msg", "Error closing Tailer", "err", err)
}
t.closeErrChan <- errors.New("reached tail max duration limit")
return
default:
}

Expand All @@ -134,8 +144,8 @@ func (t *Tailer) loop() {
if err := t.close(); err != nil {
level.Error(util.Logger).Log("Error closing Tailer", fmt.Sprintf("%v", err))
}
t.closeErrChan <- errors.New("All ingesters closed the connection")
break
t.closeErrChan <- errors.New("all ingesters closed the connection")
return
}
time.Sleep(nextEntryWait)
continue
Expand Down Expand Up @@ -316,7 +326,7 @@ func (t *Tailer) getCloseErrorChan() <-chan error {

func newTailer(delayFor time.Duration, querierTailClients map[string]logproto.Querier_TailClient,
queryDroppedStreams func(from, to time.Time, labels string) (iter.EntryIterator, error),
tailDisconnectedIngesters func([]string) (map[string]logproto.Querier_TailClient, error)) *Tailer {
tailDisconnectedIngesters func([]string) (map[string]logproto.Querier_TailClient, error), tailMaxDuration time.Duration) *Tailer {
t := Tailer{
openStreamIterator: iter.NewHeapIterator([]iter.EntryIterator{}, logproto.FORWARD),
//droppedStreamsIterator: &droppedStreamsIterator{},
Expand All @@ -326,6 +336,7 @@ func newTailer(delayFor time.Duration, querierTailClients map[string]logproto.Qu
responseChan: make(chan *TailResponse, bufferSizeForTailResponse),
closeErrChan: make(chan error),
tailDisconnectedIngesters: tailDisconnectedIngesters,
tailMaxDuration: tailMaxDuration,
}

t.readTailClients()
Expand Down