diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index aa1e261c250b..31328c1ba385 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -88,6 +88,13 @@ static class Config { * {@value #SOURCE_FILE_FORMAT} is passed to the reader while loading dataset. Default value is parquet. */ static final String SOURCE_FILE_FORMAT = "hoodie.deltastreamer.source.hoodieincr.file.format"; + /** + *{@value #SPARK_DATASOURCE_OPTIONS} is json string, passed to the reader while loading dataset. + * Example delta streamer conf + * - --hoodie-conf hoodie.deltastreamer.source.hoodieincr.spark.datasource.options={"header":"true","encoding":"UTF-8"} + * + */ + static final String SPARK_DATASOURCE_OPTIONS = "hoodie.deltastreamer.source.hoodieincr.spark.datasource.options"; static final String DEFAULT_SOURCE_FILE_FORMAT = "parquet"; } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java index 2f7d9898b95b..6b0698c788c0 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java @@ -26,14 +26,18 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; +import com.esotericsoftware.minlog.Log; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.DataFrameReader; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; @@ -42,6 +46,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH; import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT; @@ -50,6 +55,7 @@ import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH; import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT; import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT; +import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SPARK_DATASOURCE_OPTIONS; /** * This source will use the S3 events meta information from hoodie table generate by {@link S3EventsSource}. @@ -125,7 +131,7 @@ public Pair>, String> fetchNextBatch(Option lastCkpt .filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, queryTypeAndInstantEndpts.getRight().getLeft())); } - + if (source.isEmpty()) { return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getRight().getRight()); } @@ -141,7 +147,7 @@ public Pair>, String> fetchNextBatch(Option lastCkpt filter = filter + " and s3.object.key not like '%" + props.getString(Config.S3_IGNORE_KEY_SUBSTRING) + "%'"; } // add file format filtering by default - filter = filter + " and s3.object.key like '%" + fileFormat + "%'"; + filter = filter + " and s3.object.key like '%" + fileFormat + "%'"; String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase(); String s3Prefix = s3FS + "://"; @@ -174,7 +180,19 @@ public Pair>, String> fetchNextBatch(Option lastCkpt } Option> dataset = Option.empty(); if (!cloudFiles.isEmpty()) { - dataset = Option.of(sparkSession.read().format(fileFormat).load(cloudFiles.toArray(new String[0]))); + DataFrameReader dataFrameReader = sparkSession.read().format(fileFormat); + if (!StringUtils.isNullOrEmpty(props.getString(HoodieIncrSource.Config.SPARK_DATASOURCE_OPTIONS, null))) { + final ObjectMapper mapper = new ObjectMapper(); + Map sparkOptionsMap = null; + try { + sparkOptionsMap = mapper.readValue(props.getString(SPARK_DATASOURCE_OPTIONS), Map.class); + } catch (IOException e) { + throw new HoodieException(String.format("Failed to parse sparkOptions: %s", props.getString(SPARK_DATASOURCE_OPTIONS)), e); + } + Log.info(String.format("sparkOptions loaded: %s", sparkOptionsMap)); + dataFrameReader = dataFrameReader.options(sparkOptionsMap); + } + dataset = Option.of(dataFrameReader.load(cloudFiles.toArray(new String[0]))); } return Pair.of(dataset, queryTypeAndInstantEndpts.getRight().getRight()); }