Skip to content

Commit

Permalink
use httpAddr as instance in metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzhichang committed May 21, 2021
1 parent 93d15ad commit 167b0b4
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 101 deletions.
92 changes: 47 additions & 45 deletions cmd/clickhouse_sinker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ var (

cmdOps CmdOptions
selfIP string
httpAddr string
httpMetrics = promhttp.Handler()
runner *Sinker
)
Expand Down Expand Up @@ -146,6 +147,51 @@ func GenTask(cfg *config.Config) (taskImpl *task.Service) {

func main() {
util.Run("clickhouse_sinker", func() error {
// Initialize http server for metrics and debug
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`
<html><head><title>ClickHouse Sinker</title></head>
<body>
<h1>ClickHouse Sinker</h1>
<p><a href="/metrics">Metrics</a></p>
<p><a href="/ready">Ready</a></p>
<p><a href="/ready?full=1">Ready Full</a></p>
<p><a href="/live">Live</a></p>
<p><a href="/live?full=1">Live Full</a></p>
<p><a href="/debug/pprof/">pprof</a></p>
</body></html>`))
})

mux.Handle("/metrics", httpMetrics)
mux.HandleFunc("/ready", health.Health.ReadyEndpoint) // GET /ready?full=1
mux.HandleFunc("/live", health.Health.LiveEndpoint) // GET /live?full=1
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)

// cmdOps.HTTPPort=0: let OS choose the listen port, and record the exact metrics URL to log.
httpPort := cmdOps.HTTPPort
if httpPort != 0 {
httpPort = util.GetSpareTCPPort(httpPort)
}
httpAddr = fmt.Sprintf(":%d", httpPort)
listener, err := net.Listen("tcp", httpAddr)
if err != nil {
util.Logger.Fatal("net.Listen failed", zap.String("httpAddr", httpAddr), zap.Error(err))
}
httpPort = util.GetNetAddrPort(listener.Addr())
httpAddr = fmt.Sprintf("%s:%d", selfIP, httpPort)
util.Logger.Info(fmt.Sprintf("Run http server at http://%s/", httpAddr))

go func() {
if err := http.Serve(listener, mux); err != nil {
util.Logger.Error("http.ListenAndServe failed", zap.Error(err))
}
}()

var rcm config.RemoteConfManager
var properties map[string]interface{}
if cmdOps.NacosDataID != "" {
Expand All @@ -170,50 +216,6 @@ func main() {
runner = NewSinker(rcm)
return runner.Init()
}, func() error {
go func() {
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`
<html><head><title>ClickHouse Sinker</title></head>
<body>
<h1>ClickHouse Sinker</h1>
<p><a href="/metrics">Metrics</a></p>
<p><a href="/ready">Ready</a></p>
<p><a href="/ready?full=1">Ready Full</a></p>
<p><a href="/live">Live</a></p>
<p><a href="/live?full=1">Live Full</a></p>
<p><a href="/debug/pprof/">pprof</a></p>
</body></html>`))
})

mux.Handle("/metrics", httpMetrics)
mux.HandleFunc("/ready", health.Health.ReadyEndpoint) // GET /ready?full=1
mux.HandleFunc("/live", health.Health.LiveEndpoint) // GET /live?full=1

mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)

// cmdOps.HTTPPort=0: let OS choose the listen port, and record the exact metrics URL to log.
httpPort := cmdOps.HTTPPort
if httpPort != 0 {
httpPort = util.GetSpareTCPPort(httpPort)
}
httpAddr := fmt.Sprintf(":%d", httpPort)
listener, err := net.Listen("tcp", httpAddr)
if err != nil {
util.Logger.Fatal("net.Listen failed", zap.String("httpAddr", httpAddr), zap.Error(err))
}
httpPort = util.GetNetAddrPort(listener.Addr())
util.Logger.Info(fmt.Sprintf("Run http server at http://%s:%d/", selfIP, httpPort))

if err := http.Serve(listener, mux); err != nil {
util.Logger.Error("http.ListenAndServe failed", zap.Error(err))
}
}()

runner.Run()
return nil
}, func() error {
Expand Down Expand Up @@ -249,7 +251,7 @@ func (s *Sinker) Run() {
var newCfg *config.Config
if cmdOps.PushGatewayAddrs != "" {
addrs := strings.Split(cmdOps.PushGatewayAddrs, ",")
s.pusher = statistics.NewPusher(addrs, cmdOps.PushInterval)
s.pusher = statistics.NewPusher(addrs, cmdOps.PushInterval, httpAddr)
if err = s.pusher.Init(); err != nil {
return
}
Expand Down
41 changes: 2 additions & 39 deletions statistics/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package statistics
import (
"context"
"math/rand"
"net"
"time"

"github.com/housepower/clickhouse_sinker/util"
Expand Down Expand Up @@ -185,11 +184,12 @@ type Pusher struct {
cancel context.CancelFunc
}

func NewPusher(addrs []string, interval int) *Pusher {
func NewPusher(addrs []string, interval int, selfAddr string) *Pusher {
return &Pusher{
pgwAddrs: addrs,
pushInterval: interval,
inUseAddr: -1,
instance: selfAddr,
}
}

Expand All @@ -201,7 +201,6 @@ func (p *Pusher) Init() error {
if len(p.pgwAddrs) == 0 || p.pushInterval <= 0 {
return errPgwEmpty
}
p.instance = p.getInstance()
p.reconnect()
return nil
}
Expand Down Expand Up @@ -258,39 +257,3 @@ func (p *Pusher) reconnect() {
Grouping("instance", p.instance).Format(expfmt.FmtText)
p.inUseAddr = nextAddr
}

func (p *Pusher) getInstance() string {
addrs, err := net.InterfaceAddrs()
if err != nil {
return "unknown"
}

for _, addr := range addrs {
if ipnet, ok := addr.(*net.IPNet); ok {
if p.IsExternalIP(ipnet.IP) {
return ipnet.IP.String()
}
}
}
return "unknown"
}

func (p *Pusher) IsExternalIP(ip net.IP) bool {
if ip.IsLoopback() || ip.IsLinkLocalMulticast() || ip.IsLinkLocalUnicast() {
return false
}

if ip4 := ip.To4(); ip4 != nil {
switch {
case ip4[0] == 10:
return false
case ip4[0] == 172 && ip4[1] >= 16 && ip4[1] <= 31:
return false
case ip4[0] == 192 && ip4[1] == 168:
return false
default:
return true
}
}
return false
}
23 changes: 6 additions & 17 deletions statistics/statistics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,21 @@ package statistics

import (
"context"
"net"
"fmt"
"testing"
"time"

"github.com/housepower/clickhouse_sinker/util"
"github.com/stretchr/testify/require"
)

func TestPusher_IsExternalIP(t *testing.T) {
addrs := []string{"172.24.25.1:9091"}
interval := 30
pusher := NewPusher(addrs, interval)

external := pusher.IsExternalIP(net.ParseIP("192.168.154.134"))
require.Equal(t, false, external, "192.168.154.134 should not be external ip")

external = pusher.IsExternalIP(net.ParseIP("127.0.0.1"))
require.Equal(t, false, external, "127.0.0.1 should not be external ip")

external = pusher.IsExternalIP(net.ParseIP("43.230.88.7"))
require.Equal(t, true, external, "43.230.88.7 should be external ip")
}

func TestPusher(t *testing.T) {
addrs := []string{"172.24.25.1:9091", "172.24.25.2:9091"}
interval := 1
pusher := NewPusher(addrs, interval)
selfIP, _ := util.GetOutboundIP()
selfPort := util.GetSpareTCPPort(1024)
selfAddr := fmt.Sprintf("%s:%d", selfIP, selfPort)
pusher := NewPusher(addrs, interval, selfAddr)

err := pusher.Init()
require.Nilf(t, err, "pusher init failed")
Expand Down

0 comments on commit 167b0b4

Please sign in to comment.