Skip to content

Commit

Permalink
Update Redis protocol to use ECS fields
Browse files Browse the repository at this point in the history
Here's a summary of what fields changed.

Part of #7968

Changed

- bytes_in -> source.bytes
- bytes_out -> destination.bytes
- responsetime -> event.duration (unit are now nanoseconds)
- redis.error -> error.message (alias added)

Added

- source
- destination
- event.dataset = redis
- event.end
- event.start
- network.community_id
- network.transport = tcp
- network.protocol = redis
- network.bytes
- network.type

Unchanged Packetbeat Fields

- method
- resource
- path
- query
- status
- type = redis (we might remove this since we have event.dataset)
  • Loading branch information
andrewkroh committed Jan 16, 2019
1 parent 1acbfdb commit c3c54d8
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Changed TLS protocol fields to align with ECS. {pull}9980[9980]
- Changed ICMP protocol fields to align with ECS. {pull}10062[10062]
- Changed AMQP protocol fields to align with ECS. {pull}10090[10090]
- Changed Redis protocol fields to align with ECS. {pull}10126[10126]
- Changed HTTP protocol fields to align with ECS. {pull}9976[9976]

*Winlogbeat*
Expand Down
4 changes: 4 additions & 0 deletions dev-tools/ecs-migration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,10 @@
to: network.forwarded_ip
alias: false

## Redis
- from: redis.error
to: error.message
alias: true

# Heartbeat

Expand Down
4 changes: 4 additions & 0 deletions packetbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -6517,6 +6517,10 @@ The return value of the Redis command in a human readable format.
*`redis.error`*::
+
--
type: alias
alias to: error.message
If the Redis command has resulted in an error, this field contains the error message returned by the Redis server.
Expand Down
3 changes: 3 additions & 0 deletions packetbeat/protos/redis/_meta/fields.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
The return value of the Redis command in a human readable format.
- name: error
type: alias
path: error.message
migration: true
description: >
If the Redis command has resulted in an error, this field contains the
error message returned by the Redis server.
Expand Down
2 changes: 1 addition & 1 deletion packetbeat/protos/redis/fields.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

61 changes: 26 additions & 35 deletions packetbeat/protos/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"

"github.com/elastic/beats/packetbeat/pb"
"github.com/elastic/beats/packetbeat/procs"
"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/protos/applayer"
Expand Down Expand Up @@ -275,55 +276,45 @@ func (redis *redisPlugin) correlate(conn *redisConnectionData) {
}

func (redis *redisPlugin) newTransaction(requ, resp *redisMessage) beat.Event {
error := common.OK_STATUS
if resp.isError {
error = common.ERROR_STATUS
}

var returnValue map[string]common.NetString
if resp.isError {
returnValue = map[string]common.NetString{
"error": resp.message,
}
} else {
returnValue = map[string]common.NetString{
"return_value": resp.message,
}
}

source, destination := common.MakeEndpointPair(requ.tcpTuple.BaseTuple, requ.cmdlineTuple)
src, dst := &source, &destination
if requ.direction == tcp.TCPDirectionReverse {
src, dst = dst, src
}

// resp_time in milliseconds
responseTime := int32(resp.ts.Sub(requ.ts).Nanoseconds() / 1e6)

fields := common.MapStr{
"type": "redis",
"status": error,
"responsetime": responseTime,
"redis": returnValue,
"method": common.NetString(bytes.ToUpper(requ.method)),
"resource": requ.path,
"query": requ.message,
"bytes_in": uint64(requ.size),
"bytes_out": uint64(resp.size),
"src": src,
"dst": dst,
evt, pbf := pb.NewBeatEvent(requ.ts)
pbf.SetSource(src)
pbf.SetDestination(dst)
pbf.Source.Bytes = int64(requ.size)
pbf.Destination.Bytes = int64(resp.size)
pbf.Event.Dataset = "redis"
pbf.Event.Start = requ.ts
pbf.Event.End = resp.ts
pbf.Network.Transport = "tcp"
pbf.Network.Protocol = pbf.Event.Dataset

fields := evt.Fields
fields["type"] = pbf.Event.Dataset
fields["method"] = common.NetString(bytes.ToUpper(requ.method))
fields["resource"] = requ.path
fields["query"] = requ.message

if resp.isError {
evt.PutValue("status", common.ERROR_STATUS)
evt.PutValue("error.message", resp.message)
} else {
evt.PutValue("status", common.OK_STATUS)
evt.PutValue("redis.return_value", resp.message)
}

if redis.sendRequest {
fields["request"] = requ.message
}
if redis.sendResponse {
fields["response"] = resp.message
}

return beat.Event{
Timestamp: requ.ts,
Fields: fields,
}
return evt
}

func (redis *redisPlugin) GapInStream(tcptuple *common.TCPTuple, dir uint8,
Expand Down
15 changes: 8 additions & 7 deletions packetbeat/tests/system/test_0013_redis_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def test_redis_session(self):

objs = self.read_output()
assert all([o["type"] == "redis" for o in objs])
assert all([o["event.dataset"] == "redis" for o in objs])

assert objs[0]["method"] == "SET"
assert objs[0]["resource"] == "key3"
Expand All @@ -35,7 +36,7 @@ def test_redis_session(self):

assert objs[2]["status"] == "Error"
assert objs[2]["method"] == "LLEN"
assert objs[2]["redis.error"] == "ERR Operation against a key " + \
assert objs[2]["error.message"] == "ERR Operation against a key " + \
"holding the wrong kind of value"

# the rest should be successful
Expand All @@ -45,8 +46,8 @@ def test_redis_session(self):
assert all([isinstance(o["resource"], six.string_types) for o in objs[3:]])
assert all([isinstance(o["query"], six.string_types) for o in objs[3:]])

assert all(["bytes_in" in o for o in objs])
assert all(["bytes_out" in o for o in objs])
assert all(["source.bytes" in o for o in objs])
assert all(["destination.bytes" in o for o in objs])

def test_byteout_bytein(self):
"""
Expand All @@ -60,7 +61,7 @@ def test_byteout_bytein(self):
objs = self.read_output()
assert all([o["type"] == "redis" for o in objs])

assert all([isinstance(o["bytes_out"], int) for o in objs])
assert all([isinstance(o["bytes_in"], int) for o in objs])
assert all([o["bytes_out"] > 0 for o in objs])
assert all([o["bytes_in"] > 0 for o in objs])
assert all([isinstance(o["source.bytes"], int) for o in objs])
assert all([isinstance(o["destination.bytes"], int) for o in objs])
assert all([o["source.bytes"] > 0 for o in objs])
assert all([o["destination.bytes"] > 0 for o in objs])

0 comments on commit c3c54d8

Please sign in to comment.