Skip to content

Commit

Permalink
[WIP] probe: conntrack: fix output parsing
Browse files Browse the repository at this point in the history
TODO:
- [ ] don't recompile the regexp for each line
- [ ] test under different configuration (SELinux, nf_conntrack_acct)
- [ ] check performances compared to sscanf and xml

Fixes #2117
  • Loading branch information
alban committed Jan 6, 2017
1 parent b4e1fc7 commit 8b82a9d
Showing 1 changed file with 105 additions and 63 deletions.
168 changes: 105 additions & 63 deletions probe/endpoint/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"io/ioutil"
"path/filepath"
"regexp"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -251,12 +252,7 @@ func removeInplace(s, sep []byte) []byte {
}

func decodeStreamedFlow(scanner *bufio.Scanner) (flow, error) {
var (
// Use ints for parsing unused fields since their allocations
// are almost for free
unused [2]int
f flow
)
var f flow

// Examples:
// " [UPDATE] udp 17 29 src=192.168.2.100 dst=192.168.2.1 sport=57767 dport=53 src=192.168.2.1 dst=192.168.2.100 sport=53 dport=57767"
Expand All @@ -271,44 +267,82 @@ func decodeStreamedFlow(scanner *bufio.Scanner) (flow, error) {
}

line = bytes.TrimLeft(line, " ")
var regExpBegin string
var regExpEnd string = `src=([^\s]+).*\sdst=([^\s]+).*\ssport=([^\s]+).*\sdport=([^\s]+).*\ssrc=([^\s]+).*\sdst=([^\s]+).*\ssport=([^\s]+).*\sdport=([^\s]+).*\sid=([^\s]+)`

// Destroy events don't have a timeout or state field
if bytes.HasPrefix(line, destroyTypeB) {
// Destroy events don't have a timeout or state field
_, err = fmt.Sscanf(string(line), "%s %s %d src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%d",
&f.Type,
&f.Original.Layer4.Proto,
&unused[0],
&f.Original.Layer3.SrcIP,
&f.Original.Layer3.DstIP,
&f.Original.Layer4.SrcPort,
&f.Original.Layer4.DstPort,
&f.Reply.Layer3.SrcIP,
&f.Reply.Layer3.DstIP,
&f.Reply.Layer4.SrcPort,
&f.Reply.Layer4.DstPort,
&f.Independent.ID,
)
regExpBegin = `(\[[[:alpha:]]*\])\s+([[:alpha:]]*)\s+\d+\s+`
var lineMatch = regexp.MustCompile(regExpBegin + regExpEnd)
matches := lineMatch.FindSubmatch(line)
if len(matches) != 12 {
return flow{}, fmt.Errorf("Error parsing streamed flow %q: matches count: %v ", line, len(matches))
}

f.Type = string(matches[1])
f.Original.Layer4.Proto = string(matches[2])
f.Original.Layer3.SrcIP = string(matches[3])
f.Original.Layer3.DstIP = string(matches[4])
f.Original.Layer4.SrcPort, err = strconv.Atoi(string(matches[5]))
if err != nil {
return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err)
}
f.Original.Layer4.DstPort, err = strconv.Atoi(string(matches[6]))
if err != nil {
return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err)
}
f.Reply.Layer3.SrcIP = string(matches[7])
f.Reply.Layer3.DstIP = string(matches[8])
f.Reply.Layer4.SrcPort, err = strconv.Atoi(string(matches[9]))
if err != nil {
return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err)
}
f.Reply.Layer4.DstPort, err = strconv.Atoi(string(matches[10]))
if err != nil {
return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err)
}
f.Independent.ID, err = strconv.ParseInt(string(matches[11]), 10, 64)
if err != nil {
return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err)
}

} else {
_, err = fmt.Sscanf(string(line), "%s %s %d %d %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%d",
&f.Type,
&f.Original.Layer4.Proto,
&unused[0],
&unused[1],
&f.Independent.State,
&f.Original.Layer3.SrcIP,
&f.Original.Layer3.DstIP,
&f.Original.Layer4.SrcPort,
&f.Original.Layer4.DstPort,
&f.Reply.Layer3.SrcIP,
&f.Reply.Layer3.DstIP,
&f.Reply.Layer4.SrcPort,
&f.Reply.Layer4.DstPort,
&f.Independent.ID,
)
}
regExpBegin = `(\[[[:alpha:]]*\])\s+([[:alpha:]]*)\s+\d+\s+\d+\s+([[:alpha:]_]*)\s+`
var lineMatch = regexp.MustCompile(regExpBegin + regExpEnd)
matches := lineMatch.FindSubmatch(line)
if len(matches) != 13 {
return flow{}, fmt.Errorf("Error parsing streamed flow %q: matches count: %v ", line, len(matches))
}

if err != nil {
return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err)
f.Type = string(matches[1])
f.Original.Layer4.Proto = string(matches[2])
f.Independent.State = string(matches[3])
f.Original.Layer3.SrcIP = string(matches[4])
f.Original.Layer3.DstIP = string(matches[5])
f.Original.Layer4.SrcPort, err = strconv.Atoi(string(matches[6]))
if err != nil {
return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err)
}
f.Original.Layer4.DstPort, err = strconv.Atoi(string(matches[7]))
if err != nil {
return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err)
}
f.Reply.Layer3.SrcIP = string(matches[8])
f.Reply.Layer3.DstIP = string(matches[9])
f.Reply.Layer4.SrcPort, err = strconv.Atoi(string(matches[10]))
if err != nil {
return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err)
}
f.Reply.Layer4.DstPort, err = strconv.Atoi(string(matches[11]))
if err != nil {
return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err)
}
f.Independent.ID, err = strconv.ParseInt(string(matches[12]), 10, 64)
if err != nil {
return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err)
}
}

f.Reply.Layer4.Proto = f.Original.Layer4.Proto
return f, nil
}
Expand Down Expand Up @@ -346,12 +380,7 @@ func (c *conntrackWalker) existingConnections() ([]flow, error) {
}

func decodeDumpedFlow(scanner *bufio.Scanner) (flow, error) {
var (
// Use ints for parsing unused fields since allocations
// are almost for free
unused [4]int
f flow
)
var f flow

// Example:
// " tcp 6 431997 ESTABLISHED src=10.32.0.1 dst=10.32.0.1 sport=50274 dport=4040 src=10.32.0.1 dst=10.32.0.1 sport=4040 dport=50274 [ASSURED] mark=0 use=1 id=407401088c"
Expand All @@ -361,24 +390,37 @@ func decodeDumpedFlow(scanner *bufio.Scanner) (flow, error) {
return flow{}, err
}

_, err = fmt.Sscanf(string(line), "%s %d %d %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d mark=%d use=%d id=%d",
&f.Original.Layer4.Proto,
&unused[0],
&unused[1],
&f.Independent.State,
&f.Original.Layer3.SrcIP,
&f.Original.Layer3.DstIP,
&f.Original.Layer4.SrcPort,
&f.Original.Layer4.DstPort,
&f.Reply.Layer3.SrcIP,
&f.Reply.Layer3.DstIP,
&f.Reply.Layer4.SrcPort,
&f.Reply.Layer4.DstPort,
&unused[2],
&unused[3],
&f.Independent.ID,
)
var regExpEnd string = `src=([^\s]+).*\sdst=([^\s]+).*\ssport=([^\s]+).*\sdport=([^\s]+).*\ssrc=([^\s]+).*\sdst=([^\s]+).*\ssport=([^\s]+).*\sdport=([^\s]+).*\sid=([^\s]+)`
var regExpBegin string = `([[:alpha:]]*)\s+\d+\s+\d+\s+([[:alpha:]_]*)\s+`
var lineMatch = regexp.MustCompile(regExpBegin + regExpEnd)
matches := lineMatch.FindSubmatch(line)
if len(matches) != 12 {
return flow{}, fmt.Errorf("Error parsing dumped flow %q: matches count: %v ", line, len(matches))
}

f.Original.Layer4.Proto = string(matches[1])
f.Independent.State = string(matches[2])
f.Original.Layer3.SrcIP = string(matches[3])
f.Original.Layer3.DstIP = string(matches[4])
f.Original.Layer4.SrcPort, err = strconv.Atoi(string(matches[5]))
if err != nil {
return flow{}, fmt.Errorf("Error parsing dumped flow %q: %v ", line, err)
}
f.Original.Layer4.DstPort, err = strconv.Atoi(string(matches[6]))
if err != nil {
return flow{}, fmt.Errorf("Error parsing dumped flow %q: %v ", line, err)
}
f.Reply.Layer3.SrcIP = string(matches[7])
f.Reply.Layer3.DstIP = string(matches[8])
f.Reply.Layer4.SrcPort, err = strconv.Atoi(string(matches[9]))
if err != nil {
return flow{}, fmt.Errorf("Error parsing dumped flow %q: %v ", line, err)
}
f.Reply.Layer4.DstPort, err = strconv.Atoi(string(matches[10]))
if err != nil {
return flow{}, fmt.Errorf("Error parsing dumped flow %q: %v ", line, err)
}
f.Independent.ID, err = strconv.ParseInt(string(matches[11]), 10, 64)
if err != nil {
return flow{}, fmt.Errorf("Error parsing dumped flow %q: %v ", line, err)
}
Expand Down

0 comments on commit 8b82a9d

Please sign in to comment.