diff --git a/.travis.yml b/.travis.yml index e1be0bf..23e425c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,5 @@ os: - linux - - osx sudo: false @@ -21,3 +20,5 @@ cache: notifications: email: false + +env: GOTFLAGS="-race" diff --git a/bw_stats.go b/bw_stats.go index 9b5162b..3936312 100644 --- a/bw_stats.go +++ b/bw_stats.go @@ -1,11 +1,9 @@ package metrics import ( - "sync" - + flow "github.com/libp2p/go-flow-metrics" peer "github.com/libp2p/go-libp2p-peer" protocol "github.com/libp2p/go-libp2p-protocol" - gm "github.com/whyrusleeping/go-metrics" ) type Stats struct { @@ -16,74 +14,70 @@ type Stats struct { } type BandwidthCounter struct { - lock sync.Mutex - totalIn gm.Meter - totalOut gm.Meter - reg gm.Registry + totalIn flow.Meter + totalOut flow.Meter + + protocolIn flow.MeterRegistry + protocolOut flow.MeterRegistry + + peerIn flow.MeterRegistry + peerOut flow.MeterRegistry } func NewBandwidthCounter() *BandwidthCounter { - reg := gm.NewRegistry() - return &BandwidthCounter{ - totalIn: gm.GetOrRegisterMeter("totalIn", reg), - totalOut: gm.GetOrRegisterMeter("totalOut", reg), - reg: reg, - } + return new(BandwidthCounter) } func (bwc *BandwidthCounter) LogSentMessage(size int64) { - bwc.totalOut.Mark(size) + bwc.totalOut.Mark(uint64(size)) } func (bwc *BandwidthCounter) LogRecvMessage(size int64) { - bwc.totalIn.Mark(size) + bwc.totalIn.Mark(uint64(size)) } func (bwc *BandwidthCounter) LogSentMessageStream(size int64, proto protocol.ID, p peer.ID) { - meter := gm.GetOrRegisterMeter("/peer/out/"+string(p), bwc.reg) - meter.Mark(size) - - pmeter := gm.GetOrRegisterMeter("/proto/out/"+string(proto), bwc.reg) - pmeter.Mark(size) + bwc.protocolOut.Get(string(proto)).Mark(uint64(size)) + bwc.peerOut.Get(string(p)).Mark(uint64(size)) } func (bwc *BandwidthCounter) LogRecvMessageStream(size int64, proto protocol.ID, p peer.ID) { - meter := gm.GetOrRegisterMeter("/peer/in/"+string(p), bwc.reg) - meter.Mark(size) - - pmeter := gm.GetOrRegisterMeter("/proto/in/"+string(proto), bwc.reg) - pmeter.Mark(size) + bwc.protocolIn.Get(string(proto)).Mark(uint64(size)) + bwc.peerIn.Get(string(p)).Mark(uint64(size)) } func (bwc *BandwidthCounter) GetBandwidthForPeer(p peer.ID) (out Stats) { - inMeter := gm.GetOrRegisterMeter("/peer/in/"+string(p), bwc.reg).Snapshot() - outMeter := gm.GetOrRegisterMeter("/peer/out/"+string(p), bwc.reg).Snapshot() + inSnap := bwc.peerIn.Get(string(p)).Snapshot() + outSnap := bwc.peerOut.Get(string(p)).Snapshot() return Stats{ - TotalIn: inMeter.Count(), - TotalOut: outMeter.Count(), - RateIn: inMeter.RateFine(), - RateOut: outMeter.RateFine(), + TotalIn: int64(inSnap.Total), + TotalOut: int64(outSnap.Total), + RateIn: inSnap.Rate, + RateOut: outSnap.Rate, } } func (bwc *BandwidthCounter) GetBandwidthForProtocol(proto protocol.ID) (out Stats) { - inMeter := gm.GetOrRegisterMeter(string("/proto/in/"+proto), bwc.reg).Snapshot() - outMeter := gm.GetOrRegisterMeter(string("/proto/out/"+proto), bwc.reg).Snapshot() + inSnap := bwc.protocolIn.Get(string(proto)).Snapshot() + outSnap := bwc.protocolOut.Get(string(proto)).Snapshot() return Stats{ - TotalIn: inMeter.Count(), - TotalOut: outMeter.Count(), - RateIn: inMeter.RateFine(), - RateOut: outMeter.RateFine(), + TotalIn: int64(inSnap.Total), + TotalOut: int64(outSnap.Total), + RateIn: inSnap.Rate, + RateOut: outSnap.Rate, } } func (bwc *BandwidthCounter) GetBandwidthTotals() (out Stats) { + inSnap := bwc.totalIn.Snapshot() + outSnap := bwc.totalOut.Snapshot() + return Stats{ - TotalIn: bwc.totalIn.Count(), - TotalOut: bwc.totalOut.Count(), - RateIn: bwc.totalIn.RateFine(), - RateOut: bwc.totalOut.RateFine(), + TotalIn: int64(inSnap.Total), + TotalOut: int64(outSnap.Total), + RateIn: inSnap.Rate, + RateOut: outSnap.Rate, } } diff --git a/metrics_test.go b/metrics_test.go new file mode 100644 index 0000000..ec69944 --- /dev/null +++ b/metrics_test.go @@ -0,0 +1,134 @@ +package metrics + +import ( + "fmt" + "math" + "sync" + "testing" + "time" + + peer "github.com/libp2p/go-libp2p-peer" + protocol "github.com/libp2p/go-libp2p-protocol" +) + +func BenchmarkBandwidthCounter(b *testing.B) { + b.StopTimer() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + bwc := NewBandwidthCounter() + round(bwc, b) + } +} + +func round(bwc *BandwidthCounter, b *testing.B) { + start := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(10000) + for i := 0; i < 1000; i++ { + p := peer.ID(fmt.Sprintf("peer-%d", i)) + for j := 0; j < 10; j++ { + proto := protocol.ID(fmt.Sprintf("bitswap-%d", j)) + go func() { + defer wg.Done() + <-start + + for i := 0; i < 1000; i++ { + bwc.LogSentMessage(100) + bwc.LogSentMessageStream(100, proto, p) + time.Sleep(1 * time.Millisecond) + } + }() + } + } + + b.StartTimer() + close(start) + wg.Wait() + b.StopTimer() +} + +// Allow 5% errors for bw calculations. +const acceptableError = 0.05 + +func TestBandwidthCounter(t *testing.T) { + bwc := NewBandwidthCounter() + start := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(200) + for i := 0; i < 100; i++ { + p := peer.ID(fmt.Sprintf("peer-%d", i)) + for j := 0; j < 2; j++ { + proto := protocol.ID(fmt.Sprintf("proto-%d", j)) + go func() { + defer wg.Done() + <-start + + t := time.NewTicker(100 * time.Millisecond) + defer t.Stop() + + for i := 0; i < 40; i++ { + bwc.LogSentMessage(100) + bwc.LogRecvMessage(50) + bwc.LogSentMessageStream(100, proto, p) + bwc.LogRecvMessageStream(50, proto, p) + <-t.C + } + }() + } + } + + close(start) + time.Sleep(2*time.Second + 100*time.Millisecond) + + for i := 0; i < 100; i++ { + stats := bwc.GetBandwidthForPeer(peer.ID(fmt.Sprintf("peer-%d", i))) + assertApproxEq(t, 2000, stats.RateOut) + assertApproxEq(t, 1000, stats.RateIn) + } + + for i := 0; i < 2; i++ { + stats := bwc.GetBandwidthForProtocol(protocol.ID(fmt.Sprintf("proto-%d", i))) + assertApproxEq(t, 100000, stats.RateOut) + assertApproxEq(t, 50000, stats.RateIn) + } + + { + stats := bwc.GetBandwidthTotals() + assertApproxEq(t, 200000, stats.RateOut) + assertApproxEq(t, 100000, stats.RateIn) + } + + wg.Wait() + time.Sleep(1 * time.Second) + for i := 0; i < 100; i++ { + stats := bwc.GetBandwidthForPeer(peer.ID(fmt.Sprintf("peer-%d", i))) + assertEq(t, 8000, stats.TotalOut) + assertEq(t, 4000, stats.TotalIn) + } + + for i := 0; i < 2; i++ { + stats := bwc.GetBandwidthForProtocol(protocol.ID(fmt.Sprintf("proto-%d", i))) + assertEq(t, 400000, stats.TotalOut) + assertEq(t, 200000, stats.TotalIn) + } + + { + stats := bwc.GetBandwidthTotals() + assertEq(t, 800000, stats.TotalOut) + assertEq(t, 400000, stats.TotalIn) + } +} + +func assertEq(t *testing.T, expected, actual int64) { + if expected != actual { + t.Errorf("expected %d, got %d", expected, actual) + } +} + +func assertApproxEq(t *testing.T, expected, actual float64) { + margin := expected * acceptableError + if !(math.Abs(expected-actual) <= margin) { + t.Errorf("expected %f (±%f), got %f", expected, margin, actual) + } +} diff --git a/package.json b/package.json index 8e3a25b..e7675ed 100644 --- a/package.json +++ b/package.json @@ -13,12 +13,6 @@ "name": "go-libp2p-peer", "version": "2.2.1" }, - { - "author": "whyrusleeping", - "hash": "QmeYJHEk8UjVVZ4XCRTZe6dFQrb8pGWD81LYCgeLp8CvMB", - "name": "go-metrics", - "version": "0.0.0" - }, { "author": "whyrusleeping", "hash": "QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN", @@ -36,6 +30,12 @@ "hash": "QmU4vCDZTPLDqSDKguWbHCiUe46mZUtmM2g2suBZ9NE8ko", "name": "go-libp2p-net", "version": "2.0.3" + }, + { + "author": "Stebalien", + "hash": "QmQFXpvKpF34dK9HcE7k8Ksk8V4BwWYZtdEcjzu5aUgRVr", + "name": "go-flow-metrics", + "version": "0.2.0" } ], "gxVersion": "0.9.1",