Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

probe: conntrack: fix output parsing #2118

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 75 additions & 39 deletions probe/endpoint/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"io/ioutil"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"unicode"

log "github.com/Sirupsen/logrus"

Expand Down Expand Up @@ -250,6 +252,58 @@ func removeInplace(s, sep []byte) []byte {
return s[:len(s)-len(sep)]
}

// decodeFlowKeyValues parses the key-values from a conntrack line and updates the flow
// It only considers the following key-values:
// src=127.0.0.1 dst=127.0.0.1 sport=58958 dport=6784 src=127.0.0.1 dst=127.0.0.1 sport=6784 dport=58958 id=1595499776
// Keys can be present twice, so the order is important.
// Conntrack could add other key-values such as secctx=, packets=, bytes=. Those are ignored.
func decodeFlowKeyValues(line []byte, f *flow) error {
var err error
for _, field := range strings.FieldsFunc(string(line), func(c rune) bool { return unicode.IsSpace(c) }) {
kv := strings.SplitN(field, "=", 2)
if len(kv) != 2 {
continue
}
key := kv[0]
value := kv[1]
firstTupleSet := f.Original.Layer4.DstPort != 0
switch {
case key == "src":
if !firstTupleSet {
f.Original.Layer3.SrcIP = value
} else {
f.Reply.Layer3.SrcIP = value
}

case key == "dst":
if !firstTupleSet {
f.Original.Layer3.DstIP = value
} else {
f.Reply.Layer3.DstIP = value
}

case key == "sport":
if !firstTupleSet {
f.Original.Layer4.SrcPort, err = strconv.Atoi(value)
} else {
f.Reply.Layer4.SrcPort, err = strconv.Atoi(value)
}

case key == "dport":
if !firstTupleSet {
f.Original.Layer4.DstPort, err = strconv.Atoi(value)
} else {
f.Reply.Layer4.DstPort, err = strconv.Atoi(value)
}

case key == "id":
f.Independent.ID, err = strconv.ParseInt(value, 10, 64)
}
}

return err
}

func decodeStreamedFlow(scanner *bufio.Scanner) (flow, error) {
var (
// Use ints for parsing unused fields since their allocations
Expand All @@ -273,42 +327,29 @@ func decodeStreamedFlow(scanner *bufio.Scanner) (flow, error) {
line = bytes.TrimLeft(line, " ")
if bytes.HasPrefix(line, destroyTypeB) {
// Destroy events don't have a timeout or state field
_, err = fmt.Sscanf(string(line), "%s %s %d src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%d",
_, err = fmt.Sscanf(string(line), "%s %s %d",
&f.Type,
&f.Original.Layer4.Proto,
&unused[0],
&f.Original.Layer3.SrcIP,
&f.Original.Layer3.DstIP,
&f.Original.Layer4.SrcPort,
&f.Original.Layer4.DstPort,
&f.Reply.Layer3.SrcIP,
&f.Reply.Layer3.DstIP,
&f.Reply.Layer4.SrcPort,
&f.Reply.Layer4.DstPort,
&f.Independent.ID,
)
} else {
_, err = fmt.Sscanf(string(line), "%s %s %d %d %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%d",
_, err = fmt.Sscanf(string(line), "%s %s %d %d %s",
&f.Type,
&f.Original.Layer4.Proto,
&unused[0],
&unused[1],
&f.Independent.State,
&f.Original.Layer3.SrcIP,
&f.Original.Layer3.DstIP,
&f.Original.Layer4.SrcPort,
&f.Original.Layer4.DstPort,
&f.Reply.Layer3.SrcIP,
&f.Reply.Layer3.DstIP,
&f.Reply.Layer4.SrcPort,
&f.Reply.Layer4.DstPort,
&f.Independent.ID,
)
}
if err != nil {
return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err)
}

err = decodeFlowKeyValues(line, &f)
if err != nil {
return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err)
}

f.Reply.Layer4.Proto = f.Original.Layer4.Proto
return f, nil
}
Expand Down Expand Up @@ -353,32 +394,27 @@ func decodeDumpedFlow(scanner *bufio.Scanner) (flow, error) {
f flow
)

// Example:
// " tcp 6 431997 ESTABLISHED src=10.32.0.1 dst=10.32.0.1 sport=50274 dport=4040 src=10.32.0.1 dst=10.32.0.1 sport=4040 dport=50274 [ASSURED] mark=0 use=1 id=407401088c"
// Examples with different formats:
// With SELinux, there is a "secctx="
// After "sudo sysctl net.netfilter.nf_conntrack_acct=1", there is "packets=" and "bytes="
//
// "tcp 6 431997 ESTABLISHED src=10.32.0.1 dst=10.32.0.1 sport=50274 dport=4040 src=10.32.0.1 dst=10.32.0.1 sport=4040 dport=50274 [ASSURED] mark=0 use=1 id=407401088"
// "tcp 6 431998 ESTABLISHED src=10.0.2.2 dst=10.0.2.15 sport=49911 dport=22 src=10.0.2.15 dst=10.0.2.2 sport=22 dport=49911 [ASSURED] mark=0 use=1 id=2993966208"
// "tcp 6 108 ESTABLISHED src=172.17.0.5 dst=172.17.0.2 sport=47010 dport=80 src=172.17.0.2 dst=172.17.0.5 sport=80 dport=47010 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=4001098880"
// "tcp 6 431970 ESTABLISHED src=192.168.35.116 dst=216.58.213.227 sport=49862 dport=443 packets=11 bytes=1337 src=216.58.213.227 dst=192.168.35.116 sport=443 dport=49862 packets=8 bytes=716 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=943643840"

// remove tags since they are optional and make parsing harder
line, err := getUntaggedLine(scanner)
if err != nil {
return flow{}, err
}

_, err = fmt.Sscanf(string(line), "%s %d %d %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d mark=%d use=%d id=%d",
&f.Original.Layer4.Proto,
&unused[0],
&unused[1],
&f.Independent.State,
&f.Original.Layer3.SrcIP,
&f.Original.Layer3.DstIP,
&f.Original.Layer4.SrcPort,
&f.Original.Layer4.DstPort,
&f.Reply.Layer3.SrcIP,
&f.Reply.Layer3.DstIP,
&f.Reply.Layer4.SrcPort,
&f.Reply.Layer4.DstPort,
&unused[2],
&unused[3],
&f.Independent.ID,
)
_, err = fmt.Sscanf(string(line), "%s %d %d %s", &f.Original.Layer4.Proto, &unused[0], &unused[1], &f.Independent.State)
if err != nil {
return flow{}, fmt.Errorf("Error parsing dumped flow %q: %v ", line, err)
}

err = decodeFlowKeyValues(line, &f)
if err != nil {
return flow{}, fmt.Errorf("Error parsing dumped flow %q: %v ", line, err)
}
Expand Down
62 changes: 61 additions & 1 deletion probe/endpoint/conntrack_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,11 @@ func TestStreamedFlowDecoding(t *testing.T) {
}

// Obtained through conntrack -L -p tcp -o id
const dumpedFlowsSource = `tcp 6 431998 ESTABLISHED src=10.0.2.2 dst=10.0.2.15 sport=49911 dport=22 src=10.0.2.15 dst=10.0.2.2 sport=22 dport=49911 [ASSURED] mark=0 use=1 id=2993966208`
// With SELinux, there is a "secctx="
// After "sudo sysctl net.netfilter.nf_conntrack_acct=1", there is "packets=" and "bytes="
const dumpedFlowsSource = `tcp 6 431998 ESTABLISHED src=10.0.2.2 dst=10.0.2.15 sport=49911 dport=22 src=10.0.2.15 dst=10.0.2.2 sport=22 dport=49911 [ASSURED] mark=0 use=1 id=2993966208
tcp 6 108 ESTABLISHED src=172.17.0.5 dst=172.17.0.2 sport=47010 dport=80 src=172.17.0.2 dst=172.17.0.5 sport=80 dport=47010 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=4001098880
tcp 6 431970 ESTABLISHED src=192.168.35.116 dst=216.58.213.227 sport=49862 dport=443 packets=11 bytes=1337 src=216.58.213.227 dst=192.168.35.116 sport=443 dport=49862 packets=8 bytes=716 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=943643840`

var wantDumpedFlows = []flow{
{
Expand Down Expand Up @@ -186,6 +190,62 @@ var wantDumpedFlows = []flow{
State: "ESTABLISHED",
},
},
{
Original: meta{
Layer3: layer3{
SrcIP: "172.17.0.5",
DstIP: "172.17.0.2",
},
Layer4: layer4{
SrcPort: 47010,
DstPort: 80,
Proto: "tcp",
},
},
Reply: meta{
Layer3: layer3{
SrcIP: "172.17.0.2",
DstIP: "172.17.0.5",
},
Layer4: layer4{
SrcPort: 80,
DstPort: 47010,
Proto: "tcp",
},
},
Independent: meta{
ID: 4001098880,
State: "ESTABLISHED",
},
},
{
Original: meta{
Layer3: layer3{
SrcIP: "192.168.35.116",
DstIP: "216.58.213.227",
},
Layer4: layer4{
SrcPort: 49862,
DstPort: 443,
Proto: "tcp",
},
},
Reply: meta{
Layer3: layer3{
SrcIP: "216.58.213.227",
DstIP: "192.168.35.116",
},
Layer4: layer4{
SrcPort: 443,
DstPort: 49862,
Proto: "tcp",
},
},
Independent: meta{
ID: 943643840,
State: "ESTABLISHED",
},
},
}

func TestDumpedFlowDecoding(t *testing.T) {
Expand Down