Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add dynamic buffer size for log publication #1630

Merged
merged 5 commits into from
Mar 1, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
stash changes for rebase
  • Loading branch information
zackattack01 committed Feb 29, 2024
commit 9d7dc2d5d233ac8dd78a0bd90b5700293e64bdbb
5 changes: 1 addition & 4 deletions pkg/osquery/extension.go
Original file line number Diff line number Diff line change
@@ -704,7 +704,6 @@ func (e *Extension) writeBufferedLogsForType(typ logger.LogType) error {
// Collect up logs to be sent
var logs []string
var logIDs [][]byte
batchTotalBytes := 0
err = e.knapsack.BboltDB().View(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(bucketName))

@@ -730,7 +729,6 @@ func (e *Extension) writeBufferedLogsForType(typ logger.LogType) error {
)
} else if e.logPublicationState.exceedsCurrentBatchThreshold(totalBytes + len(v)) {
// Buffer is filled. Break the loop and come back later.
batchTotalBytes = totalBytes
break
} else {
logs = append(logs, string(v))
@@ -761,7 +759,6 @@ func (e *Extension) writeBufferedLogsForType(typ logger.LogType) error {
return nil
}

e.logPublicationState.addPendingBatch(batchTotalBytes)
err = e.writeLogsWithReenroll(context.Background(), typ, logs, true)
if err != nil {
return fmt.Errorf("writing logs: %w", err)
@@ -789,7 +786,7 @@ func (e *Extension) writeLogsWithReenroll(ctx context.Context, typ logger.LogTyp
// publication was successful- update logPublicationState and move on
if !invalid && err == nil {
// todo inform lps of success and batch size
e.logPublicationState.noteBatchComplete(false)
e.logPublicationState.recordBatchSuccess(logs)
return nil
}

59 changes: 40 additions & 19 deletions pkg/osquery/log_publication_state.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
package osquery

import (
"container/list"
)

const (
// set minimum batch size to 0.5 mb as lower bound for correction
// minBytesPerBatch sets the minimum batch size to 0.5mb as lower bound for correction
minBytesPerBatch = 524288
// batchIncrementAmount (0.5mb) is the incremental increase amount for the target batch
// size when previous runs have been successful
batchIncrementAmount = 524288

// batchHistoryLen is the number of logPublicationBatches to retain in our publishedBatches linked list
batchHistoryLen = 15
)

// logPublicationState holds stateful logic to influence the log batch publication size
@@ -13,10 +23,9 @@ type (
logPublicationState struct {
// maxBytesPerBatch is passed in from the extension opts and respected
// as a fixed upper limit for batch size, regardless of publication success/failure rates
maxBytesPerBatch int
currentMaxBytesPerBatch int
currentPendingBatch *logPublicationBatch
publishedBatches []*logPublicationBatch
maxBytesPerBatch int
currentMaxBytesPerBatch int
publishedBatches *list.List
}

logPublicationBatch struct {
@@ -29,31 +38,33 @@ func NewLogPublicationState(maxBytesPerBatch int) *logPublicationState {
return &logPublicationState{
maxBytesPerBatch: maxBytesPerBatch,
currentMaxBytesPerBatch: maxBytesPerBatch,
publishedBatches: make([]*logPublicationBatch, 0),
publishedBatches: list.New(),
}
}

func (lps *logPublicationState) exceedsCurrentBatchThreshold(amountBytes int) bool {
return amountBytes > lps.currentMaxBytesPerBatch
}

func (lps *logPublicationState) addPendingBatch(amountBytes int) {
lps.currentPendingBatch = &logPublicationBatch{
batchSizeBytes: amountBytes,
}
func (lps *logPublicationState) recordBatchSuccess(logs []string) {
lps.recordBatch(logs, false)
lps.increaseBatchThreshold()
}

func (lps *logPublicationState) recordBatchTimedOut(logs []string) {
lps.recordBatch(logs, true)
lps.reduceBatchThreshold()
}

func (lps *logPublicationState) noteBatchComplete(timedOut bool) {
lps.currentPendingBatch.timedOut = timedOut
lps.publishedBatches = append(lps.publishedBatches, lps.currentPendingBatch)
if len(lps.publishedBatches) > 10 {
lps.publishedBatches = lps.publishedBatches[1:]
func (lps *logPublicationState) recordBatch(logs []string, timedOut bool) {
newLogBatch := &logPublicationBatch{
batchSizeBytes: logBatchSize(logs),
timedOut: timedOut,
}

if timedOut {
lps.reduceBatchThreshold()
} else {
lps.increaseBatchThreshold()
lps.publishedBatches.PushBack(newLogBatch)
if lps.publishedBatches.Len() > batchHistoryLen {
lps.publishedBatches.Remove(lps.publishedBatches.Front())
}
}

@@ -74,3 +85,13 @@ func (lps *logPublicationState) increaseBatchThreshold() {
newTargetThreshold := lps.currentMaxBytesPerBatch * 2
lps.currentMaxBytesPerBatch = minInt(newTargetThreshold, lps.maxBytesPerBatch)
}

// logBatchSize determines the total batch size attempted given the published log batch
func logBatchSize(logs []string) int {
totalBatchSize := 0
for _, log := range logs {
totalBatchSize += len(log)
}

return totalBatchSize
}