Skip to content

Commit

Permalink
Update DNS protocol to use ECS fields
Browse files Browse the repository at this point in the history
This updates the DNS protocol to have more closely follow ECS.
The DNS tunneling dashboard has been updated to work with the new
field names.

In order to better interoperate with other data sources the trailing dot
has been removed from domain names. For example, previously Packetbeat
would produce `dns.question.name:elastic.co.` and now it will simply produce
`dns.question.name:elastic.co`. It's a breaking change but it will be make it
easier to pivot with other data sources.

Part of elastic#7968

Here's a summary of what fields changed.

Changed

- bytes_in -> source.bytes
- bytes_out -> destination.bytes
- notes -> error.message
- responsetime -> event.duration (unit are now nanoseconds)
- transport -> network.transport

Added

- event.end
- event.start
- network.bytes
- network.community_id
- network.protocol = dns
- network.transport = udp/tcp
- network.type

Unchanged Packetbeat Fields

- method - dns opcode
- query = class {{ dns.question.class }}, type {{ dns.question.type }}, {{ dns.question.name }}
- request - text representation of the entire request
- response - text representation of the entire response
- resource - dns.question.name
- status
- type = dns (we might remove this since we have event.dataset)
  • Loading branch information
andrewkroh committed Jan 9, 2019
1 parent ce3f23f commit ecbb62d
Show file tree
Hide file tree
Showing 11 changed files with 1,236 additions and 659 deletions.
1,044 changes: 585 additions & 459 deletions packetbeat/_meta/kibana/6/dashboard/Packetbeat-dns-tunneling.json

Large diffs are not rendered by default.

300 changes: 300 additions & 0 deletions packetbeat/pb/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pb

import (
"net"
"reflect"
"time"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/flowhash"
"github.com/elastic/ecs/code/go/ecs"
)

// FieldsKey is the key under which a *pb.Fields value may be stored in a
// beat.Event. The Packetbeat publisher will marshal those fields into the
// event at publish time.
const FieldsKey = "_packetbeat"

// Fields contains common fields used in Packetbeat events. Protocol
// implementations can publish a Fields pointer in a beat.Event and it will
// be marshaled into the event following the ECS schema where applicable.
//
// If client and server are nil then they will be populated with the source and
// destination values, respectively. Other fields like event.duration and
// and network.bytes are automatically computed at publish time.
type Fields struct {
Source *ecs.Source `ecs:"source"`
Destination *ecs.Destination `ecs:"destination"`
Client *ecs.Client `ecs:"client"`
Server *ecs.Server `ecs:"server"`
Network ecs.Network `ecs:"network"`
Event ecs.Event `ecs:"event"`

SourceProcess *ecs.Process `ecs:"source.process"`
DestinationProcess *ecs.Process `ecs:"destination.process"`
Process *ecs.Process `ecs:"process"`
}

// NewBeatEvent creates a new beat.Event populated with a Fields value and
// returns both.
func NewBeatEvent(timestamp time.Time) (beat.Event, *Fields) {
pbFields := &Fields{}
return beat.Event{
Timestamp: timestamp,
Fields: common.MapStr{
FieldsKey: pbFields,
},
}, pbFields
}

// GetFields returns a pointer to a Fields object if one is stored within the
// given MapStr. It returns nil and no error if no Fields value is present.
func GetFields(m common.MapStr) (*Fields, error) {
v, found := m[FieldsKey]
if !found {
return nil, nil
}

fields, ok := v.(*Fields)
if !ok {
return nil, errors.Errorf("%v must be a *types.Fields, but is %T", FieldsKey, fields)
}
return fields, nil
}

// SetSource populates the source fields with the endpoint data.
func (f *Fields) SetSource(endpoint *common.Endpoint) {
if f.Source == nil {
f.Source = &ecs.Source{}
}
f.Source.IP = endpoint.IP
f.Source.Port = int64(endpoint.Port)
f.Source.Domain = endpoint.Domain

if endpoint.PID > 0 {
f.SourceProcess = makeProcess(&endpoint.Process)
}
}

// SetDestination populates the destination fields with the endpoint data.
func (f *Fields) SetDestination(endpoint *common.Endpoint) {
if f.Destination == nil {
f.Destination = &ecs.Destination{}
}
f.Destination.IP = endpoint.IP
f.Destination.Port = int64(endpoint.Port)
f.Destination.Domain = endpoint.Domain

if endpoint.PID > 0 {
f.DestinationProcess = makeProcess(&endpoint.Process)
}
}

func makeProcess(p *common.Process) *ecs.Process {
return &ecs.Process{
Name: p.Name,
Args: p.Args,
Executable: p.Exe,
PID: int64(p.PID),
PPID: int64(p.PPID),
Start: p.StartTime,
WorkingDirectory: p.CWD,
}
}

// ComputeValues computes derived values like network.bytes and writes them to f.
func (f *Fields) ComputeValues(localIPs []net.IP) error {
var flow flowhash.Flow

// network.bytes
if f.Source != nil {
flow.SourceIP = net.ParseIP(f.Source.IP)
flow.SourcePort = uint16(f.Source.Port)
f.Network.Bytes += f.Source.Bytes
}
if f.Destination != nil {
flow.DestinationIP = net.ParseIP(f.Destination.IP)
flow.DestinationPort = uint16(f.Destination.Port)
f.Network.Bytes += f.Destination.Bytes
}

// network.community_id
switch {
case f.Network.Transport == "udp":
flow.Protocol = 17
case f.Network.Transport == "tcp":
flow.Protocol = 6
case f.Network.Transport == "icmp":
flow.Protocol = 1
// TODO: Populate the ICMP type/code.
case f.Network.Transport == "ipv6-icmp":
flow.Protocol = 65
// TODO: Populate the ICMP type/code.
}
f.Network.CommunityID = flowhash.CommunityID.Hash(flow)

// network.type
if len(flow.SourceIP) > 0 {
if flow.SourceIP.To4() != nil {
f.Network.Type = "ipv4"
} else {
f.Network.Type = "ipv6"
}
} else if len(flow.DestinationIP) > 0 {
if flow.DestinationIP.To4() != nil {
f.Network.Type = "ipv4"
} else {
f.Network.Type = "ipv6"
}
}

// network.direction
if len(localIPs) > 0 {
if flow.SourceIP != nil {
for _, ip := range localIPs {
if flow.SourceIP.Equal(ip) {
f.Network.Direction = "outbound"
break
}
}
} else if flow.DestinationIP != nil {
for _, ip := range localIPs {
if flow.DestinationIP.Equal(ip) {
f.Network.Direction = "inbound"
break
}
}
}
}

// process (dest process will take priority)
if f.DestinationProcess != nil {
f.Process = f.DestinationProcess
} else if f.SourceProcess != nil {
f.Process = f.SourceProcess
}

// event.duration
if f.Event.Duration == 0 && !f.Event.Start.IsZero() && !f.Event.End.IsZero() {
f.Event.Duration = f.Event.End.Sub(f.Event.Start)
}

// client
if f.Client == nil && f.Source != nil {
client := ecs.Client(*f.Source)
f.Client = &client
}

// server
if f.Server == nil && f.Destination != nil {
server := ecs.Server(*f.Destination)
f.Server = &server
}

return nil
}

// MarshalMapStr marshals the fields into MapStr. It returns an error if there
// is a problem writing the keys to the given map (like if an intermediate key
// exists and is not a map).
func (f *Fields) MarshalMapStr(m common.MapStr) error {
typ := reflect.TypeOf(*f)
val := reflect.ValueOf(*f)

for i := 0; i < typ.NumField(); i++ {
structField := typ.Field(i)
tag := structField.Tag.Get("ecs")
if tag == "" {
panic(errors.Errorf("missing tag on field %v", structField.Name))
}

fieldValue := val.Field(i)
if !fieldValue.IsValid() || isEmptyValue(fieldValue) {
continue
}

if err := writeECSStruct(m, tag, fieldValue); err != nil {
return err
}
}
return nil
}

func writeECSStruct(m common.MapStr, key string, val reflect.Value) error {
// Dereference pointers.
if val.Type().Kind() == reflect.Ptr {
if val.IsNil() {
return nil
}

val = val.Elem()
}

// Ignore zero values.
if !val.IsValid() {
return nil
}

typ := val.Type()
for i := 0; i < typ.NumField(); i++ {
structField := typ.Field(i)
tag := structField.Tag.Get("ecs")
if tag == "" {
break
}

fieldValue := val.Field(i)
if !fieldValue.IsValid() || isEmptyValue(fieldValue) {
continue
}

if _, err := m.Put(key+"."+tag, fieldValue.Interface()); err != nil {
return err
}
}
return nil
}

// isEmptyValue returns true if the given value is empty.
func isEmptyValue(v reflect.Value) bool {
switch v.Kind() {
case reflect.Array, reflect.Map, reflect.Slice, reflect.String:
return v.Len() == 0
case reflect.Bool:
return !v.Bool()
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
return v.Int() == 0
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Uintptr:
return v.Uint() == 0
case reflect.Float32, reflect.Float64:
return v.Float() == 0
case reflect.Interface, reflect.Ptr:
return v.IsNil()
}

switch t := v.Interface().(type) {
case time.Time:
return t.IsZero()
}
return false
}
64 changes: 64 additions & 0 deletions packetbeat/pb/event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pb

import (
"net"
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/ecs/code/go/ecs"
)

func TestMarshalMapStr(t *testing.T) {
f := Fields{
Source: &ecs.Source{
IP: "127.0.0.1",
},
}

m := common.MapStr{}
if err := f.MarshalMapStr(m); err != nil {
t.Fatal(err)
}

assert.Equal(t, common.MapStr{"source": common.MapStr{"ip": "127.0.0.1"}}, m)
}

func TestComputeValues(t *testing.T) {
f := Fields{
Source: &ecs.Source{IP: "127.0.0.1", Port: 4000, Bytes: 100},
Destination: &ecs.Destination{IP: "127.0.0.2", Port: 80, Bytes: 200},
Network: ecs.Network{Transport: "tcp"},
}

localAddrs := []net.IP{net.ParseIP("127.0.0.1")}

if err := f.ComputeValues(localAddrs); err != nil {
t.Fatal(err)
}

assert.Equal(t, f.Source.IP, f.Client.IP)
assert.Equal(t, f.Destination.IP, f.Server.IP)
assert.EqualValues(t, f.Network.Bytes, 300)
assert.NotZero(t, f.Network.CommunityID)
assert.Equal(t, f.Network.Type, "ipv4")
assert.Equal(t, f.Network.Direction, "outbound")
}
Loading

0 comments on commit ecbb62d

Please sign in to comment.