Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auditbeat: Fixes for system/socket dataset #19033

Merged
merged 7 commits into from
Jun 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- system/package: Fix parsing of Installed-Size field of DEB packages. {issue}16661[16661] {pull}17188[17188]
- system module: Fix panic during initialisation when /proc/stat can't be read. {pull}17569[17569]
- system/package: Fix an error that can occur while trying to persist package metadata. {issue}18536[18536] {pull}18887[18887]
- system/socket: Fix dataset using 100% CPU and becoming unresponsive in some scenarios. {pull}19033[19033]
- system/socket: Fixed tracking of long-running connections. {pull}19033[19033]

*Filebeat*

Expand Down
60 changes: 40 additions & 20 deletions x-pack/auditbeat/module/system/socket/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ type flow struct {
process *process
local, remote endpoint
complete bool

done bool
// these are automatically calculated by state from kernelTimes above
createdTime, lastSeenTime time.Time
}
Expand Down Expand Up @@ -253,7 +253,6 @@ type socket struct {
process *process
// This signals that the socket is in the closeTimeout list.
closing bool
closeTime time.Time
prev, next linkedElement

createdTime, lastSeenTime time.Time
Expand Down Expand Up @@ -281,7 +280,7 @@ func (s *socket) SetNext(e linkedElement) {

// Timestamp returns the time reference used to expire sockets.
func (s *socket) Timestamp() time.Time {
return s.closeTime
return s.lastSeenTime
}

type dnsTracker struct {
Expand Down Expand Up @@ -372,13 +371,16 @@ type state struct {
closing linkedList

dns dnsTracker

// Decouple time.Now()
clock func() time.Time
}

func (s *state) getSocket(sock uintptr) *socket {
if socket, found := s.socks[sock]; found {
return socket
}
now := time.Now()
now := s.clock()
socket := &socket{
sock: sock,
createdTime: now,
Expand All @@ -397,6 +399,7 @@ var kernelProcess = process{
func NewState(r mb.PushReporterV2, log helper.Logger, inactiveTimeout, socketTimeout, closeTimeout, clockMaxDrift time.Duration) *state {
s := makeState(r, log, inactiveTimeout, socketTimeout, closeTimeout, clockMaxDrift)
go s.reapLoop()
go s.logStateLoop()
return s
}

Expand All @@ -412,6 +415,7 @@ func makeState(r mb.PushReporterV2, log helper.Logger, inactiveTimeout, socketTi
closeTimeout: closeTimeout,
clockMaxDrift: clockMaxDrift,
dns: newDNSTracker(inactiveTimeout * 2),
clock: time.Now,
}
}

Expand All @@ -438,7 +442,7 @@ func (s *state) logState() {
events := atomic.LoadUint64(&eventCount)
s.Unlock()

now := time.Now()
now := s.clock()
took := now.Sub(lastTime)
newEvs := events - lastEvents
lastEvents = events
Expand All @@ -461,8 +465,6 @@ func (s *state) logState() {
func (s *state) reapLoop() {
reportTicker := time.NewTicker(reapInterval)
defer reportTicker.Stop()
logTicker := time.NewTicker(logInterval)
defer logTicker.Stop()
for {
select {
case <-s.reporter.Done():
Expand All @@ -489,6 +491,17 @@ func (s *state) reapLoop() {
return
}
}
}
}
}

func (s *state) logStateLoop() {
logTicker := time.NewTicker(logInterval)
defer logTicker.Stop()
for {
select {
case <-s.reporter.Done():
return
case <-logTicker.C:
s.logState()
}
Expand All @@ -498,7 +511,7 @@ func (s *state) reapLoop() {
func (s *state) ExpireOlder() {
s.Lock()
defer s.Unlock()
deadline := time.Now().Add(-s.inactiveTimeout)
deadline := s.clock().Add(-s.inactiveTimeout)
for item := s.flowLRU.peek(); item != nil && item.Timestamp().Before(deadline); {
if flow, ok := item.(*flow); ok {
s.onFlowTerminated(flow)
Expand All @@ -507,8 +520,7 @@ func (s *state) ExpireOlder() {
}
item = s.flowLRU.peek()
}

deadline = time.Now().Add(-s.socketTimeout)
deadline = s.clock().Add(-s.socketTimeout)
for item := s.socketLRU.peek(); item != nil && item.Timestamp().Before(deadline); {
if sock, ok := item.(*socket); ok {
s.onSockDestroyed(sock.sock, 0)
Expand All @@ -517,8 +529,7 @@ func (s *state) ExpireOlder() {
}
item = s.socketLRU.peek()
}

deadline = time.Now().Add(-s.closeTimeout)
deadline = s.clock().Add(-s.closeTimeout)
for item := s.closing.peek(); item != nil && item.Timestamp().Before(deadline); {
if sock, ok := item.(*socket); ok {
s.onSockTerminated(sock)
Expand Down Expand Up @@ -601,6 +612,8 @@ func (s *state) onSockTerminated(sock *socket) {
delete(s.socks, sock.sock)
if sock.closing {
s.closing.remove(sock)
} else {
s.moveToClosing(sock)
}
}

Expand Down Expand Up @@ -659,7 +672,7 @@ func (s *state) mutualEnrich(sock *socket, f *flow) {
f.process = sock.process
}
if !sock.closing {
sock.lastSeenTime = time.Now()
sock.lastSeenTime = s.clock()
s.socketLRU.remove(sock)
s.socketLRU.add(sock)
}
Expand Down Expand Up @@ -699,7 +712,6 @@ func (s *state) onSockDestroyed(ptr uintptr, pid uint32) error {
if !found {
return nil
}

// Enrich with pid
if sock.pid == 0 && pid != 0 {
sock.pid = pid
Expand All @@ -710,14 +722,18 @@ func (s *state) onSockDestroyed(ptr uintptr, pid uint32) error {
// Keep the sock around in case it's a connected TCP socket, as still some
// packets can be received shortly after/during inet_release.
if !sock.closing {
sock.closeTime = time.Now()
sock.closing = true
s.socketLRU.remove(sock)
s.closing.add(sock)
s.moveToClosing(sock)
}
return nil
}

func (s *state) moveToClosing(sock *socket) {
sock.lastSeenTime = s.clock()
sock.closing = true
s.socketLRU.remove(sock)
s.closing.add(sock)
}

// UpdateFlow receives a partial flow and creates or updates an existing flow.
func (s *state) UpdateFlow(ref flow) error {
return s.UpdateFlowWithCondition(ref, nil)
Expand Down Expand Up @@ -794,7 +810,11 @@ func (f *flow) updateWith(ref flow, s *state) {
}

func (s *state) onFlowTerminated(f *flow) {
if f.done {
return
}
s.flowLRU.remove(f)
f.done = true
// Unbind this flow from its parent
if parent, found := s.socks[f.sock]; found {
delete(parent.flows, f.remote.addr.String())
Expand Down Expand Up @@ -1011,8 +1031,8 @@ func (s *state) kernTimestampToTime(ts kernelTime) time.Time {
}
if s.kernelEpoch == (time.Time{}) {
// This is the first event and time sync hasn't happened yet.
// Take a temporary epoch relative to time.Now()
now := time.Now()
// Take a temporary epoch relative to current time.
now := s.clock()
s.kernelEpoch = now.Add(-time.Duration(ts))
return now
}
Expand Down
126 changes: 114 additions & 12 deletions x-pack/auditbeat/module/system/socket/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"encoding/binary"
"fmt"
"net"
"os"
"testing"
"time"

Expand Down Expand Up @@ -134,16 +135,24 @@ func TestTCPConnWithProcess(t *testing.T) {

func TestTCPConnWithProcessSocketTimeouts(t *testing.T) {
const (
localIP = "192.168.33.10"
remoteIP = "172.19.12.13"
localPort = 38842
remotePort = 443
sock uintptr = 0xff1234
localIP = "192.168.33.10"
remoteIP = "172.19.12.13"
localPort = 38842
remotePort = 443
sock uintptr = 0xff1234
flowTimeout = time.Hour
socketTimeout = time.Minute * 3
closeTimeout = time.Minute
)
st := makeState(nil, (*logWrapper)(t), time.Second, 0, 0, time.Second)
st := makeState(nil, (*logWrapper)(t), flowTimeout, socketTimeout, closeTimeout, time.Second)
now := time.Now()
st.clock = func() time.Time {
return now
}
lPort, rPort := be16(localPort), be16(remotePort)
lAddr, rAddr := ipv4(localIP), ipv4(remoteIP)
evs := []event{

callExecve(meta(1234, 1234, 1), []string{"/usr/bin/curl", "https://example.net/", "-o", "/tmp/site.html"}),
&commitCreds{Meta: meta(1234, 1234, 2), UID: 501, GID: 20, EUID: 501, EGID: 20},
&execveRet{Meta: meta(1234, 1234, 2), Retval: 1234},
Expand Down Expand Up @@ -174,7 +183,18 @@ func TestTCPConnWithProcessSocketTimeouts(t *testing.T) {
t.Fatal(err)
}
st.ExpireOlder()
// Nothing expired just yet.
flows, err := getFlows(st.DoneFlows(), all)
if err != nil {
t.Fatal(err)
}
assert.Empty(t, flows)

evs = []event{
&clockSyncCall{
Meta: meta(uint32(os.Getpid()), 1235, 0),
Ts: uint64(now.UnixNano()),
},
&inetReleaseCall{Meta: meta(0, 0, 15), Sock: sock},
&tcpV4DoRcv{
Meta: meta(0, 0, 17),
Expand All @@ -185,17 +205,31 @@ func TestTCPConnWithProcessSocketTimeouts(t *testing.T) {
RAddr: rAddr,
RPort: rPort,
},
&doExit{Meta: meta(1234, 1234, 18)},

&inetCreate{Meta: meta(1234, 1235, 18), Proto: 0},
&sockInitData{Meta: meta(1234, 1235, 19), Sock: sock + 1},
&tcpIPv4ConnectCall{Meta: meta(1234, 1235, 20), Sock: sock + 1, RAddr: rAddr, RPort: rPort},
&tcpV4DoRcv{
Meta: meta(0, 0, 21),
Sock: sock + 1,
Size: 12,
LAddr: lAddr,
LPort: lPort,
RAddr: rAddr,
RPort: rPort,
},
}
if err := feedEvents(evs, st, t); err != nil {
t.Fatal(err)
}
// Expire the first socket
now = now.Add(closeTimeout + 1)
st.ExpireOlder()
flows, err := getFlows(st.DoneFlows(), all)
flows, err = getFlows(st.DoneFlows(), all)
if err != nil {
t.Fatal(err)
}
assert.Len(t, flows, 2)
assert.Len(t, flows, 1)
flow := flows[0]
t.Log("read flow 0", flow)
for field, expected := range map[string]interface{}{
Expand All @@ -207,8 +241,8 @@ func TestTCPConnWithProcessSocketTimeouts(t *testing.T) {
"client.port": localPort,
"destination.ip": remoteIP,
"destination.port": remotePort,
"destination.packets": uint64(1),
"destination.bytes": uint64(12),
"destination.packets": uint64(2),
"destination.bytes": uint64(19),
"server.ip": remoteIP,
"server.port": remotePort,
"network.direction": "outbound",
Expand All @@ -224,10 +258,28 @@ func TestTCPConnWithProcessSocketTimeouts(t *testing.T) {
t.Fatal("expected value not found")
}
}
// Wait until sock+1 expires due to inactivity. It won't be available
// just yet.
now = now.Add(socketTimeout + 1)
st.ExpireOlder()
flows, err = getFlows(st.DoneFlows(), all)
if err != nil {
t.Fatal(err)
}
assert.Empty(t, flows)

// Wait until the sock is closed completely.
now = now.Add(closeTimeout + 1)
st.ExpireOlder()
flows, err = getFlows(st.DoneFlows(), all)
if err != nil {
t.Fatal(err)
}
assert.Len(t, flows, 1)
flow = flows[0]

// we have a truncated flow with no directionality,
// so just report what we can
flow = flows[1]
t.Log("read flow 1", flow)
for field, expected := range map[string]interface{}{
"source.ip": localIP,
Expand Down Expand Up @@ -706,5 +758,55 @@ func TestUDPSendMsgAltLogic(t *testing.T) {
ev.AltRAddrA, ev.AltRAddrB = ipv6("fddd::cafe")
assert.Equal(t, expectedIPv6, ev.String())
})
}

func TestSocketReuse(t *testing.T) {
const (
localIP = "192.168.33.10"
remoteIP = "172.19.12.13"
localPort = 38842
remotePort = 53
sock uintptr = 0xff1234
)
st := makeState(nil, (*logWrapper)(t), time.Hour, time.Hour, 0, time.Hour)
lPort, rPort := be16(localPort), be16(remotePort)
lAddr, rAddr := ipv4(localIP), ipv4(remoteIP)
evs := []event{
&clockSyncCall{
Meta: meta(uint32(os.Getpid()), 1235, 5),
Ts: uint64(time.Now().UnixNano()),
},
&inetCreate{Meta: meta(1234, 1235, 5), Proto: 0},
&sockInitData{Meta: meta(1234, 1235, 5), Sock: sock},
&udpSendMsgCall{
Meta: meta(1234, 1235, 6),
Sock: sock,
Size: 123,
LAddr: lAddr,
AltRAddr: rAddr,
LPort: lPort,
AltRPort: rPort,
},
// Asume inetRelease lost.
&inetCreate{Meta: meta(1234, 1235, 5), Proto: 0},
&sockInitData{Meta: meta(1234, 1235, 5), Sock: sock},
&udpSendMsgCall{
Meta: meta(1234, 1235, 6),
Sock: sock,
Size: 123,
LAddr: lAddr,
AltRAddr: rAddr,
LPort: lPort,
AltRPort: rPort,
},
}
if err := feedEvents(evs, st, t); err != nil {
t.Fatal(err)
}
st.ExpireOlder()
flows, err := getFlows(st.DoneFlows(), all)
if err != nil {
t.Fatal(err)
}
assert.Len(t, flows, 1)
}