diff --git a/hudi-kafka-connect/pom.xml b/hudi-kafka-connect/pom.xml
index 304afddd27272..d32d0fffe70f6 100644
--- a/hudi-kafka-connect/pom.xml
+++ b/hudi-kafka-connect/pom.xml
@@ -66,21 +66,6 @@
com.github.os72
protoc-jar-maven-plugin
- 3.11.4
-
-
- generate-sources
-
- run
-
-
- ${protoc.version}
-
- src/main/resources
-
-
-
-
diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index 1fd9e55a01b57..b6a5495e15bfa 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -60,6 +60,10 @@
org.apache.rat
apache-rat-plugin
+
+ com.github.os72
+ protoc-jar-maven-plugin
+
@@ -73,6 +77,23 @@
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+ com.google.protobuf
+ protobuf-java-util
+ test
+
+
+ com.google.guava
+ guava
+
+
+
+
com.h2database
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
index 3514ace829ab5..9fbb11fd82867 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
@@ -25,13 +25,11 @@
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
-import org.apache.hudi.utilities.sources.AvroSource;
import org.apache.hudi.utilities.sources.InputBatch;
-import org.apache.hudi.utilities.sources.JsonSource;
-import org.apache.hudi.utilities.sources.RowSource;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
+import com.google.protobuf.Message;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
@@ -62,15 +60,15 @@ public SourceFormatAdapter(Source source) {
public InputBatch> fetchNewDataInAvroFormat(Option lastCkptStr, long sourceLimit) {
switch (source.getSourceType()) {
case AVRO:
- return ((AvroSource) source).fetchNext(lastCkptStr, sourceLimit);
+ return ((Source>) source).fetchNext(lastCkptStr, sourceLimit);
case JSON: {
- InputBatch> r = ((JsonSource) source).fetchNext(lastCkptStr, sourceLimit);
+ InputBatch> r = ((Source>) source).fetchNext(lastCkptStr, sourceLimit);
AvroConvertor convertor = new AvroConvertor(r.getSchemaProvider().getSourceSchema());
return new InputBatch<>(Option.ofNullable(r.getBatch().map(rdd -> rdd.map(convertor::fromJson)).orElse(null)),
r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
case ROW: {
- InputBatch> r = ((RowSource) source).fetchNext(lastCkptStr, sourceLimit);
+ InputBatch> r = ((Source>) source).fetchNext(lastCkptStr, sourceLimit);
return new InputBatch<>(Option.ofNullable(r.getBatch().map(
rdd -> {
SchemaProvider originalProvider = UtilHelpers.getOriginalSchemaProvider(r.getSchemaProvider());
@@ -85,6 +83,12 @@ public InputBatch> fetchNewDataInAvroFormat(Option> r = ((Source>) source).fetchNext(lastCkptStr, sourceLimit);
+ AvroConvertor convertor = new AvroConvertor(r.getSchemaProvider().getSourceSchema());
+ return new InputBatch<>(Option.ofNullable(r.getBatch().map(rdd -> rdd.map(convertor::fromProtoMessage)).orElse(null)),
+ r.getCheckpointForNextBatch(), r.getSchemaProvider());
+ }
default:
throw new IllegalArgumentException("Unknown source type (" + source.getSourceType() + ")");
}
@@ -96,9 +100,9 @@ public InputBatch> fetchNewDataInAvroFormat(Option> fetchNewDataInRowFormat(Option lastCkptStr, long sourceLimit) {
switch (source.getSourceType()) {
case ROW:
- return ((RowSource) source).fetchNext(lastCkptStr, sourceLimit);
+ return ((Source>) source).fetchNext(lastCkptStr, sourceLimit);
case AVRO: {
- InputBatch> r = ((AvroSource) source).fetchNext(lastCkptStr, sourceLimit);
+ InputBatch> r = ((Source>) source).fetchNext(lastCkptStr, sourceLimit);
Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
return new InputBatch<>(
Option
@@ -111,7 +115,7 @@ public InputBatch> fetchNewDataInRowFormat(Option lastCkptS
r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
case JSON: {
- InputBatch> r = ((JsonSource) source).fetchNext(lastCkptStr, sourceLimit);
+ InputBatch> r = ((Source>) source).fetchNext(lastCkptStr, sourceLimit);
Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
StructType dataType = AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema);
return new InputBatch<>(
@@ -119,6 +123,21 @@ public InputBatch> fetchNewDataInRowFormat(Option lastCkptS
r.getBatch().map(rdd -> source.getSparkSession().read().schema(dataType).json(rdd)).orElse(null)),
r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
+ case PROTO: {
+ InputBatch> r = ((Source>) source).fetchNext(lastCkptStr, sourceLimit);
+ Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
+ AvroConvertor convertor = new AvroConvertor(r.getSchemaProvider().getSourceSchema());
+ return new InputBatch<>(
+ Option
+ .ofNullable(
+ r.getBatch()
+ .map(rdd -> rdd.map(convertor::fromProtoMessage))
+ .map(rdd -> AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(),
+ source.getSparkSession())
+ )
+ .orElse(null)),
+ r.getCheckpointForNextBatch(), r.getSchemaProvider());
+ }
default:
throw new IllegalArgumentException("Unknown source type (" + source.getSourceType() + ")");
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java
new file mode 100644
index 0000000000000..360b8de3c51ed
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hudi.utilities.schema;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.sources.helpers.ProtoConversionUtil;
+
+import org.apache.avro.Schema;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.Collections;
+
+/**
+ * A schema provider that takes in a class name for a generated protobuf class that is on the classpath.
+ */
+public class ProtoClassBasedSchemaProvider extends SchemaProvider {
+ /**
+ * Configs supported.
+ */
+ public static class Config {
+ public static final String PROTO_SCHEMA_CLASS_NAME = "hoodie.deltastreamer.schemaprovider.proto.className";
+ public static final String PROTO_SCHEMA_FLATTEN_WRAPPED_PRIMITIVES = "hoodie.deltastreamer.schemaprovider.proto.flattenWrappers";
+ }
+
+ private final String schemaString;
+
+ /**
+ * To be lazily inited on executors.
+ */
+ private transient Schema schema;
+
+ public ProtoClassBasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
+ super(props, jssc);
+ DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(
+ Config.PROTO_SCHEMA_CLASS_NAME));
+ String className = config.getString(Config.PROTO_SCHEMA_CLASS_NAME);
+ boolean flattenWrappedPrimitives = props.getBoolean(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_FLATTEN_WRAPPED_PRIMITIVES, false);
+ try {
+ schemaString = ProtoConversionUtil.getAvroSchemaForMessageClass(ReflectionUtils.getClass(className), flattenWrappedPrimitives).toString();
+ } catch (Exception e) {
+ throw new HoodieException(String.format("Error reading proto source schema for class: %s", className), e);
+ }
+ }
+
+ @Override
+ public Schema getSourceSchema() {
+ if (schema == null) {
+ Schema.Parser parser = new Schema.Parser();
+ schema = parser.parse(schemaString);
+ }
+ return schema;
+ }
+
+ @Override
+ public Schema getTargetSchema() {
+ return getSourceSchema();
+ }
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index 84c6fd815e838..a20ecbdfbf0e3 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -20,16 +20,13 @@
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer;
-import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
-import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -46,24 +43,17 @@
/**
* Reads avro serialized Kafka data, based on the confluent schema-registry.
*/
-public class AvroKafkaSource extends AvroSource {
+public class AvroKafkaSource extends KafkaSource {
private static final Logger LOG = LogManager.getLogger(AvroKafkaSource.class);
- // these are native kafka's config. do not change the config names.
- private static final String NATIVE_KAFKA_KEY_DESERIALIZER_PROP = "key.deserializer";
- private static final String NATIVE_KAFKA_VALUE_DESERIALIZER_PROP = "value.deserializer";
// These are settings used to pass things to KafkaAvroDeserializer
public static final String KAFKA_AVRO_VALUE_DESERIALIZER_PROPERTY_PREFIX = "hoodie.deltastreamer.source.kafka.value.deserializer.";
public static final String KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA = KAFKA_AVRO_VALUE_DESERIALIZER_PROPERTY_PREFIX + "schema";
-
- private final KafkaOffsetGen offsetGen;
- private final HoodieDeltaStreamerMetrics metrics;
- private final SchemaProvider schemaProvider;
private final String deserializerClassName;
public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
- super(props, sparkContext, sparkSession, schemaProvider);
+ super(props, sparkContext, sparkSession, schemaProvider, SourceType.AVRO, metrics);
props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class.getName());
deserializerClassName = props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(),
@@ -82,29 +72,11 @@ public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, Spa
LOG.error(error);
throw new HoodieException(error, e);
}
-
- this.schemaProvider = schemaProvider;
- this.metrics = metrics;
- offsetGen = new KafkaOffsetGen(props);
+ this.offsetGen = new KafkaOffsetGen(props);
}
@Override
- protected InputBatch> fetchNewData(Option lastCheckpointStr, long sourceLimit) {
- try {
- OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);
- long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
- LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
- if (totalNewMsgs <= 0) {
- return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges));
- }
- JavaRDD newDataRDD = toRDD(offsetRanges);
- return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges));
- } catch (org.apache.kafka.common.errors.TimeoutException e) {
- throw new HoodieSourceTimeoutException("Kafka Source timed out " + e.getMessage());
- }
- }
-
- private JavaRDD toRDD(OffsetRange[] offsetRanges) {
+ JavaRDD toRDD(OffsetRange[] offsetRanges) {
if (deserializerClassName.equals(ByteArrayDeserializer.class.getName())) {
if (schemaProvider == null) {
throw new HoodieException("Please provide a valid schema provider class when use ByteArrayDeserializer!");
@@ -117,11 +89,4 @@ private JavaRDD toRDD(OffsetRange[] offsetRanges) {
LocationStrategies.PreferConsistent()).map(obj -> (GenericRecord) obj.value());
}
}
-
- @Override
- public void onCommit(String lastCkptStr) {
- if (this.props.getBoolean(KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.key(), KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.defaultValue())) {
- offsetGen.commitOffsetToKafka(lastCkptStr);
- }
- }
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index 0b06d986bbf1a..26849d499e92a 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -19,20 +19,15 @@
package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
-import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
-import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
@@ -45,40 +40,18 @@
/**
* Read json kafka data.
*/
-public class JsonKafkaSource extends JsonSource {
-
- private static final Logger LOG = LogManager.getLogger(JsonKafkaSource.class);
-
- private final KafkaOffsetGen offsetGen;
-
- private final HoodieDeltaStreamerMetrics metrics;
+public class JsonKafkaSource extends KafkaSource {
public JsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
- super(properties, sparkContext, sparkSession, schemaProvider);
- this.metrics = metrics;
+ super(properties, sparkContext, sparkSession, schemaProvider, SourceType.JSON, metrics);
properties.put("key.deserializer", StringDeserializer.class.getName());
properties.put("value.deserializer", StringDeserializer.class.getName());
- offsetGen = new KafkaOffsetGen(properties);
+ this.offsetGen = new KafkaOffsetGen(props);
}
@Override
- protected InputBatch> fetchNewData(Option lastCheckpointStr, long sourceLimit) {
- try {
- OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);
- long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
- LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
- if (totalNewMsgs <= 0) {
- return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges));
- }
- JavaRDD newDataRDD = toRDD(offsetRanges);
- return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges));
- } catch (org.apache.kafka.common.errors.TimeoutException e) {
- throw new HoodieSourceTimeoutException("Kafka Source timed out " + e.getMessage());
- }
- }
-
- private JavaRDD toRDD(OffsetRange[] offsetRanges) {
+ JavaRDD toRDD(OffsetRange[] offsetRanges) {
JavaRDD jsonStringRDD = KafkaUtils.createRDD(sparkContext,
offsetGen.getKafkaParams(),
offsetRanges,
@@ -104,12 +77,4 @@ private JavaRDD postProcess(JavaRDD jsonStringRDD) {
return processor.process(jsonStringRDD);
}
-
- @Override
- public void onCommit(String lastCkptStr) {
- if (this.props.getBoolean(KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.key(),
- KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.defaultValue())) {
- offsetGen.commitOffsetToKafka(lastCkptStr);
- }
- }
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
new file mode 100644
index 0000000000000..6f2377fc7ce93
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
+import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.streaming.kafka010.OffsetRange;
+
+abstract class KafkaSource extends Source> {
+ private static final Logger LOG = LogManager.getLogger(KafkaSource.class);
+ // these are native kafka's config. do not change the config names.
+ protected static final String NATIVE_KAFKA_KEY_DESERIALIZER_PROP = "key.deserializer";
+ protected static final String NATIVE_KAFKA_VALUE_DESERIALIZER_PROP = "value.deserializer";
+
+ protected final HoodieDeltaStreamerMetrics metrics;
+ protected final SchemaProvider schemaProvider;
+ protected KafkaOffsetGen offsetGen;
+
+ protected KafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
+ SchemaProvider schemaProvider, SourceType sourceType, HoodieDeltaStreamerMetrics metrics) {
+ super(props, sparkContext, sparkSession, schemaProvider, sourceType);
+
+ this.schemaProvider = schemaProvider;
+ this.metrics = metrics;
+ }
+
+ @Override
+ protected InputBatch> fetchNewData(Option lastCheckpointStr, long sourceLimit) {
+ try {
+ OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);
+ long totalNewMsgs = KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges);
+ LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
+ if (totalNewMsgs <= 0) {
+ return new InputBatch<>(Option.empty(), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
+ }
+ JavaRDD newDataRDD = toRDD(offsetRanges);
+ return new InputBatch<>(Option.of(newDataRDD), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
+ } catch (org.apache.kafka.common.errors.TimeoutException e) {
+ throw new HoodieSourceTimeoutException("Kafka Source timed out " + e.getMessage());
+ }
+ }
+
+ abstract JavaRDD toRDD(OffsetRange[] offsetRanges);
+
+ @Override
+ public void onCommit(String lastCkptStr) {
+ if (this.props.getBoolean(KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.key(), KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.defaultValue())) {
+ offsetGen.commitOffsetToKafka(lastCkptStr);
+ }
+ }
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
new file mode 100644
index 0000000000000..ae37273be6b99
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
+import org.apache.hudi.utilities.schema.ProtoClassBasedSchemaProvider;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
+
+import com.google.protobuf.Message;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.streaming.kafka010.KafkaUtils;
+import org.apache.spark.streaming.kafka010.LocationStrategies;
+import org.apache.spark.streaming.kafka010.OffsetRange;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Collections;
+
+/**
+ * Reads protobuf serialized Kafka data, based on a provided class name.
+ */
+public class ProtoKafkaSource extends KafkaSource {
+
+ private final String className;
+
+ public ProtoKafkaSource(TypedProperties props, JavaSparkContext sparkContext,
+ SparkSession sparkSession, SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
+ super(props, sparkContext, sparkSession, schemaProvider, SourceType.PROTO, metrics);
+ DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(
+ ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME));
+ props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
+ props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, ByteArrayDeserializer.class);
+ className = props.getString(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME);
+ this.offsetGen = new KafkaOffsetGen(props);
+ }
+
+ @Override
+ JavaRDD toRDD(OffsetRange[] offsetRanges) {
+ ProtoDeserializer deserializer = new ProtoDeserializer(className);
+ return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
+ LocationStrategies.PreferConsistent()).map(obj -> deserializer.parse(obj.value()));
+ }
+
+ private static class ProtoDeserializer implements Serializable {
+ private final String className;
+ private transient Class protoClass;
+ private transient Method parseMethod;
+
+ public ProtoDeserializer(String className) {
+ this.className = className;
+ }
+
+ public Message parse(byte[] bytes) {
+ try {
+ return (Message) getParseMethod().invoke(getClass(), bytes);
+ } catch (IllegalAccessException | InvocationTargetException ex) {
+ throw new HoodieException("Failed to parse proto message from kafka", ex);
+ }
+ }
+
+ private Class getProtoClass() {
+ if (protoClass == null) {
+ protoClass = ReflectionUtils.getClass(className);
+ }
+ return protoClass;
+ }
+
+ private Method getParseMethod() {
+ if (parseMethod == null) {
+ try {
+ parseMethod = getProtoClass().getMethod("parseFrom", byte[].class);
+ } catch (NoSuchMethodException ex) {
+ throw new HoodieException("Unable to get proto parsing method from specified class: " + className, ex);
+ }
+ }
+ return parseMethod;
+ }
+ }
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
index 6d610d5c8cbdc..2e858167e05d4 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
@@ -38,7 +38,7 @@
public abstract class Source implements SourceCommitCallback, Serializable {
public enum SourceType {
- JSON, AVRO, ROW
+ JSON, AVRO, ROW, PROTO
}
protected transient TypedProperties props;
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
index 449db10b2d523..860c67f4e2add 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
@@ -20,6 +20,7 @@
import org.apache.hudi.avro.MercifulJsonConverter;
+import com.google.protobuf.Message;
import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import org.apache.avro.Schema;
@@ -95,4 +96,9 @@ public GenericRecord fromAvroBinary(byte[] avroBinary) {
initInjection();
return recordInjection.invert(avroBinary).get();
}
+
+ public GenericRecord fromProtoMessage(Message message) {
+ initSchema();
+ return ProtoConversionUtil.convertToAvro(schema, message);
+ }
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java
new file mode 100644
index 0000000000000..240f0a4bd9127
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+
+import com.google.protobuf.BoolValue;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DoubleValue;
+import com.google.protobuf.FloatValue;
+import com.google.protobuf.Int32Value;
+import com.google.protobuf.Int64Value;
+import com.google.protobuf.Message;
+import com.google.protobuf.StringValue;
+import com.google.protobuf.UInt32Value;
+import com.google.protobuf.UInt64Value;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+
+/**
+ * A utility class to help translate from Proto to Avro.
+ */
+public class ProtoConversionUtil {
+
+ /**
+ * Creates an Avro {@link Schema} for the provided class. Assumes that the class is a protobuf {@link Message}.
+ * @param clazz The protobuf class
+ * @param flattenWrappedPrimitives set to true to treat wrapped primitives like nullable fields instead of nested messages.
+ * @return An Avro schema
+ */
+ public static Schema getAvroSchemaForMessageClass(Class clazz, boolean flattenWrappedPrimitives) {
+ return AvroSupport.get().getSchema(clazz, flattenWrappedPrimitives);
+ }
+
+ /**
+ * Converts the provided {@link Message} into an avro {@link GenericRecord} with the provided schema.
+ * @param schema target schema to convert into
+ * @param message the source message to convert
+ * @return an Avro GenericRecord
+ */
+ public static GenericRecord convertToAvro(Schema schema, Message message) {
+ return AvroSupport.get().convert(schema, message);
+ }
+
+ /**
+ * This class provides support for generating schemas and converting from proto to avro. We don't directly use Avro's ProtobufData class so we can:
+ * 1. Customize how schemas are generated for protobufs. We treat Enums as strings and provide an option to treat wrapped primitives like {@link Int32Value} and {@link StringValue} as messages
+ * (default behavior) or as nullable versions of those primitives.
+ * 2. Convert directly from a protobuf {@link Message} to a {@link GenericRecord} while properly handling enums and wrapped primitives mentioned above.
+ */
+ private static class AvroSupport {
+ private static final AvroSupport INSTANCE = new AvroSupport();
+ // A cache of the proto class name paired with whether wrapped primitives should be flattened as the key and the generated avro schema as the value
+ private static final Map, Schema> SCHEMA_CACHE = new ConcurrentHashMap<>();
+ // A cache with a key as the pair target avro schema and the proto descriptor for the source and the value as an array of proto field descriptors where the order matches the avro ordering.
+ // When converting from proto to avro, we want to be able to iterate over the fields in the proto in the same order as they appear in the avro schema.
+ private static final Map, Descriptors.FieldDescriptor[]> FIELD_CACHE = new ConcurrentHashMap<>();
+
+
+ private static final Schema STRINGS = Schema.create(Schema.Type.STRING);
+
+ private static final Schema NULL = Schema.create(Schema.Type.NULL);
+ private static final Map WRAPPER_DESCRIPTORS_TO_TYPE = getWrapperDescriptorsToType();
+
+ private static Map getWrapperDescriptorsToType() {
+ Map wrapperDescriptorsToType = new HashMap<>();
+ wrapperDescriptorsToType.put(StringValue.getDescriptor(), Schema.Type.STRING);
+ wrapperDescriptorsToType.put(Int32Value.getDescriptor(), Schema.Type.INT);
+ wrapperDescriptorsToType.put(UInt32Value.getDescriptor(), Schema.Type.INT);
+ wrapperDescriptorsToType.put(Int64Value.getDescriptor(), Schema.Type.LONG);
+ wrapperDescriptorsToType.put(UInt64Value.getDescriptor(), Schema.Type.LONG);
+ wrapperDescriptorsToType.put(BoolValue.getDescriptor(), Schema.Type.BOOLEAN);
+ wrapperDescriptorsToType.put(BytesValue.getDescriptor(), Schema.Type.BYTES);
+ wrapperDescriptorsToType.put(DoubleValue.getDescriptor(), Schema.Type.DOUBLE);
+ wrapperDescriptorsToType.put(FloatValue.getDescriptor(), Schema.Type.FLOAT);
+ return wrapperDescriptorsToType;
+ }
+
+ private AvroSupport() {
+ }
+
+ public static AvroSupport get() {
+ return INSTANCE;
+ }
+
+ public GenericRecord convert(Schema schema, Message message) {
+ return (GenericRecord) convertObject(schema, message);
+ }
+
+ public Schema getSchema(Class c, boolean flattenWrappedPrimitives) {
+ return SCHEMA_CACHE.computeIfAbsent(Pair.of(c, flattenWrappedPrimitives), key -> {
+ try {
+ Object descriptor = c.getMethod("getDescriptor").invoke(null);
+ if (c.isEnum()) {
+ return getEnumSchema((Descriptors.EnumDescriptor) descriptor);
+ } else {
+ return getMessageSchema((Descriptors.Descriptor) descriptor, new HashMap<>(), flattenWrappedPrimitives);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ private Schema getEnumSchema(Descriptors.EnumDescriptor enumDescriptor) {
+ List symbols = new ArrayList<>(enumDescriptor.getValues().size());
+ for (Descriptors.EnumValueDescriptor valueDescriptor : enumDescriptor.getValues()) {
+ symbols.add(valueDescriptor.getName());
+ }
+ return Schema.createEnum(enumDescriptor.getName(), null, getNamespace(enumDescriptor.getFullName()), symbols);
+ }
+
+ private Schema getMessageSchema(Descriptors.Descriptor descriptor, Map seen, boolean flattenWrappedPrimitives) {
+ if (seen.containsKey(descriptor)) {
+ return seen.get(descriptor);
+ }
+ Schema result = Schema.createRecord(descriptor.getName(), null,
+ getNamespace(descriptor.getFullName()), false);
+
+ seen.put(descriptor, result);
+
+ List fields = new ArrayList<>(descriptor.getFields().size());
+ for (Descriptors.FieldDescriptor f : descriptor.getFields()) {
+ fields.add(new Schema.Field(f.getName(), getFieldSchema(f, seen, flattenWrappedPrimitives), null, getDefault(f)));
+ }
+ result.setFields(fields);
+ return result;
+ }
+
+ private Schema getFieldSchema(Descriptors.FieldDescriptor f, Map seen, boolean flattenWrappedPrimitives) {
+ Function schemaFinalizer = f.isRepeated() ? Schema::createArray : Function.identity();
+ switch (f.getType()) {
+ case BOOL:
+ return schemaFinalizer.apply(Schema.create(Schema.Type.BOOLEAN));
+ case FLOAT:
+ return schemaFinalizer.apply(Schema.create(Schema.Type.FLOAT));
+ case DOUBLE:
+ return schemaFinalizer.apply(Schema.create(Schema.Type.DOUBLE));
+ case ENUM:
+ return schemaFinalizer.apply(getEnumSchema(f.getEnumType()));
+ case STRING:
+ Schema s = Schema.create(Schema.Type.STRING);
+ GenericData.setStringType(s, GenericData.StringType.String);
+ return schemaFinalizer.apply(s);
+ case BYTES:
+ return schemaFinalizer.apply(Schema.create(Schema.Type.BYTES));
+ case INT32:
+ case SINT32:
+ case FIXED32:
+ case SFIXED32:
+ return schemaFinalizer.apply(Schema.create(Schema.Type.INT));
+ case UINT32:
+ case INT64:
+ case UINT64:
+ case SINT64:
+ case FIXED64:
+ case SFIXED64:
+ return schemaFinalizer.apply(Schema.create(Schema.Type.LONG));
+ case MESSAGE:
+ if (flattenWrappedPrimitives && WRAPPER_DESCRIPTORS_TO_TYPE.containsKey(f.getMessageType())) {
+ // all wrapper types have a single field, so we can get the first field in the message's schema
+ return schemaFinalizer.apply(Schema.createUnion(Arrays.asList(NULL, getFieldSchema(f.getMessageType().getFields().get(0), seen, flattenWrappedPrimitives))));
+ }
+ // if message field is repeated (like a list), elements are non-null
+ if (f.isRepeated()) {
+ return schemaFinalizer.apply(getMessageSchema(f.getMessageType(), seen, flattenWrappedPrimitives));
+ }
+ // otherwise we create a nullable field schema
+ return schemaFinalizer.apply(Schema.createUnion(Arrays.asList(NULL, getMessageSchema(f.getMessageType(), seen, flattenWrappedPrimitives))));
+ case GROUP: // groups are deprecated
+ default:
+ throw new RuntimeException("Unexpected type: " + f.getType());
+ }
+ }
+
+ private Object getDefault(Descriptors.FieldDescriptor f) {
+ if (f.isRepeated()) { // empty array as repeated fields' default value
+ return Collections.emptyList();
+ }
+
+ switch (f.getType()) { // generate default for type
+ case BOOL:
+ return false;
+ case FLOAT:
+ return 0.0F;
+ case DOUBLE:
+ return 0.0D;
+ case INT32:
+ case UINT32:
+ case SINT32:
+ case FIXED32:
+ case SFIXED32:
+ case INT64:
+ case UINT64:
+ case SINT64:
+ case FIXED64:
+ case SFIXED64:
+ return 0;
+ case STRING:
+ case BYTES:
+ return "";
+ case ENUM:
+ return f.getEnumType().getValues().get(0).getName();
+ case MESSAGE:
+ return Schema.Field.NULL_VALUE;
+ case GROUP: // groups are deprecated
+ default:
+ throw new RuntimeException("Unexpected type: " + f.getType());
+ }
+ }
+
+ private Descriptors.FieldDescriptor[] getOrderedFields(Schema schema, Message message) {
+ Descriptors.Descriptor descriptor = message.getDescriptorForType();
+ return FIELD_CACHE.computeIfAbsent(Pair.of(schema, descriptor), key -> {
+ Descriptors.FieldDescriptor[] fields = new Descriptors.FieldDescriptor[key.getLeft().getFields().size()];
+ for (Schema.Field f : key.getLeft().getFields()) {
+ fields[f.pos()] = key.getRight().findFieldByName(f.name());
+ }
+ return fields;
+ });
+ }
+
+ private Object convertObject(Schema schema, Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ switch (schema.getType()) {
+ case ARRAY:
+ List
+
+
+ com.google.protobuf
+ protobuf-java
+ ${proto.version}
+
+
+ com.google.protobuf
+ protobuf-java-util
+ ${proto.version}
+
+
org.junit.jupiter