Skip to content

Commit

Permalink
Merge pull request #526 from weaveworks/fix-natmapper-conntracker
Browse files Browse the repository at this point in the history
Refactor probe/endpoint for export and dependency cleanliness
  • Loading branch information
tomwilkie committed Oct 27, 2015
2 parents 5ed5fd1 + e3f8e14 commit 6a4c997
Show file tree
Hide file tree
Showing 16 changed files with 429 additions and 439 deletions.
4 changes: 2 additions & 2 deletions app/api_topologies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"github.com/weaveworks/scope/probe/kubernetes"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
"github.com/weaveworks/scope/test/fixture"
)

func TestAPITopology(t *testing.T) {
Expand Down Expand Up @@ -62,7 +62,7 @@ func TestAPITopologyAddsKubernetes(t *testing.T) {
// Enable the kubernetes topologies
rpt := report.MakeReport()
rpt.Pod = report.MakeTopology()
rpt.Pod.Nodes[test.ClientPodNodeID] = kubernetes.NewPod(&api.Pod{
rpt.Pod.Nodes[fixture.ClientPodNodeID] = kubernetes.NewPod(&api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pong-a",
Namespace: "ping",
Expand Down
3 changes: 2 additions & 1 deletion app/api_topology_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/weaveworks/scope/render/expected"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
"github.com/weaveworks/scope/test/fixture"
)

func TestAll(t *testing.T) {
Expand Down Expand Up @@ -78,7 +79,7 @@ func TestAPITopologyApplications(t *testing.T) {
}
equals(t, expected.ServerProcessID, node.Node.ID)
equals(t, "apache", node.Node.LabelMajor)
equals(t, fmt.Sprintf("%s (server:%s)", test.ServerHostID, test.ServerPID), node.Node.LabelMinor)
equals(t, fmt.Sprintf("%s (server:%s)", fixture.ServerHostID, fixture.ServerPID), node.Node.LabelMinor)
equals(t, false, node.Node.Pseudo)
// Let's not unit-test the specific content of the detail tables
}
Expand Down
4 changes: 2 additions & 2 deletions app/mock_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package main

import (
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
"github.com/weaveworks/scope/test/fixture"
)

// StaticReport is used as a fixture in tests. It emulates an xfer.Collector.
type StaticReport struct{}

func (s StaticReport) Report() report.Report { return test.Report }
func (s StaticReport) Report() report.Report { return fixture.Report }

func (s StaticReport) Add(report.Report) {}
4 changes: 2 additions & 2 deletions app/origin_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"net/http/httptest"
"testing"

"github.com/weaveworks/scope/test"
"github.com/weaveworks/scope/test/fixture"
)

func TestAPIOriginHost(t *testing.T) {
Expand All @@ -18,7 +18,7 @@ func TestAPIOriginHost(t *testing.T) {

{
// Origin
body := getRawJSON(t, ts, fmt.Sprintf("/api/origin/host/%s", test.ServerHostNodeID))
body := getRawJSON(t, ts, fmt.Sprintf("/api/origin/host/%s", fixture.ServerHostNodeID))
var o OriginHost
if err := json.Unmarshal(body, &o); err != nil {
t.Fatalf("JSON parse error: %s", err)
Expand Down
143 changes: 71 additions & 72 deletions probe/endpoint/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package endpoint
import (
"bufio"
"encoding/xml"
"fmt"
"io"
"log"
"os"
Expand All @@ -14,87 +13,90 @@ import (
"github.com/weaveworks/scope/common/exec"
)

// Constants exported for testing
const (
modules = "/proc/modules"
conntrackModule = "nf_conntrack"
XMLHeader = "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n"
ConntrackOpenTag = "<conntrack>\n"
TimeWait = "TIME_WAIT"
TCP = "tcp"
New = "new"
Update = "update"
Destroy = "destroy"
xmlHeader = "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n"
conntrackOpenTag = "<conntrack>\n"
timeWait = "TIME_WAIT"
tcpProto = "tcp"
newType = "new"
updateType = "update"
destroyType = "destroy"
)

// Layer3 - these structs are for the parsed conntrack output
type Layer3 struct {
type layer3 struct {
XMLName xml.Name `xml:"layer3"`
SrcIP string `xml:"src"`
DstIP string `xml:"dst"`
}

// Layer4 - these structs are for the parsed conntrack output
type Layer4 struct {
type layer4 struct {
XMLName xml.Name `xml:"layer4"`
SrcPort int `xml:"sport"`
DstPort int `xml:"dport"`
Proto string `xml:"protoname,attr"`
}

// Meta - these structs are for the parsed conntrack output
type Meta struct {
type meta struct {
XMLName xml.Name `xml:"meta"`
Direction string `xml:"direction,attr"`
Layer3 Layer3 `xml:"layer3"`
Layer4 Layer4 `xml:"layer4"`
Layer3 layer3 `xml:"layer3"`
Layer4 layer4 `xml:"layer4"`
ID int64 `xml:"id"`
State string `xml:"state"`
}

// Flow - these structs are for the parsed conntrack output
type Flow struct {
type flow struct {
XMLName xml.Name `xml:"flow"`
Metas []Meta `xml:"meta"`
Metas []meta `xml:"meta"`
Type string `xml:"type,attr"`

Original, Reply, Independent *Meta `xml:"-"`
Original, Reply, Independent *meta `xml:"-"`
}

type conntrack struct {
XMLName xml.Name `xml:"conntrack"`
Flows []Flow `xml:"flow"`
Flows []flow `xml:"flow"`
}

// Conntracker is something that tracks connections.
type Conntracker interface {
WalkFlows(f func(Flow))
Stop()
// flowWalker is something that maintains flows, and provides an accessor
// method to walk them.
type flowWalker interface {
walkFlows(f func(flow))
stop()
}

// Conntracker uses the conntrack command to track network connections
type conntracker struct {
type nilFlowWalker struct{}

func (n nilFlowWalker) stop() {}
func (n nilFlowWalker) walkFlows(f func(flow)) {}

// conntrackWalker uses the conntrack command to track network connections and
// implement flowWalker.
type conntrackWalker struct {
sync.Mutex
cmd exec.Cmd
activeFlows map[int64]Flow // active flows in state != TIME_WAIT
bufferedFlows []Flow // flows coming out of activeFlows spend 1 walk cycle here
existingConns bool
activeFlows map[int64]flow // active flows in state != TIME_WAIT
bufferedFlows []flow // flows coming out of activeFlows spend 1 walk cycle here
args []string
quit chan struct{}
}

// NewConntracker creates and starts a new Conntracter
func NewConntracker(existingConns bool, args ...string) (Conntracker, error) {
// newConntracker creates and starts a new conntracker.
func newConntrackFlowWalker(useConntrack bool, args ...string) flowWalker {
if !ConntrackModulePresent() {
return nil, fmt.Errorf("No conntrack module")
log.Printf("Not using conntrack: module not present")
return nilFlowWalker{}
} else if !useConntrack {
return nilFlowWalker{}
}
result := &conntracker{
activeFlows: map[int64]Flow{},
existingConns: existingConns,
args: args,
result := &conntrackWalker{
activeFlows: map[int64]flow{},
args: args,
}
go result.loop()
return result, nil
return result
}

// ConntrackModulePresent returns true if the kernel has the conntrack module
Expand All @@ -121,7 +123,7 @@ var ConntrackModulePresent = func() bool {
return false
}

func (c *conntracker) loop() {
func (c *conntrackWalker) loop() {
// conntrack can sometimes fail with ENOBUFS, when there is a particularly
// high connection rate. In these cases just retry in a loop, so we can
// survive the spike. For sustained loads this degrades nicely, as we
Expand All @@ -139,15 +141,15 @@ func (c *conntracker) loop() {
}
}

func (c *conntracker) clearFlows() {
func (c *conntrackWalker) clearFlows() {
c.Lock()
defer c.Unlock()

for _, f := range c.activeFlows {
c.bufferedFlows = append(c.bufferedFlows, f)
}

c.activeFlows = map[int64]Flow{}
c.activeFlows = map[int64]flow{}
}

func logPipe(prefix string, reader io.Reader) {
Expand All @@ -160,18 +162,16 @@ func logPipe(prefix string, reader io.Reader) {
}
}

func (c *conntracker) run() {
if c.existingConns {
// Fork another conntrack, just to capture existing connections
// for which we don't get events
existingFlows, err := c.existingConnections()
if err != nil {
log.Printf("conntrack existingConnections error: %v", err)
return
}
for _, flow := range existingFlows {
c.handleFlow(flow, true)
}
func (c *conntrackWalker) run() {
// Fork another conntrack, just to capture existing connections
// for which we don't get events
existingFlows, err := c.existingConnections()
if err != nil {
log.Printf("conntrack existingConnections error: %v", err)
return
}
for _, flow := range existingFlows {
c.handleFlow(flow, true)
}

args := append([]string{"-E", "-o", "xml", "-p", "tcp"}, c.args...)
Expand Down Expand Up @@ -217,14 +217,14 @@ func (c *conntracker) run() {
if line, err := reader.ReadString('\n'); err != nil {
log.Printf("conntrack error: %v", err)
return
} else if line != XMLHeader {
} else if line != xmlHeader {
log.Printf("conntrack invalid output: '%s'", line)
return
}
if line, err := reader.ReadString('\n'); err != nil {
log.Printf("conntrack error: %v", err)
return
} else if line != ConntrackOpenTag {
} else if line != conntrackOpenTag {
log.Printf("conntrack invalid output: '%s'", line)
return
}
Expand All @@ -234,7 +234,7 @@ func (c *conntracker) run() {
// Now loop on the output stream
decoder := xml.NewDecoder(reader)
for {
var f Flow
var f flow
if err := decoder.Decode(&f); err != nil {
log.Printf("conntrack error: %v", err)
return
Expand All @@ -243,15 +243,15 @@ func (c *conntracker) run() {
}
}

func (c *conntracker) existingConnections() ([]Flow, error) {
func (c *conntrackWalker) existingConnections() ([]flow, error) {
args := append([]string{"-L", "-o", "xml", "-p", "tcp"}, c.args...)
cmd := exec.Command("conntrack", args...)
stdout, err := cmd.StdoutPipe()
if err != nil {
return []Flow{}, err
return []flow{}, err
}
if err := cmd.Start(); err != nil {
return []Flow{}, err
return []flow{}, err
}
defer func() {
if err := cmd.Wait(); err != nil {
Expand All @@ -260,15 +260,14 @@ func (c *conntracker) existingConnections() ([]Flow, error) {
}()
var result conntrack
if err := xml.NewDecoder(stdout).Decode(&result); err == io.EOF {
return []Flow{}, nil
return []flow{}, nil
} else if err != nil {
return []Flow{}, err
return []flow{}, err
}
return result.Flows, nil
}

// Stop stop stop
func (c *conntracker) Stop() {
func (c *conntrackWalker) stop() {
c.Lock()
defer c.Unlock()
close(c.quit)
Expand All @@ -277,7 +276,7 @@ func (c *conntracker) Stop() {
}
}

func (c *conntracker) handleFlow(f Flow, forceAdd bool) {
func (c *conntrackWalker) handleFlow(f flow, forceAdd bool) {
// A flow consists of 3 'metas' - the 'original' 4 tuple (as seen by this
// host) and the 'reply' 4 tuple, which is what it has been rewritten to.
// This code finds those metas, which are identified by a Direction
Expand All @@ -297,32 +296,32 @@ func (c *conntracker) handleFlow(f Flow, forceAdd bool) {
// For not, I'm only interested in tcp connections - there is too much udp
// traffic going on (every container talking to weave dns, for example) to
// render nicely. TODO: revisit this.
if f.Original.Layer4.Proto != TCP {
if f.Original.Layer4.Proto != tcpProto {
return
}

c.Lock()
defer c.Unlock()

switch {
case forceAdd || f.Type == New || f.Type == Update:
if f.Independent.State != TimeWait {
case forceAdd || f.Type == newType || f.Type == updateType:
if f.Independent.State != timeWait {
c.activeFlows[f.Independent.ID] = f
} else if _, ok := c.activeFlows[f.Independent.ID]; ok {
delete(c.activeFlows, f.Independent.ID)
c.bufferedFlows = append(c.bufferedFlows, f)
}
case f.Type == Destroy:
case f.Type == destroyType:
if _, ok := c.activeFlows[f.Independent.ID]; ok {
delete(c.activeFlows, f.Independent.ID)
c.bufferedFlows = append(c.bufferedFlows, f)
}
}
}

// WalkFlows calls f with all active flows and flows that have come and gone
// since the last call to WalkFlows
func (c *conntracker) WalkFlows(f func(Flow)) {
// walkFlows calls f with all active flows and flows that have come and gone
// since the last call to walkFlows
func (c *conntrackWalker) walkFlows(f func(flow)) {
c.Lock()
defer c.Unlock()
for _, flow := range c.activeFlows {
Expand Down
Loading

0 comments on commit 6a4c997

Please sign in to comment.