Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip unknown fields #182

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ The vFlow configuration contains the following keys
|ipfix-mirror-workers | 5 | IPFIX replicator concurrent packet generator |
|ipfix-tpl-cache-file | /tmp/vflow.templates | IPFIX templates cache file |
|ipfix-rpc-enabled | true | enable/disable IPFIX RPC |
|ipfix-skip-unknown | false | enable/disable skipping of unknown elements |
|sflow-enabled | true | enable/disable sFlow decoders |
|sflow-port | 6343 | server sFlow UDP port |
|sflow-workers | 200 | sFlow concurrent decoders |
Expand All @@ -50,6 +51,7 @@ The vFlow configuration contains the following keys
|netflow9-topic | vflow.netflow9 | netflow v9 message queue topic name |
|netflow9-udp-size | 1500 | maximum netflow v9 UDP packet size |
|netflow9-tpl-cache-file | /tmp/netflow9.templates | netflow v9 templates cache file |
|netflow9-skip-unknown | false | enable/disable skipping of unknown elements |
|dynamic-workers | true | enable/disable dynamic workers feature |
|stats-enabled | true | enable/disable web stats listener |
|stats-format | prometheus | set prometheus or restful format |
Expand Down
75 changes: 40 additions & 35 deletions ipfix/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ import (

// Decoder represents IPFIX payload and remote address
type Decoder struct {
raddr net.IP
reader *reader.Reader
raddr net.IP
reader *reader.Reader
skipUnknownElements bool
}

// MessageHeader represents IPFIX message header
Expand Down Expand Up @@ -97,8 +98,8 @@ type nonfatalError struct {
var rpcChan = make(chan RPCRequest, 1)

// NewDecoder constructs a decoder
func NewDecoder(raddr net.IP, b []byte) *Decoder {
return &Decoder{raddr, reader.NewReader(b)}
func NewDecoder(raddr net.IP, b []byte, skipUnknownElements bool) *Decoder {
return &Decoder{raddr, reader.NewReader(b), skipUnknownElements}
}

// Decode decodes the IPFIX raw data
Expand Down Expand Up @@ -479,15 +480,15 @@ func (tr *TemplateRecord) unmarshalOpts(r *reader.Reader) error {
return nil
}

func (d *Decoder) getDataLength(fieldSpecifierLen uint16, t FieldType) (uint16, error) {
func (d *Decoder) getDataLength(fieldSpecifierLen uint16) (uint16, error) {
var (
err error
readLength uint16
)

r := d.reader

if (t == String || t == OctetArray) && (fieldSpecifierLen == 65535) {
if fieldSpecifierLen == 65535 {
var len8 uint8
if len8, err = r.Uint8(); err != nil {
return 0, err
Expand Down Expand Up @@ -516,55 +517,59 @@ func (d *Decoder) decodeData(tr TemplateRecord) ([]DecodedField, error) {
r := d.reader

for i := 0; i < len(tr.ScopeFieldSpecifiers); i++ {
if readLength, err = d.getDataLength(tr.ScopeFieldSpecifiers[i].Length); err != nil {
return nil, err
}

if b, err = r.Read(int(readLength)); err != nil {
return nil, err
}

m, ok := InfoModel[ElementKey{
tr.ScopeFieldSpecifiers[i].EnterpriseNo,
tr.ScopeFieldSpecifiers[i].ElementID,
}]

if !ok {
return nil, nonfatalError{fmt.Errorf("IPFIX element key (%d) not exist (scope)",
tr.ScopeFieldSpecifiers[i].ElementID)}
if ok {
fields = append(fields, DecodedField{
ID: m.FieldID,
Value: Interpret(&b, m.Type),
EnterpriseNo: tr.ScopeFieldSpecifiers[i].EnterpriseNo,
})
} else {
if !d.skipUnknownElements {
return nil, nonfatalError{fmt.Errorf("IPFIX element key (%d) not exist (scope)",
tr.ScopeFieldSpecifiers[i].ElementID)}
}
}
}

if readLength, err = d.getDataLength(tr.ScopeFieldSpecifiers[i].Length, m.Type); err != nil {
for i := 0; i < len(tr.FieldSpecifiers); i++ {
if readLength, err = d.getDataLength(tr.FieldSpecifiers[i].Length); err != nil {
return nil, err
}

if b, err = r.Read(int(readLength)); err != nil {
return nil, err
}

fields = append(fields, DecodedField{
ID: m.FieldID,
Value: Interpret(&b, m.Type),
EnterpriseNo: tr.ScopeFieldSpecifiers[i].EnterpriseNo,
})
}

for i := 0; i < len(tr.FieldSpecifiers); i++ {
m, ok := InfoModel[ElementKey{
tr.FieldSpecifiers[i].EnterpriseNo,
tr.FieldSpecifiers[i].ElementID,
}]

if !ok {
return nil, nonfatalError{fmt.Errorf("IPFIX element key (%d) not exist",
tr.FieldSpecifiers[i].ElementID)}
}

if readLength, err = d.getDataLength(tr.FieldSpecifiers[i].Length, m.Type); err != nil {
return nil, err
}

if b, err = r.Read(int(readLength)); err != nil {
return nil, err
if ok {
fields = append(fields, DecodedField{
ID: m.FieldID,
Value: Interpret(&b, m.Type),
EnterpriseNo: tr.FieldSpecifiers[i].EnterpriseNo,
})
} else {
if !d.skipUnknownElements {
return nil, nonfatalError{fmt.Errorf("IPFIX element key (%d) not exist",
tr.FieldSpecifiers[i].ElementID)}
}
}

fields = append(fields, DecodedField{
ID: m.FieldID,
Value: Interpret(&b, m.Type),
EnterpriseNo: tr.FieldSpecifiers[i].EnterpriseNo,
})
}

if len(fields) == 0 {
Expand Down
58 changes: 49 additions & 9 deletions ipfix/decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestDecodeNoData(t *testing.T) {
ip := net.ParseIP("127.0.0.1")
mCache := GetCache("cache.file")
body := []byte{}
d := NewDecoder(ip, body)
d := NewDecoder(ip, body, false)
if _, err := d.Decode(mCache); err == nil {
t.Error("expected err but nothing")
}
Expand All @@ -143,7 +143,7 @@ func TestDecodeNoData(t *testing.T) {
func TestDecodeTemplate(t *testing.T) {
ip := net.ParseIP("127.0.0.1")
mCache := GetCache("cache.file")
d := NewDecoder(ip, tpl)
d := NewDecoder(ip, tpl, false)
_, err := d.Decode(mCache)
if err != nil {
t.Error("unexpected error happened:", err)
Expand All @@ -153,7 +153,7 @@ func TestDecodeTemplate(t *testing.T) {
func TestDecodeOptsTemplate(t *testing.T) {
ip := net.ParseIP("127.0.0.1")
mCache := GetCache("cache.file")
d := NewDecoder(ip, optsTpl)
d := NewDecoder(ip, optsTpl, false)
_, err := d.Decode(mCache)
if err != nil {
t.Error("unexpected error happened:", err)
Expand All @@ -164,7 +164,7 @@ func BenchmarkDecodeTemplate(b *testing.B) {
ip := net.ParseIP("127.0.0.1")
mCache := GetCache("cache.file")
for i := 0; i < b.N; i++ {
d := NewDecoder(ip, tpl)
d := NewDecoder(ip, tpl, false)
d.Decode(mCache)
}
}
Expand All @@ -173,15 +173,15 @@ func BenchmarkDecodeOptsTemplate(b *testing.B) {
ip := net.ParseIP("127.0.0.1")
mCache := GetCache("cache.file")
for i := 0; i < b.N; i++ {
d := NewDecoder(ip, optsTpl)
d := NewDecoder(ip, optsTpl, false)
d.Decode(mCache)
}
}

func TestMultiMessage(t *testing.T) {
ip := net.ParseIP("127.0.0.1")
mCache := GetCache("cache.file")
d := NewDecoder(ip, multiMessage)
d := NewDecoder(ip, multiMessage, false)
r, err := d.Decode(mCache)
if err != nil {
t.Error("unexpected error happened:", err)
Expand All @@ -199,7 +199,7 @@ func TestMultiMessage(t *testing.T) {
func TestUnknownDatasetsMessage(t *testing.T) {
ip := net.ParseIP("127.0.0.1")
mCache := GetCache("cache.file")
d := NewDecoder(ip, unknownDatasetMessage)
d := NewDecoder(ip, unknownDatasetMessage, false)
r, err := d.Decode(mCache)
if l := len(r.DataSets); l != 0 {
t.Error("Did not expect any result datasets, but got", l)
Expand All @@ -218,15 +218,55 @@ func TestDecodeDataTpl(t *testing.T) {

ip := net.ParseIP("127.0.0.1")
templates := GetCache("")
d := NewDecoder(ip, tpl)
d := NewDecoder(ip, tpl, false)
_, err := d.Decode(templates)
if err != nil {
t.Error(err)
}

d = NewDecoder(ip, payload)
d = NewDecoder(ip, payload, false)
_, err = d.Decode(templates)
if err != nil {
t.Error(err)
}
}

func TestUnknownElement(t *testing.T) {
// Single dataset, id 61166, 2 elements, element id 1, and element id 222, enterprise 7
var template = []byte{
0x00, 0x0a, 0x00, 0x20, 0x63, 0x75, 0x58, 0xb1, 0x19, 0x10, 0x70, 0x03, 0x00, 0x00, 0x00, 0x00,
0x00, 0x02, 0x00, 0x14, 0xee, 0xee, 0x00, 0x02, 0x00, 0x01, 0x00, 0x08, 0x80, 0xde, 0x00, 0x04,
0x00, 0x00, 0x00, 0x07}

var payload = []byte{
0x00, 0x0a, 0x00, 0x20, 0x63, 0x75, 0x58, 0xb1, 0x19, 0x10, 0x70, 0x04, 0x00, 0x00, 0x00, 0x00,
0xee, 0xee, 0x00, 0x10, 0x00, 0x00, 0x00, 0x11, 0x11, 0x11, 0x11, 0x11, 0x22, 0x22, 0x22, 0x22}

ip := net.ParseIP("127.0.0.1")
templates := GetCache("")
d := NewDecoder(ip, template, false)
_, err := d.Decode(templates)
if err != nil {
t.Error(err)
}

// Parse data with unknown element
d = NewDecoder(ip, payload, false)
m, err := d.Decode(templates)
if err == nil {
t.Error("Expected error due to unknown element, but got nil")
}
if len(m.DataSets) != 0 {
t.Error("Did not expect any result datasets, but got", m.DataSets)
}

// Now parse again, skip unknown element
d = NewDecoder(ip, payload, true)
m, err = d.Decode(templates)
if err != nil {
t.Error(err)
}
if len(m.DataSets) != 1 {
t.Error("Expected 1 dataset, but got", m.DataSets)
}
}
2 changes: 1 addition & 1 deletion ipfix/memcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func TestMemCacheRetrieve(t *testing.T) {
ip := net.ParseIP("127.0.0.1")
mCache := GetCache("cache.file")
d := NewDecoder(ip, tpl)
d := NewDecoder(ip, tpl, false)
d.Decode(mCache)
v, ok := mCache.retrieve(256, ip)
if !ok {
Expand Down
43 changes: 25 additions & 18 deletions netflow/v9/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ type DecodedField struct {

// Decoder represents Netflow payload and remote address
type Decoder struct {
raddr net.IP
reader *reader.Reader
raddr net.IP
reader *reader.Reader
skipUnknownElements bool
}

// Message represents Netflow decoded data
Expand Down Expand Up @@ -335,15 +336,18 @@ func (d *Decoder) decodeData(tr TemplateRecord) ([]DecodedField, error) {
tr.ScopeFieldSpecifiers[i].ElementID,
}]

if !ok {
return nil, nonfatalError(fmt.Errorf("Netflow element key (%d) not exist (scope)",
tr.ScopeFieldSpecifiers[i].ElementID))
if ok {
fields = append(fields, DecodedField{
ID: m.FieldID,
Value: ipfix.Interpret(&b, m.Type),
})
} else {
if !d.skipUnknownElements {
return nil, nonfatalError(fmt.Errorf("Netflow element key (%d) not exist (scope)",
tr.ScopeFieldSpecifiers[i].ElementID))
}
}

fields = append(fields, DecodedField{
ID: m.FieldID,
Value: ipfix.Interpret(&b, m.Type),
})
}

for i := 0; i < len(tr.FieldSpecifiers); i++ {
Expand All @@ -357,23 +361,26 @@ func (d *Decoder) decodeData(tr TemplateRecord) ([]DecodedField, error) {
tr.FieldSpecifiers[i].ElementID,
}]

if !ok {
return nil, nonfatalError(fmt.Errorf("Netflow element key (%d) not exist",
tr.FieldSpecifiers[i].ElementID))
if ok {
fields = append(fields, DecodedField{
ID: m.FieldID,
Value: ipfix.Interpret(&b, m.Type),
})
} else {
if !d.skipUnknownElements {
return nil, nonfatalError(fmt.Errorf("Netflow element key (%d) not exist",
tr.FieldSpecifiers[i].ElementID))
}
}

fields = append(fields, DecodedField{
ID: m.FieldID,
Value: ipfix.Interpret(&b, m.Type),
})
}

return fields, nil
}

// NewDecoder constructs a decoder
func NewDecoder(raddr net.IP, b []byte) *Decoder {
return &Decoder{raddr, reader.NewReader(b)}
func NewDecoder(raddr net.IP, b []byte, skipUnknownElements bool) *Decoder {
return &Decoder{raddr, reader.NewReader(b), skipUnknownElements}
}

// Decode decodes the flow records
Expand Down
Loading