From 842015c616987e37c5303a0a527634134ab9095c Mon Sep 17 00:00:00 2001 From: Victor Vrantchan Date: Tue, 8 Jan 2019 10:52:19 -0500 Subject: [PATCH 1/3] implement library for tracking net.Listener connections --- ntrack/listener.go | 91 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100644 ntrack/listener.go diff --git a/ntrack/listener.go b/ntrack/listener.go new file mode 100644 index 0000000..b17ac4c --- /dev/null +++ b/ntrack/listener.go @@ -0,0 +1,91 @@ +package ntrack + +import ( + "context" + "fmt" + "net" + "sync/atomic" + + "github.com/pkg/errors" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" +) + +type trackingListener struct { + net.Listener + stats *Stats +} + +func NewInstrumentedListener(lis net.Listener) (net.Listener, *Stats) { + listenerStats := &Stats{} + listenerStats.init() + + return &trackingListener{ + Listener: lis, + stats: listenerStats, + }, listenerStats +} + +func (tl *trackingListener) Accept() (net.Conn, error) { + conn, err := tl.Listener.Accept() + stats.RecordWithTags(context.TODO(), []tag.Mutator{tag.Upsert(tl.stats.TagSuccess, fmt.Sprintf("%v", err == nil))}, tl.stats.ListenerAccepted.M(1)) + if err != nil { + return nil, errors.Wrap(err, "accept from base listener") + } + + atomic.AddInt64(&tl.stats.openConnections, 1) + open := atomic.LoadInt64(&tl.stats.openConnections) + stats.Record(context.TODO(), tl.stats.OpenConnections.M(open)) + return &serverConn{Conn: conn, stats: tl.stats}, nil +} + +type serverConn struct { + net.Conn + stats *Stats +} + +func (sc *serverConn) Close() error { + err := sc.Conn.Close() + atomic.AddInt64(&sc.stats.openConnections, -1) + open := atomic.LoadInt64(&sc.stats.openConnections) + stats.Record(context.TODO(), + sc.stats.OpenConnections.M(open), + sc.stats.LifetimeClosedConnections.M(1), + ) + return errors.Wrap(err, "close server conn") +} + +type Stats struct { + ListenerAccepted *stats.Int64Measure + LifetimeClosedConnections *stats.Int64Measure + OpenConnections *stats.Int64Measure + openConnections int64 + + TagSuccess tag.Key + + views []*view.View +} + +func (s *Stats) init() { + s.ListenerAccepted = stats.Int64("ntrack/listener/accepts", "The number of Accept calls on the net.Listener", stats.UnitDimensionless) + s.LifetimeClosedConnections = stats.Int64("ntrack/listener/closed", "The number of Close calls on the net.Listener", stats.UnitDimensionless) + s.OpenConnections = stats.Int64("ntrack/listener/open", "The number of Open connections from the net.Listener", stats.UnitDimensionless) + + s.TagSuccess, _ = tag.NewKey("success") + + tags := []tag.Key{s.TagSuccess} + s.views = append(s.views, viewFromStat(s.ListenerAccepted, tags, view.Count())) + s.views = append(s.views, viewFromStat(s.OpenConnections, nil, view.LastValue())) + s.views = append(s.views, viewFromStat(s.LifetimeClosedConnections, nil, view.Count())) +} + +func viewFromStat(ss *stats.Int64Measure, tags []tag.Key, agg *view.Aggregation) *view.View { + return &view.View{ + Name: ss.Name(), + Measure: ss, + Description: ss.Description(), + TagKeys: tags, + Aggregation: agg, + } +} From e6d29450e57c2ca85bc7cfc14b8c3221f1c9f462 Mon Sep 17 00:00:00 2001 From: Victor Vrantchan Date: Wed, 9 Jan 2019 13:40:38 -0500 Subject: [PATCH 2/3] some tests --- ntrack/listener_test.go | 96 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 ntrack/listener_test.go diff --git a/ntrack/listener_test.go b/ntrack/listener_test.go new file mode 100644 index 0000000..3b0c6f9 --- /dev/null +++ b/ntrack/listener_test.go @@ -0,0 +1,96 @@ +package ntrack + +import ( + "fmt" + "net" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opencensus.io/stats/view" +) + +func TestListener(t *testing.T) { + var tests = []struct { + viewName string + disableKeepalive bool + expectedValue int64 + }{ + { + viewName: "ntrack/listener/accepts", + disableKeepalive: true, + expectedValue: 5, + }, + { + viewName: "ntrack/listener/accepts", + disableKeepalive: false, + expectedValue: 1, + }, + { + viewName: "ntrack/listener/closed", + disableKeepalive: true, + expectedValue: 5, + }, + { + viewName: "ntrack/listener/open", + disableKeepalive: true, + expectedValue: 0, + }, + { + viewName: "ntrack/listener/open", + disableKeepalive: false, + expectedValue: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.viewName, func(t *testing.T) { + lis, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + ilis, stats := NewInstrumentedListener(lis) + view.Register(stats.views...) + + testClientConnections(t, ilis, tt.disableKeepalive) + + rows, err := view.RetrieveData(tt.viewName) + require.NoError(t, err) + + switch data := rows[0].Data.(type) { + case *view.CountData: + assert.Equal(t, tt.expectedValue, data.Value) + case *view.LastValueData: + assert.Equal(t, float64(tt.expectedValue), data.Value) + } + view.Unregister(stats.views...) + }) + } +} + +func testClientConnections(t *testing.T, lis net.Listener, disableKeepalive bool) { + t.Helper() + + srv := &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }), + } + + go func() { + if err := srv.Serve(lis); err != nil { + t.Fatal(err) + } + }() + + tr := &http.Transport{DisableKeepAlives: disableKeepalive} + client := &http.Client{Transport: tr} + + requestCount := 5 + for i := 0; i < requestCount; i++ { + resp, err := client.Get(fmt.Sprintf("http://%s", lis.Addr())) + require.NoError(t, err) + resp.Body.Close() + } + +} From 9eeffc748ac51c9af2e7d31b6607434ccbfa88ee Mon Sep 17 00:00:00 2001 From: Victor Vrantchan Date: Wed, 9 Jan 2019 14:36:48 -0500 Subject: [PATCH 3/3] nits --- ntrack/listener.go | 17 +++++++++-------- ntrack/listener_test.go | 21 +++++++++++++++++++-- 2 files changed, 28 insertions(+), 10 deletions(-) diff --git a/ntrack/listener.go b/ntrack/listener.go index b17ac4c..5bcfa82 100644 --- a/ntrack/listener.go +++ b/ntrack/listener.go @@ -34,8 +34,7 @@ func (tl *trackingListener) Accept() (net.Conn, error) { return nil, errors.Wrap(err, "accept from base listener") } - atomic.AddInt64(&tl.stats.openConnections, 1) - open := atomic.LoadInt64(&tl.stats.openConnections) + open := atomic.AddInt64(&tl.stats.openConnections, 1) stats.Record(context.TODO(), tl.stats.OpenConnections.M(open)) return &serverConn{Conn: conn, stats: tl.stats}, nil } @@ -47,8 +46,7 @@ type serverConn struct { func (sc *serverConn) Close() error { err := sc.Conn.Close() - atomic.AddInt64(&sc.stats.openConnections, -1) - open := atomic.LoadInt64(&sc.stats.openConnections) + open := atomic.AddInt64(&sc.stats.openConnections, -1) stats.Record(context.TODO(), sc.stats.OpenConnections.M(open), sc.stats.LifetimeClosedConnections.M(1), @@ -64,7 +62,9 @@ type Stats struct { TagSuccess tag.Key - views []*view.View + ListenerAcceptedView *view.View + LifetimeClosedConnectionsView *view.View + OpenConnectionsView *view.View } func (s *Stats) init() { @@ -75,9 +75,10 @@ func (s *Stats) init() { s.TagSuccess, _ = tag.NewKey("success") tags := []tag.Key{s.TagSuccess} - s.views = append(s.views, viewFromStat(s.ListenerAccepted, tags, view.Count())) - s.views = append(s.views, viewFromStat(s.OpenConnections, nil, view.LastValue())) - s.views = append(s.views, viewFromStat(s.LifetimeClosedConnections, nil, view.Count())) + + s.ListenerAcceptedView = viewFromStat(s.ListenerAccepted, tags, view.Count()) + s.OpenConnectionsView = viewFromStat(s.OpenConnections, nil, view.LastValue()) + s.LifetimeClosedConnectionsView = viewFromStat(s.LifetimeClosedConnections, nil, view.Count()) } func viewFromStat(ss *stats.Int64Measure, tags []tag.Key, agg *view.Aggregation) *view.View { diff --git a/ntrack/listener_test.go b/ntrack/listener_test.go index 3b0c6f9..eee1f46 100644 --- a/ntrack/listener_test.go +++ b/ntrack/listener_test.go @@ -50,7 +50,7 @@ func TestListener(t *testing.T) { require.NoError(t, err) ilis, stats := NewInstrumentedListener(lis) - view.Register(stats.views...) + registerViewByName(t, tt.viewName, stats, false) testClientConnections(t, ilis, tt.disableKeepalive) @@ -63,11 +63,28 @@ func TestListener(t *testing.T) { case *view.LastValueData: assert.Equal(t, float64(tt.expectedValue), data.Value) } - view.Unregister(stats.views...) + registerViewByName(t, tt.viewName, stats, true) }) } } +func registerViewByName(t *testing.T, name string, stats *Stats, unregister bool) { + var v *view.View + switch name { + case "ntrack/listener/accepts": + v = stats.ListenerAcceptedView + case "ntrack/listener/open": + v = stats.OpenConnectionsView + case "ntrack/listener/closed": + v = stats.LifetimeClosedConnectionsView + } + if unregister { + view.Unregister(v) + } else { + view.Register(v) + } +} + func testClientConnections(t *testing.T, lis net.Listener, disableKeepalive bool) { t.Helper()