Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use connection directions from conntrack where possible #967

Merged
merged 1 commit into from
Feb 23, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions experimental/_integration/test_single_report.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
"EdgeMetadatas": {
";192.168.1.1;10746|theinternet": {
"WithConnCountTCP": true,
"MaxConnCountTCP": 19
}
},
"NodeMetadatas": {
Expand Down Expand Up @@ -43,7 +42,6 @@
"EdgeMetadatas": {
";192.168.1.1|theinternet": {
"WithConnCountTCP": true,
"MaxConnCountTCP": 12
}
},
"NodeMetadatas": {
Expand Down
8 changes: 2 additions & 6 deletions experimental/demoprobe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,12 @@ func demoReport(nodeCount int) report.Report {
process.PID: "4000",
"name": c.srcProc,
"domain": "node-" + src,
}).WithEdge(dstPortID, report.EdgeMetadata{
MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)),
}))
}).WithEdge(dstPortID, report.EdgeMetadata{}))
r.Endpoint = r.Endpoint.AddNode(dstPortID, report.MakeNode().WithLatests(map[string]string{
process.PID: "4000",
"name": c.dstProc,
"domain": "node-" + dst,
}).WithEdge(srcPortID, report.EdgeMetadata{
MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)),
}))
}).WithEdge(srcPortID, report.EdgeMetadata{}))

// Address topology
r.Address = r.Address.AddNode(srcAddressID, report.MakeNode().WithLatests(map[string]string{
Expand Down
8 changes: 2 additions & 6 deletions experimental/genreport/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,12 @@ func DemoReport(nodeCount int) report.Report {
"pid": "4000",
"name": c.srcProc,
"domain": "node-" + src,
}).WithEdge(dstPortID, report.EdgeMetadata{
MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)),
}))
}).WithEdge(dstPortID, report.EdgeMetadata{}))
r.Endpoint = r.Endpoint.AddNode(dstPortID, report.MakeNode().WithLatests(map[string]string{
"pid": "4000",
"name": c.dstProc,
"domain": "node-" + dst,
}).WithEdge(srcPortID, report.EdgeMetadata{
MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)),
}))
}).WithEdge(srcPortID, report.EdgeMetadata{}))

// Address topology
r.Address = r.Address.AddNode(srcAddressID, report.MakeNode().WithLatests(map[string]string{
Expand Down
179 changes: 93 additions & 86 deletions probe/endpoint/reporter.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package endpoint

import (
"fmt"
"sort"
"strconv"
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -71,6 +74,27 @@ func (r *Reporter) Stop() {
r.scanner.Stop()
}

type fourTuple struct {
fromAddr, toAddr string
fromPort, toPort uint16
}

This comment was marked as abuse.

This comment was marked as abuse.


// key is a sortable direction-independent key for tuples, used to look up a
// fourTuple, when you are unsure of it's direction.
func (t fourTuple) key() string {
key := []string{
fmt.Sprintf("%s:%d", t.fromAddr, t.fromPort),
fmt.Sprintf("%s:%d", t.toAddr, t.toPort),
}
sort.Strings(key)
return strings.Join(key, " ")
}

// reverse flips the direction of the tuple
func (t *fourTuple) reverse() {
t.fromAddr, t.fromPort, t.toAddr, t.toPort = t.toAddr, t.toPort, t.fromAddr, t.fromPort
}

// Report implements Reporter.
func (r *Reporter) Report() (report.Report, error) {
defer func(begin time.Time) {
Expand All @@ -79,140 +103,123 @@ func (r *Reporter) Report() (report.Report, error) {

hostNodeID := report.MakeHostNodeID(r.hostID)
rpt := report.MakeReport()
seenTuples := map[string]fourTuple{}

// Consult the flowWalker for short-live connections
{
extraNodeInfo := report.MakeNode().WithLatests(map[string]string{
Conntracked: "true",
})
r.flowWalker.walkFlows(func(f flow) {
tuple := fourTuple{
f.Original.Layer3.SrcIP,
f.Original.Layer3.DstIP,
uint16(f.Original.Layer4.SrcPort),
uint16(f.Original.Layer4.DstPort),
}
seenTuples[tuple.key()] = tuple
r.addConnection(&rpt, tuple, &extraNodeInfo, &extraNodeInfo)
})
}

{
conns, err := r.scanner.Connections(r.includeProcesses)
if err != nil {
return rpt, err
}
commonNodeInfo := report.MakeNode().WithLatests(map[string]string{
extraNodeInfo := report.MakeNode().WithLatests(map[string]string{
Procspied: "true",
})
for conn := conns.Next(); conn != nil; conn = conns.Next() {
var (
localPort = conn.LocalPort
remotePort = conn.RemotePort
localAddr = conn.LocalAddress.String()
remoteAddr = conn.RemoteAddress.String()
tuple = fourTuple{
conn.LocalAddress.String(),
conn.RemoteAddress.String(),
conn.LocalPort,
conn.RemotePort,
}
toNodeInfo, fromNodeInfo = extraNodeInfo.Copy(), extraNodeInfo.Copy()
)
extraNodeInfo := commonNodeInfo.Copy()
if conn.Proc.PID > 0 {
extraNodeInfo = extraNodeInfo.WithLatests(map[string]string{
fromNodeInfo = fromNodeInfo.WithLatests(map[string]string{
process.PID: strconv.FormatUint(uint64(conn.Proc.PID), 10),
report.HostNodeID: hostNodeID,
})
}
r.addConnection(&rpt, localAddr, remoteAddr, localPort, remotePort, &extraNodeInfo, &commonNodeInfo)
}
}

// Consult the flowWalker for short-live connections
{
extraNodeInfo := report.MakeNode().WithLatests(map[string]string{
Conntracked: "true",
})
r.flowWalker.walkFlows(func(f flow) {
var (
localPort = uint16(f.Original.Layer4.SrcPort)
remotePort = uint16(f.Original.Layer4.DstPort)
localAddr = f.Original.Layer3.SrcIP
remoteAddr = f.Original.Layer3.DstIP
)
r.addConnection(&rpt, localAddr, remoteAddr, localPort, remotePort, &extraNodeInfo, &extraNodeInfo)
})
// If we've already seen this connection, we should know the direction
// (or have already figured it out), so we normalize and use the
// canonical direction. Otherwise, we can use a port-heuristic to guess
// the direction.
canonical, ok := seenTuples[tuple.key()]
if (ok && canonical != tuple) || (!ok && tuple.fromPort < tuple.toPort) {
tuple.reverse()
toNodeInfo, fromNodeInfo = fromNodeInfo, toNodeInfo
}

This comment was marked as abuse.

r.addConnection(&rpt, tuple, &fromNodeInfo, &toNodeInfo)
}
}

r.natMapper.applyNAT(rpt, r.hostID)
return rpt, nil
}

func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr string, localPort, remotePort uint16, extraLocalNode, extraRemoteNode *report.Node) {
localIsClient := int(localPort) > int(remotePort)

func (r *Reporter) addConnection(rpt *report.Report, t fourTuple, extraFromNode, extraToNode *report.Node) {
// Update address topology
{
var (
localAddressNodeID = report.MakeAddressNodeID(r.hostID, localAddr)
remoteAddressNodeID = report.MakeAddressNodeID(r.hostID, remoteAddr)
localNode = report.MakeNodeWith(map[string]string{
"name": r.hostName,
Addr: localAddr,
})
remoteNode = report.MakeNodeWith(map[string]string{
Addr: remoteAddr,
})
fromAddressNodeID = report.MakeAddressNodeID(r.hostID, t.fromAddr)
toAddressNodeID = report.MakeAddressNodeID(r.hostID, t.toAddr)
fromNode = report.MakeNodeWith(map[string]string{Addr: t.fromAddr}).WithEdge(toAddressNodeID, report.EdgeMetadata{})
toNode = report.MakeNodeWith(map[string]string{Addr: t.toAddr})
)

// In case we have a reverse resolution for the IP, we can use it for
// the name...
if remoteNames, err := r.reverseResolver.get(remoteAddr); err == nil {
remoteNode = remoteNode.WithSet("name", report.MakeStringSet(remoteNames...))
}

if localIsClient {
// New nodes are merged into the report so we don't need to do any
// counting here; the merge does it for us.
localNode = localNode.WithEdge(remoteAddressNodeID, report.EdgeMetadata{
MaxConnCountTCP: newu64(1),
})
} else {
remoteNode = localNode.WithEdge(localAddressNodeID, report.EdgeMetadata{
MaxConnCountTCP: newu64(1),
})
if toNames, err := r.reverseResolver.get(t.toAddr); err == nil {
toNode = toNode.WithSet("name", report.MakeStringSet(toNames...))
}

if extraLocalNode != nil {
localNode = localNode.Merge(*extraLocalNode)
if extraFromNode != nil {
fromNode = fromNode.Merge(*extraFromNode)
}
if extraRemoteNode != nil {
remoteNode = remoteNode.Merge(*extraRemoteNode)
if extraToNode != nil {
toNode = toNode.Merge(*extraToNode)
}
rpt.Address = rpt.Address.AddNode(localAddressNodeID, localNode)
rpt.Address = rpt.Address.AddNode(remoteAddressNodeID, remoteNode)
rpt.Address = rpt.Address.AddNode(fromAddressNodeID, fromNode)
rpt.Address = rpt.Address.AddNode(toAddressNodeID, toNode)
}

// Update endpoint topology
if r.includeProcesses {
var (
localEndpointNodeID = report.MakeEndpointNodeID(r.hostID, localAddr, strconv.Itoa(int(localPort)))
remoteEndpointNodeID = report.MakeEndpointNodeID(r.hostID, remoteAddr, strconv.Itoa(int(remotePort)))

localNode = report.MakeNodeWith(map[string]string{
Addr: localAddr,
Port: strconv.Itoa(int(localPort)),
})
remoteNode = report.MakeNodeWith(map[string]string{
Addr: remoteAddr,
Port: strconv.Itoa(int(remotePort)),
fromEndpointNodeID = report.MakeEndpointNodeID(r.hostID, t.fromAddr, strconv.Itoa(int(t.fromPort)))
toEndpointNodeID = report.MakeEndpointNodeID(r.hostID, t.toAddr, strconv.Itoa(int(t.toPort)))

fromNode = report.MakeNodeWith(map[string]string{
Addr: t.fromAddr,
Port: strconv.Itoa(int(t.fromPort)),
}).WithEdge(toEndpointNodeID, report.EdgeMetadata{})
toNode = report.MakeNodeWith(map[string]string{
Addr: t.toAddr,
Port: strconv.Itoa(int(t.toPort)),
})
)

// In case we have a reverse resolution for the IP, we can use it for
// the name...
if remoteNames, err := r.reverseResolver.get(remoteAddr); err == nil {
remoteNode = remoteNode.WithSet("name", report.MakeStringSet(remoteNames...))
}

if localIsClient {
// New nodes are merged into the report so we don't need to do any
// counting here; the merge does it for us.
localNode = localNode.WithEdge(remoteEndpointNodeID, report.EdgeMetadata{
MaxConnCountTCP: newu64(1),
})
} else {
remoteNode = remoteNode.WithEdge(localEndpointNodeID, report.EdgeMetadata{
MaxConnCountTCP: newu64(1),
})
if toNames, err := r.reverseResolver.get(t.toAddr); err == nil {
toNode = toNode.WithSet("name", report.MakeStringSet(toNames...))
}

if extraLocalNode != nil {
localNode = localNode.Merge(*extraLocalNode)
if extraFromNode != nil {
fromNode = fromNode.Merge(*extraFromNode)
}
if extraRemoteNode != nil {
remoteNode = remoteNode.Merge(*extraRemoteNode)
if extraToNode != nil {
toNode = toNode.Merge(*extraToNode)
}
rpt.Endpoint = rpt.Endpoint.AddNode(localEndpointNodeID, localNode)
rpt.Endpoint = rpt.Endpoint.AddNode(remoteEndpointNodeID, remoteNode)
rpt.Endpoint = rpt.Endpoint.AddNode(fromEndpointNodeID, fromNode)
rpt.Endpoint = rpt.Endpoint.AddNode(toEndpointNodeID, toNode)
}
}

Expand Down
6 changes: 0 additions & 6 deletions probe/endpoint/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"strconv"
"testing"

"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/probe/endpoint"
"github.com/weaveworks/scope/probe/endpoint/procspy"
"github.com/weaveworks/scope/report"
Expand Down Expand Up @@ -85,11 +84,6 @@ func TestSpyNoProcesses(t *testing.T) {
scopedRemote = report.MakeAddressNodeID(nodeID, fixRemoteAddress.String())
)

have, _ := r.Address.Nodes[scopedLocal].Latest.Lookup(docker.Name)
if want, have := nodeName, have; want != have {
t.Fatalf("want %q, have %q", want, have)
}

if want, have := 1, len(r.Address.Nodes[scopedRemote].Adjacency); want != have {
t.Fatalf("want %d, have %d", want, have)
}
Expand Down
2 changes: 0 additions & 2 deletions render/expected/expected.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,6 @@ var (
EdgeMetadata: report.EdgeMetadata{
IngressPacketCount: newu64(210),
IngressByteCount: newu64(2100),
MaxConnCountTCP: newu64(3),
},
},
ClientHostRenderedID: {
Expand All @@ -331,7 +330,6 @@ var (
EdgeMetadata: report.EdgeMetadata{
EgressPacketCount: newu64(30),
EgressByteCount: newu64(300),
MaxConnCountTCP: newu64(3),
},
},
pseudoHostID1: {
Expand Down
7 changes: 0 additions & 7 deletions report/edge_metadatas.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ type EdgeMetadata struct {
IngressPacketCount *uint64 `json:"ingress_packet_count,omitempty"`
EgressByteCount *uint64 `json:"egress_byte_count,omitempty"` // Transport layer
IngressByteCount *uint64 `json:"ingress_byte_count,omitempty"` // Transport layer
MaxConnCountTCP *uint64 `json:"max_conn_count_tcp,omitempty"`
}

// Copy returns a value copy of the EdgeMetadata.
Expand All @@ -227,7 +226,6 @@ func (e EdgeMetadata) Copy() EdgeMetadata {
IngressPacketCount: cpu64ptr(e.IngressPacketCount),
EgressByteCount: cpu64ptr(e.EgressByteCount),
IngressByteCount: cpu64ptr(e.IngressByteCount),
MaxConnCountTCP: cpu64ptr(e.MaxConnCountTCP),
}
}

Expand All @@ -238,7 +236,6 @@ func (e EdgeMetadata) Reversed() EdgeMetadata {
IngressPacketCount: cpu64ptr(e.EgressPacketCount),
EgressByteCount: cpu64ptr(e.IngressByteCount),
IngressByteCount: cpu64ptr(e.EgressByteCount),
MaxConnCountTCP: cpu64ptr(e.MaxConnCountTCP),
}
}

Expand All @@ -259,7 +256,6 @@ func (e EdgeMetadata) Merge(other EdgeMetadata) EdgeMetadata {
cp.IngressPacketCount = merge(cp.IngressPacketCount, other.IngressPacketCount, sum)
cp.EgressByteCount = merge(cp.EgressByteCount, other.EgressByteCount, sum)
cp.IngressByteCount = merge(cp.IngressByteCount, other.IngressByteCount, sum)
cp.MaxConnCountTCP = merge(cp.MaxConnCountTCP, other.MaxConnCountTCP, max)
return cp
}

Expand All @@ -272,9 +268,6 @@ func (e EdgeMetadata) Flatten(other EdgeMetadata) EdgeMetadata {
cp.IngressPacketCount = merge(cp.IngressPacketCount, other.IngressPacketCount, sum)
cp.EgressByteCount = merge(cp.EgressByteCount, other.EgressByteCount, sum)
cp.IngressByteCount = merge(cp.IngressByteCount, other.IngressByteCount, sum)
// Note that summing of two maximums doesn't always give us the true
// maximum. But it's a best effort.
cp.MaxConnCountTCP = merge(cp.MaxConnCountTCP, other.MaxConnCountTCP, sum)
return cp
}

Expand Down
Loading