Skip to content

Commit

Permalink
Merge branch 'main' into 31767-filestream-id-already-exists
Browse files Browse the repository at this point in the history
  • Loading branch information
AndersonQ authored Nov 12, 2024
2 parents 0481528 + 587dc60 commit 95c0b33
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]

*Packetbeat*

- Expire source port mappings. {pull}41581[41581]

*Winlogbeat*

Expand Down
64 changes: 49 additions & 15 deletions packetbeat/procs/procs.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type portProcMapping struct {
endpoint endpoint // FIXME: This is never used.
pid int
proc *process
expires time.Time
}

// process describes an OS process.
Expand Down Expand Up @@ -185,8 +186,8 @@ func (proc *ProcessesWatcher) isLocalIP(ip net.IP) bool {

func (proc *ProcessesWatcher) findProc(address net.IP, port uint16, transport applayer.Transport) *process {
proc.mu.Lock()
defer proc.mu.Unlock()
procMap, ok := proc.portProcMap[transport]
proc.mu.Unlock()
if !ok {
return nil
}
Expand All @@ -206,24 +207,47 @@ func (proc *ProcessesWatcher) findProc(address net.IP, port uint16, transport ap
return nil
}

func lookupMapping(address net.IP, port uint16, procMap map[endpoint]portProcMapping) (p portProcMapping, found bool) {
// proc.mu must be locked
func lookupMapping(address net.IP, port uint16, procMap map[endpoint]portProcMapping) (portProcMapping, bool) {
now := time.Now()
key := endpoint{address.String(), port}
p, found := procMap[key]

// Precedence when one socket is bound to a specific IP:port and another one
// to INADDR_ANY and same port is not clear. Seems that the last one to bind
// takes precedence, and we don't have a way to tell.
// This function takes the naive approach of giving precedence to the more
// specific address and then to INADDR_ANY.
if p, found = procMap[endpoint{address.String(), port}]; found {
return p, found
if !found {
if address.To4() != nil {
key.address = anyIPv4
} else {
key.address = anyIPv6
}

p, found = procMap[key]
}

nullAddr := anyIPv4
if asIPv4 := address.To4(); asIPv4 == nil {
nullAddr = anyIPv6
// We can't guarantee `p` doesn't point to an old entry, since
// we never remove entries from `procMap`, we only overwrite
// them, but we only overwrite them once an unrelated packet
// that doesn't have an entry on `procMap` ends up rebuilding
// the whole map.
//
// We take a conservative approach by discarding the entry if
// it's old enough. When we fail the first time here, our caller
// updates all maps and calls us again.
if found && now.After(p.expires) {
logp.Debug("procs", "PID %d (%s) port %d is too old, discarding", p.pid, p.proc.name, port)
delete(procMap, key)
p = portProcMapping{}
found = false
}
p, found = procMap[endpoint{nullAddr, port}]

return p, found
}

// proc.mu must be locked
func (proc *ProcessesWatcher) updateMap(transport applayer.Transport) {
if logp.HasSelector("procsdetailed") {
start := time.Now()
Expand All @@ -244,6 +268,7 @@ func (proc *ProcessesWatcher) updateMap(transport applayer.Transport) {
}
}

// proc.mu must be locked
func (proc *ProcessesWatcher) expireProcessCache() {
now := time.Now()
for pid, info := range proc.processCache {
Expand All @@ -253,9 +278,8 @@ func (proc *ProcessesWatcher) expireProcessCache() {
}
}

// proc.mu must be locked
func (proc *ProcessesWatcher) updateMappingEntry(transport applayer.Transport, e endpoint, pid int) {
proc.mu.Lock()
defer proc.mu.Unlock()
prev, ok := proc.portProcMap[transport][e]
if ok && prev.pid == pid {
// This port->pid mapping already exists
Expand All @@ -267,11 +291,21 @@ func (proc *ProcessesWatcher) updateMappingEntry(transport applayer.Transport, e
return
}

// Simply overwrite old entries for now.
// We never expire entries from this map. Since there are 65k possible
// ports, the size of the dict can be max 1.5 MB, which we consider
// reasonable.
proc.portProcMap[transport][e] = portProcMapping{endpoint: e, pid: pid, proc: p}
// We overwrite previous entries here, and they expire in
// lookupMapping() if they are deemed old enough.
//
// Map size is bound by the number of ports: ~65k, so it's
// fine to have old entries lingering, as long as we don't
// trust them on subsequent connections.
//
// If the source port is re-used within the hardcoded 10
// seconds window, we might end up hitting an old mapping.
proc.portProcMap[transport][e] = portProcMapping{
endpoint: e,
pid: pid,
proc: p,
expires: time.Now().Add(10 * time.Second),
}

if logp.IsDebug("procsdetailed") {
logp.Debug("procsdetailed", "updateMappingEntry(): local=%s:%d/%s pid=%d process='%s'",
Expand Down

0 comments on commit 95c0b33

Please sign in to comment.