diff --git a/probe/endpoint/conntrack.go b/probe/endpoint/conntrack.go index da55323ead..3f4711ad73 100644 --- a/probe/endpoint/conntrack.go +++ b/probe/endpoint/conntrack.go @@ -8,8 +8,10 @@ import ( "io/ioutil" "path/filepath" "strconv" + "strings" "sync" "time" + "unicode" log "github.com/Sirupsen/logrus" @@ -250,6 +252,58 @@ func removeInplace(s, sep []byte) []byte { return s[:len(s)-len(sep)] } +// decodeFlowKeyValues parses the key-values from a conntrack line and updates the flow +// It only considers the following key-values: +// src=127.0.0.1 dst=127.0.0.1 sport=58958 dport=6784 src=127.0.0.1 dst=127.0.0.1 sport=6784 dport=58958 id=1595499776 +// Keys can be present twice, so the order is important. +// Conntrack could add other key-values such as secctx=, packets=, bytes=. Those are ignored. +func decodeFlowKeyValues(line []byte, f *flow) error { + var err error + for _, field := range strings.FieldsFunc(string(line), func(c rune) bool { return unicode.IsSpace(c) }) { + kv := strings.SplitN(field, "=", 2) + if len(kv) != 2 { + continue + } + key := kv[0] + value := kv[1] + firstTupleSet := f.Original.Layer4.DstPort != 0 + switch { + case key == "src": + if !firstTupleSet { + f.Original.Layer3.SrcIP = value + } else { + f.Reply.Layer3.SrcIP = value + } + + case key == "dst": + if !firstTupleSet { + f.Original.Layer3.DstIP = value + } else { + f.Reply.Layer3.DstIP = value + } + + case key == "sport": + if !firstTupleSet { + f.Original.Layer4.SrcPort, err = strconv.Atoi(value) + } else { + f.Reply.Layer4.SrcPort, err = strconv.Atoi(value) + } + + case key == "dport": + if !firstTupleSet { + f.Original.Layer4.DstPort, err = strconv.Atoi(value) + } else { + f.Reply.Layer4.DstPort, err = strconv.Atoi(value) + } + + case key == "id": + f.Independent.ID, err = strconv.ParseInt(value, 10, 64) + } + } + + return err +} + func decodeStreamedFlow(scanner *bufio.Scanner) (flow, error) { var ( // Use ints for parsing unused fields since their allocations @@ -273,42 +327,29 @@ func decodeStreamedFlow(scanner *bufio.Scanner) (flow, error) { line = bytes.TrimLeft(line, " ") if bytes.HasPrefix(line, destroyTypeB) { // Destroy events don't have a timeout or state field - _, err = fmt.Sscanf(string(line), "%s %s %d src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%d", + _, err = fmt.Sscanf(string(line), "%s %s %d", &f.Type, &f.Original.Layer4.Proto, &unused[0], - &f.Original.Layer3.SrcIP, - &f.Original.Layer3.DstIP, - &f.Original.Layer4.SrcPort, - &f.Original.Layer4.DstPort, - &f.Reply.Layer3.SrcIP, - &f.Reply.Layer3.DstIP, - &f.Reply.Layer4.SrcPort, - &f.Reply.Layer4.DstPort, - &f.Independent.ID, ) } else { - _, err = fmt.Sscanf(string(line), "%s %s %d %d %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%d", + _, err = fmt.Sscanf(string(line), "%s %s %d %d %s", &f.Type, &f.Original.Layer4.Proto, &unused[0], &unused[1], &f.Independent.State, - &f.Original.Layer3.SrcIP, - &f.Original.Layer3.DstIP, - &f.Original.Layer4.SrcPort, - &f.Original.Layer4.DstPort, - &f.Reply.Layer3.SrcIP, - &f.Reply.Layer3.DstIP, - &f.Reply.Layer4.SrcPort, - &f.Reply.Layer4.DstPort, - &f.Independent.ID, ) } + if err != nil { + return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err) + } + err = decodeFlowKeyValues(line, &f) if err != nil { return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err) } + f.Reply.Layer4.Proto = f.Original.Layer4.Proto return f, nil } @@ -353,32 +394,27 @@ func decodeDumpedFlow(scanner *bufio.Scanner) (flow, error) { f flow ) - // Example: - // " tcp 6 431997 ESTABLISHED src=10.32.0.1 dst=10.32.0.1 sport=50274 dport=4040 src=10.32.0.1 dst=10.32.0.1 sport=4040 dport=50274 [ASSURED] mark=0 use=1 id=407401088c" + // Examples with different formats: + // With SELinux, there is a "secctx=" + // After "sudo sysctl net.netfilter.nf_conntrack_acct=1", there is "packets=" and "bytes=" + // + // "tcp 6 431997 ESTABLISHED src=10.32.0.1 dst=10.32.0.1 sport=50274 dport=4040 src=10.32.0.1 dst=10.32.0.1 sport=4040 dport=50274 [ASSURED] mark=0 use=1 id=407401088" + // "tcp 6 431998 ESTABLISHED src=10.0.2.2 dst=10.0.2.15 sport=49911 dport=22 src=10.0.2.15 dst=10.0.2.2 sport=22 dport=49911 [ASSURED] mark=0 use=1 id=2993966208" + // "tcp 6 108 ESTABLISHED src=172.17.0.5 dst=172.17.0.2 sport=47010 dport=80 src=172.17.0.2 dst=172.17.0.5 sport=80 dport=47010 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=4001098880" + // "tcp 6 431970 ESTABLISHED src=192.168.35.116 dst=216.58.213.227 sport=49862 dport=443 packets=11 bytes=1337 src=216.58.213.227 dst=192.168.35.116 sport=443 dport=49862 packets=8 bytes=716 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=943643840" + // remove tags since they are optional and make parsing harder line, err := getUntaggedLine(scanner) if err != nil { return flow{}, err } - _, err = fmt.Sscanf(string(line), "%s %d %d %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d mark=%d use=%d id=%d", - &f.Original.Layer4.Proto, - &unused[0], - &unused[1], - &f.Independent.State, - &f.Original.Layer3.SrcIP, - &f.Original.Layer3.DstIP, - &f.Original.Layer4.SrcPort, - &f.Original.Layer4.DstPort, - &f.Reply.Layer3.SrcIP, - &f.Reply.Layer3.DstIP, - &f.Reply.Layer4.SrcPort, - &f.Reply.Layer4.DstPort, - &unused[2], - &unused[3], - &f.Independent.ID, - ) + _, err = fmt.Sscanf(string(line), "%s %d %d %s", &f.Original.Layer4.Proto, &unused[0], &unused[1], &f.Independent.State) + if err != nil { + return flow{}, fmt.Errorf("Error parsing dumped flow %q: %v ", line, err) + } + err = decodeFlowKeyValues(line, &f) if err != nil { return flow{}, fmt.Errorf("Error parsing dumped flow %q: %v ", line, err) } diff --git a/probe/endpoint/conntrack_internal_test.go b/probe/endpoint/conntrack_internal_test.go index 105dd9c16b..a2aa253973 100644 --- a/probe/endpoint/conntrack_internal_test.go +++ b/probe/endpoint/conntrack_internal_test.go @@ -155,7 +155,11 @@ func TestStreamedFlowDecoding(t *testing.T) { } // Obtained through conntrack -L -p tcp -o id -const dumpedFlowsSource = `tcp 6 431998 ESTABLISHED src=10.0.2.2 dst=10.0.2.15 sport=49911 dport=22 src=10.0.2.15 dst=10.0.2.2 sport=22 dport=49911 [ASSURED] mark=0 use=1 id=2993966208` +// With SELinux, there is a "secctx=" +// After "sudo sysctl net.netfilter.nf_conntrack_acct=1", there is "packets=" and "bytes=" +const dumpedFlowsSource = `tcp 6 431998 ESTABLISHED src=10.0.2.2 dst=10.0.2.15 sport=49911 dport=22 src=10.0.2.15 dst=10.0.2.2 sport=22 dport=49911 [ASSURED] mark=0 use=1 id=2993966208 +tcp 6 108 ESTABLISHED src=172.17.0.5 dst=172.17.0.2 sport=47010 dport=80 src=172.17.0.2 dst=172.17.0.5 sport=80 dport=47010 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=4001098880 +tcp 6 431970 ESTABLISHED src=192.168.35.116 dst=216.58.213.227 sport=49862 dport=443 packets=11 bytes=1337 src=216.58.213.227 dst=192.168.35.116 sport=443 dport=49862 packets=8 bytes=716 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=943643840` var wantDumpedFlows = []flow{ { @@ -186,6 +190,62 @@ var wantDumpedFlows = []flow{ State: "ESTABLISHED", }, }, + { + Original: meta{ + Layer3: layer3{ + SrcIP: "172.17.0.5", + DstIP: "172.17.0.2", + }, + Layer4: layer4{ + SrcPort: 47010, + DstPort: 80, + Proto: "tcp", + }, + }, + Reply: meta{ + Layer3: layer3{ + SrcIP: "172.17.0.2", + DstIP: "172.17.0.5", + }, + Layer4: layer4{ + SrcPort: 80, + DstPort: 47010, + Proto: "tcp", + }, + }, + Independent: meta{ + ID: 4001098880, + State: "ESTABLISHED", + }, + }, + { + Original: meta{ + Layer3: layer3{ + SrcIP: "192.168.35.116", + DstIP: "216.58.213.227", + }, + Layer4: layer4{ + SrcPort: 49862, + DstPort: 443, + Proto: "tcp", + }, + }, + Reply: meta{ + Layer3: layer3{ + SrcIP: "216.58.213.227", + DstIP: "192.168.35.116", + }, + Layer4: layer4{ + SrcPort: 443, + DstPort: 49862, + Proto: "tcp", + }, + }, + Independent: meta{ + ID: 943643840, + State: "ESTABLISHED", + }, + }, } func TestDumpedFlowDecoding(t *testing.T) {