Skip to content

Commit

Permalink
[WIP] probe: conntrack: fix output parsing
Browse files Browse the repository at this point in the history
TODO:
- [ ] test under different configuration (SELinux, nf_conntrack_acct)
- [ ] check performances compared to sscanf and xml

Fixes #2117
  • Loading branch information
alban committed Jan 11, 2017
1 parent b4e1fc7 commit a57360a
Showing 1 changed file with 95 additions and 37 deletions.
132 changes: 95 additions & 37 deletions probe/endpoint/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"io/ioutil"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"unicode"

log "github.com/Sirupsen/logrus"

Expand Down Expand Up @@ -273,42 +275,70 @@ func decodeStreamedFlow(scanner *bufio.Scanner) (flow, error) {
line = bytes.TrimLeft(line, " ")
if bytes.HasPrefix(line, destroyTypeB) {
// Destroy events don't have a timeout or state field
_, err = fmt.Sscanf(string(line), "%s %s %d src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%d",
_, err = fmt.Sscanf(string(line), "%s %s %d",
&f.Type,
&f.Original.Layer4.Proto,
&unused[0],
&f.Original.Layer3.SrcIP,
&f.Original.Layer3.DstIP,
&f.Original.Layer4.SrcPort,
&f.Original.Layer4.DstPort,
&f.Reply.Layer3.SrcIP,
&f.Reply.Layer3.DstIP,
&f.Reply.Layer4.SrcPort,
&f.Reply.Layer4.DstPort,
&f.Independent.ID,
)
} else {
_, err = fmt.Sscanf(string(line), "%s %s %d %d %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%d",
_, err = fmt.Sscanf(string(line), "%s %s %d %d %s",
&f.Type,
&f.Original.Layer4.Proto,
&unused[0],
&unused[1],
&f.Independent.State,
&f.Original.Layer3.SrcIP,
&f.Original.Layer3.DstIP,
&f.Original.Layer4.SrcPort,
&f.Original.Layer4.DstPort,
&f.Reply.Layer3.SrcIP,
&f.Reply.Layer3.DstIP,
&f.Reply.Layer4.SrcPort,
&f.Reply.Layer4.DstPort,
&f.Independent.ID,
)
}
if err != nil {
return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err)
}

for _, field := range strings.FieldsFunc(string(line), func(c rune) bool { return unicode.IsSpace(c) }) {
kv := strings.SplitN(field, "=", 2)
if len(kv) != 2 {
continue
}
key := kv[0]
value := kv[1]
firstTupleSet := f.Original.Layer4.DstPort != 0
switch {
case key == "src":
if !firstTupleSet {
f.Original.Layer3.SrcIP = value
} else {
f.Reply.Layer3.SrcIP = value
}

case key == "dst":
if !firstTupleSet {
f.Original.Layer3.DstIP = value
} else {
f.Reply.Layer3.DstIP = value
}

case key == "sport":
if !firstTupleSet {
f.Original.Layer4.SrcPort, err = strconv.Atoi(value)
} else {
f.Reply.Layer4.SrcPort, err = strconv.Atoi(value)
}

case key == "dport":
if !firstTupleSet {
f.Original.Layer4.DstPort, err = strconv.Atoi(value)
} else {
f.Reply.Layer4.DstPort, err = strconv.Atoi(value)
}

case key == "id":
f.Independent.ID, err = strconv.ParseInt(value, 10, 64)
}
}

if err != nil {
return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err)
}

f.Reply.Layer4.Proto = f.Original.Layer4.Proto
return f, nil
}
Expand Down Expand Up @@ -361,24 +391,52 @@ func decodeDumpedFlow(scanner *bufio.Scanner) (flow, error) {
return flow{}, err
}

_, err = fmt.Sscanf(string(line), "%s %d %d %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d mark=%d use=%d id=%d",
&f.Original.Layer4.Proto,
&unused[0],
&unused[1],
&f.Independent.State,
&f.Original.Layer3.SrcIP,
&f.Original.Layer3.DstIP,
&f.Original.Layer4.SrcPort,
&f.Original.Layer4.DstPort,
&f.Reply.Layer3.SrcIP,
&f.Reply.Layer3.DstIP,
&f.Reply.Layer4.SrcPort,
&f.Reply.Layer4.DstPort,
&unused[2],
&unused[3],
&f.Independent.ID,
)
_, err = fmt.Sscanf(string(line), "%s %d %d %s", &f.Original.Layer4.Proto, &unused[0], &unused[1], &f.Independent.State)
if err != nil {
return flow{}, fmt.Errorf("Error parsing dumped flow %q: %v ", line, err)
}

for _, field := range strings.FieldsFunc(string(line), func(c rune) bool { return unicode.IsSpace(c) }) {
kv := strings.SplitN(field, "=", 2)
if len(kv) != 2 {
continue
}
key := kv[0]
value := kv[1]
firstTupleSet := f.Original.Layer4.DstPort != 0
switch {
case key == "src":
if !firstTupleSet {
f.Original.Layer3.SrcIP = value
} else {
f.Reply.Layer3.SrcIP = value
}

case key == "dst":
if !firstTupleSet {
f.Original.Layer3.DstIP = value
} else {
f.Reply.Layer3.DstIP = value
}

case key == "sport":
if !firstTupleSet {
f.Original.Layer4.SrcPort, err = strconv.Atoi(value)
} else {
f.Reply.Layer4.SrcPort, err = strconv.Atoi(value)
}

case key == "dport":
if !firstTupleSet {
f.Original.Layer4.DstPort, err = strconv.Atoi(value)
} else {
f.Reply.Layer4.DstPort, err = strconv.Atoi(value)
}

case key == "id":
f.Independent.ID, err = strconv.ParseInt(value, 10, 64)
}
}
if err != nil {
return flow{}, fmt.Errorf("Error parsing dumped flow %q: %v ", line, err)
}
Expand Down

0 comments on commit a57360a

Please sign in to comment.