Skip to content

Commit

Permalink
Implement partial successful write handling
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan committed Aug 13, 2024
1 parent ce98024 commit 32ba732
Show file tree
Hide file tree
Showing 9 changed files with 450 additions and 77 deletions.
9 changes: 4 additions & 5 deletions internal/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@ func (e *FatalError) Unwrap() error {

// WriteError
type WriteError struct {
Err error
MetricsErrors []error
MetricsSuccess []int
MetricsFailRetry []int
MetricsFailFatal []int
Err error
MetricsErrors []error
MetricsSuccess []int
MetricsFatal []int
}

func (e *WriteError) Error() string {
Expand Down
72 changes: 65 additions & 7 deletions models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (r *RunningOutput) Write() error {

atomic.StoreInt64(&r.newMetricsCount, 0)

// Only process the metrics in the buffer now. Metrics added while we are
// Only process the metrics in the buffer now. Metrics added while we are
// writing will be sent on the next call.
nBuffer := r.buffer.Len()
nBatches := nBuffer/r.MetricBatchSize + 1
Expand All @@ -294,9 +294,38 @@ func (r *RunningOutput) Write() error {
break
}

err := r.writeMetrics(batch)
if err != nil {
r.buffer.Reject(batch)
if err := r.writeMetrics(batch); err != nil {
var writeErr *internal.WriteError
if errors.As(err, &writeErr) {
// Translate the indices of the write error back to metrics
accept := make([]telegraf.Metric, 0, len(writeErr.MetricsSuccess))
dropped := make([]telegraf.Metric, 0, len(writeErr.MetricsFatal))
keep := make([]telegraf.Metric, 0, len(batch)-len(writeErr.MetricsSuccess)-len(writeErr.MetricsFatal))
used := make([]bool, len(batch))
for _, idx := range writeErr.MetricsSuccess {
accept = append(accept, batch[idx])
used[idx] = true
}
for _, idx := range writeErr.MetricsFatal {
dropped = append(dropped, batch[idx])
used[idx] = true
}
for i, m := range batch {
if !used[i] {
keep = append(keep, m)
}
}

// Notify the buffer on what to do
r.buffer.Accept(accept)
r.buffer.Accept(dropped) // TODO: There should be a way to mark those as lost in the stats
r.buffer.Reject(keep)
if !errors.Is(err, internal.ErrSizeLimitReached) {
continue
}
} else {
r.buffer.Reject(batch)
}
return err
}
r.buffer.Accept(batch)
Expand All @@ -322,9 +351,38 @@ func (r *RunningOutput) WriteBatch() error {
return nil
}

err := r.writeMetrics(batch)
if err != nil {
r.buffer.Reject(batch)
if err := r.writeMetrics(batch); err != nil {
var writeErr *internal.WriteError
if errors.As(err, &writeErr) {
// Translate the indices of the write error back to metrics
accept := make([]telegraf.Metric, 0, len(writeErr.MetricsSuccess))
dropped := make([]telegraf.Metric, 0, len(writeErr.MetricsFatal))
keep := make([]telegraf.Metric, 0, len(batch)-len(writeErr.MetricsSuccess)-len(writeErr.MetricsFatal))
used := make([]bool, len(batch))
for _, idx := range writeErr.MetricsSuccess {
accept = append(accept, batch[idx])
used[idx] = true
}
for _, idx := range writeErr.MetricsFatal {
dropped = append(dropped, batch[idx])
used[idx] = true
}
for i, m := range batch {
if !used[i] {
keep = append(keep, m)
}
}

// Notify the buffer on what to do
r.buffer.Accept(accept)
r.buffer.Accept(dropped) // TODO: There should be a way to mark those as lost in the stats
r.buffer.Reject(keep)
if !errors.Is(err, internal.ErrSizeLimitReached) {
return nil
}
} else {
r.buffer.Reject(batch)
}
return err
}
r.buffer.Accept(batch)
Expand Down
Loading

0 comments on commit 32ba732

Please sign in to comment.