Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
* Remove XML traces
* Improve performance
* Fix tests
  • Loading branch information
Alfonso Acosta committed Dec 21, 2016
1 parent f19889f commit d22d64c
Show file tree
Hide file tree
Showing 3 changed files with 295 additions and 221 deletions.
145 changes: 77 additions & 68 deletions probe/endpoint/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ package endpoint

import (
"bufio"
"encoding/xml"
"bytes"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"

Expand All @@ -20,48 +19,45 @@ import (
const (
// From https://www.kernel.org/doc/Documentation/networking/nf_conntrack-sysctl.txt
// Check a tcp-related file for existence since we need tcp tracking
procFileToCheck = "sys/net/netfilter/nf_conntrack_tcp_timeout_close"
xmlHeader = "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n"
conntrackOpenTag = "<conntrack>\n"
timeWait = "TIME_WAIT"
tcpProto = "tcp"
newType = "new"
updateType = "update"
destroyType = "destroy"
procFileToCheck = "sys/net/netfilter/nf_conntrack_tcp_timeout_close"
timeWait = "TIME_WAIT"
tcpProto = "tcp"
newType = "[NEW]"
updateType = "[UPDATE]"
destroyType = "[DESTROY]"
)

var (
destroyTypeB = []byte(destroyType)
assured = []byte("[ASSURED] ")
unreplied = []byte("[UNREPLIED] ")
)

type layer3 struct {
XMLName xml.Name `xml:"layer3"`
SrcIP string `xml:"src"`
DstIP string `xml:"dst"`
SrcIP string
DstIP string
}

type layer4 struct {
XMLName xml.Name `xml:"layer4"`
SrcPort int `xml:"sport"`
DstPort int `xml:"dport"`
Proto string `xml:"protoname,attr"`
SrcPort int
DstPort int
Proto string
}

type meta struct {
XMLName xml.Name `xml:"meta"`
Direction string `xml:"direction,attr"`
Layer3 layer3 `xml:"layer3"`
Layer4 layer4 `xml:"layer4"`
ID int64 `xml:"id"`
State string `xml:"state"`
Layer3 layer3
Layer4 layer4
ID int64
State string
}

type flow struct {
XMLName xml.Name `xml:"flow"`
Type string `xml:"type,attr"`

Original, Reply, Independent meta `xml:"-"`
Type string
Original, Reply, Independent meta
}

type conntrack struct {
XMLName xml.Name `xml:"conntrack"`
Flows []flow `xml:"flow"`
Flows []flow
}

// flowWalker is something that maintains flows, and provides an accessor
Expand Down Expand Up @@ -205,12 +201,12 @@ func (c *conntrackWalker) run() {
c.cmd = cmd
c.Unlock()

reader := bufio.NewReader(stdout)
scanner := bufio.NewScanner(bufio.NewReader(stdout))
defer log.Infof("conntrack exiting")

// Lop on the output stream
for {
f, err := decodeStreamedFlow(reader)
f, err := decodeStreamedFlow(scanner)
if err != nil {
log.Errorf("conntrack error: %v", err)
return
Expand All @@ -219,24 +215,40 @@ func (c *conntrackWalker) run() {
}
}

func getUntaggedLine(reader *bufio.Reader) (string, error) {
// TODO: read bytes?
line, err := reader.ReadString('\n')
if err != nil {
return "", err
// Get a line without [ASSURED]/[UNREPLIED] tags (it simplifies parsing)
func getUntaggedLine(scanner *bufio.Scanner) ([]byte, error) {
success := scanner.Scan()
if !success {
if err := scanner.Err(); err != nil {
return nil, err
}
return nil, io.EOF
}
// Remove [ASSURED]/[UNREPLIED] tags inplace
// TODO: replace in-place?
line = strings.Replace(line, "[ASSURED] ", "", -1)
line = strings.Replace(line, "[UNREPLIED] ", "", -1)
line := scanner.Bytes()
// Remove [ASSURED]/[UNREPLIED] tags
line = removeInplace(line, assured)
line = removeInplace(line, unreplied)
return line, nil
}

func decodeStreamedFlow(reader *bufio.Reader) (flow, error) {
func removeInplace(s, sep []byte) []byte {
// TODO: See if we can get better performance
// removing multiple substrings at once (with index/suffixarray New()+Lookup())
// Probably not worth it for only two substrings occurring once.
index := bytes.Index(s, sep)
if index < 0 {
return s
}
copy(s[index:], s[index+len(sep):])
return s[:len(s)-len(sep)]
}

func decodeStreamedFlow(scanner *bufio.Scanner) (flow, error) {
var (
// TODO: use []byte/int where possible?
omit [4]string
f flow
// Use ints for parsing unused fields since their allocations
// are almost for free
unused [2]int
f flow
)

// Examples:
Expand All @@ -246,18 +258,18 @@ func decodeStreamedFlow(reader *bufio.Reader) (flow, error) {
// " [DESTROY] tcp 6 src=172.17.0.1 dst=172.17.0.1 sport=34078 dport=53 src=172.17.0.1 dst=10.0.2.15 sport=53 dport=34078 id=3668417984" (note how the timeout and state field is missing)

// Remove tags since they are optional and make parsing harder
line, err := getUntaggedLine(reader)
line, err := getUntaggedLine(scanner)
if err != nil {
return flow{}, err
}

// TODO: refactor and probably create a fully-fledged parser, this is just good enough for performance testing
if strings.Contains(line, "[DESTROY]") {
line = bytes.TrimLeft(line, " ")
if bytes.HasPrefix(line, destroyTypeB) {
// Destroy events don't have a timeout or state field
_, err = fmt.Sscanf(line, "%s %s %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%x\n",
_, err = fmt.Sscanf(string(line), "%s %s %d src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%d",
&f.Type,
&f.Original.Layer4.Proto,
&omit[0],
&unused[0],
&f.Original.Layer3.SrcIP,
&f.Original.Layer3.DstIP,
&f.Original.Layer4.SrcPort,
Expand All @@ -269,11 +281,11 @@ func decodeStreamedFlow(reader *bufio.Reader) (flow, error) {
&f.Independent.ID,
)
} else {
_, err = fmt.Sscanf(line, "%s %s %s %s %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%x\n",
_, err = fmt.Sscanf(string(line), "%s %s %d %d %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%d",
&f.Type,
&f.Original.Layer4.Proto,
&omit[0],
&omit[1],
&unused[0],
&unused[1],
&f.Independent.State,
&f.Original.Layer3.SrcIP,
&f.Original.Layer3.DstIP,
Expand All @@ -290,10 +302,6 @@ func decodeStreamedFlow(reader *bufio.Reader) (flow, error) {
if err != nil {
return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err)
}
if len(f.Type) < 3 || f.Type[0] != '[' || f.Type[len(f.Type)-1] != ']' {
return flow{}, fmt.Errorf("Unexpected type format: %q", f.Type)
}
f.Type = strings.ToLower(f.Type[1 : len(f.Type)-1])
f.Reply.Layer4.Proto = f.Original.Layer4.Proto
return f, nil
}
Expand All @@ -314,10 +322,10 @@ func (c *conntrackWalker) existingConnections() ([]flow, error) {
}
}()

reader := bufio.NewReader(stdout)
scanner := bufio.NewScanner(bufio.NewReader(stdout))
var result []flow
for {
f, err := decodeDumpedFlow(reader)
f, err := decodeDumpedFlow(scanner)
if err != nil {
if err == io.EOF {
break
Expand All @@ -330,25 +338,26 @@ func (c *conntrackWalker) existingConnections() ([]flow, error) {
return result, nil
}

func decodeDumpedFlow(reader *bufio.Reader) (flow, error) {
func decodeDumpedFlow(scanner *bufio.Scanner) (flow, error) {
var (
// TODO: use int/[]byte where possible?
omit [4]string
f flow
// Use ints for parsing unused fields since allocations
// are almost for free
unused [4]int
f flow
)

// Example:
// " tcp 6 431997 ESTABLISHED src=10.32.0.1 dst=10.32.0.1 sport=50274 dport=4040 src=10.32.0.1 dst=10.32.0.1 sport=4040 dport=50274 [ASSURED] mark=0 use=1 id=407401088c"
// remove tags since they are optional and make parsing harder
line, err := getUntaggedLine(reader)
line, err := getUntaggedLine(scanner)
if err != nil {
return flow{}, err
}

_, err = fmt.Sscanf(line, "%s %s %s %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d %s %s id=%x\n",
_, err = fmt.Sscanf(string(line), "%s %d %d %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d mark=%d use=%d id=%d",
&f.Original.Layer4.Proto,
&omit[0],
&omit[1],
&unused[0],
&unused[1],
&f.Independent.State,
&f.Original.Layer3.SrcIP,
&f.Original.Layer3.DstIP,
Expand All @@ -358,8 +367,8 @@ func decodeDumpedFlow(reader *bufio.Reader) (flow, error) {
&f.Reply.Layer3.DstIP,
&f.Reply.Layer4.SrcPort,
&f.Reply.Layer4.DstPort,
&omit[2],
&omit[3],
&unused[2],
&unused[3],
&f.Independent.ID,
)

Expand Down
Loading

0 comments on commit d22d64c

Please sign in to comment.