Skip to content

Kafka HDFS Ingestion

Ziyang Liu edited this page Aug 13, 2015 · 16 revisions

Job constructs

Source and Extractor

Gobblin provides two abstract classes, KafkaSource and KafkaExtractor. KafkaSource creates a workunit for each Kafka topic partition to be pulled, then merges and groups the workunits based on the desired number of workunits specified by property mr.job.max.mappers (this property is used in both standalone and MR mode). KafkaExtractor extracts the partitions assigned to a workunit, based on the specified low watermark and high watermark.

To use them in a Kafka-HDFS ingestion job, one should subclass KafkaExtractor and implement method decodeRecord(MessageAndOffset), which takes a MessageAndOffset object pulled from the Kafka broker and decodes it into a desired object. One should also subclass KafkaSource and implement getExtractor(WorkUnitState) which should return an object of the Extractor class.

Gobblin currently provides two concrete implementations: KafkaSimpleSource/KafkaSimpleExtractor, and KafkaAvroSource/KafkaAvroExtractor.

KafkaSimpleExtractor simply returns the payload of the MessageAndOffset object as a byte array. A job that uses KafkaSimpleExtractor may use a Converter to convert the byte array to whatever format desired. For example, if the desired output format is JSON, one may implement an ByteArrayToJsonConverter to convert the byte array to JSON. Alternatively one may implement a KafkaJsonExtractor, which extends KafkaExtractor and convert the MessageAndOffset object into a JSON object in the decodeRecord method. Both approaches should work equally well.

KafkaAvroExtractor decodes the payload of the MessageAndOffset object into an Avro GenericRecord object. It requires that the byte 0 of the payload be 0, bytes 1-16 of the payload be a 16-byte schema ID, and the remaining bytes be the encoded Avro record. It also requires the existence of a schema registry that returns the Avro schema given the schema ID, which is used to decode the byte array. Thus this class is mainly applicable to LinkedIn's internal Kafka clusters.

Writer and Publisher

For Writer and Publisher, one may use the AvroHdfsDataWriter and the BaseDataPublisher, similar as the Wikipedia job. They will publish the records pulled in each task to a different folder as Avro files. Gobblin also has an AvroHdfsTimePartitionedWriter and a TimePartitionedDataPublisher. They publish records based on timestamp of the records, which means records pulled in the same task may be published to different folders, and records pulled in different tasks may be published to the same folder.

Important Job config properties

Job Config Below is a sample job config file

Launch Job Launching the job in standalone mode involves similar steps as the Wikipedia example job. The job can also be launched in MR mode. See deployment for more details.

Clone this wiki locally