Skip to content

Commit

Permalink
Deliver empty metric tracking group immediately (influxdata#5176)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored and otherpirate committed Mar 15, 2019
1 parent 8491a22 commit 59a6422
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 11 deletions.
16 changes: 15 additions & 1 deletion agent/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/influxdata/telegraf"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -127,6 +126,21 @@ func TestSetPrecision(t *testing.T) {
}
}

func TestAddTrackingMetricGroupEmpty(t *testing.T) {
ch := make(chan telegraf.Metric, 10)
metrics := []telegraf.Metric{}
acc := NewAccumulator(&TestMetricMaker{}, ch).WithTracking(1)

id := acc.AddTrackingMetricGroup(metrics)

select {
case tracking := <-acc.Delivered():
require.Equal(t, tracking.ID(), id)
default:
t.Fatal("empty group should be delivered immediately")
}
}

type TestMetricMaker struct {
}

Expand Down
28 changes: 18 additions & 10 deletions metric/tracking.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type trackingData struct {
rc int32
acceptCount int32
rejectCount int32
notify NotifyFunc
notifyFunc NotifyFunc
}

func (d *trackingData) incr() {
Expand All @@ -69,6 +69,16 @@ func (d *trackingData) reject() {
atomic.AddInt32(&d.rejectCount, 1)
}

func (d *trackingData) notify() {
d.notifyFunc(
&deliveryInfo{
id: d.id,
accepted: int(d.acceptCount),
rejected: int(d.rejectCount),
},
)
}

type trackingMetric struct {
telegraf.Metric
d *trackingData
Expand All @@ -82,7 +92,7 @@ func newTrackingMetric(metric telegraf.Metric, fn NotifyFunc) (telegraf.Metric,
rc: 1,
acceptCount: 0,
rejectCount: 0,
notify: fn,
notifyFunc: fn,
},
}

Expand All @@ -98,7 +108,7 @@ func newTrackingMetricGroup(group []telegraf.Metric, fn NotifyFunc) ([]telegraf.
rc: 0,
acceptCount: 0,
rejectCount: 0,
notify: fn,
notifyFunc: fn,
}

for i, m := range group {
Expand All @@ -114,6 +124,10 @@ func newTrackingMetricGroup(group []telegraf.Metric, fn NotifyFunc) ([]telegraf.
runtime.SetFinalizer(d, finalizer)
}

if len(group) == 0 {
d.notify()
}

return group, d.id
}

Expand Down Expand Up @@ -146,13 +160,7 @@ func (m *trackingMetric) decr() {
}

if v == 0 {
m.d.notify(
&deliveryInfo{
id: m.d.id,
accepted: int(m.d.acceptCount),
rejected: int(m.d.rejectCount),
},
)
m.d.notify()
}
}

Expand Down

0 comments on commit 59a6422

Please sign in to comment.