Skip to content

Commit

Permalink
[HUDI-4418] Add support for ProtoKafkaSource (apache#6135)
Browse files Browse the repository at this point in the history
- Adds PROTO to Source.SourceType enum.
- Handles PROTO type in SourceFormatAdapter by converting to Avro from proto Message objects. 
   Conversion to Row goes Proto -> Avro -> Row currently.
- Added ProtoClassBasedSchemaProvider to generate schemas for a proto class that is currently on the classpath.
- Added ProtoKafkaSource which parses byte[] into a class that is on the path.
- Added ProtoConversionUtil which exposes methods for creating schemas and 
   translating from Proto messages to Avro GenericRecords.
- Added KafkaSource which provides a base class for the other Kafka sources to use.
  • Loading branch information
the-other-tim-brown authored and fengjian committed Apr 5, 2023
1 parent c13a165 commit 0d6268a
Show file tree
Hide file tree
Showing 21 changed files with 1,794 additions and 361 deletions.
15 changes: 0 additions & 15 deletions hudi-kafka-connect/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,6 @@
<plugin>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
<version>3.11.4</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<protocVersion>${protoc.version}</protocVersion>
<inputDirectories>
<include>src/main/resources</include>
</inputDirectories>
</configuration>
</execution>
</executions>
</plugin>
</plugins>

Expand Down
21 changes: 21 additions & 0 deletions hudi-utilities/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
</plugin>
<plugin>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
</plugin>
</plugins>

<resources>
Expand All @@ -73,6 +77,23 @@
</build>

<dependencies>
<!-- Protobuf -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- H2 database for JdbcbaseSchemaProvider -->
<dependency>
<groupId>com.h2database</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.AvroSource;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JsonSource;
import org.apache.hudi.utilities.sources.RowSource;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;

import com.google.protobuf.Message;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
Expand Down Expand Up @@ -62,15 +60,15 @@ public SourceFormatAdapter(Source source) {
public InputBatch<JavaRDD<GenericRecord>> fetchNewDataInAvroFormat(Option<String> lastCkptStr, long sourceLimit) {
switch (source.getSourceType()) {
case AVRO:
return ((AvroSource) source).fetchNext(lastCkptStr, sourceLimit);
return ((Source<JavaRDD<GenericRecord>>) source).fetchNext(lastCkptStr, sourceLimit);
case JSON: {
InputBatch<JavaRDD<String>> r = ((JsonSource) source).fetchNext(lastCkptStr, sourceLimit);
InputBatch<JavaRDD<String>> r = ((Source<JavaRDD<String>>) source).fetchNext(lastCkptStr, sourceLimit);
AvroConvertor convertor = new AvroConvertor(r.getSchemaProvider().getSourceSchema());
return new InputBatch<>(Option.ofNullable(r.getBatch().map(rdd -> rdd.map(convertor::fromJson)).orElse(null)),
r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
case ROW: {
InputBatch<Dataset<Row>> r = ((RowSource) source).fetchNext(lastCkptStr, sourceLimit);
InputBatch<Dataset<Row>> r = ((Source<Dataset<Row>>) source).fetchNext(lastCkptStr, sourceLimit);
return new InputBatch<>(Option.ofNullable(r.getBatch().map(
rdd -> {
SchemaProvider originalProvider = UtilHelpers.getOriginalSchemaProvider(r.getSchemaProvider());
Expand All @@ -85,6 +83,12 @@ public InputBatch<JavaRDD<GenericRecord>> fetchNewDataInAvroFormat(Option<String
})
.orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
case PROTO: {
InputBatch<JavaRDD<Message>> r = ((Source<JavaRDD<Message>>) source).fetchNext(lastCkptStr, sourceLimit);
AvroConvertor convertor = new AvroConvertor(r.getSchemaProvider().getSourceSchema());
return new InputBatch<>(Option.ofNullable(r.getBatch().map(rdd -> rdd.map(convertor::fromProtoMessage)).orElse(null)),
r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
default:
throw new IllegalArgumentException("Unknown source type (" + source.getSourceType() + ")");
}
Expand All @@ -96,9 +100,9 @@ public InputBatch<JavaRDD<GenericRecord>> fetchNewDataInAvroFormat(Option<String
public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> lastCkptStr, long sourceLimit) {
switch (source.getSourceType()) {
case ROW:
return ((RowSource) source).fetchNext(lastCkptStr, sourceLimit);
return ((Source<Dataset<Row>>) source).fetchNext(lastCkptStr, sourceLimit);
case AVRO: {
InputBatch<JavaRDD<GenericRecord>> r = ((AvroSource) source).fetchNext(lastCkptStr, sourceLimit);
InputBatch<JavaRDD<GenericRecord>> r = ((Source<JavaRDD<GenericRecord>>) source).fetchNext(lastCkptStr, sourceLimit);
Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
return new InputBatch<>(
Option
Expand All @@ -111,14 +115,29 @@ public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> lastCkptS
r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
case JSON: {
InputBatch<JavaRDD<String>> r = ((JsonSource) source).fetchNext(lastCkptStr, sourceLimit);
InputBatch<JavaRDD<String>> r = ((Source<JavaRDD<String>>) source).fetchNext(lastCkptStr, sourceLimit);
Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
StructType dataType = AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema);
return new InputBatch<>(
Option.ofNullable(
r.getBatch().map(rdd -> source.getSparkSession().read().schema(dataType).json(rdd)).orElse(null)),
r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
case PROTO: {
InputBatch<JavaRDD<Message>> r = ((Source<JavaRDD<Message>>) source).fetchNext(lastCkptStr, sourceLimit);
Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
AvroConvertor convertor = new AvroConvertor(r.getSchemaProvider().getSourceSchema());
return new InputBatch<>(
Option
.ofNullable(
r.getBatch()
.map(rdd -> rdd.map(convertor::fromProtoMessage))
.map(rdd -> AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(),
source.getSparkSession())
)
.orElse(null)),
r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
default:
throw new IllegalArgumentException("Unknown source type (" + source.getSourceType() + ")");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.hudi.utilities.schema;

import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.sources.helpers.ProtoConversionUtil;

import org.apache.avro.Schema;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.Collections;

/**
* A schema provider that takes in a class name for a generated protobuf class that is on the classpath.
*/
public class ProtoClassBasedSchemaProvider extends SchemaProvider {
/**
* Configs supported.
*/
public static class Config {
public static final String PROTO_SCHEMA_CLASS_NAME = "hoodie.deltastreamer.schemaprovider.proto.className";
public static final String PROTO_SCHEMA_FLATTEN_WRAPPED_PRIMITIVES = "hoodie.deltastreamer.schemaprovider.proto.flattenWrappers";
}

private final String schemaString;

/**
* To be lazily inited on executors.
*/
private transient Schema schema;

public ProtoClassBasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(
Config.PROTO_SCHEMA_CLASS_NAME));
String className = config.getString(Config.PROTO_SCHEMA_CLASS_NAME);
boolean flattenWrappedPrimitives = props.getBoolean(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_FLATTEN_WRAPPED_PRIMITIVES, false);
try {
schemaString = ProtoConversionUtil.getAvroSchemaForMessageClass(ReflectionUtils.getClass(className), flattenWrappedPrimitives).toString();
} catch (Exception e) {
throw new HoodieException(String.format("Error reading proto source schema for class: %s", className), e);
}
}

@Override
public Schema getSourceSchema() {
if (schema == null) {
Schema.Parser parser = new Schema.Parser();
schema = parser.parse(schemaString);
}
return schema;
}

@Override
public Schema getTargetSchema() {
return getSourceSchema();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@

import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer;
import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;

import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
Expand All @@ -46,24 +43,17 @@
/**
* Reads avro serialized Kafka data, based on the confluent schema-registry.
*/
public class AvroKafkaSource extends AvroSource {
public class AvroKafkaSource extends KafkaSource<GenericRecord> {

private static final Logger LOG = LogManager.getLogger(AvroKafkaSource.class);
// these are native kafka's config. do not change the config names.
private static final String NATIVE_KAFKA_KEY_DESERIALIZER_PROP = "key.deserializer";
private static final String NATIVE_KAFKA_VALUE_DESERIALIZER_PROP = "value.deserializer";
// These are settings used to pass things to KafkaAvroDeserializer
public static final String KAFKA_AVRO_VALUE_DESERIALIZER_PROPERTY_PREFIX = "hoodie.deltastreamer.source.kafka.value.deserializer.";
public static final String KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA = KAFKA_AVRO_VALUE_DESERIALIZER_PROPERTY_PREFIX + "schema";

private final KafkaOffsetGen offsetGen;
private final HoodieDeltaStreamerMetrics metrics;
private final SchemaProvider schemaProvider;
private final String deserializerClassName;

public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
super(props, sparkContext, sparkSession, schemaProvider);
super(props, sparkContext, sparkSession, schemaProvider, SourceType.AVRO, metrics);

props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class.getName());
deserializerClassName = props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(),
Expand All @@ -82,29 +72,11 @@ public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, Spa
LOG.error(error);
throw new HoodieException(error, e);
}

this.schemaProvider = schemaProvider;
this.metrics = metrics;
offsetGen = new KafkaOffsetGen(props);
this.offsetGen = new KafkaOffsetGen(props);
}

@Override
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
try {
OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);
long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
if (totalNewMsgs <= 0) {
return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges));
}
JavaRDD<GenericRecord> newDataRDD = toRDD(offsetRanges);
return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges));
} catch (org.apache.kafka.common.errors.TimeoutException e) {
throw new HoodieSourceTimeoutException("Kafka Source timed out " + e.getMessage());
}
}

private JavaRDD<GenericRecord> toRDD(OffsetRange[] offsetRanges) {
JavaRDD<GenericRecord> toRDD(OffsetRange[] offsetRanges) {
if (deserializerClassName.equals(ByteArrayDeserializer.class.getName())) {
if (schemaProvider == null) {
throw new HoodieException("Please provide a valid schema provider class when use ByteArrayDeserializer!");
Expand All @@ -117,11 +89,4 @@ private JavaRDD<GenericRecord> toRDD(OffsetRange[] offsetRanges) {
LocationStrategies.PreferConsistent()).map(obj -> (GenericRecord) obj.value());
}
}

@Override
public void onCommit(String lastCkptStr) {
if (this.props.getBoolean(KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.key(), KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.defaultValue())) {
offsetGen.commitOffsetToKafka(lastCkptStr);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,15 @@
package org.apache.hudi.utilities.sources;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;

import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
Expand All @@ -45,40 +40,18 @@
/**
* Read json kafka data.
*/
public class JsonKafkaSource extends JsonSource {

private static final Logger LOG = LogManager.getLogger(JsonKafkaSource.class);

private final KafkaOffsetGen offsetGen;

private final HoodieDeltaStreamerMetrics metrics;
public class JsonKafkaSource extends KafkaSource<String> {

public JsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
super(properties, sparkContext, sparkSession, schemaProvider);
this.metrics = metrics;
super(properties, sparkContext, sparkSession, schemaProvider, SourceType.JSON, metrics);
properties.put("key.deserializer", StringDeserializer.class.getName());
properties.put("value.deserializer", StringDeserializer.class.getName());
offsetGen = new KafkaOffsetGen(properties);
this.offsetGen = new KafkaOffsetGen(props);
}

@Override
protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
try {
OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);
long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
if (totalNewMsgs <= 0) {
return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges));
}
JavaRDD<String> newDataRDD = toRDD(offsetRanges);
return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges));
} catch (org.apache.kafka.common.errors.TimeoutException e) {
throw new HoodieSourceTimeoutException("Kafka Source timed out " + e.getMessage());
}
}

private JavaRDD<String> toRDD(OffsetRange[] offsetRanges) {
JavaRDD<String> toRDD(OffsetRange[] offsetRanges) {
JavaRDD<String> jsonStringRDD = KafkaUtils.createRDD(sparkContext,
offsetGen.getKafkaParams(),
offsetRanges,
Expand All @@ -104,12 +77,4 @@ private JavaRDD<String> postProcess(JavaRDD<String> jsonStringRDD) {

return processor.process(jsonStringRDD);
}

@Override
public void onCommit(String lastCkptStr) {
if (this.props.getBoolean(KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.key(),
KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.defaultValue())) {
offsetGen.commitOffsetToKafka(lastCkptStr);
}
}
}
Loading

0 comments on commit 0d6268a

Please sign in to comment.