From fd8cbbf6625e5257bb54bff60be404c23317ad14 Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Wed, 17 Jul 2024 20:44:41 +0200 Subject: [PATCH] feat(inputs.procstat): Add ability to collect per-process socket statistics (#15423) --- go.mod | 2 +- go.sum | 6 +- plugins/inputs/procstat/README.md | 70 +++++- plugins/inputs/procstat/os_linux.go | 286 ++++++++++++++++++++++- plugins/inputs/procstat/os_others.go | 80 ++++++- plugins/inputs/procstat/os_windows.go | 76 +++++- plugins/inputs/procstat/process.go | 137 ++++++++++- plugins/inputs/procstat/procstat.go | 58 ++++- plugins/inputs/procstat/procstat_test.go | 4 +- plugins/inputs/procstat/sample.conf | 35 ++- 10 files changed, 714 insertions(+), 40 deletions(-) diff --git a/go.mod b/go.mod index 136fbcdaf3f79..99d55a86ffbd1 100644 --- a/go.mod +++ b/go.mod @@ -468,7 +468,7 @@ require ( github.com/twmb/murmur3 v1.1.7 // indirect github.com/uber/jaeger-client-go v2.30.0+incompatible // indirect github.com/uber/jaeger-lib v2.4.1+incompatible // indirect - github.com/vishvananda/netlink v1.2.1-beta.2 // indirect + github.com/vishvananda/netlink v1.2.1-beta.2.0.20240524165444-4d4ba1473f21 github.com/vishvananda/netns v0.0.4 github.com/xanzy/ssh-agent v0.3.3 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect diff --git a/go.sum b/go.sum index 33f9361eeaee4..01bedb68be310 100644 --- a/go.sum +++ b/go.sum @@ -2380,8 +2380,8 @@ github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyC github.com/vapourismo/knx-go v0.0.0-20240217175130-922a0d50c241 h1:3r4OPQ/jPYQA0C7i149kevHLGSG4JZtrQv2986fXSCo= github.com/vapourismo/knx-go v0.0.0-20240217175130-922a0d50c241/go.mod h1:aGkV5xHz9sBkAckp2hez7khfehKp4YvyBwAmVdVEulg= github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE= -github.com/vishvananda/netlink v1.2.1-beta.2 h1:Llsql0lnQEbHj0I1OuKyp8otXp0r3q0mPkuhwHfStVs= -github.com/vishvananda/netlink v1.2.1-beta.2/go.mod h1:twkDnbuQxJYemMlGd4JFIcuhgX83tXhKS2B/PRMpOho= +github.com/vishvananda/netlink v1.2.1-beta.2.0.20240524165444-4d4ba1473f21 h1:tcHUxOT8j/R+0S+A1j8D2InqguXFNxAiij+8QFOlX7Y= +github.com/vishvananda/netlink v1.2.1-beta.2.0.20240524165444-4d4ba1473f21/go.mod h1:whJevzBpTrid75eZy99s3DqCmy05NfibNaF2Ol5Ox5A= github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8= @@ -2816,7 +2816,6 @@ golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200728102440-3e129f6d46b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -2894,6 +2893,7 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/plugins/inputs/procstat/README.md b/plugins/inputs/procstat/README.md index f234c5836aed6..87e3dde8d6d5c 100644 --- a/plugins/inputs/procstat/README.md +++ b/plugins/inputs/procstat/README.md @@ -64,17 +64,40 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## result in a large number of series, especially with short-lived processes, ## creating high cardinality at the output. ## Available options are: - ## cmdline -- full commandline - ## pid -- ID of the process - ## ppid -- ID of the process' parent - ## status -- state of the process - ## user -- username owning the process + ## cmdline -- full commandline + ## pid -- ID of the process + ## ppid -- ID of the process' parent + ## status -- state of the process + ## user -- username owning the process + ## socket only options: + ## protocol -- protocol type of the process socket + ## state -- state of the process socket + ## src -- source address of the process socket (non-unix sockets) + ## src_port -- source port of the process socket (non-unix sockets) + ## dest -- destination address of the process socket (non-unix sockets) + ## dest_port -- destination port of the process socket (non-unix sockets) + ## name -- name of the process socket (unix sockets only) # tag_with = [] ## Properties to collect - ## Available options are "cpu", "limits", "memory", "mmap" + ## Available options are + ## cpu -- CPU usage statistics + ## limits -- set resource limits + ## memory -- memory usage statistics + ## mmap -- mapped memory usage statistics (caution: can cause high load) + ## sockets -- socket statistics for protocols in 'socket_protocols' # properties = ["cpu", "limits", "memory", "mmap"] + ## Protocol filter for the sockets property + ## Available options are + ## all -- all of the protocols below + ## tcp4 -- TCP socket statistics for IPv4 + ## tcp6 -- TCP socket statistics for IPv6 + ## udp4 -- UDP socket statistics for IPv4 + ## udp6 -- UDP socket statistics for IPv6 + ## unix -- Unix socket statistics + # socket_protocols = ["all"] + ## Method to use when finding process IDs. Can be one of 'pgrep', or ## 'native'. The pgrep finder calls the pgrep executable in the PATH while ## the native finder performs the search directly in a manor dependent on the @@ -141,8 +164,8 @@ Below are an example set of tags and fields: - procstat - tags: - - pid (when `pid_tag` is true) - - cmdline (when 'cmdline_tag' is true) + - pid (if requested) + - cmdline (if requested) - process_name - pidfile (when defined) - exe (when defined) @@ -231,6 +254,36 @@ Below are an example set of tags and fields: - pid_count (int) - running (int) - result_code (int, success = 0, lookup_error = 1) +- procstat_socket (if configured, Linux only) + - tags: + - pid (if requested) + - protocol (if requested) + - cmdline (if requested) + - process_name + - pidfile (when defined) + - exe (when defined) + - pattern (when defined) + - user (when selected) + - systemd_unit (when defined) + - cgroup (when defined) + - cgroup_full (when cgroup or systemd_unit is used with glob) + - supervisor_unit (when defined) + - win_service (when defined) + - fields: + - protocol + - state + - pid + - src + - src_port (tcp and udp sockets only) + - dest (tcp and udp sockets only) + - dest_port (tcp and udp sockets only) + - bytes_received (tcp sockets only) + - bytes_sent (tcp sockets only) + - lost (tcp sockets only) + - retransmits (tcp sockets only) + - rx_queue + - tx_queue + - inode (unix sockets only) *NOTE: Resource limit > 2147483647 will be reported as 2147483647.* @@ -239,4 +292,5 @@ Below are an example set of tags and fields: ```text procstat_lookup,host=prash-laptop,pattern=influxd,pid_finder=pgrep,result=success pid_count=1i,running=1i,result_code=0i 1582089700000000000 procstat,host=prash-laptop,pattern=influxd,process_name=influxd,user=root involuntary_context_switches=151496i,child_minor_faults=1061i,child_major_faults=8i,cpu_time_user=2564.81,pid=32025i,major_faults=8609i,created_at=1580107536000000000i,voluntary_context_switches=1058996i,cpu_time_system=616.98,memory_swap=0i,memory_locked=0i,memory_usage=1.7797634601593018,num_threads=18i,cpu_time_iowait=0,memory_rss=148643840i,memory_vms=1435688960i,memory_data=0i,memory_stack=0i,minor_faults=1856550i 1582089700000000000 +procstat_socket,host=prash-laptop,process_name=browser,protocol=tcp4 bytes_received=826987i,bytes_sent=32869i,dest="192.168.0.2",dest_port=443i,lost=0i,pid=32025i,retransmits=0i,rx_queue=0i,src="192.168.0.1",src_port=52106i,state="established",tx_queue=0i 1582089700000000000 ``` diff --git a/plugins/inputs/procstat/os_linux.go b/plugins/inputs/procstat/os_linux.go index 2b25e7be7fe11..22699fab123f1 100644 --- a/plugins/inputs/procstat/os_linux.go +++ b/plugins/inputs/procstat/os_linux.go @@ -7,11 +7,16 @@ import ( "errors" "fmt" "os" - - "github.com/prometheus/procfs" + "strconv" + "strings" + "syscall" "github.com/coreos/go-systemd/v22/dbus" + "github.com/prometheus/procfs" + "github.com/shirou/gopsutil/v3/net" "github.com/shirou/gopsutil/v3/process" + "github.com/vishvananda/netlink" + "golang.org/x/sys/unix" ) func processName(p *process.Process) (string, error) { @@ -103,3 +108,280 @@ func collectTotalReadWrite(proc Process) (r, w uint64, err error) { return stat.RChar, stat.WChar, nil } + +/* Socket statistics functions */ +func socketStateName(s uint8) string { + switch s { + case unix.BPF_TCP_ESTABLISHED: + return "established" + case unix.BPF_TCP_SYN_SENT: + return "syn-sent" + case unix.BPF_TCP_SYN_RECV: + return "syn-recv" + case unix.BPF_TCP_FIN_WAIT1: + return "fin-wait1" + case unix.BPF_TCP_FIN_WAIT2: + return "fin-wait2" + case unix.BPF_TCP_TIME_WAIT: + return "time-wait" + case unix.BPF_TCP_CLOSE: + return "closed" + case unix.BPF_TCP_CLOSE_WAIT: + return "close-wait" + case unix.BPF_TCP_LAST_ACK: + return "last-ack" + case unix.BPF_TCP_LISTEN: + return "listen" + case unix.BPF_TCP_CLOSING: + return "closing" + case unix.BPF_TCP_NEW_SYN_RECV: + return "sync-recv" + } + + return "unknown" +} + +func socketTypeName(t uint8) string { + switch t { + case syscall.SOCK_STREAM: + return "stream" + case syscall.SOCK_DGRAM: + return "dgram" + case syscall.SOCK_RAW: + return "raw" + case syscall.SOCK_RDM: + return "rdm" + case syscall.SOCK_SEQPACKET: + return "seqpacket" + case syscall.SOCK_DCCP: + return "dccp" + case syscall.SOCK_PACKET: + return "packet" + } + + return "unknown" +} + +func mapFdToInode(pid int32, fd uint32) (uint32, error) { + root := os.Getenv("HOST_PROC") + if root == "" { + root = "/proc" + } + + fn := fmt.Sprintf("%s/%d/fd/%d", root, pid, fd) + link, err := os.Readlink(fn) + if err != nil { + return 0, fmt.Errorf("reading link failed: %w", err) + } + target := strings.TrimPrefix(link, "socket:[") + target = strings.TrimSuffix(target, "]") + inode, err := strconv.ParseUint(target, 10, 32) + if err != nil { + return 0, fmt.Errorf("parsing link %q: %w", link, err) + } + + return uint32(inode), nil +} + +func statsTCP(conns []net.ConnectionStat, family uint8) ([]map[string]interface{}, error) { + if len(conns) == 0 { + return nil, nil + } + + // For TCP we need the inode for each connection to relate the connection + // statistics to the actual process socket. Therefore, map the + // file-descriptors to inodes using the /proc//fd entries. + inodes := make(map[uint32]net.ConnectionStat, len(conns)) + for _, c := range conns { + inode, err := mapFdToInode(c.Pid, c.Fd) + if err != nil { + return nil, fmt.Errorf("mapping fd %d of pid %d failed: %w", c.Fd, c.Pid, err) + } + inodes[inode] = c + } + + // Get the TCP socket statistics from the netlink socket. + responses, err := netlink.SocketDiagTCPInfo(family) + if err != nil { + return nil, fmt.Errorf("connecting to diag socket failed: %w", err) + } + + // Filter the responses via the inodes belonging to the process + fieldslist := make([]map[string]interface{}, 0) + for _, r := range responses { + c, found := inodes[r.InetDiagMsg.INode] + if !found { + // The inode does not belong to the process. + continue + } + + var proto string + switch r.InetDiagMsg.Family { + case syscall.AF_INET: + proto = "tcp4" + case syscall.AF_INET6: + proto = "tcp6" + default: + continue + } + + fields := map[string]interface{}{ + "protocol": proto, + "state": socketStateName(r.InetDiagMsg.State), + "pid": c.Pid, + "src": r.InetDiagMsg.ID.Source.String(), + "src_port": r.InetDiagMsg.ID.SourcePort, + "dest": r.InetDiagMsg.ID.Destination.String(), + "dest_port": r.InetDiagMsg.ID.DestinationPort, + "bytes_received": r.TCPInfo.Bytes_received, + "bytes_sent": r.TCPInfo.Bytes_sent, + "lost": r.TCPInfo.Lost, + "retransmits": r.TCPInfo.Retransmits, + "rx_queue": r.InetDiagMsg.RQueue, + "tx_queue": r.InetDiagMsg.WQueue, + } + fieldslist = append(fieldslist, fields) + } + + return fieldslist, nil +} + +func statsUDP(conns []net.ConnectionStat, family uint8) ([]map[string]interface{}, error) { + if len(conns) == 0 { + return nil, nil + } + + // For UDP we need the inode for each connection to relate the connection + // statistics to the actual process socket. Therefore, map the + // file-descriptors to inodes using the /proc//fd entries. + inodes := make(map[uint32]net.ConnectionStat, len(conns)) + for _, c := range conns { + inode, err := mapFdToInode(c.Pid, c.Fd) + if err != nil { + return nil, fmt.Errorf("mapping fd %d of pid %d failed: %w", c.Fd, c.Pid, err) + } + inodes[inode] = c + } + + // Get the UDP socket statistics from the netlink socket. + responses, err := netlink.SocketDiagUDPInfo(family) + if err != nil { + return nil, fmt.Errorf("connecting to diag socket failed: %w", err) + } + + // Filter the responses via the inodes belonging to the process + fieldslist := make([]map[string]interface{}, 0) + for _, r := range responses { + c, found := inodes[r.InetDiagMsg.INode] + if !found { + // The inode does not belong to the process. + continue + } + + var proto string + switch r.InetDiagMsg.Family { + case syscall.AF_INET: + proto = "udp4" + case syscall.AF_INET6: + proto = "udp6" + default: + continue + } + + fields := map[string]interface{}{ + "protocol": proto, + "state": socketStateName(r.InetDiagMsg.State), + "pid": c.Pid, + "src": r.InetDiagMsg.ID.Source.String(), + "src_port": r.InetDiagMsg.ID.SourcePort, + "dest": r.InetDiagMsg.ID.Destination.String(), + "dest_port": r.InetDiagMsg.ID.DestinationPort, + "rx_queue": r.InetDiagMsg.RQueue, + "tx_queue": r.InetDiagMsg.WQueue, + } + fieldslist = append(fieldslist, fields) + } + + return fieldslist, nil +} + +func statsUnix(conns []net.ConnectionStat) ([]map[string]interface{}, error) { + if len(conns) == 0 { + return nil, nil + } + + // We need to read the inode for each connection to relate the connection + // statistics to the actual process socket. Therefore, map the + // file-descriptors to inodes using the /proc//fd entries. + inodes := make(map[uint32]net.ConnectionStat, len(conns)) + for _, c := range conns { + inode, err := mapFdToInode(c.Pid, c.Fd) + if err != nil { + return nil, fmt.Errorf("mapping fd %d of pid %d failed: %w", c.Fd, c.Pid, err) + } + inodes[inode] = c + } + + // Get the UDP socket statistics from the netlink socket. + responses, err := netlink.UnixSocketDiagInfo() + if err != nil { + return nil, fmt.Errorf("connecting to diag socket failed: %w", err) + } + + // Filter the responses via the inodes belonging to the process + fieldslist := make([]map[string]interface{}, 0) + for _, r := range responses { + // Check if the inode belongs to the process and skip otherwise + c, found := inodes[r.DiagMsg.INode] + if !found { + continue + } + + name := c.Laddr.IP + if name == "" { + name = fmt.Sprintf("inode-%d", r.DiagMsg.INode) + } + + fields := map[string]interface{}{ + "protocol": "unix", + "type": "stream", + "state": socketStateName(r.DiagMsg.State), + "pid": c.Pid, + "name": name, + "rx_queue": r.Queue.RQueue, + "tx_queue": r.Queue.WQueue, + "inode": r.DiagMsg.INode, + } + if r.Peer != nil { + fields["peer"] = *r.Peer + } + fieldslist = append(fieldslist, fields) + } + + // Diagnosis only works for stream sockets, so add all non-stream sockets + // of the process without further data + for inode, c := range inodes { + if c.Type == syscall.SOCK_STREAM { + continue + } + + name := c.Laddr.IP + if name == "" { + name = fmt.Sprintf("inode-%d", inode) + } + + fields := map[string]interface{}{ + "protocol": "unix", + "type": socketTypeName(uint8(c.Type)), + "state": "close", + "pid": c.Pid, + "name": name, + "rx_queue": uint32(0), + "tx_queue": uint32(0), + "inode": inode, + } + fieldslist = append(fieldslist, fields) + } + + return fieldslist, nil +} diff --git a/plugins/inputs/procstat/os_others.go b/plugins/inputs/procstat/os_others.go index 105226940e0f5..f14671c1ffebb 100644 --- a/plugins/inputs/procstat/os_others.go +++ b/plugins/inputs/procstat/os_others.go @@ -4,7 +4,9 @@ package procstat import ( "errors" + "syscall" + "github.com/shirou/gopsutil/v3/net" "github.com/shirou/gopsutil/v3/process" ) @@ -12,20 +14,90 @@ func processName(p *process.Process) (string, error) { return p.Exe() } -func queryPidWithWinServiceName(_ string) (uint32, error) { +func queryPidWithWinServiceName(string) (uint32, error) { return 0, errors.New("os not supporting win_service option") } func collectMemmap(Process, string, map[string]any) {} -func findBySystemdUnits(_ []string) ([]processGroup, error) { +func findBySystemdUnits([]string) ([]processGroup, error) { return nil, nil } -func findByWindowsServices(_ []string) ([]processGroup, error) { +func findByWindowsServices([]string) ([]processGroup, error) { return nil, nil } -func collectTotalReadWrite(_ Process) (r, w uint64, err error) { +func collectTotalReadWrite(Process) (r, w uint64, err error) { return 0, 0, errors.ErrUnsupported } + +func statsTCP(conns []net.ConnectionStat, _ uint8) ([]map[string]interface{}, error) { + if len(conns) == 0 { + return nil, nil + } + + // Filter the responses via the inodes belonging to the process + fieldslist := make([]map[string]interface{}, 0, len(conns)) + for _, c := range conns { + var proto string + switch c.Family { + case syscall.AF_INET: + proto = "tcp4" + case syscall.AF_INET6: + proto = "tcp6" + default: + continue + } + + fields := map[string]interface{}{ + "protocol": proto, + "state": c.Status, + "pid": c.Pid, + "src": c.Laddr.IP, + "src_port": c.Laddr.Port, + "dest": c.Raddr.IP, + "dest_port": c.Raddr.Port, + } + fieldslist = append(fieldslist, fields) + } + + return fieldslist, nil +} + +func statsUDP(conns []net.ConnectionStat, _ uint8) ([]map[string]interface{}, error) { + if len(conns) == 0 { + return nil, nil + } + + // Filter the responses via the inodes belonging to the process + fieldslist := make([]map[string]interface{}, 0, len(conns)) + for _, c := range conns { + var proto string + switch c.Family { + case syscall.AF_INET: + proto = "udp4" + case syscall.AF_INET6: + proto = "udp6" + default: + continue + } + + fields := map[string]interface{}{ + "protocol": proto, + "state": c.Status, + "pid": c.Pid, + "src": c.Laddr.IP, + "src_port": c.Laddr.Port, + "dest": c.Raddr.IP, + "dest_port": c.Raddr.Port, + } + fieldslist = append(fieldslist, fields) + } + + return fieldslist, nil +} + +func statsUnix([]net.ConnectionStat) ([]map[string]interface{}, error) { + return nil, errors.ErrUnsupported +} diff --git a/plugins/inputs/procstat/os_windows.go b/plugins/inputs/procstat/os_windows.go index e93bcf58b8f0b..d53c3da3b7276 100644 --- a/plugins/inputs/procstat/os_windows.go +++ b/plugins/inputs/procstat/os_windows.go @@ -5,8 +5,10 @@ package procstat import ( "errors" "fmt" + "syscall" "unsafe" + "github.com/shirou/gopsutil/v3/net" "github.com/shirou/gopsutil/v3/process" "golang.org/x/sys/windows" "golang.org/x/sys/windows/svc/mgr" @@ -57,7 +59,7 @@ func queryPidWithWinServiceName(winServiceName string) (uint32, error) { func collectMemmap(Process, string, map[string]any) {} -func findBySystemdUnits(_ []string) ([]processGroup, error) { +func findBySystemdUnits([]string) ([]processGroup, error) { return nil, nil } @@ -83,6 +85,76 @@ func findByWindowsServices(services []string) ([]processGroup, error) { return groups, nil } -func collectTotalReadWrite(_ Process) (r, w uint64, err error) { +func collectTotalReadWrite(Process) (r, w uint64, err error) { return 0, 0, errors.ErrUnsupported } + +func statsTCP(conns []net.ConnectionStat, _ uint8) ([]map[string]interface{}, error) { + if len(conns) == 0 { + return nil, nil + } + + // Filter the responses via the inodes belonging to the process + fieldslist := make([]map[string]interface{}, 0, len(conns)) + for _, c := range conns { + var proto string + switch c.Family { + case syscall.AF_INET: + proto = "tcp4" + case syscall.AF_INET6: + proto = "tcp6" + default: + continue + } + + fields := map[string]interface{}{ + "protocol": proto, + "state": c.Status, + "pid": c.Pid, + "src": c.Laddr.IP, + "src_port": c.Laddr.Port, + "dest": c.Raddr.IP, + "dest_port": c.Raddr.Port, + } + fieldslist = append(fieldslist, fields) + } + + return fieldslist, nil +} + +func statsUDP(conns []net.ConnectionStat, _ uint8) ([]map[string]interface{}, error) { + if len(conns) == 0 { + return nil, nil + } + + // Filter the responses via the inodes belonging to the process + fieldslist := make([]map[string]interface{}, 0, len(conns)) + for _, c := range conns { + var proto string + switch c.Family { + case syscall.AF_INET: + proto = "udp4" + case syscall.AF_INET6: + proto = "udp6" + default: + continue + } + + fields := map[string]interface{}{ + "protocol": proto, + "state": c.Status, + "pid": c.Pid, + "src": c.Laddr.IP, + "src_port": c.Laddr.Port, + "dest": c.Raddr.IP, + "dest_port": c.Raddr.Port, + } + fieldslist = append(fieldslist, fields) + } + + return fieldslist, nil +} + +func statsUnix([]net.ConnectionStat) ([]map[string]interface{}, error) { + return nil, nil +} diff --git a/plugins/inputs/procstat/process.go b/plugins/inputs/procstat/process.go index bb92bc6750a40..9cb100b75fa5a 100644 --- a/plugins/inputs/procstat/process.go +++ b/plugins/inputs/procstat/process.go @@ -2,10 +2,13 @@ package procstat import ( "errors" + "fmt" "runtime" "strconv" + "syscall" "time" + gopsnet "github.com/shirou/gopsutil/v3/net" "github.com/shirou/gopsutil/v3/process" "github.com/influxdata/telegraf" @@ -17,7 +20,7 @@ type Process interface { Name() (string, error) SetTag(string, string) MemoryMaps(bool) (*[]process.MemoryMapsStat, error) - Metric(string, *collectionConfig) telegraf.Metric + Metrics(string, *collectionConfig, time.Time) ([]telegraf.Metric, error) } type PIDFinder interface { @@ -66,7 +69,7 @@ func (p *Proc) percent(_ time.Duration) (float64, error) { } // Add metrics a single Process -func (p *Proc) Metric(prefix string, cfg *collectionConfig) telegraf.Metric { +func (p *Proc) Metrics(prefix string, cfg *collectionConfig, t time.Time) ([]telegraf.Metric, error) { if prefix != "" { prefix += "_" } @@ -245,5 +248,133 @@ func (p *Proc) Metric(prefix string, cfg *collectionConfig) telegraf.Metric { } } - return metric.New("procstat", p.tags, fields, time.Time{}) + metrics := []telegraf.Metric{metric.New("procstat", p.tags, fields, t)} + + // Collect the socket statistics if requested + if cfg.features["sockets"] { + for _, protocol := range cfg.socketProtos { + // Get the requested connections for the PID + var fieldlist []map[string]interface{} + switch protocol { + case "all": + conns, err := gopsnet.ConnectionsPid(protocol, p.Pid) + if err != nil { + return metrics, fmt.Errorf("cannot get connections for %q of PID %d", protocol, p.Pid) + } + var connsTCPv4, connsTCPv6, connsUDPv4, connsUDPv6, connsUnix []gopsnet.ConnectionStat + for _, c := range conns { + switch { + case c.Family == syscall.AF_INET && c.Type == syscall.SOCK_STREAM: + connsTCPv4 = append(connsTCPv4, c) + case c.Family == syscall.AF_INET6 && c.Type == syscall.SOCK_STREAM: + connsTCPv6 = append(connsTCPv6, c) + case c.Family == syscall.AF_INET && c.Type == syscall.SOCK_DGRAM: + connsUDPv4 = append(connsUDPv4, c) + case c.Family == syscall.AF_INET6 && c.Type == syscall.SOCK_DGRAM: + connsUDPv6 = append(connsUDPv6, c) + case c.Family == syscall.AF_UNIX: + connsUnix = append(connsUnix, c) + } + } + fl, err := statsTCP(connsTCPv4, syscall.AF_INET) + if err != nil { + return metrics, fmt.Errorf("cannot get statistics for \"tcp4\" of PID %d", p.Pid) + } + fieldlist = append(fieldlist, fl...) + + fl, err = statsTCP(connsTCPv6, syscall.AF_INET6) + if err != nil { + return metrics, fmt.Errorf("cannot get statistics for \"tcp6\" of PID %d", p.Pid) + } + fieldlist = append(fieldlist, fl...) + + fl, err = statsUDP(connsUDPv4, syscall.AF_INET) + if err != nil { + return metrics, fmt.Errorf("cannot get statistics for \"udp4\" of PID %d", p.Pid) + } + fieldlist = append(fieldlist, fl...) + + fl, err = statsUDP(connsUDPv6, syscall.AF_INET6) + if err != nil { + return metrics, fmt.Errorf("cannot get statistics for \"udp6\" of PID %d", p.Pid) + } + fieldlist = append(fieldlist, fl...) + + fl, err = statsUnix(connsUnix) + if err != nil { + return metrics, fmt.Errorf("cannot get statistics for \"unix\" of PID %d", p.Pid) + } + fieldlist = append(fieldlist, fl...) + case "tcp4", "tcp6": + family := uint8(syscall.AF_INET) + if protocol == "tcp6" { + family = syscall.AF_INET6 + } + conns, err := gopsnet.ConnectionsPid(protocol, p.Pid) + if err != nil { + return metrics, fmt.Errorf("cannot get connections for %q of PID %d", protocol, p.Pid) + } + if fieldlist, err = statsTCP(conns, family); err != nil { + return metrics, fmt.Errorf("cannot get statistics for %q of PID %d", protocol, p.Pid) + } + case "udp4", "udp6": + family := uint8(syscall.AF_INET) + if protocol == "udp6" { + family = syscall.AF_INET6 + } + conns, err := gopsnet.ConnectionsPid(protocol, p.Pid) + if err != nil { + return metrics, fmt.Errorf("cannot get connections for %q of PID %d", protocol, p.Pid) + } + if fieldlist, err = statsUDP(conns, family); err != nil { + return metrics, fmt.Errorf("cannot get statistics for %q of PID %d", protocol, p.Pid) + } + case "unix": + conns, err := gopsnet.ConnectionsPid(protocol, p.Pid) + if err != nil { + return metrics, fmt.Errorf("cannot get connections for %q of PID %d", protocol, p.Pid) + } + if fieldlist, err = statsUnix(conns); err != nil { + return metrics, fmt.Errorf("cannot get statistics for %q of PID %d", protocol, p.Pid) + } + } + + for _, fields := range fieldlist { + if cfg.tagging["protocol"] { + p.tags["protocol"] = fields["protocol"].(string) + delete(fields, "protocol") + } + if cfg.tagging["state"] { + p.tags["state"] = fields["state"].(string) + delete(fields, "state") + } + if cfg.tagging["src"] && fields["src"] != nil { + p.tags["src"] = fields["src"].(string) + delete(fields, "src") + } + if cfg.tagging["src_port"] && fields["src_port"] != nil { + port := uint64(fields["src_port"].(uint16)) + p.tags["src_port"] = strconv.FormatUint(port, 10) + delete(fields, "src_port") + } + if cfg.tagging["dest"] && fields["dest"] != nil { + p.tags["dest"] = fields["dest"].(string) + delete(fields, "dest") + } + if cfg.tagging["dest_port"] && fields["dest_port"] != nil { + port := uint64(fields["dest_port"].(uint16)) + p.tags["dest_port"] = strconv.FormatUint(port, 10) + delete(fields, "dest_port") + } + if cfg.tagging["name"] && fields["name"] != nil { + p.tags["name"] = fields["name"].(string) + delete(fields, "name") + } + + metrics = append(metrics, metric.New("procstat_socket", p.tags, fields, t)) + } + } + } + + return metrics, nil } diff --git a/plugins/inputs/procstat/procstat.go b/plugins/inputs/procstat/procstat.go index 0a211de4195cb..bd9b4d8bd7d2c 100644 --- a/plugins/inputs/procstat/procstat.go +++ b/plugins/inputs/procstat/procstat.go @@ -10,6 +10,7 @@ import ( "os/exec" "path/filepath" "runtime" + "slices" "strconv" "strings" "time" @@ -30,9 +31,10 @@ var execCommand = exec.Command type PID int32 type collectionConfig struct { - solarisMode bool - tagging map[string]bool - features map[string]bool + solarisMode bool + tagging map[string]bool + features map[string]bool + socketProtos []string } type Procstat struct { @@ -53,6 +55,7 @@ type Procstat struct { WinService string `toml:"win_service"` Mode string `toml:"mode"` Properties []string `toml:"properties"` + SocketProtocols []string `toml:"socket_protocols"` TagWith []string `toml:"tag_with"` Filter []Filter `toml:"filter"` Log telegraf.Logger `toml:"-"` @@ -96,6 +99,10 @@ func (p *Procstat) Init() error { for _, tag := range p.TagWith { switch tag { case "cmdline", "pid", "ppid", "status", "user": + case "protocol", "state", "src", "src_port", "dest", "dest_port", "name": // socket only + if !slices.Contains(p.Properties, "sockets") { + return fmt.Errorf("socket tagging option %q specified without sockets enabled", tag) + } default: return fmt.Errorf("invalid 'tag_with' setting %q", tag) } @@ -107,6 +114,27 @@ func (p *Procstat) Init() error { for _, prop := range p.Properties { switch prop { case "cpu", "limits", "memory", "mmap": + case "sockets": + if len(p.SocketProtocols) == 0 { + p.SocketProtocols = []string{"all"} + } + protos := make(map[string]bool, len(p.SocketProtocols)) + for _, proto := range p.SocketProtocols { + switch proto { + case "all": + if len(protos) > 0 || len(p.SocketProtocols) > 1 { + return errors.New("additional 'socket_protocol' settings besides 'all' are not allowed") + } + case "tcp4", "tcp6", "udp4", "udp6", "unix": + default: + return fmt.Errorf("invalid 'socket_protocol' setting %q", proto) + } + if protos[proto] { + return fmt.Errorf("duplicate %q in 'socket_protocol' setting", proto) + } + protos[proto] = true + p.cfg.socketProtos = append(p.cfg.socketProtos, proto) + } default: return fmt.Errorf("invalid 'properties' setting %q", prop) } @@ -252,9 +280,15 @@ func (p *Procstat) gatherOld(acc telegraf.Accumulator) error { p.processes[pid] = proc } running[pid] = true - m := proc.Metric(p.Prefix, &p.cfg) - m.SetTime(now) - acc.AddMetric(m) + metrics, err := proc.Metrics(p.Prefix, &p.cfg, now) + if err != nil { + // Continue after logging an error as there might still be + // metrics available + acc.AddError(err) + } + for _, m := range metrics { + acc.AddMetric(m) + } } } @@ -351,9 +385,15 @@ func (p *Procstat) gatherNew(acc telegraf.Accumulator) error { p.processes[pid] = proc } running[pid] = true - m := proc.Metric(p.Prefix, &p.cfg) - m.SetTime(now) - acc.AddMetric(m) + metrics, err := proc.Metrics(p.Prefix, &p.cfg, now) + if err != nil { + // Continue after logging an error as there might still be + // metrics available + acc.AddError(err) + } + for _, m := range metrics { + acc.AddMetric(m) + } } } diff --git a/plugins/inputs/procstat/procstat_test.go b/plugins/inputs/procstat/procstat_test.go index 5221d831406bc..883ea5bb4629d 100644 --- a/plugins/inputs/procstat/procstat_test.go +++ b/plugins/inputs/procstat/procstat_test.go @@ -142,7 +142,7 @@ func (p *testProc) MemoryMaps(bool) (*[]process.MemoryMapsStat, error) { return &[]process.MemoryMapsStat{}, nil } -func (p *testProc) Metric(prefix string, cfg *collectionConfig) telegraf.Metric { +func (p *testProc) Metrics(prefix string, cfg *collectionConfig, t time.Time) ([]telegraf.Metric, error) { if prefix != "" { prefix += "_" } @@ -212,7 +212,7 @@ func (p *testProc) Metric(prefix string, cfg *collectionConfig) telegraf.Metric fields[prefix+"user"] = "testuser" } - return metric.New("procstat", tags, fields, time.Time{}) + return []telegraf.Metric{metric.New("procstat", tags, fields, t)}, nil } var pid = PID(42) diff --git a/plugins/inputs/procstat/sample.conf b/plugins/inputs/procstat/sample.conf index 0977412a82072..ea897691a0e32 100644 --- a/plugins/inputs/procstat/sample.conf +++ b/plugins/inputs/procstat/sample.conf @@ -35,17 +35,40 @@ ## result in a large number of series, especially with short-lived processes, ## creating high cardinality at the output. ## Available options are: - ## cmdline -- full commandline - ## pid -- ID of the process - ## ppid -- ID of the process' parent - ## status -- state of the process - ## user -- username owning the process + ## cmdline -- full commandline + ## pid -- ID of the process + ## ppid -- ID of the process' parent + ## status -- state of the process + ## user -- username owning the process + ## socket only options: + ## protocol -- protocol type of the process socket + ## state -- state of the process socket + ## src -- source address of the process socket (non-unix sockets) + ## src_port -- source port of the process socket (non-unix sockets) + ## dest -- destination address of the process socket (non-unix sockets) + ## dest_port -- destination port of the process socket (non-unix sockets) + ## name -- name of the process socket (unix sockets only) # tag_with = [] ## Properties to collect - ## Available options are "cpu", "limits", "memory", "mmap" + ## Available options are + ## cpu -- CPU usage statistics + ## limits -- set resource limits + ## memory -- memory usage statistics + ## mmap -- mapped memory usage statistics (caution: can cause high load) + ## sockets -- socket statistics for protocols in 'socket_protocols' # properties = ["cpu", "limits", "memory", "mmap"] + ## Protocol filter for the sockets property + ## Available options are + ## all -- all of the protocols below + ## tcp4 -- TCP socket statistics for IPv4 + ## tcp6 -- TCP socket statistics for IPv6 + ## udp4 -- UDP socket statistics for IPv4 + ## udp6 -- UDP socket statistics for IPv6 + ## unix -- Unix socket statistics + # socket_protocols = ["all"] + ## Method to use when finding process IDs. Can be one of 'pgrep', or ## 'native'. The pgrep finder calls the pgrep executable in the PATH while ## the native finder performs the search directly in a manor dependent on the