#Notes
This is a custom version of Flume-NG-Kafka-Sink
(https://github.com/thilinamb/flume-ng-kafka-sink), with the example code modification and some structure changes.
This is a Flume Sink implementation that can publish data to a Kafka topic. The objective is to integrate Flume with Kafka so that pull based processing systems such as Apache Storm can process the data coming through various Flume sources such as Syslog.
This is now a part of the official Flume distribution (from v1.6 onwards) along with significant improvements.
- Aug 23, 2014 - This implementation is submitted as a new feature to Apache Flume. Associated Jira for this feature - https://issues.apache.org/jira/browse/FLUME-2251
- Aug 17, 2014 - Unit tests are added and linked with the build.
Realtime Syslog processing architecture using Apache Flume, Apache Kafka and Apache Storm.
- Apache Flume - 1.5.0
- Apache Kafka - 0.8.1.1
- Java 1.6 or higher
- Apache Maven 3
- An Apache Flume installation (See the dependent version above)
- An Apache Kafka installation (See the dependent version above)
Apache Maven is used to build the project. This page contains the download links and an installation guide for various operating systems.
Issue the command: > mvn clean install
This will compile the project and the binary distribution(flume-kafka-sink-dist-x.x.x-bin.zip) will be copied into '${project_root}/dist/target' directory.
- Build the project as per the instructions in the previous subsection.
- Unzip the binary distribution(flume-kafka-sink-dist-x.x.x-bin.zip) inside ${project_root}/dist/target.
- There are two ways to include this custom sink in Flume binary installation.
Recommended Approach
- Create a new directory inside
plugins.d
directory which is located in${FLUME_HOME}
. If theplugins.d
directory is not there, go ahead and create it. We will call this new directory that was created inside plugins.d 'kafka-sink'. You can give it any name depending on the naming conventions you prefer. - Inside this new directory (kafka-sink) create two subdirectories called
lib
andlibext
. - You can find the jar files for this sink inside the
lib
directory of the extracted archive. Copyflume-kafka-sink-impl-x.x.x.jar
into theplugins.d/kafka-sink/lib
directory. Then copy the rest of the jars into theplugins.d/kafka-sink/libext
directory.
This is how it'll look like at the end.
${FLUME_HOME}
|-- plugins.d
|-- kafka-sink
|-- lib
|-- flume-kafka-sink-impl-x.x.x.jar
|-- libext
|-- kafka_x.x.-x.x.x.x.jar
|-- metrics-core-x.x.x.jar
|-- scala-library-x.x.x.jar
More details can be found in the Flume user guide.
OR
Quick and Dirty Approach
- Copy the jar files inside the
lib
directory of extracted archive into${FLUME_HOME}/lib
.
Following parameters are supported at the moment.
-
type
- The sink type. This should be set as
com.thilinamb.flume.sink.KafkaSink
.
- The sink type. This should be set as
-
topic[optional]
- The topic in Kafka to which the messages will be published. If this topic is mentioned, every message will be published to the same topic. If dynamic topics are required, it's possible to use a preprocessor instead of a static topic. It's mandatory that either of the parameters topic or preprocessor is provided, because the topic cannot be null when publishing to Kafka. If none of these parameters are provided, the messages will be published to a default topic called
default-flume-topic
.
- The topic in Kafka to which the messages will be published. If this topic is mentioned, every message will be published to the same topic. If dynamic topics are required, it's possible to use a preprocessor instead of a static topic. It's mandatory that either of the parameters topic or preprocessor is provided, because the topic cannot be null when publishing to Kafka. If none of these parameters are provided, the messages will be published to a default topic called
-
preprocessor[optional]
- This is an extension point provided support dynamic topics and keys. Also it's possible to use it to support message modification before publishing to Kafka. The full qualified class name of the preprocessor implementation should be provided here. Refer the next subsection to read more about preprocessors. If a preprocessor is not configured, then a static topic should be used as explained before. And the messages will not be keyed. In a primitive setup, configuring a static topic would suffice.
-
Kafka Producer Properties
- These properties are used to configure the Kafka Producer. Any producer property supported by Kafka can be used. The only requirement is to prepend the property name with the prefix
kafka.
. For instance, themetadata.broker.list
property should be written askafka.metadata.broker.list
. Please take a look at the sample configuration provided in theconf
directory of the distribution.
- These properties are used to configure the Kafka Producer. Any producer property supported by Kafka can be used. The only requirement is to prepend the property name with the prefix
Implementing a custom preprocessor is useful to support dynamic topics and keys. Also they support message transformations. The requirement is to implement the interface com.thilinamb.flume.sink.MessagePreprocessor
. The java-docs of this interface provides a detailed description of the methods, parameters, etc. There are three methods that needs to be implemented. The method names are self explainatory.
public String extractKey(Event event, Context context)
public String extractTopic(Event event, Context context)
public String transformMessage(Event event, Context context)
The class 'com.thilinamb.flume.sink.example.SimpleMessagePreprocessor' inside the 'example' module is an example implementation of a preprocessor.
After implementing the preprocessor, compile it into a jar and add into the Flume classpath with the rest of the jars (copy to libext
if you are using the plugins.d
directory or copy it to ${FLUME_HOME}\lib
if you are using the other approach) and configure the preprocessor
parameter with its fully qualified classname. For instance;
a1.sinks.k1.preprocessor = com.thilinamb.flume.sink.example.SimpleMessagePreprocessor
Please file a bug or contact me via email with respect to any bug you encounter or any other feedback.