Skip to content

Commit

Permalink
Merge pull request #198 from andrewkroh/feature/udp
Browse files Browse the repository at this point in the history
Adding support for UDP based protocols.
  • Loading branch information
tsg committed Aug 12, 2015
2 parents 9c7f9ad + 6fc4326 commit 485f368
Show file tree
Hide file tree
Showing 10 changed files with 960 additions and 225 deletions.
138 changes: 138 additions & 0 deletions decoder/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package decoder

import (
"fmt"

"github.com/elastic/libbeat/logp"
"github.com/elastic/packetbeat/protos"
"github.com/elastic/packetbeat/protos/tcp"
"github.com/elastic/packetbeat/protos/udp"

"github.com/tsg/gopacket"
"github.com/tsg/gopacket/layers"
)

type DecoderStruct struct {
Parser *gopacket.DecodingLayerParser

sll layers.LinuxSLL
d1q layers.Dot1Q
lo layers.Loopback
eth layers.Ethernet
ip4 layers.IPv4
ip6 layers.IPv6
tcp layers.TCP
udp layers.UDP
payload gopacket.Payload
decoded []gopacket.LayerType

tcpProc tcp.Processor
udpProc udp.Processor
}

// Creates and returns a new DecoderStruct.
func NewDecoder(datalink layers.LinkType, tcp tcp.Processor, udp udp.Processor) (*DecoderStruct, error) {
d := DecoderStruct{tcpProc: tcp, udpProc: udp}

logp.Debug("pcapread", "Layer type: %s", datalink.String())

switch datalink {

case layers.LinkTypeLinuxSLL:
d.Parser = gopacket.NewDecodingLayerParser(
layers.LayerTypeLinuxSLL,
&d.sll, &d.d1q, &d.ip4, &d.ip6, &d.tcp, &d.udp, &d.payload)

case layers.LinkTypeEthernet:
d.Parser = gopacket.NewDecodingLayerParser(
layers.LayerTypeEthernet,
&d.eth, &d.d1q, &d.ip4, &d.ip6, &d.tcp, &d.udp, &d.payload)

case layers.LinkTypeNull: // loopback on OSx
d.Parser = gopacket.NewDecodingLayerParser(
layers.LayerTypeLoopback,
&d.lo, &d.d1q, &d.ip4, &d.ip6, &d.tcp, &d.udp, &d.payload)

default:
return nil, fmt.Errorf("Unsuported link type: %s", datalink.String())

}

d.decoded = []gopacket.LayerType{}

return &d, nil
}

func (decoder *DecoderStruct) DecodePacketData(data []byte, ci *gopacket.CaptureInfo) {

var err error
var packet protos.Packet

err = decoder.Parser.DecodeLayers(data, &decoder.decoded)
if err != nil {
// Ignore UnsupportedLayerType errors that can occur while parsing
// UDP packets.
lastLayer := decoder.decoded[len(decoder.decoded)-1]
_, unsupported := err.(gopacket.UnsupportedLayerType)
if !(unsupported && lastLayer == layers.LayerTypeUDP) {
logp.Debug("pcapread", "Decoding error: %s", err)
return
}
}

has_tcp := false
has_udp := false

for _, layerType := range decoder.decoded {
switch layerType {
case layers.LayerTypeIPv4:
logp.Debug("ip", "IPv4 packet")

packet.Tuple.Src_ip = decoder.ip4.SrcIP
packet.Tuple.Dst_ip = decoder.ip4.DstIP
packet.Tuple.Ip_length = 4

case layers.LayerTypeIPv6:
logp.Debug("ip", "IPv6 packet")

packet.Tuple.Src_ip = decoder.ip6.SrcIP
packet.Tuple.Dst_ip = decoder.ip6.DstIP
packet.Tuple.Ip_length = 16

case layers.LayerTypeTCP:
logp.Debug("ip", "TCP packet")

packet.Tuple.Src_port = uint16(decoder.tcp.SrcPort)
packet.Tuple.Dst_port = uint16(decoder.tcp.DstPort)

has_tcp = true

case layers.LayerTypeUDP:
logp.Debug("ip", "UDP packet")

packet.Tuple.Src_port = uint16(decoder.udp.SrcPort)
packet.Tuple.Dst_port = uint16(decoder.udp.DstPort)
packet.Payload = decoder.udp.Payload

has_udp = true

case gopacket.LayerTypePayload:
packet.Payload = decoder.payload
}
}

packet.Ts = ci.Timestamp
packet.Tuple.ComputeHashebles()

if has_udp {
decoder.udpProc.Process(&packet)
} else if has_tcp {
if len(packet.Payload) == 0 && !decoder.tcp.FIN {
// We have no use for this atm.
logp.Debug("pcapread", "Ignore empty non-FIN packet")
return
}

decoder.tcpProc.Process(&decoder.tcp, &packet)
}
}
166 changes: 166 additions & 0 deletions decoder/decoder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package decoder

import (
"strings"
"testing"

"github.com/elastic/packetbeat/protos"

"github.com/stretchr/testify/assert"
"github.com/tsg/gopacket"
"github.com/tsg/gopacket/layers"
)

type TestTcpProcessor struct {
tcphdr *layers.TCP
pkt *protos.Packet
}

func (l *TestTcpProcessor) Process(tcphdr *layers.TCP, pkt *protos.Packet) {
l.tcphdr = tcphdr
l.pkt = pkt
}

type TestUdpProcessor struct {
pkt *protos.Packet
}

func (l *TestUdpProcessor) Process(pkt *protos.Packet) {
l.pkt = pkt
}

// 172.16.16.164:1108 172.16.16.139:53 DNS 87 Standard query 0x0007 AXFR contoso.local
var ipv4TcpDns = []byte{
0x00, 0x0c, 0x29, 0xce, 0xd1, 0x9e, 0x00, 0x0c, 0x29, 0x7e, 0xec, 0xa4, 0x08, 0x00, 0x45, 0x00,
0x00, 0x49, 0x46, 0x54, 0x40, 0x00, 0x80, 0x06, 0x3b, 0x0b, 0xac, 0x10, 0x10, 0xa4, 0xac, 0x10,
0x10, 0x8b, 0x04, 0x54, 0x00, 0x35, 0x5d, 0x9f, 0x0c, 0x90, 0x1a, 0xef, 0x6f, 0x43, 0x50, 0x18,
0xfa, 0xf0, 0xbc, 0x3d, 0x00, 0x00, 0x00, 0x07, 0x01, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x6f, 0x73, 0x6f, 0x05, 0x6c, 0x6f, 0x63, 0x61, 0x6c,
0x00, 0x00, 0xfc, 0x00, 0x01, 0x4d, 0x53,
}

// Test that DecodePacket decodes and IPv4/TCP packet and invokes the TCP processor.
func TestDecodePacketData_ipv4Tcp(t *testing.T) {
p := gopacket.NewPacket(ipv4TcpDns, layers.LinkTypeEthernet, gopacket.Default)
if p.ErrorLayer() != nil {
t.Error("Failed to decode packet:", p.ErrorLayer().Error())
}
d, tcp, _ := newTestDecoder(t)
d.DecodePacketData(p.Data(), &p.Metadata().CaptureInfo)

assert.NotNil(t, tcp.pkt, "TCP packet not received")
assert.Equal(t, "172.16.16.164", tcp.pkt.Tuple.Src_ip.String())
assert.Equal(t, uint16(1108), tcp.pkt.Tuple.Src_port)
assert.Equal(t, "172.16.16.139", tcp.pkt.Tuple.Dst_ip.String())
assert.Equal(t, uint16(53), tcp.pkt.Tuple.Dst_port)
assert.NotEqual(t, -1, strings.Index(string(p.Data()), string(tcp.pkt.Payload)))
}

// 192.168.170.8:32795 192.168.170.20:53 DNS 74 Standard query 0x75c0 A www.netbsd.org
var ipv4UdpDns = []byte{
0x00, 0xc0, 0x9f, 0x32, 0x41, 0x8c, 0x00, 0xe0, 0x18, 0xb1, 0x0c, 0xad, 0x08, 0x00, 0x45, 0x00,
0x00, 0x3c, 0x00, 0x00, 0x40, 0x00, 0x40, 0x11, 0x65, 0x43, 0xc0, 0xa8, 0xaa, 0x08, 0xc0, 0xa8,
0xaa, 0x14, 0x80, 0x1b, 0x00, 0x35, 0x00, 0x28, 0xaf, 0x61, 0x75, 0xc0, 0x01, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x77, 0x77, 0x77, 0x06, 0x6e, 0x65, 0x74, 0x62, 0x73,
0x64, 0x03, 0x6f, 0x72, 0x67, 0x00, 0x00, 0x01, 0x00, 0x01,
}

// Test that DecodePacket decodes and IPv4/UDP packet and invokes the UDP processor.
func TestDecodePacketData_ipv4Udp(t *testing.T) {
p := gopacket.NewPacket(ipv4UdpDns, layers.LinkTypeEthernet, gopacket.Default)
if p.ErrorLayer() != nil {
t.Error("Failed to decode packet:", p.ErrorLayer().Error())
}
d, _, udp := newTestDecoder(t)
d.DecodePacketData(p.Data(), &p.Metadata().CaptureInfo)

assert.NotNil(t, udp.pkt, "UDP packet not received")
assert.Equal(t, "192.168.170.8", udp.pkt.Tuple.Src_ip.String())
assert.Equal(t, uint16(32795), udp.pkt.Tuple.Src_port)
assert.Equal(t, "192.168.170.20", udp.pkt.Tuple.Dst_ip.String())
assert.Equal(t, uint16(53), udp.pkt.Tuple.Dst_port)
assert.NotEqual(t, -1, strings.Index(string(p.Data()), string(udp.pkt.Payload)))
}

// IP6 2001:6f8:102d::2d0:9ff:fee3:e8de.59201 > 2001:6f8:900:7c0::2.80
var ipv6TcpHttpGet = []byte{
0x00, 0x11, 0x25, 0x82, 0x95, 0xb5, 0x00, 0xd0, 0x09, 0xe3, 0xe8, 0xde, 0x86, 0xdd, 0x60, 0x00,
0x00, 0x00, 0x01, 0x04, 0x06, 0x40, 0x20, 0x01, 0x06, 0xf8, 0x10, 0x2d, 0x00, 0x00, 0x02, 0xd0,
0x09, 0xff, 0xfe, 0xe3, 0xe8, 0xde, 0x20, 0x01, 0x06, 0xf8, 0x09, 0x00, 0x07, 0xc0, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xe7, 0x41, 0x00, 0x50, 0xab, 0xdc, 0xd6, 0x61, 0x01, 0x4a,
0x73, 0x9f, 0x50, 0x18, 0x16, 0x80, 0xf4, 0x48, 0x00, 0x00, 0x47, 0x45, 0x54, 0x20, 0x2f, 0x20,
0x48, 0x54, 0x54, 0x50, 0x2f, 0x31, 0x2e, 0x30, 0x0d, 0x0a, 0x48, 0x6f, 0x73, 0x74, 0x3a, 0x20,
0x63, 0x6c, 0x2d, 0x31, 0x39, 0x38, 0x35, 0x2e, 0x68, 0x61, 0x6d, 0x2d, 0x30, 0x31, 0x2e, 0x64,
0x65, 0x2e, 0x73, 0x69, 0x78, 0x78, 0x73, 0x2e, 0x6e, 0x65, 0x74, 0x0d, 0x0a, 0x41, 0x63, 0x63,
0x65, 0x70, 0x74, 0x3a, 0x20, 0x74, 0x65, 0x78, 0x74, 0x2f, 0x68, 0x74, 0x6d, 0x6c, 0x2c, 0x20,
0x74, 0x65, 0x78, 0x74, 0x2f, 0x70, 0x6c, 0x61, 0x69, 0x6e, 0x2c, 0x20, 0x74, 0x65, 0x78, 0x74,
0x2f, 0x63, 0x73, 0x73, 0x2c, 0x20, 0x74, 0x65, 0x78, 0x74, 0x2f, 0x73, 0x67, 0x6d, 0x6c, 0x2c,
0x20, 0x2a, 0x2f, 0x2a, 0x3b, 0x71, 0x3d, 0x30, 0x2e, 0x30, 0x31, 0x0d, 0x0a, 0x41, 0x63, 0x63,
0x65, 0x70, 0x74, 0x2d, 0x45, 0x6e, 0x63, 0x6f, 0x64, 0x69, 0x6e, 0x67, 0x3a, 0x20, 0x67, 0x7a,
0x69, 0x70, 0x2c, 0x20, 0x62, 0x7a, 0x69, 0x70, 0x32, 0x0d, 0x0a, 0x41, 0x63, 0x63, 0x65, 0x70,
0x74, 0x2d, 0x4c, 0x61, 0x6e, 0x67, 0x75, 0x61, 0x67, 0x65, 0x3a, 0x20, 0x65, 0x6e, 0x0d, 0x0a,
0x55, 0x73, 0x65, 0x72, 0x2d, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x3a, 0x20, 0x4c, 0x79, 0x6e, 0x78,
0x2f, 0x32, 0x2e, 0x38, 0x2e, 0x36, 0x72, 0x65, 0x6c, 0x2e, 0x32, 0x20, 0x6c, 0x69, 0x62, 0x77,
0x77, 0x77, 0x2d, 0x46, 0x4d, 0x2f, 0x32, 0x2e, 0x31, 0x34, 0x20, 0x53, 0x53, 0x4c, 0x2d, 0x4d,
0x4d, 0x2f, 0x31, 0x2e, 0x34, 0x2e, 0x31, 0x20, 0x4f, 0x70, 0x65, 0x6e, 0x53, 0x53, 0x4c, 0x2f,
0x30, 0x2e, 0x39, 0x2e, 0x38, 0x62, 0x0d, 0x0a, 0x0d, 0x0a,
}

// Test that DecodePacket decodes and IPv6/TCP packet and invokes the TCP processor.
func TestDecodePacketData_ipv6Tcp(t *testing.T) {
p := gopacket.NewPacket(ipv6TcpHttpGet, layers.LinkTypeEthernet, gopacket.Default)
if p.ErrorLayer() != nil {
t.Error("Failed to decode packet: ", p.ErrorLayer().Error())
}
d, tcp, _ := newTestDecoder(t)
d.DecodePacketData(p.Data(), &p.Metadata().CaptureInfo)

assert.NotNil(t, tcp.pkt, "TCP packet not received")
assert.Equal(t, "2001:6f8:102d:0:2d0:9ff:fee3:e8de", tcp.pkt.Tuple.Src_ip.String())
assert.Equal(t, uint16(59201), tcp.pkt.Tuple.Src_port)
assert.Equal(t, "2001:6f8:900:7c0::2", tcp.pkt.Tuple.Dst_ip.String())
assert.Equal(t, uint16(80), tcp.pkt.Tuple.Dst_port)
assert.NotEqual(t, -1, strings.Index(string(p.Data()), string(tcp.pkt.Payload)))
}

// 3ffe:507:0:1:200:86ff:fe05:80da.2415 > 3ffe:501:4819::42.53
var ipv6UdpDns = []byte{
0x00, 0x60, 0x97, 0x07, 0x69, 0xea, 0x00, 0x00, 0x86, 0x05, 0x80, 0xda, 0x86, 0xdd, 0x60, 0x00,
0x00, 0x00, 0x00, 0x61, 0x11, 0x40, 0x3f, 0xfe, 0x05, 0x07, 0x00, 0x00, 0x00, 0x01, 0x02, 0x00,
0x86, 0xff, 0xfe, 0x05, 0x80, 0xda, 0x3f, 0xfe, 0x05, 0x01, 0x48, 0x19, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x42, 0x09, 0x6f, 0x00, 0x35, 0x00, 0x61, 0xa3, 0x35, 0x5c, 0x78,
0x01, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x61, 0x01, 0x65, 0x01, 0x39,
0x01, 0x36, 0x01, 0x37, 0x01, 0x30, 0x01, 0x65, 0x01, 0x66, 0x01, 0x66, 0x01, 0x66, 0x01, 0x37,
0x01, 0x39, 0x01, 0x30, 0x01, 0x36, 0x01, 0x32, 0x01, 0x30, 0x01, 0x31, 0x01, 0x30, 0x01, 0x30,
0x01, 0x30, 0x01, 0x30, 0x01, 0x30, 0x01, 0x30, 0x01, 0x30, 0x01, 0x37, 0x01, 0x30, 0x01, 0x35,
0x01, 0x30, 0x01, 0x65, 0x01, 0x66, 0x01, 0x66, 0x01, 0x33, 0x03, 0x69, 0x70, 0x36, 0x03, 0x69,
0x6e, 0x74, 0x00, 0x00, 0x0c, 0x00, 0x01,
}

// Test that DecodePacket decodes and IPv6/UDP packet and invokes the UDP processor.
func TestDecodePacketData_ipv6Udp(t *testing.T) {
p := gopacket.NewPacket(ipv6UdpDns, layers.LinkTypeEthernet, gopacket.Default)
if p.ErrorLayer() != nil {
t.Error("Failed to decode packet:", p.ErrorLayer().Error())
}
d, _, udp := newTestDecoder(t)
d.DecodePacketData(p.Data(), &p.Metadata().CaptureInfo)

assert.NotNil(t, udp.pkt, "UDP packet not received")
assert.Equal(t, "3ffe:507:0:1:200:86ff:fe05:80da", udp.pkt.Tuple.Src_ip.String())
assert.Equal(t, uint16(2415), udp.pkt.Tuple.Src_port)
assert.Equal(t, "3ffe:501:4819::42", udp.pkt.Tuple.Dst_ip.String())
assert.Equal(t, uint16(53), udp.pkt.Tuple.Dst_port)
assert.NotEqual(t, -1, strings.Index(string(p.Data()), string(udp.pkt.Payload)))
}

// Creates a new TestDecoder that handles ethernet packets.
func newTestDecoder(t *testing.T) (*DecoderStruct, *TestTcpProcessor, *TestUdpProcessor) {
tcpLayer := &TestTcpProcessor{}
udpLayer := &TestUdpProcessor{}
d, err := NewDecoder(layers.LinkTypeEthernet, tcpLayer, udpLayer)
if err != nil {
t.Fatalf("Error creating decoder %v", err)
}
return d, tcpLayer, udpLayer
}
13 changes: 10 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/elastic/packetbeat/protos/redis"
"github.com/elastic/packetbeat/protos/tcp"
"github.com/elastic/packetbeat/protos/thrift"
"github.com/elastic/packetbeat/protos/udp"
"github.com/elastic/packetbeat/sniffer"
)

Expand Down Expand Up @@ -111,7 +112,13 @@ func main() {
protos.Protos.Register(proto, plugin)
}

if err = tcp.TcpInit(); err != nil {
tcpProc, err := tcp.NewTcp(&protos.Protos)
if err != nil {
logp.Critical(err.Error())
os.Exit(1)
}
udpProc, err := udp.NewUdp(&protos.Protos)
if err != nil {
logp.Critical(err.Error())
os.Exit(1)
}
Expand All @@ -134,7 +141,7 @@ func main() {
}

logp.Debug("main", "Initializing sniffer")
err = sniff.Init(false, afterInputsQueue)
err = sniff.Init(false, afterInputsQueue, tcpProc, udpProc)
if err != nil {
logp.Critical("Initializing sniffer failed: %v", err)
os.Exit(1)
Expand Down Expand Up @@ -184,7 +191,7 @@ func main() {
if service.WithMemProfile() {
// wait for all TCP streams to expire
time.Sleep(tcp.TCP_STREAM_EXPIRY * 1.2)
tcp.PrintTcpMap()
tcpProc.PrintTcpMap()
}
service.Cleanup()
}
Loading

0 comments on commit 485f368

Please sign in to comment.