From 251d359fb3a73d89b45ab3c4029d7f98994151b4 Mon Sep 17 00:00:00 2001 From: Kevin Burke Date: Mon, 9 Sep 2024 16:19:21 -0700 Subject: [PATCH] all: add Unix datagram support I tried updating https://github.com/segmentio/stats/pull/123 with ~50 commits but the diff got too messy. Easier to just add the changes on a new commit. --- .github/workflows/golangci-lint.yml | 4 +- buffer.go | 5 +- datadog/client.go | 77 ++++++++++++++----- datadog/client_test.go | 75 +++++++++++++++++- datadog/serializer.go | 3 +- datadog/server_test.go | 70 ++++++++++++++++- datadog/udp.go | 41 ++++++++++ datadog/uds.go | 113 ++++++++++++++++++++++++++++ datadog/uds_test.go | 49 ++++++++++++ engine_test.go | 4 +- go.mod | 8 +- go.sum | 9 ++- grafana/query_test.go | 4 +- httpstats/handler_test.go | 6 +- httpstats/metrics_test.go | 10 +-- httpstats/transport_test.go | 5 +- influxdb/client_test.go | 4 +- otlp/client.go | 5 +- otlp/go.mod | 17 ++++- otlp/go.sum | 32 +++++++- otlp/handler.go | 8 +- procstats/delaystats_darwin.go | 2 +- procstats/delaystats_test.go | 26 ++++--- procstats/linux/io_test.go | 16 ++-- procstats/linux/memory_darwin.go | 2 +- procstats/proc_darwin.go | 4 +- prometheus/handler_test.go | 4 +- prometheus/metric.go | 12 +-- 28 files changed, 519 insertions(+), 96 deletions(-) create mode 100644 datadog/udp.go create mode 100644 datadog/uds.go create mode 100644 datadog/uds_test.go diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index 14d8ffe..5ceba97 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -27,6 +27,6 @@ jobs: - uses: actions/checkout@v4 - name: golangci-lint - uses: golangci/golangci-lint-action@v6.0.1 + uses: golangci/golangci-lint-action@v6.1.0 with: - version: v1.59.1 + version: v1.61.0 diff --git a/buffer.go b/buffer.go index 97c9e67..db448eb 100644 --- a/buffer.go +++ b/buffer.go @@ -155,8 +155,9 @@ func (b *buffer) len() int { return len(b.data) } -func (b *buffer) flush(w io.Writer, n int) { - _, _ = w.Write(b.data[:n]) +func (b *buffer) flush(w io.Writer, n int) error { + _, err := w.Write(b.data[:n]) n = copy(b.data, b.data[n:]) b.data = b.data[:n] + return err } diff --git a/datadog/client.go b/datadog/client.go index efa5096..2f9d3aa 100644 --- a/datadog/client.go +++ b/datadog/client.go @@ -1,9 +1,11 @@ package datadog import ( + "io" "log" - "net" + "net/url" "os" + "strings" "time" "github.com/segmentio/stats/v5" @@ -40,6 +42,8 @@ var ( // The ClientConfig type is used to configure datadog clients. type ClientConfig struct { // Address of the datadog database to send metrics to. + // UDP: host:port (default) + // UDS: unixgram://dir/file.ext Address string // Maximum size of batch of events sent to datadog. @@ -106,15 +110,23 @@ func NewClientWith(config ClientConfig) *Client { }, } - conn, bufferSize, err := dial(config.Address, config.BufferSize) + w, err := newWriter(config.Address) if err != nil { log.Printf("stats/datadog: %s", err) + c.err = err + w = &noopWriter{} } - c.conn, c.err, c.bufferSize = conn, err, bufferSize - c.buffer.BufferSize = bufferSize + newBufSize, err := w.CalcBufferSize(config.BufferSize) + if err != nil { + log.Printf("stats/datadog: unable to calc writer's buffer size. Defaulting to a buffer of size %d - %v\n", DefaultBufferSize, err) + newBufSize = DefaultBufferSize + } + + c.bufferSize = newBufSize c.buffer.Serializer = &c.serializer - log.Printf("stats/datadog: sending metrics with a buffer of size %d B", bufferSize) + c.serializer.conn = w + log.Printf("stats/datadog: sending metrics with a buffer of size %d B", newBufSize) return c } @@ -140,18 +152,7 @@ func (c *Client) Close() error { return c.err } -func dial(address string, sizehint int) (conn net.Conn, bufsize int, err error) { - var f *os.File - - if conn, err = net.Dial("udp", address); err != nil { - return - } - - if f, err = conn.(*net.UDPConn).File(); err != nil { - conn.Close() - return - } - defer f.Close() +func bufSizeFromFD(f *os.File, sizehint int) (bufsize int, err error) { fd := int(f.Fd()) // The kernel refuses to send UDP datagrams that are larger than the size of @@ -160,7 +161,6 @@ func dial(address string, sizehint int) (conn net.Conn, bufsize int, err error) // to accept larger datagrams, or fallback to the default socket buffer size // if it failed. if bufsize, err = unix.GetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_SNDBUF); err != nil { - conn.Close() return } @@ -198,3 +198,44 @@ func dial(address string, sizehint int) (conn net.Conn, bufsize int, err error) _ = unix.SetNonblock(fd, true) return } + +type ddWriter interface { + io.WriteCloser + CalcBufferSize(desiredBufSize int) (int, error) +} + +func newWriter(addr string) (ddWriter, error) { + if strings.HasPrefix(addr, "unixgram://") || + strings.HasPrefix(addr, "udp://") { + u, err := url.Parse(addr) + if err != nil { + return nil, err + } + switch u.Scheme { + case "unixgram": + return newUDSWriter(u.Path) + case "udp": + return newUDPWriter(u.Path) + } + } + // default assume addr host:port to use UDP + return newUDPWriter(addr) +} + +// noopWriter is a writer that does nothing. +type noopWriter struct{} + +// Write writes nothing. +func (w *noopWriter) Write(_ []byte) (int, error) { + return 0, nil +} + +// Close is a noop close. +func (w *noopWriter) Close() error { + return nil +} + +// CalcBufferSize returns the sizehint. +func (w *noopWriter) CalcBufferSize(sizehint int) (int, error) { + return sizehint, nil +} diff --git a/datadog/client_test.go b/datadog/client_test.go index bacfac8..7cc39c3 100644 --- a/datadog/client_test.go +++ b/datadog/client_test.go @@ -61,6 +61,28 @@ func TestClientWithDistributionPrefixes(t *testing.T) { } } +func TestClient_UDS(t *testing.T) { + client := NewClient("unixgram://do-not-exist") + + for i := 0; i != 1000; i++ { + client.HandleMeasures(time.Time{}, stats.Measure{ + Name: "request", + Fields: []stats.Field{ + {Name: "count", Value: stats.ValueOf(5)}, + {Name: "rtt", Value: stats.ValueOf(100 * time.Millisecond)}, + }, + Tags: []stats.Tag{ + stats.T("answer", "42"), + stats.T("hello", "world"), + }, + }) + } + + if err := client.Close(); err != nil { + t.Error(err) + } +} + func TestClientWithUseDistributions(t *testing.T) { // Start a goroutine listening for packets and giving them back on packets chan packets := make(chan []byte) @@ -87,14 +109,24 @@ func TestClientWithUseDistributions(t *testing.T) { client.Flush() expectedPacket1 := "request.count:5|c|#answer:42,hello:world\nrequest.dist_rtt:0.1|d|#answer:42,hello:world\n" - assert.EqualValues(t, expectedPacket1, string(<-packets)) + select { + case packet := <-packets: + assert.EqualValues(t, expectedPacket1, string(packet)) + case <-time.After(2 * time.Second): + t.Fatal("no response after 2 seconds") + } client.useDistributions = false client.HandleMeasures(time.Time{}, testMeasure) client.Flush() expectedPacket2 := "request.count:5|c|#answer:42,hello:world\nrequest.dist_rtt:0.1|h|#answer:42,hello:world\n" - assert.EqualValues(t, expectedPacket2, string(<-packets)) + select { + case packet := <-packets: + assert.EqualValues(t, expectedPacket2, string(packet)) + case <-time.After(2 * time.Second): + t.Fatal("no response after 2 seconds") + } if err := client.Close(); err != nil { t.Error(err) @@ -117,7 +149,7 @@ main.http.rtt.seconds:0.001215296|h|#http_req_content_charset:,http_req_content_ count := int32(0) expect := int32(strings.Count(data, "\n")) - addr, closer := startTestServer(t, HandlerFunc(func(_ Metric, _ net.Addr) { + addr, closer := startUDPTestServer(t, HandlerFunc(func(_ Metric, _ net.Addr) { atomic.AddInt32(&count, 1) })) defer closer.Close() @@ -135,6 +167,40 @@ main.http.rtt.seconds:0.001215296|h|#http_req_content_charset:,http_req_content_ } } +func TestClientWriteLargeMetrics_UDS(t *testing.T) { + const data = `main.http.error.count:0|c|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity +main.http.message.count:1|c|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,operation:read,type:request +main.http.message.header.size:2|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,operation:read,type:request +main.http.message.header.bytes:240|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,operation:read,type:request +main.http.message.body.bytes:0|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,operation:read,type:request +main.http.message.count:1|c|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response +main.http.message.header.size:1|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response +main.http.message.header.bytes:70|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response +main.http.message.body.bytes:839|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response +main.http.rtt.seconds:0.001215296|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response +` + + count := int32(0) + expect := int32(strings.Count(data, "\n")) + + addr, closer := startUDSTestServer(t, HandlerFunc(func(_ Metric, _ net.Addr) { + atomic.AddInt32(&count, 1) + })) + defer closer.Close() + + client := NewClient("unixgram://" + addr) + + if _, err := client.Write([]byte(data)); err != nil { + t.Error(err) + } + + time.Sleep(100 * time.Millisecond) + + if n := atomic.LoadInt32(&count); n != expect { + t.Error("bad metric count:", n) + } +} + func BenchmarkClient(b *testing.B) { log.SetOutput(io.Discard) @@ -180,7 +246,8 @@ func isClosedNetworkConnectionErr(err error) bool { // startUDPListener starts a goroutine listening for UDP packets on 127.0.0.1 and an available port. // The address listened to is returned as `addr`. The payloads of packets received are copied to `packets`. func startUDPListener(t *testing.T, packets chan []byte) (addr string, closer io.Closer) { - conn, err := net.ListenPacket("udp", "127.0.0.1:0") // :0 chooses an available port + t.Helper() + conn, err := net.ListenUDP("udp", &net.UDPAddr{Port: 0, IP: net.ParseIP("127.0.0.1")}) // :0 chooses an available port if err != nil { t.Fatal(err) } diff --git a/datadog/serializer.go b/datadog/serializer.go index 54054bd..b6a3bb1 100644 --- a/datadog/serializer.go +++ b/datadog/serializer.go @@ -5,7 +5,6 @@ import ( "io" "log" "math" - "net" "strconv" "strings" "time" @@ -16,7 +15,7 @@ import ( // Datagram format: https://docs.datadoghq.com/developers/dogstatsd/datagram_shell type serializer struct { - conn net.Conn + conn io.WriteCloser bufferSize int filters map[string]struct{} distPrefixes []string diff --git a/datadog/server_test.go b/datadog/server_test.go index 1e3ee6c..60e574e 100644 --- a/datadog/server_test.go +++ b/datadog/server_test.go @@ -3,6 +3,8 @@ package datadog import ( "io" "net" + "os" + "path/filepath" "sort" "sync" "sync/atomic" @@ -21,7 +23,7 @@ func TestServer(t *testing.T) { seenGauges := make([]Metric, 0) var mu sync.Mutex - addr, closer := startTestServer(t, HandlerFunc(func(m Metric, _ net.Addr) { + addr, closer := startUDPTestServer(t, HandlerFunc(func(m Metric, _ net.Addr) { switch m.Name { case "datadog.test.A": atomic.AddUint32(&a, uint32(m.Value)) @@ -94,7 +96,7 @@ func TestServer(t *testing.T) { } } -func startTestServer(t *testing.T, handler Handler) (addr string, closer io.Closer) { +func startUDPTestServer(t *testing.T, handler Handler) (addr string, closer io.Closer) { conn, err := net.ListenPacket("udp", "127.0.0.1:0") if err != nil { t.Error(err) @@ -105,3 +107,67 @@ func startTestServer(t *testing.T, handler Handler) (addr string, closer io.Clos return conn.LocalAddr().String(), conn } + +// startUDSTestServerWithSocketFile starts a UDS server with a given socket file. +func startUDSTestServerWithSocketFile(t *testing.T, socketPath string, handler Handler) (closer io.Closer) { + udsAddr, err := net.ResolveUnixAddr("unixgram", socketPath) + if err != nil { + t.Error(err) + t.FailNow() + } + + conn, err := net.ListenUnixgram("unixgram", udsAddr) + if err != nil { + t.Error(err) + t.FailNow() + } + + go Serve(conn, handler) + + return &testUnixgramServer{ + UnixConn: conn, + pathToDelete: socketPath, + } +} + +// startUDSTestServer starts a UDS server with a random socket file internally generated. +func startUDSTestServer(t *testing.T, handler Handler) (socketPath string, closer io.Closer) { + // generate a random dir + dir, err := os.MkdirTemp("", "socket") + if err != nil { + t.Error(err) + t.FailNow() + } + + socketPath = filepath.Join(dir, "dsd.socket") + + udsAddr, err := net.ResolveUnixAddr("unixgram", socketPath) + if err != nil { + t.Error(err) + t.FailNow() + } + + conn, err := net.ListenUnixgram("unixgram", udsAddr) + if err != nil { + t.Error(err) + t.FailNow() + } + + ts := testUnixgramServer{ + UnixConn: conn, + pathToDelete: dir, // so we delete any tmp dir we created + } + + go Serve(conn, handler) + return socketPath, &ts +} + +type testUnixgramServer struct { + *net.UnixConn + pathToDelete string +} + +func (ts *testUnixgramServer) Close() error { + os.RemoveAll(ts.pathToDelete) // clean up + return ts.UnixConn.Close() +} diff --git a/datadog/udp.go b/datadog/udp.go new file mode 100644 index 0000000..52dea3b --- /dev/null +++ b/datadog/udp.go @@ -0,0 +1,41 @@ +package datadog + +import ( + "net" +) + +type udpWriter struct { + conn net.Conn +} + +// newUDPWriter returns a pointer to a new newUDPWriter given a socket file path as addr. +func newUDPWriter(addr string) (*udpWriter, error) { + udpAddr, err := net.ResolveUDPAddr("udp", addr) + if err != nil { + return nil, err + } + conn, err := net.DialUDP("udp", nil, udpAddr) + if err != nil { + return nil, err + } + return &udpWriter{conn: conn}, nil +} + +// Write data to the UDP connection. +func (w *udpWriter) Write(data []byte) (int, error) { + return w.conn.Write(data) +} + +func (w *udpWriter) Close() error { + return w.conn.Close() +} + +func (w *udpWriter) CalcBufferSize(sizehint int) (int, error) { + f, err := w.conn.(*net.UDPConn).File() + if err != nil { + return 0, err + } + defer f.Close() + + return bufSizeFromFD(f, sizehint) +} diff --git a/datadog/uds.go b/datadog/uds.go new file mode 100644 index 0000000..c2bdd7a --- /dev/null +++ b/datadog/uds.go @@ -0,0 +1,113 @@ +package datadog + +import ( + "net" + "sync" + "time" +) + +// UDSTimeout holds the default timeout for UDS socket writes, as they can get +// blocking when the receiving buffer is full. +// same value as in official datadog client: https://github.com/DataDog/datadog-go/blob/master/statsd/uds.go#L13 +const defaultUDSTimeout = 1 * time.Millisecond + +// udsWriter is an internal class wrapping around management of UDS connection +// credits to Datadog team: https://github.com/DataDog/datadog-go/blob/master/statsd/uds.go +type udsWriter struct { + // Address to send metrics to, needed to allow reconnection on error + addr net.Addr + + // Established connection object, or nil if not connected yet + conn net.Conn + connMu sync.RWMutex // so that we can replace the failing conn on error + + // write timeout + writeTimeout time.Duration +} + +// newUDSWriter returns a pointer to a new udsWriter given a socket file path as addr. +func newUDSWriter(addr string) (*udsWriter, error) { + udsAddr, err := net.ResolveUnixAddr("unixgram", addr) + if err != nil { + return nil, err + } + // Defer connection to first read/write + writer := &udsWriter{addr: udsAddr, conn: nil, writeTimeout: defaultUDSTimeout} + return writer, nil +} + +// Write data to the UDS connection with write timeout and minimal error handling: +// create the connection if nil, and destroy it if the statsd server has disconnected. +func (w *udsWriter) Write(data []byte) (int, error) { + conn, err := w.ensureConnection() + if err != nil { + return 0, err + } + + if err = conn.SetWriteDeadline(time.Now().Add(w.writeTimeout)); err != nil { + return 0, err + } + + n, err := conn.Write(data) + // var netErr net.Error + // if err != nil && (!errors.As(err, &netErr) || !err.()) { + if err, isNetworkErr := err.(net.Error); err != nil && (!isNetworkErr || !err.Timeout()) { + // Statsd server disconnected, retry connecting at next packet + w.unsetConnection() + return 0, err + } + return n, err +} + +func (w *udsWriter) Close() error { + if w.conn != nil { + return w.conn.Close() + } + return nil +} + +func (w *udsWriter) CalcBufferSize(sizehint int) (int, error) { + conn, err := w.ensureConnection() + if err != nil { + return 0, err + } + f, err := conn.(*net.UnixConn).File() + if err != nil { + w.unsetConnection() + return 0, err + } + defer f.Close() + + return bufSizeFromFD(f, sizehint) +} + +func (w *udsWriter) ensureConnection() (net.Conn, error) { + // Check if we've already got a socket we can use + w.connMu.RLock() + currentConn := w.conn + w.connMu.RUnlock() + + if currentConn != nil { + return currentConn, nil + } + + // Looks like we might need to connect - try again with write locking. + w.connMu.Lock() + defer w.connMu.Unlock() + if w.conn != nil { + return w.conn, nil + } + + newConn, err := net.Dial(w.addr.Network(), w.addr.String()) + if err != nil { + return nil, err + } + w.conn = newConn + return newConn, nil +} + +func (w *udsWriter) unsetConnection() { + w.connMu.Lock() + defer w.connMu.Unlock() + w.conn = nil +} diff --git a/datadog/uds_test.go b/datadog/uds_test.go new file mode 100644 index 0000000..36c90ba --- /dev/null +++ b/datadog/uds_test.go @@ -0,0 +1,49 @@ +package datadog + +import ( + "net" + "os" + "path/filepath" + "testing" +) + +func TestUDSReconnectsWhenConnRefused(t *testing.T) { + dir, err := os.MkdirTemp("", "socket") + if err != nil { + t.Error(err) + t.FailNow() + } + defer os.RemoveAll(dir) + + socketPath := filepath.Join(dir, "dsd.socket") + closerServer1 := startUDSTestServerWithSocketFile(t, socketPath, HandlerFunc(func(_ Metric, _ net.Addr) {})) + defer closerServer1.Close() + + client := NewClientWith(ClientConfig{ + Address: "unixgram://" + socketPath, + BufferSize: 1, // small buffer to force write to unix socket for each measure written + }) + + measure := `main.http.error.count:0|c|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity +` + + _, err = client.Write([]byte(measure)) + if err != nil { + t.Errorf("unable to write data %v", err) + } + + closerServer1.Close() + + _, err = client.Write([]byte(measure)) + if err == nil { + t.Errorf("got no error but expected one as the connection should be refused as we closed the server") + } + // restart UDS server with same socket file + closerServer2 := startUDSTestServerWithSocketFile(t, socketPath, HandlerFunc(func(_ Metric, _ net.Addr) {})) + defer closerServer2.Close() + + _, err = client.Write([]byte(measure)) + if err != nil { + t.Errorf("unable to write data but should be able to as the client should reconnect %v", err) + } +} diff --git a/engine_test.go b/engine_test.go index 5314fec..a06a98f 100644 --- a/engine_test.go +++ b/engine_test.go @@ -1,7 +1,7 @@ package stats_test import ( - "io/ioutil" + "io" "net/http" "reflect" "strings" @@ -549,7 +549,7 @@ type discardTransport struct{} func (t *discardTransport) RoundTrip(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusOK, - Body: ioutil.NopCloser(strings.NewReader("")), + Body: io.NopCloser(strings.NewReader("")), Request: req, }, nil } diff --git a/go.mod b/go.mod index 4ca8eef..94e2ed3 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,8 @@ module github.com/segmentio/stats/v5 -go 1.18 +go 1.22.0 + +toolchain go1.23.1 require ( github.com/mdlayher/taskstats v0.0.0-20190313225729-7cbba52ee072 @@ -8,8 +10,8 @@ require ( github.com/segmentio/objconv v1.0.1 github.com/segmentio/vpcinfo v0.1.10 github.com/stretchr/testify v1.8.4 - golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 - golang.org/x/sync v0.7.0 + golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 + golang.org/x/sync v0.8.0 golang.org/x/sys v0.21.0 ) diff --git a/go.sum b/go.sum index 6c6e36f..57b5310 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,7 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -27,14 +28,14 @@ github.com/segmentio/vpcinfo v0.1.10/go.mod h1:KEIWiWRE/KLh90mOzOY0QkFWT7ObUYLp9 github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 h1:yixxcjnhBmY0nkL253HFVIm0JsFHwrHdT3Yh6szTnfY= -golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8/go.mod h1:jj3sYF3dwk5D+ghuXyeI3r5MFf+NT2An6/9dOA95KSI= +golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk= +golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190313220215-9f648a60d977/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= diff --git a/grafana/query_test.go b/grafana/query_test.go index c06fc21..60986a3 100644 --- a/grafana/query_test.go +++ b/grafana/query_test.go @@ -3,7 +3,7 @@ package grafana import ( "bytes" "context" - "io/ioutil" + "io" "net/http" "net/http/httptest" "reflect" @@ -83,7 +83,7 @@ func TestQueryHandler(t *testing.T) { } defer r.Body.Close() - found, _ := ioutil.ReadAll(r.Body) + found, _ := io.ReadAll(r.Body) expect := queryResult if s := string(found); s != expect { diff --git a/httpstats/handler_test.go b/httpstats/handler_test.go index 162eb7e..dd61433 100644 --- a/httpstats/handler_test.go +++ b/httpstats/handler_test.go @@ -1,7 +1,7 @@ package httpstats import ( - "io/ioutil" + "io" "net/http" "net/http/httptest" "strings" @@ -16,7 +16,7 @@ func TestHandler(t *testing.T) { e := stats.NewEngine("", h) server := httptest.NewServer(NewHandlerWith(e, http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { - ioutil.ReadAll(req.Body) + io.ReadAll(req.Body) _ = RequestWithTags(req, stats.T("foo", "bar")) res.WriteHeader(http.StatusOK) res.Write([]byte("Hello World")) @@ -28,7 +28,7 @@ func TestHandler(t *testing.T) { t.Error(err) return } - ioutil.ReadAll(res.Body) + io.ReadAll(res.Body) res.Body.Close() measures := h.Measures() diff --git a/httpstats/metrics_test.go b/httpstats/metrics_test.go index 3faab72..0dbd1d3 100644 --- a/httpstats/metrics_test.go +++ b/httpstats/metrics_test.go @@ -1,7 +1,7 @@ package httpstats import ( - "io/ioutil" + "io" "net/http" "net/url" "strconv" @@ -81,7 +81,7 @@ func TestUrlLength(t *testing.T) { func TestHeaderLength(t *testing.T) { hdrlen := func(h1 http.Header) int { - c := &iostats.CountWriter{W: ioutil.Discard} + c := &iostats.CountWriter{W: io.Discard} h2 := copyHeader(h1) h2.Write(c) return c.N + len("\r\n") @@ -108,7 +108,7 @@ func TestHeaderLength(t *testing.T) { func TestRequestLength(t *testing.T) { reqlen := func(req *http.Request) int { - c := &iostats.CountWriter{W: ioutil.Discard} + c := &iostats.CountWriter{W: io.Discard} r := &http.Request{ Proto: req.Proto, Method: req.Method, @@ -153,7 +153,7 @@ func TestRequestLength(t *testing.T) { func TestResponseLength(t *testing.T) { reslen := func(res *http.Response) int { - c := &iostats.CountWriter{W: ioutil.Discard} + c := &iostats.CountWriter{W: io.Discard} r := &http.Response{ StatusCode: res.StatusCode, ProtoMajor: res.ProtoMajor, @@ -181,7 +181,7 @@ func TestResponseLength(t *testing.T) { Request: &http.Request{Method: "GET"}, ContentLength: 11, Header: http.Header{"Content-Type": {"text/plain"}}, - Body: ioutil.NopCloser(strings.NewReader("Hello World!")), + Body: io.NopCloser(strings.NewReader("Hello World!")), }, } diff --git a/httpstats/transport_test.go b/httpstats/transport_test.go index 79a0dcc..577004c 100644 --- a/httpstats/transport_test.go +++ b/httpstats/transport_test.go @@ -2,7 +2,6 @@ package httpstats import ( "io" - "io/ioutil" "net/http" "net/http/httptest" "strings" @@ -34,7 +33,7 @@ func TestTransport(t *testing.T) { e := stats.NewEngine("", h) server := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { - ioutil.ReadAll(req.Body) + io.ReadAll(req.Body) res.Write([]byte("Hello World!")) })) defer server.Close() @@ -51,7 +50,7 @@ func TestTransport(t *testing.T) { t.Error(err) return } - ioutil.ReadAll(res.Body) + io.ReadAll(res.Body) res.Body.Close() if len(h.Measures()) == 0 { diff --git a/influxdb/client_test.go b/influxdb/client_test.go index f7cc7ce..2ba27f6 100644 --- a/influxdb/client_test.go +++ b/influxdb/client_test.go @@ -2,7 +2,7 @@ package influxdb import ( "fmt" - "io/ioutil" + "io" "net/http" "strings" "testing" @@ -104,7 +104,7 @@ type discardTransport struct{} func (t *discardTransport) RoundTrip(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusOK, - Body: ioutil.NopCloser(strings.NewReader("")), + Body: io.NopCloser(strings.NewReader("")), Request: req, }, nil } diff --git a/otlp/client.go b/otlp/client.go index f816e45..4f71f3d 100644 --- a/otlp/client.go +++ b/otlp/client.go @@ -48,8 +48,9 @@ func (c *HTTPClient) Handle(ctx context.Context, request *colmetricpb.ExportMetr return c.do(httpReq) } -//TODO: deal with requests failures and retries. We potentially want to implement -// some kind of retry mechanism with expotential backoff + short time window. +// TODO: deal with requests failures and retries. We potentially want to implement +// +// some kind of retry mechanism with expotential backoff + short time window. func (c *HTTPClient) do(req *http.Request) error { resp, err := c.client.Do(req) if err != nil { diff --git a/otlp/go.mod b/otlp/go.mod index 01c63bf..e919fcc 100644 --- a/otlp/go.mod +++ b/otlp/go.mod @@ -2,4 +2,19 @@ module github.com/segmentio/stats/v5/otlp go 1.19 -require github.com/segmentio/stats v3.0.0+incompatible // indirect +require ( + github.com/segmentio/stats/v4 v4.7.4 + go.opentelemetry.io/proto/otlp v1.3.1 + google.golang.org/protobuf v1.34.2 +) + +require ( + github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect + golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/sys v0.21.0 // indirect + golang.org/x/text v0.16.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20240513163218-0867130af1f8 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240513163218-0867130af1f8 // indirect + google.golang.org/grpc v1.64.0 // indirect +) diff --git a/otlp/go.sum b/otlp/go.sum index c8864e3..aecff2e 100644 --- a/otlp/go.sum +++ b/otlp/go.sum @@ -1,2 +1,30 @@ -github.com/segmentio/stats v3.0.0+incompatible h1:YGWv6X5GH3Eb+ML1QasqzYESSZsiNQBp8Yx15M4bXz4= -github.com/segmentio/stats v3.0.0+incompatible/go.mod h1:ZkGKMkt6GVRIsV5Biy4HotVqonMWEsr+uMtOD2NBDeU= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/segmentio/fasthash v1.0.3 h1:EI9+KE1EwvMLBWwjpRDc+fEM+prwxDYbslddQGtrmhM= +github.com/segmentio/objconv v1.0.1 h1:QjfLzwriJj40JibCV3MGSEiAoXixbp4ybhwfTB8RXOM= +github.com/segmentio/stats/v4 v4.7.4 h1:Bjih37HCfeWBeWi/ePmjxPcOqwAstGOA5nah9Nq7H28= +github.com/segmentio/stats/v4 v4.7.4/go.mod h1:9cpRXsN9YtKnqz19q36HQl1o0VnEYsfMprN5tWU0yls= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= +go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= +golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 h1:yixxcjnhBmY0nkL253HFVIm0JsFHwrHdT3Yh6szTnfY= +golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8/go.mod h1:jj3sYF3dwk5D+ghuXyeI3r5MFf+NT2An6/9dOA95KSI= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= +golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +google.golang.org/genproto/googleapis/api v0.0.0-20240513163218-0867130af1f8 h1:W5Xj/70xIA4x60O/IFyXivR5MGqblAb8R3w26pnD6No= +google.golang.org/genproto/googleapis/api v0.0.0-20240513163218-0867130af1f8/go.mod h1:vPrPUTsDCYxXWjP7clS81mZ6/803D8K4iM9Ma27VKas= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240513163218-0867130af1f8 h1:mxSlqyb8ZAHsYDCfiXN1EDdNTdvjUJSLY+OnAUtYNYA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240513163218-0867130af1f8/go.mod h1:I7Y+G38R2bu5j1aLzfFmQfTcU/WnFuqDwLZAbvKTKpM= +google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY= +google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/otlp/handler.go b/otlp/handler.go index 9861864..4894b55 100644 --- a/otlp/handler.go +++ b/otlp/handler.go @@ -27,11 +27,11 @@ const ( DefaultFlushInterval = 10 * time.Second ) -// Status: Alpha. This Handler is still in heavy development phase. -// Do not use in production. +// Status: Alpha. This Handler is still in heavy development phase. Do not use +// in production. // -// Handler implements stats.Handler to be used to forward metrics to an -// OpenTelemetry destination. Usually an OpenTelemetry Collector. +// Handler implements stats.Handler to forward metrics to an OpenTelemetry +// destination. Usually an OpenTelemetry Collector. // // With the current implementation this Handler is targeting a Prometheus // based backend or any backend expecting cumulative values. diff --git a/procstats/delaystats_darwin.go b/procstats/delaystats_darwin.go index c2a01a0..070329a 100644 --- a/procstats/delaystats_darwin.go +++ b/procstats/delaystats_darwin.go @@ -1,6 +1,6 @@ package procstats -func collectDelayInfo(pid int) (info DelayInfo) { +func collectDelayInfo(_ int) (info DelayInfo) { // TODO return } diff --git a/procstats/delaystats_test.go b/procstats/delaystats_test.go index a389e11..73ebd3d 100644 --- a/procstats/delaystats_test.go +++ b/procstats/delaystats_test.go @@ -1,7 +1,6 @@ package procstats_test import ( - "io/ioutil" "math/rand" "os" "os/user" @@ -21,33 +20,36 @@ func TestProcMetrics(t *testing.T) { t.Skip() } - rand.Seed(time.Now().UnixNano()) + // Create a new random generator + rng := rand.New(rand.NewSource(time.Now().UnixNano())) h := &statstest.Handler{} e := stats.NewEngine("", h) - proc := procstats.NewDelayMetricsWith(e, os.Getpid()) for i := 0; i != 10; i++ { - tmpfile, err := ioutil.TempFile("", "delaystats_test") + tmpFile, err := os.CreateTemp("", "delaystats_test") if err != nil { t.Fatal(err) } - defer tmpfile.Close() - defer os.Remove(tmpfile.Name()) - - b := make([]byte, rand.Int31n(1000000)) + defer func(name string) { + err := os.Remove(name) + if err != nil { + t.Fatal(err) + } + }(tmpFile.Name()) - if _, err := rand.Read(b); err != nil { + b := make([]byte, rng.Int31n(1000000)) + if _, err := rng.Read(b); err != nil { t.Fatal(err) } - if _, err := tmpfile.Write(b); err != nil { + if _, err := tmpFile.Write(b); err != nil { t.Fatal(err) } - if err := tmpfile.Sync(); err != nil { + if err := tmpFile.Sync(); err != nil { t.Fatal(err) } - if err := tmpfile.Close(); err != nil { + if err := tmpFile.Close(); err != nil { t.Fatal(err) } diff --git a/procstats/linux/io_test.go b/procstats/linux/io_test.go index 3f298f2..3c48086 100644 --- a/procstats/linux/io_test.go +++ b/procstats/linux/io_test.go @@ -1,16 +1,22 @@ package linux import ( - "io/ioutil" + "os" "testing" ) func TestReadFile(t *testing.T) { - f, _ := ioutil.TempFile("/tmp", "io_test_") + f, err := os.CreateTemp("/tmp", "io_test_") + if err != nil { + t.Error(err) + } path := f.Name() - f.Write([]byte("Hello World!\n")) - f.Close() - + if _, err := f.Write([]byte("Hello World!\n")); err != nil { + t.Error(err) + } + if err := f.Close(); err != nil { + t.Error(err) + } if s := readFile(path); s != "Hello World!\n" { t.Error("invalid file content:", s) } diff --git a/procstats/linux/memory_darwin.go b/procstats/linux/memory_darwin.go index 5cb4ade..f3017a7 100644 --- a/procstats/linux/memory_darwin.go +++ b/procstats/linux/memory_darwin.go @@ -1,6 +1,6 @@ package linux -func readMemoryLimit(pid int) (limit uint64, err error) { +func readMemoryLimit(_ int) (limit uint64, err error) { limit = unlimitedMemoryLimit return } diff --git a/procstats/proc_darwin.go b/procstats/proc_darwin.go index 9d00393..a50cc15 100644 --- a/procstats/proc_darwin.go +++ b/procstats/proc_darwin.go @@ -69,9 +69,9 @@ func collectProcInfo(pid int) (info ProcInfo, err error) { func memoryAvailable() uint64 { mib := [2]C.int{C.CTL_HW, C.HW_MEMSIZE} mem := C.int64_t(0) - len := C.size_t(8) // sizeof(int64_t) + length := C.size_t(8) // sizeof(int64_t) - _, err := C.sysctl(&mib[0], 2, unsafe.Pointer(&mem), &len, nil, 0) + _, err := C.sysctl(&mib[0], 2, unsafe.Pointer(&mem), &length, nil, 0) check(err) return uint64(mem) diff --git a/prometheus/handler_test.go b/prometheus/handler_test.go index 51558e2..49f49a7 100644 --- a/prometheus/handler_test.go +++ b/prometheus/handler_test.go @@ -2,7 +2,7 @@ package prometheus import ( "fmt" - "io/ioutil" + "io" "net/http" "net/http/httptest" "testing" @@ -92,7 +92,7 @@ func TestServeHTTP(t *testing.T) { } defer res.Body.Close() - b, _ := ioutil.ReadAll(res.Body) + b, _ := io.ReadAll(res.Body) const expects = `# TYPE A counter A 3 1496614320000 diff --git a/prometheus/metric.go b/prometheus/metric.go index 7e6c1f0..a694b41 100644 --- a/prometheus/metric.go +++ b/prometheus/metric.go @@ -1,7 +1,6 @@ package prometheus import ( - "reflect" "strconv" "strings" "sync" @@ -404,16 +403,9 @@ func le(buckets []stats.Value) string { return unsafeByteSliceToString(b) } -// This function converts the byte array to a string without additional -// memory allocation. -// Source: https://stackoverflow.com/a/66865482 (license: CC BY-SA 4.0). +// unsafeByteSliceToString converts a byte slice to a string without copying the underlying data. func unsafeByteSliceToString(b []byte) string { - sliceHeader := (*reflect.SliceHeader)(unsafe.Pointer(&b)) - var s string - sh := (*reflect.StringHeader)(unsafe.Pointer(&s)) - sh.Data = sliceHeader.Data - sh.Len = sliceHeader.Len - return s + return unsafe.String(unsafe.SliceData(b), len(b)) } func nextLe(s string) (head, tail string) {