diff --git a/probe/endpoint/conntrack.go b/probe/endpoint/conntrack.go index 1470081464..ff5875e857 100644 --- a/probe/endpoint/conntrack.go +++ b/probe/endpoint/conntrack.go @@ -2,7 +2,8 @@ package endpoint import ( "bufio" - "encoding/xml" + "bytes" + "fmt" "io" "os" "path/filepath" @@ -18,49 +19,45 @@ import ( const ( // From https://www.kernel.org/doc/Documentation/networking/nf_conntrack-sysctl.txt // Check a tcp-related file for existence since we need tcp tracking - procFileToCheck = "sys/net/netfilter/nf_conntrack_tcp_timeout_close" - xmlHeader = "\n" - conntrackOpenTag = "\n" - timeWait = "TIME_WAIT" - tcpProto = "tcp" - newType = "new" - updateType = "update" - destroyType = "destroy" + procFileToCheck = "sys/net/netfilter/nf_conntrack_tcp_timeout_close" + timeWait = "TIME_WAIT" + tcpProto = "tcp" + newType = "[NEW]" + updateType = "[UPDATE]" + destroyType = "[DESTROY]" +) + +var ( + destroyTypeB = []byte(destroyType) + assured = []byte("[ASSURED] ") + unreplied = []byte("[UNREPLIED] ") ) type layer3 struct { - XMLName xml.Name `xml:"layer3"` - SrcIP string `xml:"src"` - DstIP string `xml:"dst"` + SrcIP string + DstIP string } type layer4 struct { - XMLName xml.Name `xml:"layer4"` - SrcPort int `xml:"sport"` - DstPort int `xml:"dport"` - Proto string `xml:"protoname,attr"` + SrcPort int + DstPort int + Proto string } type meta struct { - XMLName xml.Name `xml:"meta"` - Direction string `xml:"direction,attr"` - Layer3 layer3 `xml:"layer3"` - Layer4 layer4 `xml:"layer4"` - ID int64 `xml:"id"` - State string `xml:"state"` + Layer3 layer3 + Layer4 layer4 + ID int64 + State string } type flow struct { - XMLName xml.Name `xml:"flow"` - Metas []meta `xml:"meta"` - Type string `xml:"type,attr"` - - Original, Reply, Independent *meta `xml:"-"` + Type string + Original, Reply, Independent meta } type conntrack struct { - XMLName xml.Name `xml:"conntrack"` - Flows []flow `xml:"flow"` + Flows []flow } // flowWalker is something that maintains flows, and provides an accessor @@ -165,7 +162,7 @@ func (c *conntrackWalker) run() { args := append([]string{ "--buffer-size", strconv.Itoa(c.bufferSize), "-E", - "-o", "xml", "-p", "tcp"}, c.args..., + "-o", "id", "-p", "tcp"}, c.args..., ) cmd := exec.Command("conntrack", args...) stdout, err := cmd.StdoutPipe() @@ -204,30 +201,13 @@ func (c *conntrackWalker) run() { c.cmd = cmd c.Unlock() - // Swallow the first two lines - reader := bufio.NewReader(stdout) - if line, err := reader.ReadString('\n'); err != nil { - log.Errorf("conntrack error: %v", err) - return - } else if line != xmlHeader { - log.Errorf("conntrack invalid output: '%s'", line) - return - } - if line, err := reader.ReadString('\n'); err != nil { - log.Errorf("conntrack error: %v", err) - return - } else if line != conntrackOpenTag { - log.Errorf("conntrack invalid output: '%s'", line) - return - } - - defer log.Infof("contrack exiting") + scanner := bufio.NewScanner(bufio.NewReader(stdout)) + defer log.Infof("conntrack exiting") - // Now loop on the output stream - decoder := xml.NewDecoder(reader) + // Loop on the output stream for { - var f flow - if err := decoder.Decode(&f); err != nil { + f, err := decodeStreamedFlow(scanner) + if err != nil { log.Errorf("conntrack error: %v", err) return } @@ -235,8 +215,99 @@ func (c *conntrackWalker) run() { } } +// Get a line without [ASSURED]/[UNREPLIED] tags (it simplifies parsing) +func getUntaggedLine(scanner *bufio.Scanner) ([]byte, error) { + success := scanner.Scan() + if !success { + if err := scanner.Err(); err != nil { + return nil, err + } + return nil, io.EOF + } + line := scanner.Bytes() + // Remove [ASSURED]/[UNREPLIED] tags + line = removeInplace(line, assured) + line = removeInplace(line, unreplied) + return line, nil +} + +func removeInplace(s, sep []byte) []byte { + // TODO: See if we can get better performance + // removing multiple substrings at once (with index/suffixarray New()+Lookup()) + // Probably not worth it for only two substrings occurring once. + index := bytes.Index(s, sep) + if index < 0 { + return s + } + copy(s[index:], s[index+len(sep):]) + return s[:len(s)-len(sep)] +} + +func decodeStreamedFlow(scanner *bufio.Scanner) (flow, error) { + var ( + // Use ints for parsing unused fields since their allocations + // are almost for free + unused [2]int + f flow + ) + + // Examples: + // " [UPDATE] udp 17 29 src=192.168.2.100 dst=192.168.2.1 sport=57767 dport=53 src=192.168.2.1 dst=192.168.2.100 sport=53 dport=57767" + // " [NEW] tcp 6 120 SYN_SENT src=127.0.0.1 dst=127.0.0.1 sport=58958 dport=6784 [UNREPLIED] src=127.0.0.1 dst=127.0.0.1 sport=6784 dport=58958 id=1595499776" + // " [UPDATE] tcp 6 120 TIME_WAIT src=10.0.2.15 dst=10.0.2.15 sport=51154 dport=4040 src=10.0.2.15 dst=10.0.2.15 sport=4040 dport=51154 [ASSURED] id=3663628160" + // " [DESTROY] tcp 6 src=172.17.0.1 dst=172.17.0.1 sport=34078 dport=53 src=172.17.0.1 dst=10.0.2.15 sport=53 dport=34078 id=3668417984" (note how the timeout and state field is missing) + + // Remove tags since they are optional and make parsing harder + line, err := getUntaggedLine(scanner) + if err != nil { + return flow{}, err + } + + line = bytes.TrimLeft(line, " ") + if bytes.HasPrefix(line, destroyTypeB) { + // Destroy events don't have a timeout or state field + _, err = fmt.Sscanf(string(line), "%s %s %d src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%d", + &f.Type, + &f.Original.Layer4.Proto, + &unused[0], + &f.Original.Layer3.SrcIP, + &f.Original.Layer3.DstIP, + &f.Original.Layer4.SrcPort, + &f.Original.Layer4.DstPort, + &f.Reply.Layer3.SrcIP, + &f.Reply.Layer3.DstIP, + &f.Reply.Layer4.SrcPort, + &f.Reply.Layer4.DstPort, + &f.Independent.ID, + ) + } else { + _, err = fmt.Sscanf(string(line), "%s %s %d %d %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%d", + &f.Type, + &f.Original.Layer4.Proto, + &unused[0], + &unused[1], + &f.Independent.State, + &f.Original.Layer3.SrcIP, + &f.Original.Layer3.DstIP, + &f.Original.Layer4.SrcPort, + &f.Original.Layer4.DstPort, + &f.Reply.Layer3.SrcIP, + &f.Reply.Layer3.DstIP, + &f.Reply.Layer4.SrcPort, + &f.Reply.Layer4.DstPort, + &f.Independent.ID, + ) + } + + if err != nil { + return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err) + } + f.Reply.Layer4.Proto = f.Original.Layer4.Proto + return f, nil +} + func (c *conntrackWalker) existingConnections() ([]flow, error) { - args := append([]string{"-L", "-o", "xml", "-p", "tcp"}, c.args...) + args := append([]string{"-L", "-o", "id", "-p", "tcp"}, c.args...) cmd := exec.Command("conntrack", args...) stdout, err := cmd.StdoutPipe() if err != nil { @@ -250,13 +321,63 @@ func (c *conntrackWalker) existingConnections() ([]flow, error) { log.Errorf("conntrack existingConnections exit error: %v", err) } }() - var result conntrack - if err := xml.NewDecoder(stdout).Decode(&result); err == io.EOF { - return []flow{}, nil - } else if err != nil { - return []flow{}, err + + scanner := bufio.NewScanner(bufio.NewReader(stdout)) + var result []flow + for { + f, err := decodeDumpedFlow(scanner) + if err != nil { + if err == io.EOF { + break + } + log.Errorf("conntrack error: %v", err) + return result, err + } + result = append(result, f) } - return result.Flows, nil + return result, nil +} + +func decodeDumpedFlow(scanner *bufio.Scanner) (flow, error) { + var ( + // Use ints for parsing unused fields since allocations + // are almost for free + unused [4]int + f flow + ) + + // Example: + // " tcp 6 431997 ESTABLISHED src=10.32.0.1 dst=10.32.0.1 sport=50274 dport=4040 src=10.32.0.1 dst=10.32.0.1 sport=4040 dport=50274 [ASSURED] mark=0 use=1 id=407401088c" + // remove tags since they are optional and make parsing harder + line, err := getUntaggedLine(scanner) + if err != nil { + return flow{}, err + } + + _, err = fmt.Sscanf(string(line), "%s %d %d %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d mark=%d use=%d id=%d", + &f.Original.Layer4.Proto, + &unused[0], + &unused[1], + &f.Independent.State, + &f.Original.Layer3.SrcIP, + &f.Original.Layer3.DstIP, + &f.Original.Layer4.SrcPort, + &f.Original.Layer4.DstPort, + &f.Reply.Layer3.SrcIP, + &f.Reply.Layer3.DstIP, + &f.Reply.Layer4.SrcPort, + &f.Reply.Layer4.DstPort, + &unused[2], + &unused[3], + &f.Independent.ID, + ) + + if err != nil { + return flow{}, fmt.Errorf("Error parsing dumped flow %q: %v ", line, err) + } + + f.Reply.Layer4.Proto = f.Original.Layer4.Proto + return f, nil } func (c *conntrackWalker) stop() { @@ -269,21 +390,8 @@ func (c *conntrackWalker) stop() { } func (c *conntrackWalker) handleFlow(f flow, forceAdd bool) { - // A flow consists of 3 'metas' - the 'original' 4 tuple (as seen by this - // host) and the 'reply' 4 tuple, which is what it has been rewritten to. - // This code finds those metas, which are identified by a Direction - // attribute. - for i := range f.Metas { - meta := &f.Metas[i] - switch meta.Direction { - case "original": - f.Original = meta - case "reply": - f.Reply = meta - case "independent": - f.Independent = meta - } - } + c.Lock() + defer c.Unlock() // For not, I'm only interested in tcp connections - there is too much udp // traffic going on (every container talking to weave dns, for example) to @@ -292,9 +400,6 @@ func (c *conntrackWalker) handleFlow(f flow, forceAdd bool) { return } - c.Lock() - defer c.Unlock() - // Ignore flows for which we never saw an update; they are likely // incomplete or wrong. See #1462. switch { diff --git a/probe/endpoint/conntrack_internal_test.go b/probe/endpoint/conntrack_internal_test.go index 96c0aa29f2..105dd9c16b 100644 --- a/probe/endpoint/conntrack_internal_test.go +++ b/probe/endpoint/conntrack_internal_test.go @@ -2,176 +2,192 @@ package endpoint import ( "bufio" - "encoding/xml" "io" + "strings" "testing" "time" - "github.com/weaveworks/common/exec" - testexec "github.com/weaveworks/common/test/exec" "github.com/weaveworks/scope/test" ) -const conntrackCloseTag = "\n" -const bufferSize = 1024 * 1024 - -func makeFlow(ty string) flow { - return flow{ - XMLName: xml.Name{ - Local: "flow", +// Obtained though conntrack -E -p tcp -o id and then tweaked +const streamedFlowsSource = `[DESTROY] tcp 6 src=10.0.0.1 dst=127.0.0.1 sport=36826 dport=28106 src=10.0.0.2 dst=127.0.0.2 sport=28107 dport=36826 [ASSURED] id=347275904 + [NEW] tcp 6 120 SYN_SENT src=10.0.0.1 dst=127.0.0.1 sport=36898 dport=28107 [UNREPLIED] src=10.0.0.2 dst=127.0.0.2 sport=28107 dport=36898 id=347275904 + [UPDATE] tcp 6 60 SYN_RECV src=10.0.0.1 dst=127.0.0.1 sport=36898 dport=28107 src=10.0.0.2 dst=127.0.0.2 sport=28107 dport=36898 id=347275904 + [UPDATE] tcp 6 432000 ESTABLISHED src=10.0.0.1 dst=127.0.0.1 sport=36898 dport=28107 src=10.0.0.2 dst=127.0.0.2 sport=28107 dport=36898 [ASSURED] id=347275904` + +var wantStreamedFlows = []flow{ + { + Type: destroyType, + Original: meta{ + Layer3: layer3{ + SrcIP: "10.0.0.1", + DstIP: "127.0.0.1", + }, + Layer4: layer4{ + SrcPort: 36826, + DstPort: 28106, + Proto: "tcp", + }, }, - Type: ty, - } -} - -func addMeta(f *flow, dir, srcIP, dstIP string, srcPort, dstPort int) *meta { - meta := meta{ - XMLName: xml.Name{ - Local: "meta", + Reply: meta{ + Layer3: layer3{ + SrcIP: "10.0.0.2", + DstIP: "127.0.0.2", + }, + Layer4: layer4{ + SrcPort: 28107, + DstPort: 36826, + Proto: "tcp", + }, + }, + Independent: meta{ + ID: 347275904, }, - Direction: dir, - Layer3: layer3{ - XMLName: xml.Name{ - Local: "layer3", + }, + { + Type: newType, + Original: meta{ + Layer3: layer3{ + SrcIP: "10.0.0.1", + DstIP: "127.0.0.1", + }, + Layer4: layer4{ + SrcPort: 36898, + DstPort: 28107, + Proto: "tcp", }, - SrcIP: srcIP, - DstIP: dstIP, }, - Layer4: layer4{ - XMLName: xml.Name{ - Local: "layer4", + Reply: meta{ + Layer3: layer3{ + SrcIP: "10.0.0.2", + DstIP: "127.0.0.2", + }, + Layer4: layer4{ + SrcPort: 28107, + DstPort: 36898, + Proto: "tcp", }, - SrcPort: srcPort, - DstPort: dstPort, - Proto: tcpProto, }, - } - f.Metas = append(f.Metas, meta) - return &meta -} - -func addIndependant(f *flow, id int64, state string) *meta { - meta := meta{ - XMLName: xml.Name{ - Local: "meta", + Independent: meta{ + ID: 347275904, + State: "SYN_SENT", }, - Direction: "independent", - ID: id, - State: state, - Layer3: layer3{ - XMLName: xml.Name{ - Local: "layer3", + }, + { + Type: updateType, + Original: meta{ + Layer3: layer3{ + SrcIP: "10.0.0.1", + DstIP: "127.0.0.1", + }, + Layer4: layer4{ + SrcPort: 36898, + DstPort: 28107, + Proto: "tcp", }, }, - Layer4: layer4{ - XMLName: xml.Name{ - Local: "layer4", + Reply: meta{ + Layer3: layer3{ + SrcIP: "10.0.0.2", + DstIP: "127.0.0.2", + }, + Layer4: layer4{ + SrcPort: 28107, + DstPort: 36898, + Proto: "tcp", }, }, - } - f.Metas = append(f.Metas, meta) - return &meta + Independent: meta{ + ID: 347275904, + State: "SYN_RECV", + }, + }, + { + Type: updateType, + Original: meta{ + Layer3: layer3{ + SrcIP: "10.0.0.1", + DstIP: "127.0.0.1", + }, + Layer4: layer4{ + SrcPort: 36898, + DstPort: 28107, + Proto: "tcp", + }, + }, + Reply: meta{ + Layer3: layer3{ + SrcIP: "10.0.0.2", + DstIP: "127.0.0.2", + }, + Layer4: layer4{ + SrcPort: 28107, + DstPort: 36898, + Proto: "tcp", + }, + }, + Independent: meta{ + ID: 347275904, + State: "ESTABLISHED", + }, + }, } -func TestConntracker(t *testing.T) { - oldExecCmd, oldIsConntrackSupported := exec.Command, IsConntrackSupported - defer func() { exec.Command, IsConntrackSupported = oldExecCmd, oldIsConntrackSupported }() - - IsConntrackSupported = func(_ string) error { - return nil - } - - first := true - existingConnectionsReader, existingConnectionsWriter := io.Pipe() - reader, writer := io.Pipe() - exec.Command = func(name string, args ...string) exec.Cmd { - if first { - first = false - return testexec.NewMockCmd(existingConnectionsReader) +func testFlowDecoding(t *testing.T, source string, want []flow, decoder func(scanner *bufio.Scanner) (flow, error)) { + scanner := bufio.NewScanner(strings.NewReader(source)) + d := time.Millisecond * 100 + for _, wantFlow := range want { + haveFlow, err := decoder(scanner) + if err != nil { + t.Fatalf("Unexpected decoding error: %v", err) } - return testexec.NewMockCmd(reader) - } - - flowWalker := newConntrackFlowWalker(true, "", bufferSize) - defer flowWalker.stop() - - // First write out some empty xml for the existing connections - ecbw := bufio.NewWriter(existingConnectionsWriter) - if _, err := ecbw.WriteString(xmlHeader); err != nil { - t.Fatal(err) - } - if _, err := ecbw.WriteString(conntrackOpenTag); err != nil { - t.Fatal(err) - } - if _, err := ecbw.WriteString(conntrackCloseTag); err != nil { - t.Fatal(err) - } - if err := ecbw.Flush(); err != nil { - t.Fatal(err) + test.Poll(t, d, wantFlow, func() interface{} { return haveFlow }) } - // Then write out eventa - bw := bufio.NewWriter(writer) - if _, err := bw.WriteString(xmlHeader); err != nil { - t.Fatal(err) - } - if _, err := bw.WriteString(conntrackOpenTag); err != nil { - t.Fatal(err) - } - if err := bw.Flush(); err != nil { - t.Fatal(err) - } - - have := func() interface{} { - result := []flow{} - flowWalker.walkFlows(func(f flow) { - f.Original = nil - f.Reply = nil - f.Independent = nil - result = append(result, f) - }) - return result - } - ts := 100 * time.Millisecond - - // First, assert we have no flows - test.Poll(t, ts, []flow{}, have) - - // Now add some flows - xmlEncoder := xml.NewEncoder(bw) - writeFlow := func(f flow) { - if err := xmlEncoder.Encode(f); err != nil { - t.Fatal(err) - } - if _, err := bw.WriteString("\n"); err != nil { - t.Fatal(err) - } - if err := bw.Flush(); err != nil { - t.Fatal(err) - } + if _, err := decodeStreamedFlow(scanner); err != io.EOF { + t.Fatalf("Unexpected error value on empty input: %v", err) } +} - flow1 := makeFlow(updateType) - addMeta(&flow1, "original", "1.2.3.4", "2.3.4.5", 2, 3) - addIndependant(&flow1, 1, "") - writeFlow(flow1) - test.Poll(t, ts, []flow{flow1}, have) +func TestStreamedFlowDecoding(t *testing.T) { + testFlowDecoding(t, streamedFlowsSource, wantStreamedFlows, decodeStreamedFlow) +} - // Now check when we remove the flow, we still get it in the next Walk - flow2 := makeFlow(destroyType) - addMeta(&flow2, "original", "1.2.3.4", "2.3.4.5", 2, 3) - addIndependant(&flow2, 1, "") - writeFlow(flow2) - test.Poll(t, ts, []flow{flow1}, have) - test.Poll(t, ts, []flow{}, have) +// Obtained through conntrack -L -p tcp -o id +const dumpedFlowsSource = `tcp 6 431998 ESTABLISHED src=10.0.2.2 dst=10.0.2.15 sport=49911 dport=22 src=10.0.2.15 dst=10.0.2.2 sport=22 dport=49911 [ASSURED] mark=0 use=1 id=2993966208` - // This time we're not going to remove it, but put it in state TIME_WAIT - flow1.Type = updateType - writeFlow(flow1) - test.Poll(t, ts, []flow{flow1}, have) +var wantDumpedFlows = []flow{ + { + Original: meta{ + Layer3: layer3{ + SrcIP: "10.0.2.2", + DstIP: "10.0.2.15", + }, + Layer4: layer4{ + SrcPort: 49911, + DstPort: 22, + Proto: "tcp", + }, + }, + Reply: meta{ + Layer3: layer3{ + SrcIP: "10.0.2.15", + DstIP: "10.0.2.2", + }, + Layer4: layer4{ + SrcPort: 22, + DstPort: 49911, + Proto: "tcp", + }, + }, + Independent: meta{ + ID: 2993966208, + State: "ESTABLISHED", + }, + }, +} - flow1.Metas[1].State = timeWait - writeFlow(flow1) - test.Poll(t, ts, []flow{flow1}, have) - test.Poll(t, ts, []flow{}, have) +func TestDumpedFlowDecoding(t *testing.T) { + testFlowDecoding(t, dumpedFlowsSource, wantDumpedFlows, decodeDumpedFlow) } diff --git a/probe/endpoint/nat_internal_test.go b/probe/endpoint/nat_internal_test.go index 67440126fc..2214be5f44 100644 --- a/probe/endpoint/nat_internal_test.go +++ b/probe/endpoint/nat_internal_test.go @@ -29,15 +29,40 @@ func TestNat(t *testing.T) { // correctly. // the setup is this: // - // container2 (10.0.47.2:222222), host2 (2.3.4.5:22223) -> + // container2 (10.0.47.2:22222), host2 (2.3.4.5:22223) -> // host1 (1.2.3.4:80), container1 (10.0.47.1:80) // from the PoV of host1 { - f := makeFlow(updateType) - addIndependant(&f, 1, "") - f.Original = addMeta(&f, "original", "2.3.4.5", "1.2.3.4", 222222, 80) - f.Reply = addMeta(&f, "reply", "10.0.47.1", "2.3.4.5", 80, 222222) + f := flow{ + Type: updateType, + Original: meta{ + Layer3: layer3{ + SrcIP: "2.3.4.5", + DstIP: "1.2.3.4", + }, + Layer4: layer4{ + SrcPort: 22222, + DstPort: 80, + Proto: "tcp", + }, + }, + Reply: meta{ + Layer3: layer3{ + SrcIP: "10.0.47.1", + DstIP: "2.3.4.5", + }, + Layer4: layer4{ + SrcPort: 80, + DstPort: 22222, + Proto: "tcp", + }, + }, + Independent: meta{ + ID: 1, + }, + } + ct := &mockFlowWalker{ flows: []flow{f}, } @@ -69,10 +94,34 @@ func TestNat(t *testing.T) { // form the PoV of host2 { - f := makeFlow(updateType) - addIndependant(&f, 2, "") - f.Original = addMeta(&f, "original", "10.0.47.2", "1.2.3.4", 22222, 80) - f.Reply = addMeta(&f, "reply", "1.2.3.4", "2.3.4.5", 80, 22223) + f := flow{ + Type: updateType, + Original: meta{ + Layer3: layer3{ + SrcIP: "10.0.47.2", + DstIP: "1.2.3.4", + }, + Layer4: layer4{ + SrcPort: 22222, + DstPort: 80, + Proto: "tcp", + }, + }, + Reply: meta{ + Layer3: layer3{ + SrcIP: "1.2.3.4", + DstIP: "2.3.4.5", + }, + Layer4: layer4{ + SrcPort: 80, + DstPort: 22223, + Proto: "tcp", + }, + }, + Independent: meta{ + ID: 2, + }, + } ct := &mockFlowWalker{ flows: []flow{f}, }