-
Notifications
You must be signed in to change notification settings - Fork 76
/
kafka.rb
381 lines (342 loc) · 17.8 KB
/
kafka.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
require 'logstash/namespace'
require 'logstash/outputs/base'
require 'java'
require 'logstash-output-kafka_jars.rb'
java_import org.apache.kafka.clients.producer.ProducerRecord
# Write events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on
# the broker.
#
# Here's a compatibility matrix that shows the Kafka client versions that are compatible with each combination
# of Logstash and the Kafka output plugin:
#
# [options="header"]
# |==========================================================
# |Kafka Client Version |Logstash Version |Plugin Version |Why?
# |0.8 |2.0.0 - 2.x.x |<3.0.0 |Legacy, 0.8 is still popular
# |0.9 |2.0.0 - 2.3.x | 3.x.x |Works with the old Ruby Event API (`event['product']['price'] = 10`)
# |0.9 |2.4.x - 5.x.x | 4.x.x |Works with the new getter/setter APIs (`event.set('[product][price]', 10)`)
# |0.10.0.x |2.4.x - 5.x.x | 5.x.x |Not compatible with the <= 0.9 broker
# |0.10.1.x |2.4.x - 5.x.x | 6.x.x |
# |==========================================================
#
# NOTE: We recommended that you use matching Kafka client and broker versions. During upgrades, you should
# upgrade brokers before clients because brokers target backwards compatibility. For example, the 0.9 broker
# is compatible with both the 0.8 consumer and 0.9 consumer APIs, but not the other way around.
#
# This output supports connecting to Kafka over:
#
# * SSL (requires plugin version 3.0.0 or later)
# * Kerberos SASL (requires plugin version 5.1.0 or later)
#
# By default security is disabled but can be turned on as needed.
#
# The only required configuration is the topic_id. The default codec is plain,
# so events will be persisted on the broker in plain format. Logstash will encode your messages with not
# only the message but also with a timestamp and hostname. If you do not want anything but your message
# passing through, you should make the output configuration something like:
# [source,ruby]
# output {
# kafka {
# codec => plain {
# format => "%{message}"
# }
# topic_id => "mytopic"
# }
# }
# For more information see http://kafka.apache.org/documentation.html#theproducer
#
# Kafka producer configuration: http://kafka.apache.org/documentation.html#newproducerconfigs
class LogStash::Outputs::Kafka < LogStash::Outputs::Base
declare_threadsafe!
config_name 'kafka'
default :codec, 'plain'
# The number of acknowledgments the producer requires the leader to have received
# before considering a request complete.
#
# acks=0, the producer will not wait for any acknowledgment from the server at all.
# acks=1, This will mean the leader will write the record to its local log but
# will respond without awaiting full acknowledgement from all followers.
# acks=all, This means the leader will wait for the full set of in-sync replicas to acknowledge the record.
config :acks, :validate => ["0", "1", "all"], :default => "1"
# The producer will attempt to batch records together into fewer requests whenever multiple
# records are being sent to the same partition. This helps performance on both the client
# and the server. This configuration controls the default batch size in bytes.
config :batch_size, :validate => :number, :default => 16384
# This is for bootstrapping and the producer will only use it for getting metadata (topics,
# partitions and replicas). The socket connections for sending the actual data will be
# established based on the broker information returned in the metadata. The format is
# `host1:port1,host2:port2`, and the list can be a subset of brokers or a VIP pointing to a
# subset of brokers.
config :bootstrap_servers, :validate => :string, :default => 'localhost:9092'
# The total bytes of memory the producer can use to buffer records waiting to be sent to the server.
config :buffer_memory, :validate => :number, :default => 33554432
# The compression type for all data generated by the producer.
# The default is none (i.e. no compression). Valid values are none, gzip, or snappy.
config :compression_type, :validate => ["none", "gzip", "snappy", "lz4"], :default => "none"
# The id string to pass to the server when making requests.
# The purpose of this is to be able to track the source of requests beyond just
# ip/port by allowing a logical application name to be included with the request
config :client_id, :validate => :string
# Serializer class for the key of the message
config :key_serializer, :validate => :string, :default => 'org.apache.kafka.common.serialization.StringSerializer'
# The producer groups together any records that arrive in between request
# transmissions into a single batched request. Normally this occurs only under
# load when records arrive faster than they can be sent out. However in some circumstances
# the client may want to reduce the number of requests even under moderate load.
# This setting accomplishes this by adding a small amount of artificial delay—that is,
# rather than immediately sending out a record the producer will wait for up to the given delay
# to allow other records to be sent so that the sends can be batched together.
config :linger_ms, :validate => :number, :default => 0
# The maximum size of a request
config :max_request_size, :validate => :number, :default => 1048576
# The key for the message
config :message_key, :validate => :string
# the timeout setting for initial metadata request to fetch topic metadata.
config :metadata_fetch_timeout_ms, :validate => :number, :default => 60000
# the max time in milliseconds before a metadata refresh is forced.
config :metadata_max_age_ms, :validate => :number, :default => 300000
# The size of the TCP receive buffer to use when reading data
config :receive_buffer_bytes, :validate => :number, :default => 32768
# The amount of time to wait before attempting to reconnect to a given host when a connection fails.
config :reconnect_backoff_ms, :validate => :number, :default => 10
# The configuration controls the maximum amount of time the client will wait
# for the response of a request. If the response is not received before the timeout
# elapses the client will resend the request if necessary or fail the request if
# retries are exhausted.
config :request_timeout_ms, :validate => :number
# The default retry behavior is to retry until successful. To prevent data loss,
# the use of this setting is discouraged.
#
# If you choose to set `retries`, a value greater than zero will cause the
# client to only retry a fixed number of times. This will result in data loss
# if a transient error outlasts your retry count.
#
# A value less than zero is a configuration error.
config :retries, :validate => :number
# The amount of time to wait before attempting to retry a failed produce request to a given topic partition.
config :retry_backoff_ms, :validate => :number, :default => 100
# The size of the TCP send buffer to use when sending data.
config :send_buffer_bytes, :validate => :number, :default => 131072
# The truststore type.
config :ssl_truststore_type, :validate => :string
# The JKS truststore path to validate the Kafka broker's certificate.
config :ssl_truststore_location, :validate => :path
# The truststore password
config :ssl_truststore_password, :validate => :password
# The keystore type.
config :ssl_keystore_type, :validate => :string
# If client authentication is required, this setting stores the keystore path.
config :ssl_keystore_location, :validate => :path
# If client authentication is required, this setting stores the keystore password
config :ssl_keystore_password, :validate => :password
# The password of the private key in the key store file.
config :ssl_key_password, :validate => :password
# Algorithm to use when verifying host. Set to "" to disable
config :ssl_endpoint_identification_algorithm, :validate => :string, :default => 'https'
# Security protocol to use, which can be either of PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL
config :security_protocol, :validate => ["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"], :default => "PLAINTEXT"
# http://kafka.apache.org/documentation.html#security_sasl[SASL mechanism] used for client connections.
# This may be any mechanism for which a security provider is available.
# GSSAPI is the default mechanism.
config :sasl_mechanism, :validate => :string, :default => "GSSAPI"
# The Kerberos principal name that Kafka broker runs as.
# This can be defined either in Kafka's JAAS config or in Kafka's config.
config :sasl_kerberos_service_name, :validate => :string
# The Java Authentication and Authorization Service (JAAS) API supplies user authentication and authorization
# services for Kafka. This setting provides the path to the JAAS file. Sample JAAS file for Kafka client:
# [source,java]
# ----------------------------------
# KafkaClient {
# com.sun.security.auth.module.Krb5LoginModule required
# useTicketCache=true
# renewTicket=true
# serviceName="kafka";
# };
# ----------------------------------
#
# Please note that specifying `jaas_path` and `kerberos_config` in the config file will add these
# to the global JVM system properties. This means if you have multiple Kafka inputs, all of them would be sharing the same
# `jaas_path` and `kerberos_config`. If this is not desirable, you would have to run separate instances of Logstash on
# different JVM instances.
config :jaas_path, :validate => :path
# Optional path to kerberos config file. This is krb5.conf style as detailed in https://web.mit.edu/kerberos/krb5-1.12/doc/admin/conf_files/krb5_conf.html
config :kerberos_config, :validate => :path
# The topic to produce messages to
config :topic_id, :validate => :string, :required => true
# Serializer class for the value of the message
config :value_serializer, :validate => :string, :default => 'org.apache.kafka.common.serialization.StringSerializer'
public
def register
@thread_batch_map = Concurrent::Hash.new
if !@retries.nil?
if @retries < 0
raise ConfigurationError, "A negative retry count (#{@retries}) is not valid. Must be a value >= 0"
end
@logger.warn("Kafka output is configured with finite retry. This instructs Logstash to LOSE DATA after a set number of send attempts fails. If you do not want to lose data if Kafka is down, then you must remove the retry setting.", :retries => @retries)
end
@producer = create_producer
if value_serializer == 'org.apache.kafka.common.serialization.StringSerializer'
@codec.on_event do |event, data|
write_to_kafka(event, data)
end
elsif value_serializer == 'org.apache.kafka.common.serialization.ByteArraySerializer'
@codec.on_event do |event, data|
write_to_kafka(event, data.to_java_bytes)
end
else
raise ConfigurationError, "'value_serializer' only supports org.apache.kafka.common.serialization.ByteArraySerializer and org.apache.kafka.common.serialization.StringSerializer"
end
end
# def register
def prepare(record)
# This output is threadsafe, so we need to keep a batch per thread.
@thread_batch_map[Thread.current].add(record)
end
def multi_receive(events)
t = Thread.current
if !@thread_batch_map.include?(t)
@thread_batch_map[t] = java.util.ArrayList.new(events.size)
end
events.each do |event|
break if event == LogStash::SHUTDOWN
@codec.encode(event)
end
batch = @thread_batch_map[t]
if batch.any?
retrying_send(batch)
batch.clear
end
end
def retrying_send(batch)
remaining = @retries
while batch.any?
if !remaining.nil?
if remaining < 0
# TODO(sissel): Offer to DLQ? Then again, if it's a transient fault,
# DLQing would make things worse (you dlq data that would be successful
# after the fault is repaired)
logger.info("Exhausted user-configured retry count when sending to Kafka. Dropping these events.",
:max_retries => @retries, :drop_count => batch.count)
break
end
remaining -= 1
end
failures = []
futures = batch.collect do |record|
begin
# send() can throw an exception even before the future is created.
@producer.send(record)
rescue org.apache.kafka.common.errors.TimeoutException => e
failures << record
nil
rescue org.apache.kafka.common.errors.InterruptException => e
failures << record
nil
rescue org.apache.kafka.common.errors.SerializationException => e
# TODO(sissel): Retrying will fail because the data itself has a problem serializing.
# TODO(sissel): Let's add DLQ here.
failures << record
nil
end
end.compact
futures.each_with_index do |future, i|
begin
result = future.get()
rescue => e
# TODO(sissel): Add metric to count failures, possibly by exception type.
logger.warn("KafkaProducer.send() failed: #{e}", :exception => e)
failures << batch[i]
end
end
# No failures? Cool. Let's move on.
break if failures.empty?
# Otherwise, retry with any failed transmissions
if remaining.nil? || remaining >= 0
delay = @retry_backoff_ms / 1000.0
logger.info("Sending batch to Kafka failed. Will retry after a delay.", :batch_size => batch.size,
:failures => failures.size,
:sleep => delay)
batch = failures
sleep(delay)
end
end
end
def close
@producer.close
end
private
def write_to_kafka(event, serialized_data)
if @message_key.nil?
record = ProducerRecord.new(event.sprintf(@topic_id), serialized_data)
else
record = ProducerRecord.new(event.sprintf(@topic_id), event.sprintf(@message_key), serialized_data)
end
prepare(record)
rescue LogStash::ShutdownSignal
@logger.debug('Kafka producer got shutdown signal')
rescue => e
@logger.warn('kafka producer threw exception, restarting',
:exception => e)
end
def create_producer
begin
props = java.util.Properties.new
kafka = org.apache.kafka.clients.producer.ProducerConfig
props.put(kafka::ACKS_CONFIG, acks)
props.put(kafka::BATCH_SIZE_CONFIG, batch_size.to_s)
props.put(kafka::BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers)
props.put(kafka::BUFFER_MEMORY_CONFIG, buffer_memory.to_s)
props.put(kafka::COMPRESSION_TYPE_CONFIG, compression_type)
props.put(kafka::CLIENT_ID_CONFIG, client_id) unless client_id.nil?
props.put(kafka::KEY_SERIALIZER_CLASS_CONFIG, key_serializer)
props.put(kafka::LINGER_MS_CONFIG, linger_ms.to_s)
props.put(kafka::MAX_REQUEST_SIZE_CONFIG, max_request_size.to_s)
props.put(kafka::METADATA_MAX_AGE_CONFIG, metadata_max_age_ms) unless metadata_max_age_ms.nil?
props.put(kafka::RECEIVE_BUFFER_CONFIG, receive_buffer_bytes.to_s) unless receive_buffer_bytes.nil?
props.put(kafka::RECONNECT_BACKOFF_MS_CONFIG, reconnect_backoff_ms) unless reconnect_backoff_ms.nil?
props.put(kafka::REQUEST_TIMEOUT_MS_CONFIG, request_timeout_ms) unless request_timeout_ms.nil?
props.put(kafka::RETRIES_CONFIG, retries.to_s) unless retries.nil?
props.put(kafka::RETRY_BACKOFF_MS_CONFIG, retry_backoff_ms.to_s)
props.put(kafka::SEND_BUFFER_CONFIG, send_buffer_bytes.to_s)
props.put(kafka::VALUE_SERIALIZER_CLASS_CONFIG, value_serializer)
props.put("security.protocol", security_protocol) unless security_protocol.nil?
if security_protocol == "SSL"
set_trustore_keystore_config(props)
elsif security_protocol == "SASL_PLAINTEXT"
set_sasl_config(props)
elsif security_protocol == "SASL_SSL"
set_trustore_keystore_config(props)
set_sasl_config(props)
end
org.apache.kafka.clients.producer.KafkaProducer.new(props)
rescue => e
logger.error("Unable to create Kafka producer from given configuration",
:kafka_error_message => e,
:cause => e.respond_to?(:getCause) ? e.getCause() : nil)
raise e
end
end
def set_trustore_keystore_config(props)
if ssl_truststore_location.nil?
raise LogStash::ConfigurationError, "ssl_truststore_location must be set when SSL is enabled"
end
props.put("ssl.truststore.type", ssl_truststore_type) unless ssl_truststore_type.nil?
props.put("ssl.truststore.location", ssl_truststore_location)
props.put("ssl.truststore.password", ssl_truststore_password.value) unless ssl_truststore_password.nil?
# Client auth stuff
props.put("ssl.keystore.type", ssl_keystore_type) unless ssl_keystore_type.nil?
props.put("ssl.key.password", ssl_key_password.value) unless ssl_key_password.nil?
props.put("ssl.keystore.location", ssl_keystore_location) unless ssl_keystore_location.nil?
props.put("ssl.keystore.password", ssl_keystore_password.value) unless ssl_keystore_password.nil?
props.put("ssl.endpoint.identification.algorithm", ssl_endpoint_identification_algorithm) unless ssl_endpoint_identification_algorithm.nil?
end
def set_sasl_config(props)
java.lang.System.setProperty("java.security.auth.login.config",jaas_path) unless jaas_path.nil?
java.lang.System.setProperty("java.security.krb5.conf",kerberos_config) unless kerberos_config.nil?
props.put("sasl.mechanism",sasl_mechanism)
if sasl_mechanism == "GSSAPI" && sasl_kerberos_service_name.nil?
raise LogStash::ConfigurationError, "sasl_kerberos_service_name must be specified when SASL mechanism is GSSAPI"
end
props.put("sasl.kerberos.service.name",sasl_kerberos_service_name) unless sasl_kerberos_service_name.nil?
end
end #class LogStash::Outputs::Kafka