diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b3e908b5715..16dfe0e6c79 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -127,6 +127,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - system/package: Fix parsing of Installed-Size field of DEB packages. {issue}16661[16661] {pull}17188[17188] - system module: Fix panic during initialisation when /proc/stat can't be read. {pull}17569[17569] - system/package: Fix an error that can occur while trying to persist package metadata. {issue}18536[18536] {pull}18887[18887] +- system/socket: Fix dataset using 100% CPU and becoming unresponsive in some scenarios. {pull}19033[19033] +- system/socket: Fixed tracking of long-running connections. {pull}19033[19033] *Filebeat* diff --git a/x-pack/auditbeat/module/system/socket/state.go b/x-pack/auditbeat/module/system/socket/state.go index f9e4d6f9f18..afd72e853fc 100644 --- a/x-pack/auditbeat/module/system/socket/state.go +++ b/x-pack/auditbeat/module/system/socket/state.go @@ -178,7 +178,7 @@ type flow struct { process *process local, remote endpoint complete bool - + done bool // these are automatically calculated by state from kernelTimes above createdTime, lastSeenTime time.Time } @@ -253,7 +253,6 @@ type socket struct { process *process // This signals that the socket is in the closeTimeout list. closing bool - closeTime time.Time prev, next linkedElement createdTime, lastSeenTime time.Time @@ -281,7 +280,7 @@ func (s *socket) SetNext(e linkedElement) { // Timestamp returns the time reference used to expire sockets. func (s *socket) Timestamp() time.Time { - return s.closeTime + return s.lastSeenTime } type dnsTracker struct { @@ -372,13 +371,16 @@ type state struct { closing linkedList dns dnsTracker + + // Decouple time.Now() + clock func() time.Time } func (s *state) getSocket(sock uintptr) *socket { if socket, found := s.socks[sock]; found { return socket } - now := time.Now() + now := s.clock() socket := &socket{ sock: sock, createdTime: now, @@ -397,6 +399,7 @@ var kernelProcess = process{ func NewState(r mb.PushReporterV2, log helper.Logger, inactiveTimeout, socketTimeout, closeTimeout, clockMaxDrift time.Duration) *state { s := makeState(r, log, inactiveTimeout, socketTimeout, closeTimeout, clockMaxDrift) go s.reapLoop() + go s.logStateLoop() return s } @@ -412,6 +415,7 @@ func makeState(r mb.PushReporterV2, log helper.Logger, inactiveTimeout, socketTi closeTimeout: closeTimeout, clockMaxDrift: clockMaxDrift, dns: newDNSTracker(inactiveTimeout * 2), + clock: time.Now, } } @@ -438,7 +442,7 @@ func (s *state) logState() { events := atomic.LoadUint64(&eventCount) s.Unlock() - now := time.Now() + now := s.clock() took := now.Sub(lastTime) newEvs := events - lastEvents lastEvents = events @@ -461,8 +465,6 @@ func (s *state) logState() { func (s *state) reapLoop() { reportTicker := time.NewTicker(reapInterval) defer reportTicker.Stop() - logTicker := time.NewTicker(logInterval) - defer logTicker.Stop() for { select { case <-s.reporter.Done(): @@ -489,6 +491,17 @@ func (s *state) reapLoop() { return } } + } + } +} + +func (s *state) logStateLoop() { + logTicker := time.NewTicker(logInterval) + defer logTicker.Stop() + for { + select { + case <-s.reporter.Done(): + return case <-logTicker.C: s.logState() } @@ -498,7 +511,7 @@ func (s *state) reapLoop() { func (s *state) ExpireOlder() { s.Lock() defer s.Unlock() - deadline := time.Now().Add(-s.inactiveTimeout) + deadline := s.clock().Add(-s.inactiveTimeout) for item := s.flowLRU.peek(); item != nil && item.Timestamp().Before(deadline); { if flow, ok := item.(*flow); ok { s.onFlowTerminated(flow) @@ -507,8 +520,7 @@ func (s *state) ExpireOlder() { } item = s.flowLRU.peek() } - - deadline = time.Now().Add(-s.socketTimeout) + deadline = s.clock().Add(-s.socketTimeout) for item := s.socketLRU.peek(); item != nil && item.Timestamp().Before(deadline); { if sock, ok := item.(*socket); ok { s.onSockDestroyed(sock.sock, 0) @@ -517,8 +529,7 @@ func (s *state) ExpireOlder() { } item = s.socketLRU.peek() } - - deadline = time.Now().Add(-s.closeTimeout) + deadline = s.clock().Add(-s.closeTimeout) for item := s.closing.peek(); item != nil && item.Timestamp().Before(deadline); { if sock, ok := item.(*socket); ok { s.onSockTerminated(sock) @@ -601,6 +612,8 @@ func (s *state) onSockTerminated(sock *socket) { delete(s.socks, sock.sock) if sock.closing { s.closing.remove(sock) + } else { + s.moveToClosing(sock) } } @@ -659,7 +672,7 @@ func (s *state) mutualEnrich(sock *socket, f *flow) { f.process = sock.process } if !sock.closing { - sock.lastSeenTime = time.Now() + sock.lastSeenTime = s.clock() s.socketLRU.remove(sock) s.socketLRU.add(sock) } @@ -699,7 +712,6 @@ func (s *state) onSockDestroyed(ptr uintptr, pid uint32) error { if !found { return nil } - // Enrich with pid if sock.pid == 0 && pid != 0 { sock.pid = pid @@ -710,14 +722,18 @@ func (s *state) onSockDestroyed(ptr uintptr, pid uint32) error { // Keep the sock around in case it's a connected TCP socket, as still some // packets can be received shortly after/during inet_release. if !sock.closing { - sock.closeTime = time.Now() - sock.closing = true - s.socketLRU.remove(sock) - s.closing.add(sock) + s.moveToClosing(sock) } return nil } +func (s *state) moveToClosing(sock *socket) { + sock.lastSeenTime = s.clock() + sock.closing = true + s.socketLRU.remove(sock) + s.closing.add(sock) +} + // UpdateFlow receives a partial flow and creates or updates an existing flow. func (s *state) UpdateFlow(ref flow) error { return s.UpdateFlowWithCondition(ref, nil) @@ -794,7 +810,11 @@ func (f *flow) updateWith(ref flow, s *state) { } func (s *state) onFlowTerminated(f *flow) { + if f.done { + return + } s.flowLRU.remove(f) + f.done = true // Unbind this flow from its parent if parent, found := s.socks[f.sock]; found { delete(parent.flows, f.remote.addr.String()) @@ -1011,8 +1031,8 @@ func (s *state) kernTimestampToTime(ts kernelTime) time.Time { } if s.kernelEpoch == (time.Time{}) { // This is the first event and time sync hasn't happened yet. - // Take a temporary epoch relative to time.Now() - now := time.Now() + // Take a temporary epoch relative to current time. + now := s.clock() s.kernelEpoch = now.Add(-time.Duration(ts)) return now } diff --git a/x-pack/auditbeat/module/system/socket/state_test.go b/x-pack/auditbeat/module/system/socket/state_test.go index 1fcaeb78abf..9b36c8b5dd4 100644 --- a/x-pack/auditbeat/module/system/socket/state_test.go +++ b/x-pack/auditbeat/module/system/socket/state_test.go @@ -10,6 +10,7 @@ import ( "encoding/binary" "fmt" "net" + "os" "testing" "time" @@ -134,16 +135,24 @@ func TestTCPConnWithProcess(t *testing.T) { func TestTCPConnWithProcessSocketTimeouts(t *testing.T) { const ( - localIP = "192.168.33.10" - remoteIP = "172.19.12.13" - localPort = 38842 - remotePort = 443 - sock uintptr = 0xff1234 + localIP = "192.168.33.10" + remoteIP = "172.19.12.13" + localPort = 38842 + remotePort = 443 + sock uintptr = 0xff1234 + flowTimeout = time.Hour + socketTimeout = time.Minute * 3 + closeTimeout = time.Minute ) - st := makeState(nil, (*logWrapper)(t), time.Second, 0, 0, time.Second) + st := makeState(nil, (*logWrapper)(t), flowTimeout, socketTimeout, closeTimeout, time.Second) + now := time.Now() + st.clock = func() time.Time { + return now + } lPort, rPort := be16(localPort), be16(remotePort) lAddr, rAddr := ipv4(localIP), ipv4(remoteIP) evs := []event{ + callExecve(meta(1234, 1234, 1), []string{"/usr/bin/curl", "https://example.net/", "-o", "/tmp/site.html"}), &commitCreds{Meta: meta(1234, 1234, 2), UID: 501, GID: 20, EUID: 501, EGID: 20}, &execveRet{Meta: meta(1234, 1234, 2), Retval: 1234}, @@ -174,7 +183,18 @@ func TestTCPConnWithProcessSocketTimeouts(t *testing.T) { t.Fatal(err) } st.ExpireOlder() + // Nothing expired just yet. + flows, err := getFlows(st.DoneFlows(), all) + if err != nil { + t.Fatal(err) + } + assert.Empty(t, flows) + evs = []event{ + &clockSyncCall{ + Meta: meta(uint32(os.Getpid()), 1235, 0), + Ts: uint64(now.UnixNano()), + }, &inetReleaseCall{Meta: meta(0, 0, 15), Sock: sock}, &tcpV4DoRcv{ Meta: meta(0, 0, 17), @@ -185,17 +205,31 @@ func TestTCPConnWithProcessSocketTimeouts(t *testing.T) { RAddr: rAddr, RPort: rPort, }, - &doExit{Meta: meta(1234, 1234, 18)}, + + &inetCreate{Meta: meta(1234, 1235, 18), Proto: 0}, + &sockInitData{Meta: meta(1234, 1235, 19), Sock: sock + 1}, + &tcpIPv4ConnectCall{Meta: meta(1234, 1235, 20), Sock: sock + 1, RAddr: rAddr, RPort: rPort}, + &tcpV4DoRcv{ + Meta: meta(0, 0, 21), + Sock: sock + 1, + Size: 12, + LAddr: lAddr, + LPort: lPort, + RAddr: rAddr, + RPort: rPort, + }, } if err := feedEvents(evs, st, t); err != nil { t.Fatal(err) } + // Expire the first socket + now = now.Add(closeTimeout + 1) st.ExpireOlder() - flows, err := getFlows(st.DoneFlows(), all) + flows, err = getFlows(st.DoneFlows(), all) if err != nil { t.Fatal(err) } - assert.Len(t, flows, 2) + assert.Len(t, flows, 1) flow := flows[0] t.Log("read flow 0", flow) for field, expected := range map[string]interface{}{ @@ -207,8 +241,8 @@ func TestTCPConnWithProcessSocketTimeouts(t *testing.T) { "client.port": localPort, "destination.ip": remoteIP, "destination.port": remotePort, - "destination.packets": uint64(1), - "destination.bytes": uint64(12), + "destination.packets": uint64(2), + "destination.bytes": uint64(19), "server.ip": remoteIP, "server.port": remotePort, "network.direction": "outbound", @@ -224,10 +258,28 @@ func TestTCPConnWithProcessSocketTimeouts(t *testing.T) { t.Fatal("expected value not found") } } + // Wait until sock+1 expires due to inactivity. It won't be available + // just yet. + now = now.Add(socketTimeout + 1) + st.ExpireOlder() + flows, err = getFlows(st.DoneFlows(), all) + if err != nil { + t.Fatal(err) + } + assert.Empty(t, flows) + + // Wait until the sock is closed completely. + now = now.Add(closeTimeout + 1) + st.ExpireOlder() + flows, err = getFlows(st.DoneFlows(), all) + if err != nil { + t.Fatal(err) + } + assert.Len(t, flows, 1) + flow = flows[0] // we have a truncated flow with no directionality, // so just report what we can - flow = flows[1] t.Log("read flow 1", flow) for field, expected := range map[string]interface{}{ "source.ip": localIP, @@ -706,5 +758,55 @@ func TestUDPSendMsgAltLogic(t *testing.T) { ev.AltRAddrA, ev.AltRAddrB = ipv6("fddd::cafe") assert.Equal(t, expectedIPv6, ev.String()) }) +} +func TestSocketReuse(t *testing.T) { + const ( + localIP = "192.168.33.10" + remoteIP = "172.19.12.13" + localPort = 38842 + remotePort = 53 + sock uintptr = 0xff1234 + ) + st := makeState(nil, (*logWrapper)(t), time.Hour, time.Hour, 0, time.Hour) + lPort, rPort := be16(localPort), be16(remotePort) + lAddr, rAddr := ipv4(localIP), ipv4(remoteIP) + evs := []event{ + &clockSyncCall{ + Meta: meta(uint32(os.Getpid()), 1235, 5), + Ts: uint64(time.Now().UnixNano()), + }, + &inetCreate{Meta: meta(1234, 1235, 5), Proto: 0}, + &sockInitData{Meta: meta(1234, 1235, 5), Sock: sock}, + &udpSendMsgCall{ + Meta: meta(1234, 1235, 6), + Sock: sock, + Size: 123, + LAddr: lAddr, + AltRAddr: rAddr, + LPort: lPort, + AltRPort: rPort, + }, + // Asume inetRelease lost. + &inetCreate{Meta: meta(1234, 1235, 5), Proto: 0}, + &sockInitData{Meta: meta(1234, 1235, 5), Sock: sock}, + &udpSendMsgCall{ + Meta: meta(1234, 1235, 6), + Sock: sock, + Size: 123, + LAddr: lAddr, + AltRAddr: rAddr, + LPort: lPort, + AltRPort: rPort, + }, + } + if err := feedEvents(evs, st, t); err != nil { + t.Fatal(err) + } + st.ExpireOlder() + flows, err := getFlows(st.DoneFlows(), all) + if err != nil { + t.Fatal(err) + } + assert.Len(t, flows, 1) }