From c8afdcf5bc36eae18025e27b95f716bb5cf5bc82 Mon Sep 17 00:00:00 2001 From: woodsaj Date: Tue, 8 Nov 2016 23:32:15 +0800 Subject: [PATCH] support clean shutdowns of carbon listener. Add unit tests --- input/carbon/carbon.go | 74 ++++++++++++++++++++---- input/carbon/carbon_test.go | 112 ++++++++++++++++++++++++++++++++++++ 2 files changed, 176 insertions(+), 10 deletions(-) create mode 100644 input/carbon/carbon_test.go diff --git a/input/carbon/carbon.go b/input/carbon/carbon.go index c7974d7f35..b453c24e1c 100644 --- a/input/carbon/carbon.go +++ b/input/carbon/carbon.go @@ -29,6 +29,39 @@ type Carbon struct { stats met.Backend listener *net.TCPListener handlerWaitGroup sync.WaitGroup + quit chan struct{} + connTrack *ConnTrack +} + +type ConnTrack struct { + sync.Mutex + conns map[string]net.Conn +} + +func NewConnTrack() *ConnTrack { + return &ConnTrack{ + conns: make(map[string]net.Conn), + } +} + +func (c *ConnTrack) Add(conn net.Conn) { + c.Lock() + c.conns[conn.RemoteAddr().String()] = conn + c.Unlock() +} + +func (c *ConnTrack) Remove(conn net.Conn) { + c.Lock() + delete(c.conns, conn.RemoteAddr().String()) + c.Unlock() +} + +func (c *ConnTrack) CloseAll() { + c.Lock() + for _, conn := range c.conns { + conn.Close() + } + c.Unlock() } func (c *Carbon) Name() string { @@ -55,7 +88,7 @@ func ConfigProcess() { var err error schemas, err = persister.ReadWhisperSchemas(schemasFile) if err != nil { - log.Fatal(4, "can't read schemas file %q: %s", schemasFile, err.Error()) + log.Fatal(4, "carbon-in: can't read schemas file %q: %s", schemasFile, err.Error()) } var defaultFound bool for _, schema := range schemas { @@ -69,7 +102,7 @@ func ConfigProcess() { if !defaultFound { // good graphite health (not sure what graphite does if there's no .*) // but we definitely need to always be able to determine which interval to use - log.Fatal(4, "storage-conf does not have a default '.*' pattern") + log.Fatal(4, "carbon-in: storage-conf does not have a default '.*' pattern") } } @@ -80,10 +113,12 @@ func New(stats met.Backend) *Carbon { log.Fatal(4, err.Error()) } return &Carbon{ - addrStr: addr, - addr: addrT, - schemas: schemas, - stats: stats, + addrStr: addr, + addr: addrT, + schemas: schemas, + stats: stats, + quit: make(chan struct{}), + connTrack: NewConnTrack(), } } @@ -102,21 +137,34 @@ func (c *Carbon) accept() { for { conn, err := c.listener.AcceptTCP() if nil != err { - log.Error(4, err.Error()) - break + select { + case <-c.quit: + // we are shutting down. + return + default: + } + log.Error(4, "carbon-in: Accept Error: %s", err.Error()) + return } c.handlerWaitGroup.Add(1) + c.connTrack.Add(conn) go c.handle(conn) } } func (c *Carbon) Stop() { + log.Info("carbon-in: shutting down.") + close(c.quit) c.listener.Close() + c.connTrack.CloseAll() c.handlerWaitGroup.Wait() } func (c *Carbon) handle(conn net.Conn) { - defer conn.Close() + defer func() { + conn.Close() + c.connTrack.Remove(conn) + }() // TODO c.SetTimeout(60e9) r := bufio.NewReaderSize(conn, 4096) for { @@ -124,8 +172,14 @@ func (c *Carbon) handle(conn net.Conn) { buf, _, err := r.ReadLine() if nil != err { + select { + case <-c.quit: + // we are shutting down. + break + default: + } if io.EOF != err { - log.Error(4, err.Error()) + log.Error(4, "carbon-in: Recv error: %s", err.Error()) } break } diff --git a/input/carbon/carbon_test.go b/input/carbon/carbon_test.go new file mode 100644 index 0000000000..2bfd29034d --- /dev/null +++ b/input/carbon/carbon_test.go @@ -0,0 +1,112 @@ +package carbon + +import ( + "fmt" + "net" + "regexp" + "sync" + "testing" + "time" + + "github.com/benbjohnson/clock" + "github.com/lomik/go-carbon/persister" + "github.com/raintank/met/helper" + "github.com/raintank/metrictank/idx/memory" + "github.com/raintank/metrictank/mdata" + "github.com/raintank/metrictank/usage" + "gopkg.in/raintank/schema.v1" +) + +func Test_HandleMessage(t *testing.T) { + stats, _ := helper.New(false, "", "standard", "metrictank", "") + mdata.CluStatus = mdata.NewClusterStatus("test", false) + mdata.InitMetrics(stats) + store := mdata.NewDevnullStore() + aggmetrics := mdata.NewAggMetrics(store, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0)) + metricIndex := memory.New() + metricIndex.Init(stats) + usage := usage.New(300, aggmetrics, metricIndex, clock.New()) + Enabled = true + addr = "localhost:2003" + var err error + s := persister.Schema{ + Name: "default", + RetentionStr: "1s:1d", + Pattern: regexp.MustCompile(".*"), + } + if err != nil { + panic(err) + } + s.Retentions, err = persister.ParseRetentionDefs(s.RetentionStr) + if err != nil { + panic(err) + } + + schemas = persister.WhisperSchemas{s} + c := New(stats) + c.Start(aggmetrics, metricIndex, usage) + + allMetrics := make(map[string]int) + var mu sync.Mutex + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + go func(i int, t *testing.T) { + metrics := test_handleMessage(i, t) + mu.Lock() + for mId, id := range metrics { + allMetrics[mId] = id + } + mu.Unlock() + wg.Done() + }(i, t) + } + wg.Wait() + c.Stop() + defs := metricIndex.List(-1) + if len(defs) != len(allMetrics) { + t.Fatalf("query for org -1 should result in %d distinct metrics. not %d", len(allMetrics), len(defs)) + } + + for _, d := range defs { + id := allMetrics[d.Id] + if d.Name != fmt.Sprintf("some.id.%d", id) { + t.Fatalf("incorrect name for %s : %s, expected %s", d.Id, d.Name, fmt.Sprintf("some.id.%d", id)) + } + if d.OrgId != 1 { + t.Fatalf("incorrect OrgId for %s : %d", d.Id, d.OrgId) + } + } +} + +func test_handleMessage(worker int, t *testing.T) map[string]int { + conn, _ := net.Dial("tcp", "127.0.0.1:2003") + defer conn.Close() + metrics := make(map[string]int) + for m := 0; m < 4; m++ { + id := (worker + 1) * (m + 1) + t.Logf("worker %d metric %d -> adding metric with id %d and orgid %d", worker, m, id, 1) + md := &schema.MetricData{ + Name: fmt.Sprintf("some.id.%d", id), + Metric: fmt.Sprintf("some.id.%d", id), + Interval: 1, + Value: 1234.567, + Unit: "unknown", + Time: int64(id), + Mtype: "gauge", + Tags: []string{}, + OrgId: 1, // admin org + } + md.SetId() + metrics[md.Id] = id + t.Logf("%s = %s", md.Id, md.Name) + _, err := fmt.Fprintf(conn, fmt.Sprintf("%s %f %d\n", md.Name, md.Value, md.Time)) + if err != nil { + t.Fatal(err) + } + } + // as soon as this function ends, the server will close the socket. We need to sleep here + // to ensure that the packets have time to be procesed by the kernel and passed to the server. + time.Sleep(time.Millisecond * 100) + return metrics +}