Skip to content

Commit

Permalink
[libbeat] Enable WriteAheadLimit in the disk queue (elastic#21391) (e…
Browse files Browse the repository at this point in the history
…lastic#21401)

(cherry picked from commit ef6274d)
  • Loading branch information
faec committed Sep 30, 2020
1 parent 136b1d8 commit 7e2c03d
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 12 deletions.
12 changes: 10 additions & 2 deletions libbeat/publisher/queue/diskqueue/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ func DefaultSettings() Settings {
MaxSegmentSize: 100 * (1 << 20), // 100MiB
MaxBufferSize: (1 << 30), // 1GiB

ReadAheadLimit: 256,
WriteAheadLimit: 1024,
ReadAheadLimit: 512,
WriteAheadLimit: 2048,
}
}

Expand All @@ -129,6 +129,14 @@ func SettingsForUserConfig(config *common.Config) (Settings, error) {
// divided by 10.
settings.MaxSegmentSize = uint64(userConfig.MaxSize) / 10
}

if userConfig.ReadAheadLimit != nil {
settings.ReadAheadLimit = *userConfig.ReadAheadLimit
}
if userConfig.WriteAheadLimit != nil {
settings.WriteAheadLimit = *userConfig.WriteAheadLimit
}

return settings, nil
}

Expand Down
28 changes: 18 additions & 10 deletions libbeat/publisher/queue/diskqueue/core_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,15 @@ func (dq *diskQueue) run() {
// The writer loop completed a request, so check if there is more
// data to be sent.
dq.maybeWritePending()
// We also check whether the reader loop is waiting for the data
// that was just written.

// The data that was just written is now available for reading, so check
// if we should start a new read request.
dq.maybeReadPending()

// pendingFrames should now be empty. If any producers were blocked
// because pendingFrames hit settings.WriteAheadLimit, wake them up.
dq.maybeUnblockProducers()

// Reader loop handling
case readerLoopResponse := <-dq.readerLoop.responseChan:
dq.handleReaderLoopResponse(readerLoopResponse)
Expand Down Expand Up @@ -417,22 +422,25 @@ func (dq *diskQueue) enqueueWriteFrame(frame *writeFrame) {
})
}

// canAcceptFrameOfSize checks whether there is enough free space in the
// queue (subject to settings.MaxBufferSize) to accept a new frame with
// the given size. Size includes both the serialized data and the frame
// header / footer; the easy way to do this for a writeFrame is to pass
// canAcceptFrameOfSize checks whether there is enough free space in the queue
// (subject to settings.{MaxBufferSize, WriteAheadLimit}) to accept a new
// frame with the given size. Size includes both the serialized data and the
// frame header / footer; the easy way to do this for a writeFrame is to pass
// in frame.sizeOnDisk().
// Capacity calculations do not include requests in the blockedProducers
// list (that data is owned by its callers and we can't touch it until
// we are ready to respond). That allows this helper to be used both while
// handling producer requests and while deciding whether to unblock
// producers after free capacity increases.
// If we decide to add limits on how many events / bytes can be stored
// in pendingFrames (to avoid unbounded memory use if the input is faster
// than the disk), this is the function to modify.
func (dq *diskQueue) canAcceptFrameOfSize(frameSize uint64) bool {
// If pendingFrames is already at the WriteAheadLimit, we can't accept
// any new frames right now.
if len(dq.pendingFrames) >= dq.settings.WriteAheadLimit {
return false
}

// If the queue size is unbounded (max == 0), we accept.
if dq.settings.MaxBufferSize == 0 {
// Currently we impose no limitations if the queue size is unbounded.
return true
}

Expand Down

0 comments on commit 7e2c03d

Please sign in to comment.