This plugin is used to serialize Logstash events as Avro datums, as well as deserializing Avro datums into Logstash events.
Decode/encode Avro records as Logstash events using the associated Avro schema from a Confluent schema registry. (https://github.com/confluentinc/schema-registry)
logstash-plugin install logstash-codec-avro_schema_registry
When this codec is used to decode the input, you may pass the following options:
-
endpoint
- always required. -
username
- optional. -
password
- optional. -
tag_on_failure
- tag events with_avroparsefailure
when decode fails -
decorate_events
- will add Avro schema metadata to the event."@metadata" => { "avro" => { "name" => "schema", "namespace" => "namespace", "type" => "record", "schema_id" => 1799 } }
If the input stream is binary encoded, you should use the ByteArrayDeserializer
in the Kafka input config.
This codec uses the Confluent schema registry to register a schema and encode the data in Avro using schema_id lookups.
When this codec is used to encode, you may pass the following options:
endpoint
- always required.username
- optional.password
- optional.schema_id
- when provided, no other options are required.subject_name
- required when there is noschema_id
.schema_version
- when provided, the schema will be looked up in the registry.schema_uri
- when provided, JSON schema is loaded from URL or file.schema_string
- required when there is noschema_id
,schema_version
orschema_uri
check_compatibility
- will check schema compatibility before encoding.register_schema
- will register the JSON schema if it does not exist.binary_encoded
- will output the encoded event as a ByteArray. Requires theByteArraySerializer
to be set in the Kafka output config.base64_encoded
- will output the encoded event as a base64 string.client_certificate
- Client TLS certificate for mutual TLSclient_key
- Client TLS key for mutual TLSca_certificate
- CA Certificateverify_mode
- SSL Verify modes. Valid options areverify_none
,verify_peer
,verify_client_once
, andverify_fail_if_no_peer_cert
. Default isverify_peer
input {
kafka {
...
codec => avro_schema_registry {
endpoint => "http://schemas.example.com"
decorate_events => true
}
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
}
}
filter {
...
}
output {
kafka {
...
codec => avro_schema_registry {
endpoint => "http://schemas.example.com"
subject_name => "my_kafka_subject_name"
schema_uri => "/app/my_kafka_subject.avsc"
register_schema => true
}
value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
}
}
output {
kafka {
...
codec => avro_schema_registry {
endpoint => "http://schemas.example.com"
schema_id => 47
client_key => "./client.key"
client_certificate => "./client.crt"
ca_certificate => "./ca.pem"
verify_mode => "verify_peer"
}
value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
}
}
-
Install JRuby
-
Set environment variables:
JRUBY_HOME
andPATH=%JRUBY_HOME%\bin;%PATH%
-
Build the plug-in:
gem install bundler bundle install bundle exec rspec gem build logstash-codec-avro_schema_registry.gemspec
-
Publish the plug-in to Github
echo ":github: Bearer GH_TOKEN" >> C:\users\rahul\.gem\credentials gem push --key github --host https://rubygems.pkg.github.com/rahulsinghai logstash-codec-avro_schema_registry-1.2.3.gem
-
Install the plug-in
.\bin\logstash-plugin install logstash-codec-avro_schema_registry-1.2.3.gem
-
Prepare offline pack:
.\bin\logstash-plugin update .\bin\logstash-plugin prepare-offline-pack --output logstash-codec-avro_schema_registry-1.2.3.zip --overwrite logstash-codec-avro_schema_registry
-
Release a new version in Github and upload the
.gem
&.zip
files.