Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
simplify outbound to graphite conn routine
Browse files Browse the repository at this point in the history
  • Loading branch information
Dieterbe committed Dec 30, 2016
1 parent 5eac470 commit 650d027
Showing 1 changed file with 33 additions and 47 deletions.
80 changes: 33 additions & 47 deletions stats/out_graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package stats
import (
"bytes"
"net"
"sync"
"time"

"github.com/raintank/worldping-api/pkg/log"
Expand Down Expand Up @@ -86,60 +85,47 @@ func (g *Graphite) reporter(interval int) {
}

// graphiteWriter is the background workers that connects to graphite and submits all pending data to it
// TODO: conn.Write() returns no error for a while when the remote endpoint is down, the reconnect happens with a delay
// TODO: conn.Write() returns no error for a while when the remote endpoint is down, the reconnect happens with a delay. this can also cause lost data for a second or two.
func (g *Graphite) writer() {
connectTicker := time.Tick(time.Second)

lock := &sync.Mutex{}
var conn net.Conn

var err error
go func() {
for range connectTicker {
lock.Lock()
if conn == nil {
conn, err = net.Dial("tcp", g.addr)
if err == nil {
log.Info("stats now connected to %s", g.addr)
} else {
log.Warn("stats dialing %s failed: %s. will retry", g.addr, err.Error())
}
}
lock.Unlock()
}
}()
for buf := range g.toGraphite {
queueItems.Value(len(g.toGraphite))
var ok bool
for !ok {
for {
lock.Lock()
haveConn := (conn != nil)
connected.Set(haveConn)
lock.Unlock()
if haveConn {
break
}
time.Sleep(time.Second)
}
pre := time.Now()
lock.Lock()
_, err = conn.Write(buf)

assureConn := func() net.Conn {
connected.Set(conn != nil)
for conn == nil {
time.Sleep(time.Second)
conn, err = net.Dial("tcp", g.addr)
if err == nil {
ok = true
flushDuration.Value(time.Since(pre))
log.Info("stats now connected to %s", g.addr)
} else {
log.Warn("stats failed to write to graphite: %s (took %s). will retry...", err, time.Now().Sub(pre))
conn.Close()
conn = nil
connected.SetFalse()
log.Warn("stats dialing %s failed: %s. will retry", g.addr, err.Error())
}
lock.Unlock()
connected.Set(conn != nil)
}
return conn
}
lock.Lock()
if conn != nil {
conn.Close()

for {
select {
case <-connectTicker:
conn = assureConn()
case buf := <-g.toGraphite:
queueItems.Value(len(g.toGraphite))
var ok bool
for !ok {
conn = assureConn()
pre := time.Now()
_, err = conn.Write(buf)
if err == nil {
ok = true
flushDuration.Value(time.Since(pre))
} else {
log.Warn("stats failed to write to graphite: %s (took %s). will retry...", err, time.Now().Sub(pre))
conn.Close()
conn = nil
}
}
}
}
lock.Unlock()
}

0 comments on commit 650d027

Please sign in to comment.