diff --git a/packetbeat/flows/worker.go b/packetbeat/flows/worker.go index 9a90bc3ec6f..abdcffcff55 100644 --- a/packetbeat/flows/worker.go +++ b/packetbeat/flows/worker.go @@ -212,6 +212,9 @@ func createEvent( "end": common.Time(f.ts), "duration": f.ts.Sub(f.createTS), "dataset": "flow", + "kind": "event", + "category": "network_traffic", + "action": "network_flow", } flow := common.MapStr{ "id": common.NetString(f.id.Serialize()), diff --git a/packetbeat/pb/event.go b/packetbeat/pb/event.go index a9b184a88f3..ebed6ebb708 100644 --- a/packetbeat/pb/event.go +++ b/packetbeat/pb/event.go @@ -35,6 +35,13 @@ import ( // event at publish time. const FieldsKey = "_packetbeat" +// Network direction values. +const ( + Inbound = "inbound" + Outbound = "outbound" + Internal = "internal" +) + // Fields contains common fields used in Packetbeat events. Protocol // implementations can publish a Fields pointer in a beat.Event and it will // be marshaled into the event following the ECS schema where applicable. @@ -54,13 +61,23 @@ type Fields struct { DestinationProcess *ecs.Process `ecs:"destination.process"` Process *ecs.Process `ecs:"process"` + Error struct { + Message []string + } + ICMPType uint8 // ICMP message type for use in computing network.community_id. ICMPCode uint8 // ICMP message code for use in computing network.community_id. } // NewFields returns a new Fields value. func NewFields() *Fields { - return &Fields{Event: ecs.Event{Duration: -1}} + return &Fields{ + Event: ecs.Event{ + Duration: -1, + Kind: "event", + Category: "network_traffic", + }, + } } // NewBeatEvent creates a new beat.Event populated with a Fields value and @@ -181,18 +198,23 @@ func (f *Fields) ComputeValues(localIPs []net.IP) error { } // network.direction - if len(localIPs) > 0 { + if len(localIPs) > 0 && f.Network.Direction == "" { if flow.SourceIP != nil { for _, ip := range localIPs { if flow.SourceIP.Equal(ip) { - f.Network.Direction = "outbound" + f.Network.Direction = Outbound break } } - } else if flow.DestinationIP != nil { + } + if flow.DestinationIP != nil { for _, ip := range localIPs { if flow.DestinationIP.Equal(ip) { - f.Network.Direction = "inbound" + if f.Network.Direction == Outbound { + f.Network.Direction = Internal + } else { + f.Network.Direction = Inbound + } break } } @@ -256,6 +278,13 @@ func (f *Fields) MarshalMapStr(m common.MapStr) error { return err } } + + if len(f.Error.Message) == 1 { + m.Put("error.message", f.Error.Message[0]) + } else if len(f.Error.Message) > 1 { + m.Put("error.message", f.Error.Message) + } + return nil } diff --git a/packetbeat/pb/event_test.go b/packetbeat/pb/event_test.go index 661c3d12ae4..788003d7691 100644 --- a/packetbeat/pb/event_test.go +++ b/packetbeat/pb/event_test.go @@ -38,7 +38,13 @@ func TestMarshalMapStr(t *testing.T) { t.Fatal(err) } - assert.Equal(t, common.MapStr{"source": common.MapStr{"ip": "127.0.0.1"}}, m) + assert.Equal(t, common.MapStr{ + "event": common.MapStr{ + "kind": "event", + "category": "network_traffic", + }, + "source": common.MapStr{"ip": "127.0.0.1"}, + }, m) } func TestComputeValues(t *testing.T) { diff --git a/packetbeat/protos/amqp/amqp.go b/packetbeat/protos/amqp/amqp.go index fe3b33356e8..f2a2ca02601 100644 --- a/packetbeat/protos/amqp/amqp.go +++ b/packetbeat/protos/amqp/amqp.go @@ -437,6 +437,7 @@ func (amqp *amqpPlugin) publishTransaction(t *amqpTransaction) { pbf.Event.Dataset = "amqp" pbf.Network.Protocol = pbf.Event.Dataset pbf.Network.Transport = "tcp" + pbf.Error.Message = t.notes fields := evt.Fields fields["type"] = pbf.Event.Dataset @@ -490,12 +491,6 @@ func (amqp *amqpPlugin) publishTransaction(t *amqpTransaction) { } } - if len(t.notes) == 1 { - evt.PutValue("error.message", t.notes[0]) - } else if len(t.notes) > 1 { - evt.PutValue("error.message", t.notes) - } - amqp.results(evt) } diff --git a/packetbeat/protos/amqp/amqp_test.go b/packetbeat/protos/amqp/amqp_test.go index 829d2282f1a..6a17068ac4c 100644 --- a/packetbeat/protos/amqp/amqp_test.go +++ b/packetbeat/protos/amqp/amqp_test.go @@ -28,8 +28,8 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/packetbeat/pb" "github.com/elastic/beats/packetbeat/protos" + "github.com/elastic/beats/packetbeat/publish" ) type eventStore struct { @@ -37,18 +37,7 @@ type eventStore struct { } func (e *eventStore) publish(event beat.Event) { - pbf, err := pb.GetFields(event.Fields) - if err != nil || pbf == nil { - panic("_packetbeat not found") - } - delete(event.Fields, pb.FieldsKey) - if err = pbf.ComputeValues(nil); err != nil { - panic(err) - } - if err = pbf.MarshalMapStr(event.Fields); err != nil { - panic(err) - } - + publish.MarshalPacketbeatFields(&event, nil) e.events = append(e.events, event) } diff --git a/packetbeat/protos/applayer/applayer.go b/packetbeat/protos/applayer/applayer.go index 77ca08f4bba..21b740c73e1 100644 --- a/packetbeat/protos/applayer/applayer.go +++ b/packetbeat/protos/applayer/applayer.go @@ -223,7 +223,7 @@ func (t *Transaction) InitWithMsg( func (t *Transaction) Event(event *beat.Event) error { event.Timestamp = t.Ts.Ts - pbf := &pb.Fields{} + pbf := pb.NewFields() pbf.SetSource(&t.Src) pbf.SetDestination(&t.Dst) pbf.Source.Bytes = int64(t.BytesIn) @@ -233,16 +233,12 @@ func (t *Transaction) Event(event *beat.Event) error { pbf.Event.End = t.EndTime pbf.Network.Transport = t.Transport.String() pbf.Network.Protocol = pbf.Event.Dataset + pbf.Error.Message = t.Notes fields := event.Fields fields[pb.FieldsKey] = pbf fields["type"] = pbf.Event.Dataset fields["status"] = t.Status - if len(t.Notes) == 1 { - event.PutValue("error.message", t.Notes[0]) - } else if len(t.Notes) > 1 { - event.PutValue("error.message", t.Notes) - } return nil } diff --git a/packetbeat/protos/cassandra/pub.go b/packetbeat/protos/cassandra/pub.go index 8736138b5d8..2f79295ee03 100644 --- a/packetbeat/protos/cassandra/pub.go +++ b/packetbeat/protos/cassandra/pub.go @@ -79,13 +79,12 @@ func (pub *transPub) createEvent(requ, resp *message) beat.Event { cassandra := common.MapStr{} status := common.OK_STATUS - var notes []string //requ can be null, if the message is a PUSHed message if requ != nil { pbf.Source.Bytes = int64(requ.Size) pbf.Event.Start = requ.Ts - notes = append(notes, requ.Notes...) + pbf.Error.Message = requ.Notes if pub.sendRequest { if pub.sendRequestHeader { @@ -107,7 +106,7 @@ func (pub *transPub) createEvent(requ, resp *message) beat.Event { if resp != nil { pbf.Destination.Bytes = int64(resp.Size) pbf.Event.End = resp.Ts - notes = append(notes, resp.Notes...) + pbf.Error.Message = append(pbf.Error.Message, resp.Notes...) if resp.failed { status = common.ERROR_STATUS @@ -133,11 +132,5 @@ func (pub *transPub) createEvent(requ, resp *message) beat.Event { fields["cassandra"] = cassandra } - if len(notes) == 1 { - fields.Put("error.message", notes[0]) - } else if len(notes) > 1 { - fields.Put("error.message", notes) - } - return evt } diff --git a/packetbeat/protos/dhcpv4/dhcpv4_test.go b/packetbeat/protos/dhcpv4/dhcpv4_test.go index 9760d215fbd..56d9428ebe2 100644 --- a/packetbeat/protos/dhcpv4/dhcpv4_test.go +++ b/packetbeat/protos/dhcpv4/dhcpv4_test.go @@ -29,8 +29,8 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/packetbeat/pb" "github.com/elastic/beats/packetbeat/protos" + "github.com/elastic/beats/packetbeat/publish" ) var _ protos.UDPPlugin = &dhcpv4Plugin{} @@ -117,8 +117,10 @@ func TestParseDHCPRequest(t *testing.T) { "port": 67, }, "event": common.MapStr{ - "dataset": "dhcpv4", - "start": pkt.Ts, + "category": "network_traffic", + "dataset": "dhcpv4", + "kind": "event", + "start": pkt.Ts, }, "network": common.MapStr{ "type": "ipv4", @@ -150,8 +152,9 @@ func TestParseDHCPRequest(t *testing.T) { }, } - actual := marshalPacketbeatFields(t, p.parseDHCPv4(pkt)) + actual := p.parseDHCPv4(pkt) if assert.NotNil(t, actual) { + publish.MarshalPacketbeatFields(actual, nil) t.Logf("DHCP event: %+v", actual) assertEqual(t, expected, *actual) } @@ -194,8 +197,10 @@ func TestParseDHCPACK(t *testing.T) { "bytes": 300, }, "event": common.MapStr{ - "dataset": "dhcpv4", - "start": pkt.Ts, + "category": "network_traffic", + "dataset": "dhcpv4", + "kind": "event", + "start": pkt.Ts, }, "network": common.MapStr{ "type": "ipv4", @@ -226,8 +231,9 @@ func TestParseDHCPACK(t *testing.T) { }, } - actual := marshalPacketbeatFields(t, p.parseDHCPv4(pkt)) + actual := p.parseDHCPv4(pkt) if assert.NotNil(t, actual) { + publish.MarshalPacketbeatFields(actual, nil) t.Logf("DHCP event: %+v", actual) assertEqual(t, expected, *actual) } @@ -249,20 +255,3 @@ func normalizeEvent(t testing.TB, event beat.Event) interface{} { } return out } - -func marshalPacketbeatFields(t testing.TB, evt *beat.Event) *beat.Event { - pbf, err := pb.GetFields(evt.Fields) - if err != nil || pbf == nil { - t.Fatal("failed getting _packetbeat", err) - } - delete(evt.Fields, pb.FieldsKey) - - if err = pbf.ComputeValues(nil); err != nil { - t.Fatal(err) - } - - if err = pbf.MarshalMapStr(evt.Fields); err != nil { - t.Fatal(err) - } - return evt -} diff --git a/packetbeat/protos/dns/dns.go b/packetbeat/protos/dns/dns.go index 31670b6943d..1f650e17ed5 100644 --- a/packetbeat/protos/dns/dns.go +++ b/packetbeat/protos/dns/dns.go @@ -370,15 +370,11 @@ func (dns *dnsPlugin) publishTransaction(t *dnsTransaction) { pbf.SetDestination(&t.dst) pbf.Network.Transport = t.transport.String() pbf.Network.Protocol = "dns" + pbf.Error.Message = t.notes fields := evt.Fields fields["type"] = "dns" fields["status"] = common.ERROR_STATUS - if len(t.notes) == 1 { - fields.Put("error.message", t.notes[0]) - } else if len(t.notes) > 1 { - fields.Put("error.message", t.notes) - } dnsEvent := common.MapStr{} fields["dns"] = dnsEvent diff --git a/packetbeat/protos/dns/dns_test.go b/packetbeat/protos/dns/dns_test.go index ff083cfb4b6..f23f88e9186 100644 --- a/packetbeat/protos/dns/dns_test.go +++ b/packetbeat/protos/dns/dns_test.go @@ -34,8 +34,8 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/packetbeat/pb" "github.com/elastic/beats/packetbeat/protos" + "github.com/elastic/beats/packetbeat/publish" ) // Test Constants @@ -79,18 +79,7 @@ type eventStore struct { } func (e *eventStore) publish(event beat.Event) { - pbf, err := pb.GetFields(event.Fields) - if err != nil || pbf == nil { - panic("_packetbeat not found") - } - delete(event.Fields, pb.FieldsKey) - if err = pbf.ComputeValues(nil); err != nil { - panic(err) - } - if err = pbf.MarshalMapStr(event.Fields); err != nil { - panic(err) - } - + publish.MarshalPacketbeatFields(&event, nil) e.events = append(e.events, event) } diff --git a/packetbeat/protos/http/http.go b/packetbeat/protos/http/http.go index a2082811dfb..0a270642f2b 100644 --- a/packetbeat/protos/http/http.go +++ b/packetbeat/protos/http/http.go @@ -519,7 +519,6 @@ func (http *httpPlugin) newTransaction(requ, resp *message) beat.Event { fields["status"] = status var httpFields ProtocolFields - var notes []string if requ != nil { http.decodeBody(requ) path, params, err := http.extractParameters(requ) @@ -534,7 +533,7 @@ func (http *httpPlugin) newTransaction(requ, resp *message) beat.Event { } pbf.Event.Start = requ.ts pbf.Network.ForwardedIP = string(requ.realIP) - notes = append(notes, requ.notes...) + pbf.Error.Message = requ.notes // http httpFields.Version = requ.version.String() @@ -560,6 +559,7 @@ func (http *httpPlugin) newTransaction(requ, resp *message) beat.Event { if http.sendRequest { fields["request"] = string(http.makeRawMessage(requ)) } + fields["method"] = httpFields.RequestMethod fields["query"] = fmt.Sprintf("%s %s", requ.method, path) } @@ -568,7 +568,7 @@ func (http *httpPlugin) newTransaction(requ, resp *message) beat.Event { pbf.Destination.Bytes = int64(resp.size) pbf.Event.End = resp.ts - notes = append(notes, resp.notes...) + pbf.Error.Message = append(pbf.Error.Message, resp.notes...) // http httpFields.ResponseStatusCode = int64(resp.statusCode) @@ -587,12 +587,6 @@ func (http *httpPlugin) newTransaction(requ, resp *message) beat.Event { } } - if len(notes) == 1 { - fields.Put("error.message", notes[0]) - } else if len(notes) > 1 { - fields.Put("error.message", notes) - } - pb.MarshalStruct(evt.Fields, "http", httpFields) return evt } diff --git a/packetbeat/protos/http/http_test.go b/packetbeat/protos/http/http_test.go index 5e1bd41231b..a10456285b6 100644 --- a/packetbeat/protos/http/http_test.go +++ b/packetbeat/protos/http/http_test.go @@ -34,6 +34,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/packetbeat/protos" + "github.com/elastic/beats/packetbeat/publish" ) type testParser struct { @@ -49,6 +50,7 @@ type eventStore struct { } func (e *eventStore) publish(event beat.Event) { + publish.MarshalPacketbeatFields(&event, nil) e.events = append(e.events, event) } diff --git a/packetbeat/protos/icmp/icmp.go b/packetbeat/protos/icmp/icmp.go index 6d3c9bceef7..82b8481df92 100644 --- a/packetbeat/protos/icmp/icmp.go +++ b/packetbeat/protos/icmp/icmp.go @@ -288,6 +288,7 @@ func (icmp *icmpPlugin) publishTransaction(trans *icmpTransaction) { pbf.Source = &ecs.Source{IP: trans.tuple.srcIP.String()} pbf.Destination = &ecs.Destination{IP: trans.tuple.dstIP.String()} pbf.Event.Dataset = "icmp" + pbf.Error.Message = trans.notes // common fields - group "event" fields := evt.Fields @@ -341,11 +342,5 @@ func (icmp *icmpPlugin) publishTransaction(trans *icmpTransaction) { } } - if len(trans.notes) == 1 { - evt.PutValue("error.message", trans.notes[0]) - } else if len(trans.notes) > 1 { - evt.PutValue("error.message", trans.notes) - } - icmp.results(evt) } diff --git a/packetbeat/protos/mysql/mysql.go b/packetbeat/protos/mysql/mysql.go index d44088ce6c7..1e6fc5179de 100644 --- a/packetbeat/protos/mysql/mysql.go +++ b/packetbeat/protos/mysql/mysql.go @@ -1164,6 +1164,7 @@ func (mysql *mysqlPlugin) publishTransaction(t *mysqlTransaction) { pbf.Event.End = t.endTime pbf.Network.Transport = "tcp" pbf.Network.Protocol = "mysql" + pbf.Error.Message = t.notes fields := evt.Fields fields["type"] = pbf.Event.Dataset @@ -1190,12 +1191,6 @@ func (mysql *mysqlPlugin) publishTransaction(t *mysqlTransaction) { fields["response"] = t.responseRaw } - if len(t.notes) == 1 { - evt.PutValue("error.message", t.notes[0]) - } else if len(t.notes) > 1 { - evt.PutValue("error.message", t.notes) - } - mysql.results(evt) } diff --git a/packetbeat/protos/mysql/mysql_test.go b/packetbeat/protos/mysql/mysql_test.go index 195ec694632..20332a08cc1 100644 --- a/packetbeat/protos/mysql/mysql_test.go +++ b/packetbeat/protos/mysql/mysql_test.go @@ -33,6 +33,7 @@ import ( "github.com/elastic/beats/packetbeat/protos" "github.com/elastic/beats/packetbeat/protos/tcp" + "github.com/elastic/beats/packetbeat/publish" ) const serverPort = 3306 @@ -42,6 +43,7 @@ type eventStore struct { } func (e *eventStore) publish(event beat.Event) { + publish.MarshalPacketbeatFields(&event, nil) e.events = append(e.events, event) } diff --git a/packetbeat/protos/pgsql/pgsql.go b/packetbeat/protos/pgsql/pgsql.go index f009fe9712b..f48731fee72 100644 --- a/packetbeat/protos/pgsql/pgsql.go +++ b/packetbeat/protos/pgsql/pgsql.go @@ -467,6 +467,7 @@ func (pgsql *pgsqlPlugin) publishTransaction(t *pgsqlTransaction) { pbf.Event.Dataset = "pgsql" pbf.Network.Transport = "tcp" pbf.Network.Protocol = pbf.Event.Dataset + pbf.Error.Message = t.notes fields := evt.Fields fields["type"] = pbf.Event.Dataset @@ -486,12 +487,6 @@ func (pgsql *pgsqlPlugin) publishTransaction(t *pgsqlTransaction) { fields["response"] = t.responseRaw } - if len(t.notes) == 1 { - evt.PutValue("error.message", t.notes[0]) - } else if len(t.notes) > 1 { - evt.PutValue("error.message", t.notes) - } - pgsql.results(evt) } diff --git a/packetbeat/protos/pgsql/pgsql_test.go b/packetbeat/protos/pgsql/pgsql_test.go index 6359ab8d3d6..259f727adc4 100644 --- a/packetbeat/protos/pgsql/pgsql_test.go +++ b/packetbeat/protos/pgsql/pgsql_test.go @@ -31,8 +31,8 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/packetbeat/pb" "github.com/elastic/beats/packetbeat/protos" + "github.com/elastic/beats/packetbeat/publish" ) type eventStore struct { @@ -40,18 +40,7 @@ type eventStore struct { } func (e *eventStore) publish(event beat.Event) { - pbf, err := pb.GetFields(event.Fields) - if err != nil || pbf == nil { - panic("_packetbeat not found") - } - delete(event.Fields, pb.FieldsKey) - if err = pbf.ComputeValues(nil); err != nil { - panic(err) - } - if err = pbf.MarshalMapStr(event.Fields); err != nil { - panic(err) - } - + publish.MarshalPacketbeatFields(&event, nil) e.events = append(e.events, event) } diff --git a/packetbeat/protos/thrift/thrift.go b/packetbeat/protos/thrift/thrift.go index b5bff896bb6..8241f400108 100644 --- a/packetbeat/protos/thrift/thrift.go +++ b/packetbeat/protos/thrift/thrift.go @@ -1137,11 +1137,8 @@ func (thrift *thriftPlugin) publishTransactions() { t.reply.exceptions) } } - if len(t.reply.notes) == 1 { - evt.PutValue("error.message", t.reply.notes[0]) - } else if len(t.reply.notes) > 1 { - evt.PutValue("error.message", t.reply.notes) - } + + pbf.Error.Message = t.reply.notes } if thrift.results != nil { diff --git a/packetbeat/protos/tls/tls_test.go b/packetbeat/protos/tls/tls_test.go index 18e116dee53..1457bc87639 100644 --- a/packetbeat/protos/tls/tls_test.go +++ b/packetbeat/protos/tls/tls_test.go @@ -30,8 +30,8 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/packetbeat/pb" "github.com/elastic/beats/packetbeat/protos" + "github.com/elastic/beats/packetbeat/publish" ) type eventStore struct { @@ -39,7 +39,7 @@ type eventStore struct { } const ( - expectedClientHello = `{"client":{"ip":"192.168.0.1","port":6512},"destination":{"domain":"example.org","ip":"192.168.0.2","port":27017},"event":{"dataset":"tls"},"network":{"community_id":"1:jKfewJN/czjTuEpVvsKdYXXiMzs=","protocol":"tls","transport":"tcp","type":"ipv4"},"server":{"domain":"example.org","ip":"192.168.0.2","port":27017},"source":{"ip":"192.168.0.1","port":6512},"status":"Error","tls":{"client_certificate_requested":false,"client_hello":{"extensions":{"_unparsed_":["renegotiation_info","23","status_request","18","30032"],"application_layer_protocol_negotiation":["h2","http/1.1"],"ec_points_formats":["uncompressed"],"server_name_indication":["example.org"],"session_ticket":"","signature_algorithms":["ecdsa_secp256r1_sha256","rsa_pss_sha256","rsa_pkcs1_sha256","ecdsa_secp384r1_sha384","rsa_pss_sha384","rsa_pkcs1_sha384","rsa_pss_sha512","rsa_pkcs1_sha512","rsa_pkcs1_sha1"],"supported_groups":["x25519","secp256r1","secp384r1"]},"supported_ciphers":["TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256","TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256","TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384","TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384","TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256","TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256","TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA","TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA","TLS_RSA_WITH_AES_128_GCM_SHA256","TLS_RSA_WITH_AES_256_GCM_SHA384","TLS_RSA_WITH_AES_128_CBC_SHA","TLS_RSA_WITH_AES_256_CBC_SHA","TLS_RSA_WITH_3DES_EDE_CBC_SHA"],"supported_compression_methods":["NULL"],"version":"3.3"},"fingerprints":{"ja3":{"hash":"94c485bca29d5392be53f2b8cf7f4304","str":"771,49195-49199-49196-49200-52393-52392-49171-49172-156-157-47-53-10,65281-0-23-35-13-5-18-16-30032-11-10,29-23-24,0"}},"handshake_completed":false,"resumed":false},"type":"tls"}` + expectedClientHello = `{"client":{"ip":"192.168.0.1","port":6512},"destination":{"domain":"example.org","ip":"192.168.0.2","port":27017},"event":{"category":"network_traffic","dataset":"tls","kind":"event"},"network":{"community_id":"1:jKfewJN/czjTuEpVvsKdYXXiMzs=","protocol":"tls","transport":"tcp","type":"ipv4"},"server":{"domain":"example.org","ip":"192.168.0.2","port":27017},"source":{"ip":"192.168.0.1","port":6512},"status":"Error","tls":{"client_certificate_requested":false,"client_hello":{"extensions":{"_unparsed_":["renegotiation_info","23","status_request","18","30032"],"application_layer_protocol_negotiation":["h2","http/1.1"],"ec_points_formats":["uncompressed"],"server_name_indication":["example.org"],"session_ticket":"","signature_algorithms":["ecdsa_secp256r1_sha256","rsa_pss_sha256","rsa_pkcs1_sha256","ecdsa_secp384r1_sha384","rsa_pss_sha384","rsa_pkcs1_sha384","rsa_pss_sha512","rsa_pkcs1_sha512","rsa_pkcs1_sha1"],"supported_groups":["x25519","secp256r1","secp384r1"]},"supported_ciphers":["TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256","TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256","TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384","TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384","TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256","TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256","TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA","TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA","TLS_RSA_WITH_AES_128_GCM_SHA256","TLS_RSA_WITH_AES_256_GCM_SHA384","TLS_RSA_WITH_AES_128_CBC_SHA","TLS_RSA_WITH_AES_256_CBC_SHA","TLS_RSA_WITH_3DES_EDE_CBC_SHA"],"supported_compression_methods":["NULL"],"version":"3.3"},"fingerprints":{"ja3":{"hash":"94c485bca29d5392be53f2b8cf7f4304","str":"771,49195-49199-49196-49200-52393-52392-49171-49172-156-157-47-53-10,65281-0-23-35-13-5-18-16-30032-11-10,29-23-24,0"}},"handshake_completed":false,"resumed":false},"type":"tls"}` expectedServerHello = `{"extensions":{"_unparsed_":["renegotiation_info","status_request"],"application_layer_protocol_negotiation":["h2"],"ec_points_formats":["uncompressed","ansiX962_compressed_prime","ansiX962_compressed_char2"],"session_ticket":""},"selected_cipher":"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256","selected_compression_method":"NULL","version":"3.3"}` rawClientHello = "16030100c2010000be03033367dfae0d46ec0651e49cca2ae47317e8989df710" + "ee7570a88b9a7d5d56b3af00001c3a3ac02bc02fc02cc030cca9cca8c013c014" + @@ -56,18 +56,7 @@ const ( ) func (e *eventStore) publish(event beat.Event) { - pbf, err := pb.GetFields(event.Fields) - if err != nil || pbf == nil { - panic("_packetbeat not found") - } - delete(event.Fields, pb.FieldsKey) - if err = pbf.ComputeValues(nil); err != nil { - panic(err) - } - if err = pbf.MarshalMapStr(event.Fields); err != nil { - panic(err) - } - + publish.MarshalPacketbeatFields(&event, nil) e.events = append(e.events, event) } diff --git a/packetbeat/publish/publish.go b/packetbeat/publish/publish.go index 6b8a52fb3c6..0b34715e1b9 100644 --- a/packetbeat/publish/publish.go +++ b/packetbeat/publish/publish.go @@ -39,7 +39,6 @@ type TransactionPublisher struct { type transProcessor struct { ignoreOutgoing bool localIPs []net.IP // TODO: Periodically update this list. - localIPStrings []string // Deprecated. Use localIPs. name string } @@ -56,11 +55,9 @@ func NewTransactionPublisher( return nil, err } var localIPs []net.IP - var localIPStrings []string for _, addr := range addrs { if !addr.IsLoopback() { localIPs = append(localIPs, addr) - localIPStrings = append(localIPStrings, addr.String()) } } @@ -70,7 +67,6 @@ func NewTransactionPublisher( canDrop: canDrop, processor: transProcessor{ localIPs: localIPs, - localIPStrings: localIPStrings, name: name, ignoreOutgoing: ignoreOutgoing, }, @@ -146,13 +142,19 @@ func (p *transProcessor) Run(event *beat.Event) (*beat.Event, error) { return nil, nil } - if !p.normalizeTransAddr(event.Fields) { - return nil, nil + fields, err := MarshalPacketbeatFields(event, p.localIPs) + if err != nil { + return nil, err } - if err := marshalPacketbeatFields(event, p.localIPs); err != nil { - return nil, err + if fields != nil { + if p.ignoreOutgoing && fields.Network.Direction == "outbound" { + debugf("Ignore outbound transaction on: %s -> %s", + fields.Source.IP, fields.Destination.IP) + return nil, nil + } } + return event, nil } @@ -183,110 +185,22 @@ func validateEvent(event *beat.Event) error { return nil } -func (p *transProcessor) normalizeTransAddr(event common.MapStr) bool { - debugf("normalize address for: %v", event) - - var srcServer, dstServer string - var process common.MapStr - src, ok := event["src"].(*common.Endpoint) - debugf("has src: %v", ok) - if ok { - delete(event, "src") - - // Check if it's outgoing transaction (as client). - if p.IsPublisherIP(src.IP) { - if p.ignoreOutgoing { - // Duplicated transaction -> ignore it. - debugf("Ignore duplicated transaction on: %s -> %s", srcServer, dstServer) - return false - } - - event.Put("network.direction", "outgoing") - } - - var client common.MapStr - client, process = makeEndpoint(p.name, src) - event.DeepUpdate(common.MapStr{"client": client}) - } - - dst, ok := event["dst"].(*common.Endpoint) - debugf("has dst: %v", ok) - if ok { - delete(event, "dst") - - var server common.MapStr - server, process = makeEndpoint(p.name, dst) - event.DeepUpdate(common.MapStr{"server": server}) - - // Check if it's incoming transaction (as server). - if p.IsPublisherIP(dst.IP) { - event.Put("network.direction", "incoming") - } - } - - if len(process) > 0 { - event.Put("process", process) - } - - return true -} - -func (p *transProcessor) IsPublisherIP(ip string) bool { - for _, myip := range p.localIPStrings { - if myip == ip { - return true - } - } - return false -} - -// makeEndpoint builds a map containing the endpoint information. As a -// convenience it returns a reference to the process map that is contained in -// the endpoint map (for use in populating the top-level process field). -func makeEndpoint(shipperName string, endpoint *common.Endpoint) (m common.MapStr, process common.MapStr) { - // address - m = common.MapStr{ - "ip": endpoint.IP, - "port": endpoint.Port, - } - if endpoint.Domain != "" { - m["domain"] = endpoint.Domain - } else if shipperName != "" { - if isLocal, err := common.IsLoopback(endpoint.IP); err == nil && isLocal { - m["domain"] = shipperName - } - } - - // process - if endpoint.PID > 0 { - process := common.MapStr{ - "pid": endpoint.PID, - "ppid": endpoint.PPID, - "name": endpoint.Name, - "args": endpoint.Args, - "executable": endpoint.Exe, - "start": endpoint.StartTime, - } - if endpoint.CWD != "" { - process["working_directory"] = endpoint.CWD - } - m["process"] = process - } - - return m, process -} - -func marshalPacketbeatFields(event *beat.Event, localIPs []net.IP) error { +// MarshalPacketbeatFields marshals data contained in the _packetbeat field +// into the event and removes the _packetbeat key. +func MarshalPacketbeatFields(event *beat.Event, localIPs []net.IP) (*pb.Fields, error) { defer delete(event.Fields, pb.FieldsKey) fields, err := pb.GetFields(event.Fields) if err != nil || fields == nil { - return err + return nil, err } - if err := fields.ComputeValues(localIPs); err != nil { - return err + if err = fields.ComputeValues(localIPs); err != nil { + return nil, err } - return fields.MarshalMapStr(event.Fields) + if err = fields.MarshalMapStr(event.Fields); err != nil { + return nil, err + } + return fields, nil } diff --git a/packetbeat/publish/publish_test.go b/packetbeat/publish/publish_test.go index 9ed3e7874d5..895d8247bda 100644 --- a/packetbeat/publish/publish_test.go +++ b/packetbeat/publish/publish_test.go @@ -20,6 +20,7 @@ package publish import ( + "net" "testing" "time" @@ -27,6 +28,8 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/packetbeat/pb" + "github.com/elastic/ecs/code/go/ecs" ) func testEvent() beat.Event { @@ -88,125 +91,98 @@ func TestFilterEvent(t *testing.T) { } } -func TestDirectionOut(t *testing.T) { - processor := transProcessor{ - localIPStrings: []string{"192.145.2.4"}, - ignoreOutgoing: false, - name: "test", - } - - event := beat.Event{ - Timestamp: time.Now(), - Fields: common.MapStr{ - "type": "test", - "src": &common.Endpoint{ - IP: "192.145.2.4", - Port: 3267, - Domain: "server1", - Process: common.Process{ - Args: []string{"proc1", "start"}, - Name: "proc1", +func TestPublish(t *testing.T) { + var srcIP, dstIP = "192.145.2.4", "192.145.2.5" + + event := func() *beat.Event { + return &beat.Event{ + Timestamp: time.Now(), + Fields: common.MapStr{ + "type": "test", + "_packetbeat": &pb.Fields{ + Source: &ecs.Source{ + IP: srcIP, + Port: 3267, + }, + Destination: &ecs.Destination{ + IP: dstIP, + Port: 32232, + }, }, }, - "dst": &common.Endpoint{ - IP: "192.145.2.5", - Port: 32232, - Domain: "server2", - Process: common.Process{ - Args: []string{"proc2", "start"}, - Name: "proc2", - }, - }, - }, - } - - if res, _ := processor.Run(&event); res == nil { - t.Fatalf("event has been filtered out") - } - clientIP, _ := event.GetValue("client.ip") - assert.Equal(t, "192.145.2.4", clientIP) - dir, _ := event.GetValue("network.direction") - assert.Equal(t, "outgoing", dir) -} - -func TestDirectionIn(t *testing.T) { - processor := transProcessor{ - localIPStrings: []string{"192.145.2.5"}, - ignoreOutgoing: false, - name: "test", + } } - event := beat.Event{ - Timestamp: time.Now(), - Fields: common.MapStr{ - "type": "test", - "src": &common.Endpoint{ - IP: "192.145.2.4", - Port: 3267, - Domain: "server1", - Process: common.Process{ - Args: []string{"proc1", "start"}, - Name: "proc1", - }, - }, - "dst": &common.Endpoint{ - IP: "192.145.2.5", - Port: 32232, - Domain: "server2", - Process: common.Process{ - Args: []string{"proc2", "start"}, - Name: "proc2", - }, - }, - }, - } - - if res, _ := processor.Run(&event); res == nil { - t.Fatalf("event has been filtered out") - } - clientIP, _ := event.GetValue("client.ip") - assert.Equal(t, "192.145.2.4", clientIP) - dir, _ := event.GetValue("network.direction") - assert.Equal(t, "incoming", dir) -} - -func TestNoDirection(t *testing.T) { - processor := transProcessor{ - localIPStrings: []string{"192.145.2.6"}, - ignoreOutgoing: false, - name: "test", - } - - event := beat.Event{ - Timestamp: time.Now(), - Fields: common.MapStr{ - "type": "test", - "src": &common.Endpoint{ - IP: "192.145.2.4", - Port: 3267, - Domain: "server1", - Process: common.Process{ - Args: []string{"proc1", "start"}, - Name: "proc1", - }, - }, - "dst": &common.Endpoint{ - IP: "192.145.2.5", - Port: 32232, - Domain: "server2", - Process: common.Process{ - Args: []string{"proc2", "start"}, - Name: "proc2", - }, - }, - }, - } - - if res, _ := processor.Run(&event); res == nil { - t.Fatalf("event has been filtered out") - } - clientIP, _ := event.GetValue("client.ip") - assert.Equal(t, "192.145.2.4", clientIP) - dir, _ := event.GetValue("network.direction") - assert.Nil(t, dir) + t.Run("direction/inbound", func(t *testing.T) { + processor := transProcessor{ + localIPs: []net.IP{net.ParseIP(dstIP)}, + name: "test", + } + + res, _ := processor.Run(event()) + if res == nil { + t.Fatalf("event has been filtered out") + } + + dir, _ := res.GetValue("network.direction") + assert.Equal(t, "inbound", dir) + }) + + t.Run("direction/outbound", func(t *testing.T) { + processor := transProcessor{ + localIPs: []net.IP{net.ParseIP(srcIP)}, + name: "test", + } + + res, _ := processor.Run(event()) + if res == nil { + t.Fatalf("event has been filtered out") + } + + dir, _ := res.GetValue("network.direction") + assert.Equal(t, "outbound", dir) + }) + + t.Run("direction/internal", func(t *testing.T) { + processor := transProcessor{ + localIPs: []net.IP{net.ParseIP(srcIP), net.ParseIP(dstIP)}, + name: "test", + } + + res, _ := processor.Run(event()) + if res == nil { + t.Fatalf("event has been filtered out") + } + + dir, _ := res.GetValue("network.direction") + assert.Equal(t, "internal", dir) + }) + + t.Run("direction/none", func(t *testing.T) { + processor := transProcessor{ + localIPs: []net.IP{net.ParseIP(dstIP + "1")}, + name: "test", + } + + res, _ := processor.Run(event()) + if res == nil { + t.Fatalf("event has been filtered out") + } + + dir, _ := res.GetValue("network.direction") + assert.Nil(t, dir) + }) + + t.Run("ignore_outgoing", func(t *testing.T) { + processor := transProcessor{ + localIPs: []net.IP{net.ParseIP(srcIP)}, + ignoreOutgoing: true, + name: "test", + } + + res, err := processor.Run(event()) + if assert.NoError(t, err) { + assert.Nil(t, res) + } + }) } diff --git a/packetbeat/tests/system/packetbeat.py b/packetbeat/tests/system/packetbeat.py index a20d9c92561..e6ec4b2559e 100644 --- a/packetbeat/tests/system/packetbeat.py +++ b/packetbeat/tests/system/packetbeat.py @@ -10,7 +10,7 @@ TRANS_REQUIRED_FIELDS = ["@timestamp", "type", "status", "agent.type", "agent.hostname", "agent.version", - "event.dataset", "event.start", + "event.kind", "event.category", "event.dataset", "event.start", "source.ip", "destination.ip", "client.ip", "server.ip", "network.type", "network.transport", "network.community_id", @@ -18,7 +18,7 @@ FLOWS_REQUIRED_FIELDS = ["@timestamp", "type", "agent.type", "agent.hostname", "agent.version", - "event.dataset", "event.start", "event.end", "event.duration", + "event.kind", "event.category", "event.dataset", "event.action", "event.start", "event.end", "event.duration", "source.ip", "destination.ip", "flow.id", "network.type", "network.transport", "network.community_id",