-
Notifications
You must be signed in to change notification settings - Fork 751
Kafka HDFS Ingestion
This section helps you set up a quick-start job for ingesting Kafka topics on a single machine. We provide quick start examples in both standalone and MapReduce mode.
-
Setup a single node Kafka broker by following the Kafka quick start guide. Suppose your broker URI is
localhost:9092
, and you've created a topic "test" with two events "This is a message" and "This is a another message". -
The remaining steps are the same as the Wikipedia example, except using the following job config properties:
job.name=GobblinKafkaQuickStart
job.group=GobblinKafka
job.description=Gobblin quick start job for Kafka
job.lock.enabled=false
kafka.brokers=localhost:9092
source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource
extract.namespace=gobblin.extract.kafka
writer.builder.class=gobblin.writer.SimpleDataWriterBuilder
writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=txt
data.publisher.type=gobblin.publisher.BaseDataPublisher
mr.job.max.mappers=1
metrics.reporting.file.enabled=true
metrics.log.dir=${env:GOBBLIN_WORK_DIR}/metrics
metrics.reporting.file.suffix=txt
bootstrap.with.offset=earliest
After the job finishes, the following messages should be in the job log:
INFO Pulling topic test
INFO Pulling partition test:0 from offset 0 to 2, range=2
INFO Finished pulling partition test:0
INFO Finished pulling topic test
INFO Extracted 2 data records
INFO Actual high watermark for partition test:0=2, expected=2
Task <task_id> completed in 31212ms with state SUCCESSFUL
The output file will be in GOBBLIN_WORK_DIR/job-output/test
, with the two messages you've just created in the Kafka broker. GOBBLIN_WORK_DIR/metrics
will contain metrics collected from this run.
- Setup a single node Kafka broker same as in standalone mode.
- Setup a single node Hadoop cluster by following the steps in Hadoop: Setting up a Single Node Cluster (link). Suppose your HDFS URI is
hdfs://localhost:9000
. - Create a job config file with the following properties:
job.name=GobblinKafkaQuickStart
job.group=GobblinKafka
job.description=Gobblin quick start job for Kafka
job.lock.enabled=false
kafka.brokers=localhost:9092
source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource
extract.namespace=gobblin.extract.kafka
writer.builder.class=gobblin.writer.SimpleDataWriterBuilder
writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=txt
data.publisher.type=gobblin.publisher.BaseDataPublisher
mr.job.max.mappers=1
metrics.reporting.file.enabled=true
metrics.log.dir=/gobblin-kafka/metrics
metrics.reporting.file.suffix=txt
bootstrap.with.offset=earliest
fs.uri=hdfs://localhost:9000
writer.fs.uri=hdfs://localhost:9000
state.store.fs.uri=hdfs://localhost:9000
mr.job.root.dir=/gobblin-kafka/working
state.store.dir=/gobblin-kafka/state-store
task.data.root.dir=/jobs/kafkaetl/gobblin/gobblin-kafka/task-data
data.publisher.final.dir=/gobblintest/job-output
- Run
gobblin-mapreduce.sh
:
gobblin-mapreduce.sh --conf <path-to-job-config-file>
After the job finishes, the job output file will be in /gobblintest/job-output/test
, and the metrics will be in /gobblin-kafka/metrics
.
Source and Extractor
Gobblin provides two abstract classes, KafkaSource
and KafkaExtractor
. KafkaSource
creates a workunit for each Kafka topic partition to be pulled, then merges and groups the workunits based on the desired number of workunits specified by property mr.job.max.mappers
(this property is used in both standalone and MR mode). More details about how workunits are merged and grouped is available here. KafkaExtractor
extracts the partitions assigned to a workunit, based on the specified low watermark and high watermark.
To use them in a Kafka-HDFS ingestion job, one should subclass KafkaExtractor
and implement method decodeRecord(MessageAndOffset)
, which takes a MessageAndOffset
object pulled from the Kafka broker and decodes it into a desired object. One should also subclass KafkaSource
and implement getExtractor(WorkUnitState)
which should return an instance of the Extractor class.
Gobblin currently provides two concrete implementations: KafkaSimpleSource
/KafkaSimpleExtractor
, and KafkaAvroSource
/KafkaAvroExtractor
.
KafkaSimpleExtractor
simply returns the payload of the MessageAndOffset
object as a byte array. A job that uses KafkaSimpleExtractor
may use a Converter
to convert the byte array to whatever format desired. For example, if the desired output format is JSON, one may implement an ByteArrayToJsonConverter
to convert the byte array to JSON. Alternatively one may implement a KafkaJsonExtractor
, which extends KafkaExtractor
and convert the MessageAndOffset
object into a JSON object in the decodeRecord
method. Both approaches should work equally well.
KafkaAvroExtractor
decodes the payload of the MessageAndOffset
object into an Avro GenericRecord
object. It requires that the byte 0 of the payload be 0, bytes 1-16 of the payload be a 16-byte schema ID, and the remaining bytes be the encoded Avro record. It also requires the existence of a schema registry that returns the Avro schema given the schema ID, which is used to decode the byte array. Thus this class is mainly applicable to LinkedIn's internal Kafka clusters.
Writer and Publisher
Any desired writer and publisher can be used, e.g., one may use the AvroHdfsDataWriter
and the BaseDataPublisher
, similar as the Wikipedia example job. If plain text output file is desired, one may use SimpleDataWriter
.
These are some of the job config properties used by KafkaSource
and KafkaExtractor
.
Property Name | Semantics |
---|---|
topic.whitelist (regex) |
Kafka topics to be pulled. Default value = .* |
topic.blacklist (regex) |
Kafka topics not to be pulled. Default value = empty |
kafka.brokers |
Comma separated Kafka brokers to ingest data from. |
mr.job.max.mappers |
Number of tasks to launch. In MR mode, this will be the number of mappers launched. If the number of topic partitions to be pulled is larger than the number of tasks, KafkaSource will assign partitions to tasks in a balanced manner. |
bootstrap.with.offset |
For new topics / partitions, this property controls whether they start at the earliest offset or the latest offset. Possible values: earliest, latest, skip. Default: latest |
reset.on.offset.out.of.range |
This property controls what to do if a partition's previously persisted offset is out of the range of the currently available offsets. Possible values: earliest (always move to earliest available offset), latest (always move to latest available offset), nearest (move to earliest if the previously persisted offset is smaller than the earliest offset, otherwise move to latest), skip (skip this partition). Default: nearest |
topics.move.to.latest.offset (no regex) |
Topics in this list will always start from the latest offset (i.e., no records will be pulled). To move all topics to the latest offset, use "all". This property should rarely, if ever, be used. |
It is also possible to set a time limit for each task. For example, to set the time limit to 15 minutes, set the following properties:
extract.limit.enabled=true
extract.limit.type=time #(other possible values: rate, count, pool)
extract.limit.time.limit=15
extract.limit.time.limit.timeunit=minutes
Task Level Metrics
Task level metrics can be created in Extractor
, Converter
and Writer
by extending InstrumentedExtractor
, InstrumentedConverter
and InstrumentedDataWriter
.
For example, KafkaExtractor
extends InstrumentedExtractor
. So you can do the following in subclasses of KafkaExtractor
:
Counter decodingErrorCounter = this.getMetricContext().counter("num.of.decoding.errors");
decodingErrorCounter.inc();
Besides Counter, Meter and Histogram are also supported.
Task Level Events
Task level events can be submitted by creating an EventSubmitter
instance and using EventSubmitter.submit()
or EventSubmitter.getTimingEvent()
.
Job Level Metrics
To create job level metrics, one may extend AbstractJobLauncher
and create metrics there. For example:
Optional<JobMetrics> jobMetrics = this.jobContext.getJobMetricsOptional();
if (!jobMetrics.isPresent()) {
LOG.warn("job metrics is absent");
return;
}
Counter recordsWrittenCounter = jobMetrics.get().getCounter("job.records.written");
recordsWrittenCounter.inc(value);
Job level metrics are often aggregations of task level metrics, such as the job.records.written
counter above. Since AbstractJobLauncher
doesn't have access to task-level metrics, one should set these counters in TaskState
s, and override AbstractJobLauncher.postProcessTaskStates()
to aggregate them. For example, in AvroHdfsTimePartitionedWriter.close()
, property writer.records.written
is set for the TaskState
.
Job Level Events
Job level events can be created by extending AbstractJobLauncher
and use this.eventSubmitter.submit()
or this.eventSubmitter.getTimingEvent()
.
For more details about metrics, events and reporting them, please see Gobblin Metrics section.
For each topic partition that should be ingested, KafkaSource
first retrieves the last offset pulled by the previous run, which should be the first offset of the current run. It also retrieves the earliest and latest offsets currently available from the Kafka cluster and verifies that the first offset is between the earliest and the latest offsets. The latest offset is the last offset to be pulled by the current workunit. Since new records may be constantly published to Kafka and old records are deleted based on retention policies, the earliest and latest offsets of a partition may change constantly.
For each partition, after the first and last offsets are determined, a workunit is created. If the number of Kafka partitions exceeds the desired number of workunits specified by property mr.job.max.mappers
, KafkaSource
will merge and group them into n
MultiWorkUnit
s where n=mr.job.max.mappers
.
KafkaSource
employs a two-level bin packing algorithms with two goals: (1) balance the workload of the MultiWorkUnit
s, and (2) prefer assigning partitions of the same topic to the same workunit, so that these partitions can write to the same output file which reduces the number of files published.
In the first level bin packing, all workunits are grouped into approximately 3n
groups, each of which contains partitions of the same topic. The max group size is set as
maxGroupSize = totalWorkunitSize/3n
The best-fit-decreasing algorithm is run on all partitions of each topic. If an individual workunit’s size exceeds maxGroupSize
, it is put in a separate group. For each group, a new workunit is created which will be responsible for extracting all partitions in the group.
The reason behind 3n
is that if this number is too small (i.e., too close to n
), it is difficult for the second level to pack these groups into n balanced multiworkunits; if this number is too big, avgGroupSize
will be small which doesn’t help grouping partitions of the same topic together. 3n
is a number that is empirically selected.
The second level bin-packing uses the worst-fit-decreasing bin packing algorithm that packs the approximately 3n
groups into n
MultiWorkUnit
s. Each group is assigned to the emptiest MultiWorkUnit
.
- Home
- [Getting Started](Getting Started)
- Architecture
- User Guide
- Working with Job Configuration Files
- [Deployment](Gobblin Deployment)
- Gobblin on Yarn
- Compaction
- [State Management and Watermarks] (State-Management-and-Watermarks)
- Working with the ForkOperator
- [Configuration Glossary](Configuration Properties Glossary)
- [Partitioned Writers](Partitioned Writers)
- Monitoring
- Schedulers
- [Job Execution History Store](Job Execution History Store)
- Gobblin Build Options
- Troubleshooting
- [FAQs] (FAQs)
- Case Studies
- Gobblin Metrics
- [Quick Start](Gobblin Metrics)
- [Existing Reporters](Existing Reporters)
- [Metrics for Gobblin ETL](Metrics for Gobblin ETL)
- [Gobblin Metrics Architecture](Gobblin Metrics Architecture)
- [Implementing New Reporters](Implementing New Reporters)
- [Gobblin Metrics Performance](Gobblin Metrics Performance)
- Developer Guide
- [Customization: New Source](Customization for New Source)
- [Customization: Converter/Operator](Customization for Converter and Operator)
- Code Style Guide
- IDE setup
- Monitoring Design
- Project
- [Feature List](Feature List)
- Contributors/Team
- [Talks/Tech Blogs](Talks and Tech Blogs)
- News/Roadmap
- Posts
- Miscellaneous