Skip to content

Commit

Permalink
refactoring endpoint probe
Browse files Browse the repository at this point in the history
  • Loading branch information
paulbellamy committed Feb 16, 2016
1 parent fbbf5ce commit 1d9e3ac
Showing 1 changed file with 80 additions and 77 deletions.
157 changes: 80 additions & 77 deletions probe/endpoint/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ func (r *Reporter) Stop() {
r.scanner.Stop()
}

type fourTuple struct {
fromAddr, toAddr string
fromPort, toPort uint16
}

// Report implements Reporter.
func (r *Reporter) Report() (report.Report, error) {
defer func(begin time.Time) {
Expand All @@ -82,7 +87,7 @@ func (r *Reporter) Report() (report.Report, error) {

hostNodeID := report.MakeHostNodeID(r.hostID)
rpt := report.MakeReport()
seenTuples := map[string]struct{}{}
seenTuples := map[string]fourTuple{}

// Consult the flowWalker for short-live connections
{
Expand All @@ -91,15 +96,16 @@ func (r *Reporter) Report() (report.Report, error) {
})
r.flowWalker.walkFlows(func(f flow) {
var (
localPort = uint16(f.Original.Layer4.SrcPort)
remotePort = uint16(f.Original.Layer4.DstPort)
localAddr = f.Original.Layer3.SrcIP
remoteAddr = f.Original.Layer3.DstIP
fromAddr, fromPort = f.Original.Layer3.SrcIP, uint16(f.Original.Layer4.SrcPort)
toAddr, toPort = f.Original.Layer3.DstIP, uint16(f.Original.Layer4.DstPort)
tuple = []string{
fmt.Sprintf("%s:%d", fromAddr, fromPort),
fmt.Sprintf("%s:%d", toAddr, toPort),
}
)
tuple := []string{fmt.Sprintf("%s:%d", localAddr, localPort), fmt.Sprintf("%s:%d", remoteAddr, remotePort)}
sort.Strings(tuple)
seenTuples[strings.Join(tuple, " ")] = struct{}{}
r.addConnection(&rpt, localAddr, remoteAddr, localPort, remotePort, &extraNodeInfo, &extraNodeInfo, true)
seenTuples[strings.Join(tuple, " ")] = fourTuple{fromAddr, toAddr, fromPort, toPort}
r.addConnection(&rpt, fromAddr, toAddr, fromPort, toPort, &extraNodeInfo, &extraNodeInfo)
})
}

Expand All @@ -108,124 +114,121 @@ func (r *Reporter) Report() (report.Report, error) {
if err != nil {
return rpt, err
}
commonNodeInfo := report.MakeNode().WithLatests(map[string]string{
toNodeInfo := report.MakeNode().WithLatests(map[string]string{
Procspied: "true",
})
for conn := conns.Next(); conn != nil; conn = conns.Next() {
var (
localPort = conn.LocalPort
remotePort = conn.RemotePort
localAddr = conn.LocalAddress.String()
remoteAddr = conn.RemoteAddress.String()
fromAddr, fromPort = conn.LocalAddress.String(), conn.LocalPort
toAddr, toPort = conn.RemoteAddress.String(), conn.RemotePort
tuple = []string{
fmt.Sprintf("%s:%d", fromAddr, fromPort),
fmt.Sprintf("%s:%d", toAddr, toPort),
}
)
extraNodeInfo := commonNodeInfo.Copy()
fromNodeInfo := toNodeInfo.Copy()
if conn.Proc.PID > 0 {
extraNodeInfo = extraNodeInfo.WithLatests(map[string]string{
fromNodeInfo = fromNodeInfo.WithLatests(map[string]string{
process.PID: strconv.FormatUint(uint64(conn.Proc.PID), 10),
report.HostNodeID: hostNodeID,
})
}
tuple := []string{fmt.Sprintf("%s:%d", localAddr, localPort), fmt.Sprintf("%s:%d", remoteAddr, remotePort)}
sort.Strings(tuple)
tupleKey := strings.Join(tuple, " ")

localIsClient := true
if _, ok := seenTuples[tupleKey]; !ok {
localIsClient = int(localPort) > int(remotePort)
// If we've already seen this connection, we should know the direction
// (or have already figured it out), so we normalize and use the
// canonical direction. Otherwise, we can use a port-heuristic to guess
// the direction.
if c, ok := seenTuples[strings.Join(tuple, " ")]; ok {
if fromAddr != c.fromAddr || fromPort != c.fromPort {
fromAddr, fromPort, toAddr, toPort = c.fromAddr, c.fromPort, c.toAddr, c.toPort
toNodeInfo, fromNodeInfo = fromNodeInfo, toNodeInfo
}
} else if int(fromPort) < int(toPort) {
fromAddr, fromPort, toAddr, toPort = toAddr, toPort, fromAddr, fromPort
toNodeInfo, fromNodeInfo = fromNodeInfo, toNodeInfo
}
r.addConnection(&rpt, localAddr, remoteAddr, localPort, remotePort, &extraNodeInfo, &commonNodeInfo, localIsClient)
seenTuples[strings.Join(tuple, " ")] = fourTuple{fromAddr, toAddr, fromPort, toPort}
// TODO(paulbellamy): Won't adding this from conntrack, and adding the
// edge again here result in double-counting? Also, if we've seen the
// same conn twice from proc (if both ends are on the same host).
r.addConnection(&rpt, fromAddr, toAddr, fromPort, toPort, &fromNodeInfo, &toNodeInfo)
}
}

r.natMapper.applyNAT(rpt, r.hostID)
return rpt, nil
}

func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr string, localPort, remotePort uint16, extraLocalNode, extraRemoteNode *report.Node, localIsClient bool) {
func (r *Reporter) addConnection(rpt *report.Report, fromAddr, toAddr string, fromPort, toPort uint16, extraFromNode, extraToNode *report.Node) {
// Update address topology
{
var (
localAddressNodeID = report.MakeAddressNodeID(r.hostID, localAddr)
remoteAddressNodeID = report.MakeAddressNodeID(r.hostID, remoteAddr)
localNode = report.MakeNodeWith(map[string]string{
fromAddressNodeID = report.MakeAddressNodeID(r.hostID, fromAddr)
toAddressNodeID = report.MakeAddressNodeID(r.hostID, toAddr)
fromNode = report.MakeNodeWith(map[string]string{
"name": r.hostName,
Addr: localAddr,
Addr: fromAddr,
}).WithEdge(toAddressNodeID, report.EdgeMetadata{
// New nodes are merged into the report so we don't need to do any
// counting here; the merge does it for us.
MaxConnCountTCP: newu64(1),
})
remoteNode = report.MakeNodeWith(map[string]string{
Addr: remoteAddr,
toNode = report.MakeNodeWith(map[string]string{
Addr: toAddr,
})
)

// In case we have a reverse resolution for the IP, we can use it for
// the name...
if remoteNames, err := r.reverseResolver.get(remoteAddr); err == nil {
remoteNode = remoteNode.WithSet("name", report.MakeStringSet(remoteNames...))
}

if localIsClient {
// New nodes are merged into the report so we don't need to do any
// counting here; the merge does it for us.
localNode = localNode.WithEdge(remoteAddressNodeID, report.EdgeMetadata{
MaxConnCountTCP: newu64(1),
})
} else {
remoteNode = remoteNode.WithEdge(localAddressNodeID, report.EdgeMetadata{
MaxConnCountTCP: newu64(1),
})
if toNames, err := r.reverseResolver.get(toAddr); err == nil {
toNode = toNode.WithSet("name", report.MakeStringSet(toNames...))
}

if extraLocalNode != nil {
localNode = localNode.Merge(*extraLocalNode)
if extraFromNode != nil {
fromNode = fromNode.Merge(*extraFromNode)
}
if extraRemoteNode != nil {
remoteNode = remoteNode.Merge(*extraRemoteNode)
if extraToNode != nil {
toNode = toNode.Merge(*extraToNode)
}
rpt.Address = rpt.Address.AddNode(localAddressNodeID, localNode)
rpt.Address = rpt.Address.AddNode(remoteAddressNodeID, remoteNode)
rpt.Address = rpt.Address.AddNode(fromAddressNodeID, fromNode)
rpt.Address = rpt.Address.AddNode(toAddressNodeID, toNode)
}

// Update endpoint topology
if r.includeProcesses {
var (
localEndpointNodeID = report.MakeEndpointNodeID(r.hostID, localAddr, strconv.Itoa(int(localPort)))
remoteEndpointNodeID = report.MakeEndpointNodeID(r.hostID, remoteAddr, strconv.Itoa(int(remotePort)))

localNode = report.MakeNodeWith(map[string]string{
Addr: localAddr,
Port: strconv.Itoa(int(localPort)),
fromEndpointNodeID = report.MakeEndpointNodeID(r.hostID, fromAddr, strconv.Itoa(int(fromPort)))
toEndpointNodeID = report.MakeEndpointNodeID(r.hostID, toAddr, strconv.Itoa(int(toPort)))

fromNode = report.MakeNodeWith(map[string]string{
Addr: fromAddr,
Port: strconv.Itoa(int(fromPort)),
}).WithEdge(toEndpointNodeID, report.EdgeMetadata{
// New nodes are merged into the report so we don't need to do any
// counting here; the merge does it for us.
MaxConnCountTCP: newu64(1),
})
remoteNode = report.MakeNodeWith(map[string]string{
Addr: remoteAddr,
Port: strconv.Itoa(int(remotePort)),
toNode = report.MakeNodeWith(map[string]string{
Addr: toAddr,
Port: strconv.Itoa(int(toPort)),
})
)

// In case we have a reverse resolution for the IP, we can use it for
// the name...
if remoteNames, err := r.reverseResolver.get(remoteAddr); err == nil {
remoteNode = remoteNode.WithSet("name", report.MakeStringSet(remoteNames...))
}

if localIsClient {
// New nodes are merged into the report so we don't need to do any
// counting here; the merge does it for us.
localNode = localNode.WithEdge(remoteEndpointNodeID, report.EdgeMetadata{
MaxConnCountTCP: newu64(1),
})
} else {
remoteNode = remoteNode.WithEdge(localEndpointNodeID, report.EdgeMetadata{
MaxConnCountTCP: newu64(1),
})
if toNames, err := r.reverseResolver.get(toAddr); err == nil {
toNode = toNode.WithSet("name", report.MakeStringSet(toNames...))
}

if extraLocalNode != nil {
localNode = localNode.Merge(*extraLocalNode)
if extraFromNode != nil {
fromNode = fromNode.Merge(*extraFromNode)
}
if extraRemoteNode != nil {
remoteNode = remoteNode.Merge(*extraRemoteNode)
if extraToNode != nil {
toNode = toNode.Merge(*extraToNode)
}
rpt.Endpoint = rpt.Endpoint.AddNode(localEndpointNodeID, localNode)
rpt.Endpoint = rpt.Endpoint.AddNode(remoteEndpointNodeID, remoteNode)
rpt.Endpoint = rpt.Endpoint.AddNode(fromEndpointNodeID, fromNode)
rpt.Endpoint = rpt.Endpoint.AddNode(toEndpointNodeID, toNode)
}
}

Expand Down

0 comments on commit 1d9e3ac

Please sign in to comment.