Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(probe/ebpf): feed initial connections synchronously on restart #3712

Merged
merged 2 commits into from
Oct 22, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions probe/endpoint/connection_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func newConnectionTracker(conf ReporterConfig) connectionTracker {
et, err := newEbpfTracker()
if err == nil {
ct.ebpfTracker = et
go ct.getInitialState()
go feedEBPFInitialState(conf, et)
return ct
}
log.Warnf("Error setting up the eBPF tracker, falling back to proc scanning: %v", err)
Expand Down Expand Up @@ -86,7 +86,7 @@ func (t *connectionTracker) ReportConnections(rpt *report.Report) {
log.Warnf("ebpf tracker died, restarting it")
err := t.ebpfTracker.restart()
if err == nil {
go t.getInitialState()
feedEBPFInitialState(t.conf, t.ebpfTracker)
t.performEbpfTrack(rpt, hostNodeID)
return
}
Expand All @@ -108,13 +108,13 @@ func (t *connectionTracker) ReportConnections(rpt *report.Report) {
}
}

func (t *connectionTracker) existingFlows() map[string]fourTuple {
func existingFlowsFromConntrack(conf ReporterConfig) map[string]fourTuple {
seenTuples := map[string]fourTuple{}
if !t.conf.UseConntrack {
if !conf.UseConntrack {
// log.Warnf("Not using conntrack: disabled")
} else if err := IsConntrackSupported(t.conf.ProcRoot); err != nil {
} else if err := IsConntrackSupported(conf.ProcRoot); err != nil {
log.Warnf("Not using conntrack: not supported by the kernel: %s", err)
} else if existingFlows, err := conntrack.ConnectionsSize(t.conf.BufferSize); err != nil {
} else if existingFlows, err := conntrack.ConnectionsSize(conf.BufferSize); err != nil {
log.Errorf("conntrack existingConnections error: %v", err)
} else {
for _, f := range existingFlows {
Expand Down Expand Up @@ -147,18 +147,20 @@ func (t *connectionTracker) performWalkProc(rpt *report.Report, hostNodeID strin
return nil
}

// getInitialState runs conntrack and proc parsing synchronously only
// feedEBPFInitialState runs conntrack and proc parsing synchronously only
// once to initialize ebpfTracker
func (t *connectionTracker) getInitialState() {
// This is run on a background goroutine during initial setup, so does
// not take *connectionTracker which could change under it
func feedEBPFInitialState(conf ReporterConfig, ebpfTracker *EbpfTracker) {
var processCache *process.CachingWalker
walker := process.NewWalker(t.conf.ProcRoot, true)
walker := process.NewWalker(conf.ProcRoot, true)
processCache = process.NewCachingWalker(walker)
processCache.Tick()

scanner := procspy.NewSyncConnectionScanner(processCache, t.conf.SpyProcs)
scanner := procspy.NewSyncConnectionScanner(processCache, conf.SpyProcs)

// Consult conntrack to get the initial state
seenTuples := t.existingFlows()
seenTuples := existingFlowsFromConntrack(conf)

conns, err := scanner.Connections()
if err != nil {
Expand All @@ -173,7 +175,7 @@ func (t *connectionTracker) getInitialState() {
}
})

t.ebpfTracker.feedInitialConnections(conns, seenTuples, processesWaitingInAccept, report.MakeHostNodeID(t.conf.HostID))
ebpfTracker.feedInitialConnections(conns, seenTuples, processesWaitingInAccept, report.MakeHostNodeID(conf.HostID))
}

func (t *connectionTracker) performEbpfTrack(rpt *report.Report, hostNodeID string) error {
Expand Down