diff --git a/stats/out_graphite.go b/stats/out_graphite.go index ac971bbcea..8eb4878484 100644 --- a/stats/out_graphite.go +++ b/stats/out_graphite.go @@ -3,7 +3,6 @@ package stats import ( "bytes" "net" - "sync" "time" "github.com/raintank/worldping-api/pkg/log" @@ -86,60 +85,47 @@ func (g *Graphite) reporter(interval int) { } // graphiteWriter is the background workers that connects to graphite and submits all pending data to it -// TODO: conn.Write() returns no error for a while when the remote endpoint is down, the reconnect happens with a delay +// TODO: conn.Write() returns no error for a while when the remote endpoint is down, the reconnect happens with a delay. this can also cause lost data for a second or two. func (g *Graphite) writer() { connectTicker := time.Tick(time.Second) - - lock := &sync.Mutex{} var conn net.Conn - var err error - go func() { - for range connectTicker { - lock.Lock() - if conn == nil { - conn, err = net.Dial("tcp", g.addr) - if err == nil { - log.Info("stats now connected to %s", g.addr) - } else { - log.Warn("stats dialing %s failed: %s. will retry", g.addr, err.Error()) - } - } - lock.Unlock() - } - }() - for buf := range g.toGraphite { - queueItems.Value(len(g.toGraphite)) - var ok bool - for !ok { - for { - lock.Lock() - haveConn := (conn != nil) - connected.Set(haveConn) - lock.Unlock() - if haveConn { - break - } - time.Sleep(time.Second) - } - pre := time.Now() - lock.Lock() - _, err = conn.Write(buf) + + assureConn := func() net.Conn { + connected.Set(conn != nil) + for conn == nil { + time.Sleep(time.Second) + conn, err = net.Dial("tcp", g.addr) if err == nil { - ok = true - flushDuration.Value(time.Since(pre)) + log.Info("stats now connected to %s", g.addr) } else { - log.Warn("stats failed to write to graphite: %s (took %s). will retry...", err, time.Now().Sub(pre)) - conn.Close() - conn = nil - connected.SetFalse() + log.Warn("stats dialing %s failed: %s. will retry", g.addr, err.Error()) } - lock.Unlock() + connected.Set(conn != nil) } + return conn } - lock.Lock() - if conn != nil { - conn.Close() + + for { + select { + case <-connectTicker: + conn = assureConn() + case buf := <-g.toGraphite: + queueItems.Value(len(g.toGraphite)) + var ok bool + for !ok { + conn = assureConn() + pre := time.Now() + _, err = conn.Write(buf) + if err == nil { + ok = true + flushDuration.Value(time.Since(pre)) + } else { + log.Warn("stats failed to write to graphite: %s (took %s). will retry...", err, time.Now().Sub(pre)) + conn.Close() + conn = nil + } + } + } } - lock.Unlock() }