Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Ingester RF-1 #13365

Merged
merged 9 commits into from
Jul 3, 2024
51 changes: 31 additions & 20 deletions pkg/ingester-rf1/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,32 +571,43 @@ func (i *Ingester) loop() {
for {
select {
case <-flushTicker.C:
//i.logger.Log("msg", "starting periodic flush")
i.flushCtx.lock.Lock() // Stop new chunks being written while we swap destinations - we'll never unlock as this flushctx can no longer be used.
currentFlushCtx := i.flushCtx
// APIs become unblocked after resetting flushCtx
segmentWriter, err := wal.NewWalSegmentWriter()
if err != nil {
// TODO: handle this properly
panic(err)
}
i.flushCtx = &flushCtx{
lock: &sync.RWMutex{},
flushDone: make(chan struct{}),
newCtxAvailable: make(chan struct{}),
segmentWriter: segmentWriter,
}
close(currentFlushCtx.newCtxAvailable) // Broadcast to all waiters that they can now fetch a new flushCtx. Small chance of a race but if they re-fetch the old one, they'll just check again immediately.
// Flush the finished context in the background & then notify watching API requests
// TODO: use multiple flush queues if required
i.flushQueues[0].Enqueue(currentFlushCtx)

i.doFlushTick()
case <-i.loopQuit:
return
}
}
}

func (i *Ingester) doFlushTick() {
i.flushCtx.lock.Lock()
defer i.flushCtx.lock.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably doesn't hurt but we actually don't want to unlock this - the context is only locked once when its no longer accepting writes.
I wonder if its possible that this goroutine can gets pre-empted and another tick can fire before we swap in a new context? In that case we'd block the timer thread completely waiting on the lock... Best to leave the unlock here for now in that case!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right!


//i.logger.Log("msg", "starting periodic flush")
// Stop new chunks being written while we swap destinations - we'll never unlock as this flushctx can no longer be used.
currentFlushCtx := i.flushCtx
// Don't write empty files if there is nothing to write.
if currentFlushCtx.segmentWriter.InputSize() == 0 {
return
}

// APIs become unblocked after resetting flushCtx
segmentWriter, err := wal.NewWalSegmentWriter()
if err != nil {
// TODO: handle this properly
panic(err)
}
i.flushCtx = &flushCtx{
lock: &sync.RWMutex{},
flushDone: make(chan struct{}),
newCtxAvailable: make(chan struct{}),
segmentWriter: segmentWriter,
}
close(currentFlushCtx.newCtxAvailable) // Broadcast to all waiters that they can now fetch a new flushCtx. Small chance of a race but if they re-fetch the old one, they'll just check again immediately.
// Flush the finished context in the background & then notify watching API requests
// TODO: use multiple flush queues if required
i.flushQueues[0].Enqueue(currentFlushCtx)
}

// PrepareShutdown will handle the /ingester/prepare_shutdown endpoint.
//
// Internally, when triggered, this handler will configure the ingester service to release their resources whenever a SIGTERM is received.
Expand Down
Loading