Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
support clean shutdowns of carbon listener. Add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
woodsaj committed Nov 9, 2016
1 parent d42b979 commit c8afdcf
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 10 deletions.
74 changes: 64 additions & 10 deletions input/carbon/carbon.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,39 @@ type Carbon struct {
stats met.Backend
listener *net.TCPListener
handlerWaitGroup sync.WaitGroup
quit chan struct{}
connTrack *ConnTrack
}

type ConnTrack struct {
sync.Mutex
conns map[string]net.Conn
}

func NewConnTrack() *ConnTrack {
return &ConnTrack{
conns: make(map[string]net.Conn),
}
}

func (c *ConnTrack) Add(conn net.Conn) {
c.Lock()
c.conns[conn.RemoteAddr().String()] = conn
c.Unlock()
}

func (c *ConnTrack) Remove(conn net.Conn) {
c.Lock()
delete(c.conns, conn.RemoteAddr().String())
c.Unlock()
}

func (c *ConnTrack) CloseAll() {
c.Lock()
for _, conn := range c.conns {
conn.Close()
}
c.Unlock()
}

func (c *Carbon) Name() string {
Expand All @@ -55,7 +88,7 @@ func ConfigProcess() {
var err error
schemas, err = persister.ReadWhisperSchemas(schemasFile)
if err != nil {
log.Fatal(4, "can't read schemas file %q: %s", schemasFile, err.Error())
log.Fatal(4, "carbon-in: can't read schemas file %q: %s", schemasFile, err.Error())
}
var defaultFound bool
for _, schema := range schemas {
Expand All @@ -69,7 +102,7 @@ func ConfigProcess() {
if !defaultFound {
// good graphite health (not sure what graphite does if there's no .*)
// but we definitely need to always be able to determine which interval to use
log.Fatal(4, "storage-conf does not have a default '.*' pattern")
log.Fatal(4, "carbon-in: storage-conf does not have a default '.*' pattern")
}

}
Expand All @@ -80,10 +113,12 @@ func New(stats met.Backend) *Carbon {
log.Fatal(4, err.Error())
}
return &Carbon{
addrStr: addr,
addr: addrT,
schemas: schemas,
stats: stats,
addrStr: addr,
addr: addrT,
schemas: schemas,
stats: stats,
quit: make(chan struct{}),
connTrack: NewConnTrack(),
}
}

Expand All @@ -102,30 +137,49 @@ func (c *Carbon) accept() {
for {
conn, err := c.listener.AcceptTCP()
if nil != err {
log.Error(4, err.Error())
break
select {
case <-c.quit:
// we are shutting down.
return
default:
}
log.Error(4, "carbon-in: Accept Error: %s", err.Error())
return
}
c.handlerWaitGroup.Add(1)
c.connTrack.Add(conn)
go c.handle(conn)
}
}

func (c *Carbon) Stop() {
log.Info("carbon-in: shutting down.")
close(c.quit)
c.listener.Close()
c.connTrack.CloseAll()
c.handlerWaitGroup.Wait()
}

func (c *Carbon) handle(conn net.Conn) {
defer conn.Close()
defer func() {
conn.Close()
c.connTrack.Remove(conn)
}()
// TODO c.SetTimeout(60e9)
r := bufio.NewReaderSize(conn, 4096)
for {
// note that we don't support lines longer than 4096B. that seems very reasonable..
buf, _, err := r.ReadLine()

if nil != err {
select {
case <-c.quit:
// we are shutting down.
break
default:
}
if io.EOF != err {
log.Error(4, err.Error())
log.Error(4, "carbon-in: Recv error: %s", err.Error())
}
break
}
Expand Down
112 changes: 112 additions & 0 deletions input/carbon/carbon_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package carbon

import (
"fmt"
"net"
"regexp"
"sync"
"testing"
"time"

"github.com/benbjohnson/clock"
"github.com/lomik/go-carbon/persister"
"github.com/raintank/met/helper"
"github.com/raintank/metrictank/idx/memory"
"github.com/raintank/metrictank/mdata"
"github.com/raintank/metrictank/usage"
"gopkg.in/raintank/schema.v1"
)

func Test_HandleMessage(t *testing.T) {
stats, _ := helper.New(false, "", "standard", "metrictank", "")
mdata.CluStatus = mdata.NewClusterStatus("test", false)
mdata.InitMetrics(stats)
store := mdata.NewDevnullStore()
aggmetrics := mdata.NewAggMetrics(store, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0))
metricIndex := memory.New()
metricIndex.Init(stats)
usage := usage.New(300, aggmetrics, metricIndex, clock.New())
Enabled = true
addr = "localhost:2003"
var err error
s := persister.Schema{
Name: "default",
RetentionStr: "1s:1d",
Pattern: regexp.MustCompile(".*"),
}
if err != nil {
panic(err)
}
s.Retentions, err = persister.ParseRetentionDefs(s.RetentionStr)
if err != nil {
panic(err)
}

schemas = persister.WhisperSchemas{s}
c := New(stats)
c.Start(aggmetrics, metricIndex, usage)

allMetrics := make(map[string]int)
var mu sync.Mutex
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int, t *testing.T) {
metrics := test_handleMessage(i, t)
mu.Lock()
for mId, id := range metrics {
allMetrics[mId] = id
}
mu.Unlock()
wg.Done()
}(i, t)
}
wg.Wait()
c.Stop()
defs := metricIndex.List(-1)
if len(defs) != len(allMetrics) {
t.Fatalf("query for org -1 should result in %d distinct metrics. not %d", len(allMetrics), len(defs))
}

for _, d := range defs {
id := allMetrics[d.Id]
if d.Name != fmt.Sprintf("some.id.%d", id) {
t.Fatalf("incorrect name for %s : %s, expected %s", d.Id, d.Name, fmt.Sprintf("some.id.%d", id))
}
if d.OrgId != 1 {
t.Fatalf("incorrect OrgId for %s : %d", d.Id, d.OrgId)
}
}
}

func test_handleMessage(worker int, t *testing.T) map[string]int {
conn, _ := net.Dial("tcp", "127.0.0.1:2003")
defer conn.Close()
metrics := make(map[string]int)
for m := 0; m < 4; m++ {
id := (worker + 1) * (m + 1)
t.Logf("worker %d metric %d -> adding metric with id %d and orgid %d", worker, m, id, 1)
md := &schema.MetricData{
Name: fmt.Sprintf("some.id.%d", id),
Metric: fmt.Sprintf("some.id.%d", id),
Interval: 1,
Value: 1234.567,
Unit: "unknown",
Time: int64(id),
Mtype: "gauge",
Tags: []string{},
OrgId: 1, // admin org
}
md.SetId()
metrics[md.Id] = id
t.Logf("%s = %s", md.Id, md.Name)
_, err := fmt.Fprintf(conn, fmt.Sprintf("%s %f %d\n", md.Name, md.Value, md.Time))
if err != nil {
t.Fatal(err)
}
}
// as soon as this function ends, the server will close the socket. We need to sleep here
// to ensure that the packets have time to be procesed by the kernel and passed to the server.
time.Sleep(time.Millisecond * 100)
return metrics
}

0 comments on commit c8afdcf

Please sign in to comment.