Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(outputs): Implement partial write errors #16146

Merged
merged 3 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions internal/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,23 @@ func (e *FatalError) Error() string {
func (e *FatalError) Unwrap() error {
return e.Err
}

// PartialWriteError indicate that only a subset of the metrics were written
// successfully (i.e. accepted). The rejected metrics should be removed from
// the buffer without being successfully written. Please note: the metrics
// are specified as indices into the batch to be able to reference tracking
// metrics correctly.
type PartialWriteError struct {
Err error
MetricsAccept []int
MetricsReject []int
Comment on lines +48 to +49
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using bit arrays here would lead to memory savings and performance improvements. Switching to them would be a significant change to the PR and I think the code is sound as it is, so not recommending that, but maybe as a future performance enhancement.

I would continue to use separate vars for "accept" and "reject", just switch them to bit arrays. They would be fixed in size at len(metrics)/8+1 bytes long, vs len(accepted)*64, which I think in many cases would be approaching len(metrics)*64. Not sure what the typical use case is here, would take some measurement of real-world scenarios, but I think chances are good it would be a decent savings.

The other advantage is that it can support simplified operations. The metrics to drop from the WAL after a single write can be found with the bit-wise union of the two bit arrays for accept and reject. Then you union that again with the long-running mask for the WAL. If all the bits are 1 the whole WAL is done. If not, to find out what prefix to remove from the WAL, you find the first non-zero bit in the mask.

Here's an example of a bit-array library you could use as a reference. I would not use a library, but instead write a simplified one that supplies just the operations needed here. When bit-array data structures are not made general they can be surprisingly small in code.

https://github.com/yourbasic/bit

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I do agree that using a bitmask is better it seems a premature optimization at this point. We really need to optimize this further but IMO this also must include optimizing disk I/O as we are currently scratching the disk badly. In this step, the removal-masking should move into a dedicated WAL implementation and be converted to a bit-mask as you suggest. What do you think?

MetricsRejectErrors []error
DStrand1 marked this conversation as resolved.
Show resolved Hide resolved
}

func (e *PartialWriteError) Error() string {
return e.Err.Error()
}

func (e *PartialWriteError) Unwrap() error {
return e.Err
}
89 changes: 71 additions & 18 deletions models/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,73 @@ import (
)

var (
AgentMetricsWritten = selfstat.Register("agent", "metrics_written", make(map[string]string))
AgentMetricsDropped = selfstat.Register("agent", "metrics_dropped", make(map[string]string))
AgentMetricsWritten = selfstat.Register("agent", "metrics_written", make(map[string]string))
AgentMetricsRejected = selfstat.Register("agent", "metrics_rejected", make(map[string]string))
AgentMetricsDropped = selfstat.Register("agent", "metrics_dropped", make(map[string]string))

registerGob = sync.OnceFunc(func() { metric.Init() })
)

type Transaction struct {
// Batch of metrics to write
Batch []telegraf.Metric

// Accept denotes the indices of metrics that were successfully written
Accept []int
// Reject denotes the indices of metrics that were not written but should
// not be requeued
Reject []int

// Marks this transaction as valid
valid bool

// Internal state that can be used by the buffer implementation
state interface{}
}
Comment on lines +20 to +35
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "transaction" is the wrong word to use here. In computing, a transaction is a sequence of steps that is executed atomically. Either it runs fully, or not at all. The structure Transaction here records partially completed writes. It is explicitly providing support for a sequence of actions that is allowed to partially complete. Maybe I would call it PartialBatch, or something else to indicate it need not all succeed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm IMO it is a transaction on the buffer. The buffer is not modified until EndTransaction is called on the buffer. Let's keep this for now.


func (tx *Transaction) AcceptAll() {
tx.Accept = make([]int, len(tx.Batch))
for i := range tx.Batch {
tx.Accept[i] = i
}
}

func (tx *Transaction) KeepAll() {}

func (tx *Transaction) InferKeep() []int {
used := make([]bool, len(tx.Batch))
for _, idx := range tx.Accept {
used[idx] = true
}
for _, idx := range tx.Reject {
used[idx] = true
}

keep := make([]int, 0, len(tx.Batch))
for i := range tx.Batch {
if !used[i] {
keep = append(keep, i)
}
}
return keep
}

type Buffer interface {
// Len returns the number of metrics currently in the buffer.
Len() int

// Add adds metrics to the buffer and returns number of dropped metrics.
Add(metrics ...telegraf.Metric) int

// Batch returns a slice containing up to batchSize of the oldest metrics not
// yet dropped. Metrics are ordered from oldest to newest in the batch. The
// batch must not be modified by the client.
Batch(batchSize int) []telegraf.Metric

// Accept marks the batch, acquired from Batch(), as successfully written.
Accept(metrics []telegraf.Metric)
// Batch starts a transaction by returning a slice of metrics up to the
// given batch-size starting from the oldest metric in the buffer. Metrics
// are ordered from oldest to newest and must not be modified by the plugin.
BeginTransaction(batchSize int) *Transaction

// Reject returns the batch, acquired from Batch(), to the buffer and marks it
// as unsent.
Reject([]telegraf.Metric)
// Flush ends a metric and persists the buffer state
EndTransaction(*Transaction)

// Stats returns the buffer statistics such as rejected, dropped and accepred metrics
// Stats returns the buffer statistics such as rejected, dropped and accepted metrics
Stats() BufferStats

// Close finalizes the buffer and closes all open resources
Expand All @@ -45,11 +86,12 @@ type Buffer interface {
// BufferStats holds common metrics used for buffer implementations.
// Implementations of Buffer should embed this struct in them.
type BufferStats struct {
MetricsAdded selfstat.Stat
MetricsWritten selfstat.Stat
MetricsDropped selfstat.Stat
BufferSize selfstat.Stat
BufferLimit selfstat.Stat
MetricsAdded selfstat.Stat
MetricsWritten selfstat.Stat
MetricsRejected selfstat.Stat
MetricsDropped selfstat.Stat
BufferSize selfstat.Stat
BufferLimit selfstat.Stat
}

// NewBuffer returns a new empty Buffer with the given capacity.
Expand Down Expand Up @@ -84,6 +126,11 @@ func NewBufferStats(name, alias string, capacity int) BufferStats {
"metrics_written",
tags,
),
MetricsRejected: selfstat.Register(
"write",
"metrics_rejected",
tags,
),
MetricsDropped: selfstat.Register(
"write",
"metrics_dropped",
Expand Down Expand Up @@ -115,6 +162,12 @@ func (b *BufferStats) metricWritten(m telegraf.Metric) {
m.Accept()
}

func (b *BufferStats) metricRejected(m telegraf.Metric) {
AgentMetricsRejected.Incr(1)
b.MetricsRejected.Incr(1)
m.Reject()
}

func (b *BufferStats) metricDropped(m telegraf.Metric) {
AgentMetricsDropped.Incr(1)
b.MetricsDropped.Incr(1)
Expand Down
129 changes: 88 additions & 41 deletions models/buffer_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"log"
"path/filepath"
"slices"
"sort"
"sync"

"github.com/tidwall/wal"
Expand All @@ -31,6 +33,11 @@ type DiskBuffer struct {
// we have to do our best and track that the walfile "should" be empty, so that next
// write, we can remove the invalid entry (also skipping this entry if it is being read).
isEmpty bool

// The mask contains offsets of metric already removed during a previous
// transaction. Metrics at those offsets should not be contained in new
// batches.
mask []int
}

func NewDiskBuffer(name, id, path string, stats BufferStats) (*DiskBuffer, error) {
Expand Down Expand Up @@ -67,7 +74,11 @@ func (b *DiskBuffer) length() int {
if b.isEmpty {
return 0
}
// Special case for when the read index is zero, it must be empty (otherwise it would be >= 1)

return b.entries() - len(b.mask)
}

func (b *DiskBuffer) entries() int {
if b.readIndex() == 0 {
return 0
}
Expand Down Expand Up @@ -121,28 +132,33 @@ func (b *DiskBuffer) addSingleMetric(m telegraf.Metric) bool {
return false
}

func (b *DiskBuffer) Batch(batchSize int) []telegraf.Metric {
func (b *DiskBuffer) BeginTransaction(batchSize int) *Transaction {
b.Lock()
defer b.Unlock()

if b.length() == 0 {
// no metrics in the wal file, so return an empty array
return make([]telegraf.Metric, 0)
return &Transaction{}
}
b.batchFirst = b.readIndex()
var metrics []telegraf.Metric

b.batchSize = 0

metrics := make([]telegraf.Metric, 0, batchSize)
offsets := make([]int, 0, batchSize)
readIndex := b.batchFirst
endIndex := b.writeIndex()
offset := 0
for batchSize > 0 && readIndex < endIndex {
data, err := b.file.Read(readIndex)
if err != nil {
panic(err)
}
readIndex++
offset++

m, err := metric.FromBytes(data)
if slices.Contains(b.mask, offset) {
// Metric is masked by a previous write and is scheduled for removal
continue
}

// Validate that a tracking metric is from this instance of telegraf and skip ones from older instances.
// A tracking metric can be skipped here because metric.Accept() is only called once data is successfully
Expand All @@ -152,11 +168,12 @@ func (b *DiskBuffer) Batch(batchSize int) []telegraf.Metric {
// - ErrSkipTracking: means that the tracking information was unable to be found for a tracking ID.
// - Outside of range: means that the metric was guaranteed to be left over from the previous instance
// as it was here when we opened the wal file in this instance.
if errors.Is(err, metric.ErrSkipTracking) {
// could not look up tracking information for metric, skip
continue
}
m, err := metric.FromBytes(data)
if err != nil {
if errors.Is(err, metric.ErrSkipTracking) {
// could not look up tracking information for metric, skip
continue
}
// non-recoverable error in deserialization, abort
log.Printf("E! raw metric data: %v", data)
panic(err)
Expand All @@ -167,33 +184,82 @@ func (b *DiskBuffer) Batch(batchSize int) []telegraf.Metric {
}

metrics = append(metrics, m)
offsets = append(offsets, offset)
b.batchSize++
batchSize--
}
return metrics
return &Transaction{Batch: metrics, valid: true, state: offsets}
}

func (b *DiskBuffer) Accept(batch []telegraf.Metric) {
func (b *DiskBuffer) EndTransaction(tx *Transaction) {
if len(tx.Batch) == 0 {
return
}

// Ignore invalid transactions and make sure they can only be finished once
if !tx.valid {
return
}
tx.valid = false

// Get the metric offsets from the transaction
offsets := tx.state.([]int)

b.Lock()
defer b.Unlock()

if b.batchSize == 0 || len(batch) == 0 {
// nothing to accept
// Mark metrics which should be removed in the internal mask
remove := make([]int, 0, len(tx.Accept)+len(tx.Reject))
for _, idx := range tx.Accept {
b.metricWritten(tx.Batch[idx])
remove = append(remove, offsets[idx])
}
for _, idx := range tx.Reject {
b.metricRejected(tx.Batch[idx])
remove = append(remove, offsets[idx])
}
b.mask = append(b.mask, remove...)
sort.Ints(b.mask)
DStrand1 marked this conversation as resolved.
Show resolved Hide resolved

// Remove the metrics that are marked for removal from the front of the
// WAL file. All other metrics must be kept.
if len(b.mask) == 0 || b.mask[0] != 0 {
// Mask is empty or the first index is not the front of the file, so
// exit early as there is nothing to remove
return
}
for _, m := range batch {
b.metricWritten(m)

// Determine up to which index we can remove the entries from the WAL file
var removeIdx int
for i, offset := range b.mask {
if offset != i {
break
}
removeIdx = offset
}
if b.length() == len(batch) {
b.emptyFile()

// Remove the metrics in front from the WAL file
b.isEmpty = b.entries()-removeIdx-1 <= 0
if b.isEmpty {
// WAL files cannot be fully empty but need to contain at least one
// item to not throw an error
if err := b.file.TruncateFront(b.writeIndex()); err != nil {
log.Printf("E! batch length: %d, first: %d, size: %d", len(tx.Batch), b.batchFirst, b.batchSize)
panic(err)
srebhan marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
err := b.file.TruncateFront(b.batchFirst + uint64(len(batch)))
if err != nil {
log.Printf("E! batch length: %d, batchFirst: %d, batchSize: %d", len(batch), b.batchFirst, b.batchSize)
if err := b.file.TruncateFront(b.batchFirst + uint64(removeIdx+1)); err != nil {
log.Printf("E! batch length: %d, first: %d, size: %d", len(tx.Batch), b.batchFirst, b.batchSize)
panic(err)
}
}

// Truncate the mask and update the relative offsets
b.mask = b.mask[:removeIdx]
for i := range b.mask {
b.mask[i] -= removeIdx
}

// check if the original end index is still valid, clear if not
if b.originalEnd < b.readIndex() {
b.originalEnd = 0
Expand All @@ -203,14 +269,6 @@ func (b *DiskBuffer) Accept(batch []telegraf.Metric) {
b.BufferSize.Set(int64(b.length()))
}

func (b *DiskBuffer) Reject(_ []telegraf.Metric) {
// very little to do here as the disk buffer retains metrics in
// the wal file until a call to accept
b.Lock()
defer b.Unlock()
b.resetBatch()
}

func (b *DiskBuffer) Stats() BufferStats {
return b.BufferStats
}
Expand Down Expand Up @@ -238,14 +296,3 @@ func (b *DiskBuffer) handleEmptyFile() {
}
b.isEmpty = false
}

func (b *DiskBuffer) emptyFile() {
if b.isEmpty || b.length() == 0 {
return
}
if err := b.file.TruncateFront(b.writeIndex() - 1); err != nil {
log.Printf("E! writeIndex: %d, buffer len: %d", b.writeIndex(), b.length())
panic(err)
}
b.isEmpty = true
}
10 changes: 5 additions & 5 deletions models/buffer_disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ func TestDiskBufferRetainsTrackingInformation(t *testing.T) {
defer buf.Close()

buf.Add(mm)

batch := buf.Batch(1)
buf.Accept(batch)
tx := buf.BeginTransaction(1)
tx.AcceptAll()
buf.EndTransaction(tx)
require.Equal(t, 1, delivered)
}

Expand Down Expand Up @@ -85,11 +85,11 @@ func TestDiskBufferTrackingDroppedFromOldWal(t *testing.T) {
buf.Stats().MetricsDropped.Set(0)
defer buf.Close()

batch := buf.Batch(4)
tx := buf.BeginTransaction(4)

// Check that the tracking metric is skipped
expected := []telegraf.Metric{
metrics[0], metrics[1], metrics[2], metrics[4],
}
testutil.RequireMetricsEqual(t, expected, batch)
testutil.RequireMetricsEqual(t, expected, tx.Batch)
}
Loading
Loading