Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable XML in conntrack parsing #2095

Merged
merged 5 commits into from
Dec 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 182 additions & 77 deletions probe/endpoint/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package endpoint

import (
"bufio"
"encoding/xml"
"bytes"
"fmt"
"io"
"os"
"path/filepath"
Expand All @@ -18,49 +19,45 @@ import (
const (
// From https://www.kernel.org/doc/Documentation/networking/nf_conntrack-sysctl.txt
// Check a tcp-related file for existence since we need tcp tracking
procFileToCheck = "sys/net/netfilter/nf_conntrack_tcp_timeout_close"
xmlHeader = "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n"
conntrackOpenTag = "<conntrack>\n"
timeWait = "TIME_WAIT"
tcpProto = "tcp"
newType = "new"
updateType = "update"
destroyType = "destroy"
procFileToCheck = "sys/net/netfilter/nf_conntrack_tcp_timeout_close"
timeWait = "TIME_WAIT"
tcpProto = "tcp"
newType = "[NEW]"
updateType = "[UPDATE]"
destroyType = "[DESTROY]"
)

var (
destroyTypeB = []byte(destroyType)
assured = []byte("[ASSURED] ")
unreplied = []byte("[UNREPLIED] ")
)

type layer3 struct {
XMLName xml.Name `xml:"layer3"`
SrcIP string `xml:"src"`
DstIP string `xml:"dst"`
SrcIP string
DstIP string
}

type layer4 struct {
XMLName xml.Name `xml:"layer4"`
SrcPort int `xml:"sport"`
DstPort int `xml:"dport"`
Proto string `xml:"protoname,attr"`
SrcPort int
DstPort int
Proto string
}

type meta struct {
XMLName xml.Name `xml:"meta"`
Direction string `xml:"direction,attr"`
Layer3 layer3 `xml:"layer3"`
Layer4 layer4 `xml:"layer4"`
ID int64 `xml:"id"`
State string `xml:"state"`
Layer3 layer3
Layer4 layer4
ID int64
State string
}

type flow struct {
XMLName xml.Name `xml:"flow"`
Metas []meta `xml:"meta"`
Type string `xml:"type,attr"`

Original, Reply, Independent *meta `xml:"-"`
Type string
Original, Reply, Independent meta
}

type conntrack struct {
XMLName xml.Name `xml:"conntrack"`
Flows []flow `xml:"flow"`
Flows []flow
}

// flowWalker is something that maintains flows, and provides an accessor
Expand Down Expand Up @@ -165,7 +162,7 @@ func (c *conntrackWalker) run() {

args := append([]string{
"--buffer-size", strconv.Itoa(c.bufferSize), "-E",
"-o", "xml", "-p", "tcp"}, c.args...,
"-o", "id", "-p", "tcp"}, c.args...,
)
cmd := exec.Command("conntrack", args...)
stdout, err := cmd.StdoutPipe()
Expand Down Expand Up @@ -204,39 +201,113 @@ func (c *conntrackWalker) run() {
c.cmd = cmd
c.Unlock()

// Swallow the first two lines
reader := bufio.NewReader(stdout)
if line, err := reader.ReadString('\n'); err != nil {
log.Errorf("conntrack error: %v", err)
return
} else if line != xmlHeader {
log.Errorf("conntrack invalid output: '%s'", line)
return
}
if line, err := reader.ReadString('\n'); err != nil {
log.Errorf("conntrack error: %v", err)
return
} else if line != conntrackOpenTag {
log.Errorf("conntrack invalid output: '%s'", line)
return
}

defer log.Infof("contrack exiting")
scanner := bufio.NewScanner(bufio.NewReader(stdout))
defer log.Infof("conntrack exiting")

// Now loop on the output stream
decoder := xml.NewDecoder(reader)
// Loop on the output stream
for {
var f flow
if err := decoder.Decode(&f); err != nil {
f, err := decodeStreamedFlow(scanner)
if err != nil {
log.Errorf("conntrack error: %v", err)
return
}
c.handleFlow(f, false)
}
}

// Get a line without [ASSURED]/[UNREPLIED] tags (it simplifies parsing)
func getUntaggedLine(scanner *bufio.Scanner) ([]byte, error) {
success := scanner.Scan()
if !success {
if err := scanner.Err(); err != nil {
return nil, err
}
return nil, io.EOF
}
line := scanner.Bytes()
// Remove [ASSURED]/[UNREPLIED] tags
line = removeInplace(line, assured)
line = removeInplace(line, unreplied)
return line, nil
}

func removeInplace(s, sep []byte) []byte {
// TODO: See if we can get better performance
// removing multiple substrings at once (with index/suffixarray New()+Lookup())
// Probably not worth it for only two substrings occurring once.
index := bytes.Index(s, sep)
if index < 0 {
return s
}
copy(s[index:], s[index+len(sep):])
return s[:len(s)-len(sep)]
}

func decodeStreamedFlow(scanner *bufio.Scanner) (flow, error) {
var (
// Use ints for parsing unused fields since their allocations
// are almost for free
unused [2]int
f flow
)

// Examples:
// " [UPDATE] udp 17 29 src=192.168.2.100 dst=192.168.2.1 sport=57767 dport=53 src=192.168.2.1 dst=192.168.2.100 sport=53 dport=57767"
// " [NEW] tcp 6 120 SYN_SENT src=127.0.0.1 dst=127.0.0.1 sport=58958 dport=6784 [UNREPLIED] src=127.0.0.1 dst=127.0.0.1 sport=6784 dport=58958 id=1595499776"
// " [UPDATE] tcp 6 120 TIME_WAIT src=10.0.2.15 dst=10.0.2.15 sport=51154 dport=4040 src=10.0.2.15 dst=10.0.2.15 sport=4040 dport=51154 [ASSURED] id=3663628160"
// " [DESTROY] tcp 6 src=172.17.0.1 dst=172.17.0.1 sport=34078 dport=53 src=172.17.0.1 dst=10.0.2.15 sport=53 dport=34078 id=3668417984" (note how the timeout and state field is missing)

// Remove tags since they are optional and make parsing harder
line, err := getUntaggedLine(scanner)
if err != nil {
return flow{}, err
}

line = bytes.TrimLeft(line, " ")
if bytes.HasPrefix(line, destroyTypeB) {
// Destroy events don't have a timeout or state field
_, err = fmt.Sscanf(string(line), "%s %s %d src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%d",
&f.Type,
&f.Original.Layer4.Proto,
&unused[0],
&f.Original.Layer3.SrcIP,
&f.Original.Layer3.DstIP,
&f.Original.Layer4.SrcPort,
&f.Original.Layer4.DstPort,
&f.Reply.Layer3.SrcIP,
&f.Reply.Layer3.DstIP,
&f.Reply.Layer4.SrcPort,
&f.Reply.Layer4.DstPort,
&f.Independent.ID,
)
} else {
_, err = fmt.Sscanf(string(line), "%s %s %d %d %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%d",
&f.Type,
&f.Original.Layer4.Proto,
&unused[0],
&unused[1],
&f.Independent.State,
&f.Original.Layer3.SrcIP,
&f.Original.Layer3.DstIP,
&f.Original.Layer4.SrcPort,
&f.Original.Layer4.DstPort,
&f.Reply.Layer3.SrcIP,
&f.Reply.Layer3.DstIP,
&f.Reply.Layer4.SrcPort,
&f.Reply.Layer4.DstPort,
&f.Independent.ID,
)
}

if err != nil {
return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err)
}
f.Reply.Layer4.Proto = f.Original.Layer4.Proto
return f, nil
}

func (c *conntrackWalker) existingConnections() ([]flow, error) {
args := append([]string{"-L", "-o", "xml", "-p", "tcp"}, c.args...)
args := append([]string{"-L", "-o", "id", "-p", "tcp"}, c.args...)
cmd := exec.Command("conntrack", args...)
stdout, err := cmd.StdoutPipe()
if err != nil {
Expand All @@ -250,13 +321,63 @@ func (c *conntrackWalker) existingConnections() ([]flow, error) {
log.Errorf("conntrack existingConnections exit error: %v", err)
}
}()
var result conntrack
if err := xml.NewDecoder(stdout).Decode(&result); err == io.EOF {
return []flow{}, nil
} else if err != nil {
return []flow{}, err

scanner := bufio.NewScanner(bufio.NewReader(stdout))
var result []flow
for {
f, err := decodeDumpedFlow(scanner)
if err != nil {
if err == io.EOF {
break
}
log.Errorf("conntrack error: %v", err)
return result, err
}
result = append(result, f)
}
return result.Flows, nil
return result, nil
}

func decodeDumpedFlow(scanner *bufio.Scanner) (flow, error) {
var (
// Use ints for parsing unused fields since allocations
// are almost for free
unused [4]int
f flow
)

// Example:
// " tcp 6 431997 ESTABLISHED src=10.32.0.1 dst=10.32.0.1 sport=50274 dport=4040 src=10.32.0.1 dst=10.32.0.1 sport=4040 dport=50274 [ASSURED] mark=0 use=1 id=407401088c"
// remove tags since they are optional and make parsing harder
line, err := getUntaggedLine(scanner)
if err != nil {
return flow{}, err
}

_, err = fmt.Sscanf(string(line), "%s %d %d %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d mark=%d use=%d id=%d",
&f.Original.Layer4.Proto,
&unused[0],
&unused[1],
&f.Independent.State,
&f.Original.Layer3.SrcIP,
&f.Original.Layer3.DstIP,
&f.Original.Layer4.SrcPort,
&f.Original.Layer4.DstPort,
&f.Reply.Layer3.SrcIP,
&f.Reply.Layer3.DstIP,
&f.Reply.Layer4.SrcPort,
&f.Reply.Layer4.DstPort,
&unused[2],
&unused[3],
&f.Independent.ID,
)

if err != nil {
return flow{}, fmt.Errorf("Error parsing dumped flow %q: %v ", line, err)
}

f.Reply.Layer4.Proto = f.Original.Layer4.Proto
return f, nil
}

func (c *conntrackWalker) stop() {
Expand All @@ -269,21 +390,8 @@ func (c *conntrackWalker) stop() {
}

func (c *conntrackWalker) handleFlow(f flow, forceAdd bool) {
// A flow consists of 3 'metas' - the 'original' 4 tuple (as seen by this
// host) and the 'reply' 4 tuple, which is what it has been rewritten to.
// This code finds those metas, which are identified by a Direction
// attribute.
for i := range f.Metas {
meta := &f.Metas[i]
switch meta.Direction {
case "original":
f.Original = meta
case "reply":
f.Reply = meta
case "independent":
f.Independent = meta
}
}
c.Lock()
defer c.Unlock()

// For not, I'm only interested in tcp connections - there is too much udp
// traffic going on (every container talking to weave dns, for example) to
Expand All @@ -292,9 +400,6 @@ func (c *conntrackWalker) handleFlow(f flow, forceAdd bool) {
return
}

c.Lock()
defer c.Unlock()

// Ignore flows for which we never saw an update; they are likely
// incomplete or wrong. See #1462.
switch {
Expand Down
Loading