Skip to content

Kafka HDFS Ingestion

Ziyang Liu edited this page Aug 21, 2015 · 16 revisions

Job Constructs

Source and Extractor

Gobblin provides two abstract classes, KafkaSource and KafkaExtractor. KafkaSource creates a workunit for each Kafka topic partition to be pulled, then merges and groups the workunits based on the desired number of workunits specified by property mr.job.max.mappers (this property is used in both standalone and MR mode). More details about how workunits are merged and grouped is available here. KafkaExtractor extracts the partitions assigned to a workunit, based on the specified low watermark and high watermark.

To use them in a Kafka-HDFS ingestion job, one should subclass KafkaExtractor and implement method decodeRecord(MessageAndOffset), which takes a MessageAndOffset object pulled from the Kafka broker and decodes it into a desired object. One should also subclass KafkaSource and implement getExtractor(WorkUnitState) which should return an object of the Extractor class.

Gobblin currently provides two concrete implementations: KafkaSimpleSource/KafkaSimpleExtractor, and KafkaAvroSource/KafkaAvroExtractor.

KafkaSimpleExtractor simply returns the payload of the MessageAndOffset object as a byte array. A job that uses KafkaSimpleExtractor may use a Converter to convert the byte array to whatever format desired. For example, if the desired output format is JSON, one may implement an ByteArrayToJsonConverter to convert the byte array to JSON. Alternatively one may implement a KafkaJsonExtractor, which extends KafkaExtractor and convert the MessageAndOffset object into a JSON object in the decodeRecord method. Both approaches should work equally well.

KafkaAvroExtractor decodes the payload of the MessageAndOffset object into an Avro GenericRecord object. It requires that the byte 0 of the payload be 0, bytes 1-16 of the payload be a 16-byte schema ID, and the remaining bytes be the encoded Avro record. It also requires the existence of a schema registry that returns the Avro schema given the schema ID, which is used to decode the byte array. Thus this class is mainly applicable to LinkedIn's internal Kafka clusters.

Writer and Publisher

For Writer and Publisher, one may use the AvroHdfsDataWriter and the BaseDataPublisher, similar as the Wikipedia example job. They will publish the records pulled in each task to a different folder as Avro files. Gobblin also has an AvroHdfsTimePartitionedWriter and a TimePartitionedDataPublisher. They publish records based on timestamp of the records, which means records pulled in the same task may be published to different folders, and records pulled in different tasks may be published to the same folder.

Job Config Properties

These are some of the job config properties used by KafkaSource and KafkaExtractor.

Property Name Semantics
topic.whitelist (case insensitive regex) Kafka topics to be pulled. Default value = .*
topic.blacklist (case insensitive regex) Kafka topics not to be pulled. Default value = empty
kafka.brokers Comma separated Kafka brokers to ingest data from.
mr.job.max.mappers Number of tasks to launch. In MR mode, this will be the number of mappers launched. If the number of topic partitions to be pulled is larger than the number of tasks, KafkaSource will assign partitions to tasks in a balanced manner.
bootstrap.with.offset For new topics / partitions, this property controls whether they start at the earliest offset or the latest offset. Possible values: earliest, latest, skip. Default: latest
reset.on.offset.out.of.range This property controls what to do if a partition's previously persisted offset is out of the range of the currently available offsets. Possible values: earliest (always move to earliest available offset), latest (always move to latest available offset), nearest (move to earliest if the previously persisted offset is smaller than the earliest offset, otherwise move to latest), skip (skip this partition). Default: nearest
topics.move.to.latest.offset (case insensitive, NO regex) Topics in this list will always start from the latest offset (i.e., no records will be pulled). To move all topics to the latest offset, use "all". This property should rarely, if ever, be used.

It is also possible to set a time limit for each task. For example, to set the time limit to 15 minutes, set the following properties:

extract.limit.enabled=true
extract.limit.type=time #(other possible values: rate, count, pool)
extract.limit.time.limit=15
extract.limit.time.limit.timeunit=minutes 

Sample Job Config File

Below is a sample job config file.

job.name=PullFromKafka
job.group=Kafka
job.description=Kafka Extractor for Gobblin
job.lock.enabled=false

source.class=gobblin.source.extractor.extract.kafka.KafkaAvroSource

extract.namespace=gobblin.extract.kafka

writer.destination.type=HDFS
writer.output.format=AVRO
writer.fs.uri=file://localhost/

data.publisher.type=gobblin.publisher.TimePartitionedDataPublisher

topic.whitelist=CaptchaEvent
bootstrap.with.offset=earliest

kafka.brokers=#Kafka brokers URI
kafka.schema.registry.url=#schema registry URI

writer.partition.level=hourly
writer.partition.pattern=YYYY/MM/dd/HH
writer.builder.class=gobblin.writer.AvroTimePartitionedWriterBuilder
writer.file.path.type=tablename
writer.partition.column.name=header.time

mr.job.max.mappers=20

extract.limit.enabled=true
extract.limit.type=time #(other possible values: rate, count, pool)
extract.limit.time.limit=15
extract.limit.time.limit.timeunit=minutes 

Launch Job

Launching the job in standalone mode involves similar steps as the Wikipedia example job. The job can also be launched in MR mode. See deployment for more details.

Metrics and Events

Task Level Metrics

Task level metrics can be created in Extractor, Converter and Writer by extending InstrumentedExtractor, InstrumentedConverter and InstrumentedDataWriter.

For example, KafkaExtractor extends InstrumentedExtractor. So you can do the following in subclasses of KafkaExtractor:

Counter decodingErrorCounter = this.getMetricContext().counter("num.of.decoding.errors");
decodingErrorCounter.inc();

Besides Counter, Meter and Histogram are also supported.

###Task Level Events Task level events can be submitted by creating an EventSubmitter instance and using EventSubmitter.submit() or EventSubmitter.getTimingEvent().

###Job Level Metrics

To create job level metrics, one may extend AbstractJobLauncher and create metrics there. For example:

Optional<JobMetrics> jobMetrics = this.jobContext.getJobMetricsOptional();
if (!jobMetrics.isPresent()) {
  LOG.warn("job metrics is absent");
  return;
}
Counter recordsWrittenCounter = jobMetrics.get().getCounter("job.records.written");
recordsWrittenCounter.inc(value);

Job level metrics are often aggregations of task level metrics, such as the job.records.written counter above. Since AbstractJobLauncher doesn't have access to task-level metrics, one should set these counters in TaskStates, and override AbstractJobLauncher.postProcessTaskStates() to aggregate them. For example, in AvroHdfsTimePartitionedWriter.close(), property writer.records.written is set for the TaskState.

###Job Level Events Job level events can be created by extending AbstractJobLauncher and use this.eventSubmitter.submit() or this.eventSubmitter.getTimingEvent().

For more details about metrics, events and reporting them, please see Gobblin Metrics section.

Merging and Grouping Workunits in KafkaSource

For each topic partition that should be ingested, KafkaSource first retrieves the last offset pulled by the previous run, which should be the first offset of the current run. It also retrieves the earliest and latest offsets currently available from the Kafka cluster and verifies that the first offset is between the earliest and the latest offsets. The latest offset is the last offset to be pulled by the current workunit. Since new records may be constantly published to Kafka and old records are deleted based on retention policies, the earliest and latest offsets of a partition may change constantly.

For each partition, after the first and last offsets are determined, a workunit is created. If the number of Kafka partitions exceeds the desired number of workunits specified by property mr.job.max.mappers, KafkaSource will merge and group them into n MultiWorkUnits where n=mr.job.max.mappers.

KafkaSource employs a two-level bin packing algorithms with two goals: (1) balance the workload of the MultiWorkUnits, and (2) prefer assigning partitions of the same topic to the same workunit, so that these partitions can write to the same output file which reduces the number of files published.

In the first level bin packing, all workunits are grouped into approximately 3n groups, each of which contains partitions of the same topic. The max group size is set as

maxGroupSize = totalWorkunitSize/3n

The best-fit-decreasing algorithm is run on all partitions of each topic. If an individual workunit’s size exceeds maxGroupSize, it is put in a separate group. For each group, a new workunit is created which will be responsible for extracting all partitions in the group.

The reason behind 3n is that if this number is too small (i.e., too close to n), it is difficult for the second level to pack these groups into n balanced multiworkunits; if this number is too big, avgGroupSize will be small which doesn’t help grouping partitions of the same topic together. 3n is a number that is empirically selected.

The second level bin-packing uses the worst-fit-decreasing bin packing algorithm that packs the approximately 3n groups into n MultiWorkUnits. Each group is assigned to the emptiest MultiWorkUnit.

Clone this wiki locally