-
Notifications
You must be signed in to change notification settings - Fork 13
/
riemann.rb
180 lines (158 loc) · 5.57 KB
/
riemann.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# encoding: utf-8
require "logstash/outputs/base"
require "logstash/namespace"
# Riemann is a network event stream processing system.
#
# While Riemann is very similar conceptually to Logstash, it has
# much more in terms of being a monitoring system replacement.
#
# Riemann is used in Logstash much like statsd or other metric-related
# outputs
#
# You can learn about Riemann here:
#
# * http://riemann.io/
# You can see the author talk about it here:
# * http://vimeo.com/38377415
#
class LogStash::Outputs::Riemann < LogStash::Outputs::Base
config_name "riemann"
# The address of the Riemann server.
config :host, :validate => :string, :default => "localhost"
# The port to connect to on your Riemann server.
config :port, :validate => :number, :default => 5555
# The protocol to use
# UDP is non-blocking
# TCP is blocking
#
# Logstash's default output behaviour
# is to never lose events
# As such, we use tcp as default here
config :protocol, :validate => ["tcp", "udp"], :default => "tcp"
# The name of the sender.
# This sets the `host` value
# in the Riemann event
config :sender, :validate => :string, :default => "%{host}"
# A Hash to set Riemann event fields
# (http://riemann.io/concepts.html).
#
# The following event fields are supported:
# `description`, `state`, `metric`, `ttl`, `service`
#
# Tags found on the Logstash event will automatically be added to the
# Riemann event.
#
# Any other field set here will be passed to Riemann as an event attribute.
#
# Example:
# [source,ruby]
# riemann {
# riemann_event => {
# "metric" => "%{metric}"
# "service" => "%{service}"
# }
# }
#
# `metric` and `ttl` values will be coerced to a floating point value.
# Values which cannot be coerced will zero (0.0).
#
# `description`, by default, will be set to the event message
# but can be overridden here.
config :riemann_event, :validate => :hash
# If set to true automatically map all logstash defined fields to riemann event fields.
# All nested logstash fields will be mapped to riemann fields containing all parent keys
# separated by dots and the deepest value.
#
# As an example, the logstash event:
# [source,ruby]
# {
# "@timestamp":"2013-12-10T14:36:26.151+0000",
# "@version": 1,
# "message":"log message",
# "host": "host.domain.com",
# "nested_field": {
# "key": "value"
# }
# }
# Is mapped to this riemann event:
# [source,ruby]
# {
# :time 1386686186,
# :host host.domain.com,
# :message log message,
# :nested_field.key value
# }
#
# It can be used in conjunction with or independent of the riemann_event option.
# When used with the riemann_event any duplicate keys receive their value from
# riemann_event instead of the logstash event itself.
config :map_fields, :validate => :boolean, :default => false
#
# Enable debugging output?
config :debug, :validate => :boolean, :default => false
public
def register
require 'riemann/client'
@client = Riemann::Client.new(:host => @host, :port => @port)
end # def register
public
def map_fields(parent, fields)
this_level = Hash.new
fields.each do |key, contents|
next if key.start_with?("@")
field = parent.nil? ? key : "#{parent}.#{key}"
if contents.is_a?(Hash)
this_level.merge! map_fields(field, contents)
else
this_level[field.to_sym] = contents
end
end
return this_level
end
public
def receive(event)
r_event = build_riemann_formatted_event(event)
@logger.debug("Riemann event: ", :riemann_event => r_event)
send_to_riemann(r_event)
end # def receive
def build_riemann_formatted_event(event)
# Let's build us an event, shall we?
r_event = Hash.new
# Always copy "message" to Riemann's "description" field.
r_event[:description] = event.get("message")
# Directly map all other fields, if requested. Note that the "message" field
# will also be mapped this way, so if it's present, it will become a
# redundant copy of "description".
if @map_fields == true
r_event.merge! map_fields(nil, event.to_hash)
end
# Fields specified in the "riemann_event" configuration option take
# precedence over mapped fields.
if @riemann_event
@riemann_event.each do |key, val|
r_event[key.to_sym] = event.sprintf(val)
end
end
# Riemann event attributes are always strings, with a few critical
# exceptions. "ttl" and "metric" should be sent as float values.
r_event[:ttl] = r_event[:ttl].to_f if r_event[:ttl]
r_event[:metric] = r_event[:metric].to_f if r_event[:metric]
# Similarly, event _time_ in Riemann was historically an integer value.
# While current Riemann versions support sub-second time resolution in the
# form of a float, we currently ensure that we send an integer value, as
# expected by Riemann versions earlier than 0.2.13.
r_event[:time] = event.timestamp.to_i
r_event[:tags] = event.get("tags") if event.get("tags").is_a?(Array)
r_event[:host] = event.sprintf(@sender)
return r_event
end
def send_to_riemann(riemann_formatted_event)
begin
proto_client = @client.instance_variable_get("@#{@protocol}")
@logger.debug("Riemann client proto: #{proto_client.to_s}")
proto_client << riemann_formatted_event
rescue Exception => e
@logger.error("Unhandled exception", :error => e)
end
end # def send_to_riemann
end # class LogStash::Outputs::Riemann