From 84b05c8473aec8ded4fcb2a45f87dd32284b7229 Mon Sep 17 00:00:00 2001 From: Jon Vexler Date: Wed, 21 Sep 2022 10:52:08 -0400 Subject: [PATCH] [HUDI-4758] Add validations to java spark examples (#6615) --- .../common/HoodieExampleDataGenerator.java | 59 +++++++++++++--- .../quickstart/HoodieSparkQuickstart.java | 67 +++++++++++++------ 2 files changed, 96 insertions(+), 30 deletions(-) diff --git a/hudi-examples/hudi-examples-common/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java b/hudi-examples/hudi-examples-common/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java index 4ce11acfa0a92..004271a329d36 100644 --- a/hudi-examples/hudi-examples-common/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java +++ b/hudi-examples/hudi-examples-common/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java @@ -55,11 +55,11 @@ public class HoodieExampleDataGenerator> { public static final String[] DEFAULT_PARTITION_PATHS = {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH}; public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ " - + "{\"name\": \"ts\",\"type\": \"long\"},{\"name\": \"uuid\", \"type\": \"string\"}," - + "{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"}," - + "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"}," - + "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"}," - + "{\"name\":\"fare\",\"type\": \"double\"}]}"; + + "{\"name\": \"ts\",\"type\": \"long\"},{\"name\": \"uuid\", \"type\": \"string\"}," + + "{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"}," + + "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"}," + + "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"}," + + "{\"name\":\"fare\",\"type\": \"double\"}]}"; public static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA); private static final Random RAND = new Random(46474747); @@ -130,12 +130,36 @@ public Stream> generateInsertsStream(String commitTime, Integer }); } + /** + * Generates new inserts, across a single partition path. It also updates the list of existing keys. + */ + public List> generateInsertsOnPartition(String commitTime, Integer n, String partitionPath) { + return generateInsertsStreamOnPartition(commitTime, n, partitionPath).collect(Collectors.toList()); + } + + /** + * Generates new inserts, across a single partition path. It also updates the list of existing keys. + */ + public Stream> generateInsertsStreamOnPartition(String commitTime, Integer n, String partitionPath) { + int currSize = getNumExistingKeys(); + + return IntStream.range(0, n).boxed().map(i -> { + HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath); + KeyPartition kp = new KeyPartition(); + kp.key = key; + kp.partitionPath = partitionPath; + existingKeys.put(currSize + i, kp); + numExistingKeys++; + return new HoodieAvroRecord<>(key, generateRandomValue(key, commitTime)); + }); + } + /** * Generates new updates, randomly distributed across the keys above. There can be duplicates within the returned * list * * @param commitTime Commit Timestamp - * @param n Number of updates (including dups) + * @param n Number of updates (including dups) * @return list of hoodie record updates */ public List> generateUpdates(String commitTime, Integer n) { @@ -148,6 +172,23 @@ public List> generateUpdates(String commitTime, Integer n) { return updates; } + /** + * Generates new updates, one for each of the keys above + * list + * + * @param commitTime Commit Timestamp + * @return list of hoodie record updates + */ + public List> generateUniqueUpdates(String commitTime) { + List> updates = new ArrayList<>(); + for (int i = 0; i < numExistingKeys; i++) { + KeyPartition kp = existingKeys.get(i); + HoodieRecord record = generateUpdateRecord(kp.key, commitTime); + updates.add(record); + } + return updates; + } + public HoodieRecord generateUpdateRecord(HoodieKey key, String commitTime) { return new HoodieAvroRecord<>(key, generateRandomValue(key, commitTime)); } @@ -155,8 +196,8 @@ public HoodieRecord generateUpdateRecord(HoodieKey key, String commitTime) { private Option convertToString(HoodieRecord record) { try { String str = HoodieAvroUtils - .bytesToAvro(((HoodieAvroPayload)record.getData()).getRecordBytes(), avroSchema) - .toString(); + .bytesToAvro(((HoodieAvroPayload) record.getData()).getRecordBytes(), avroSchema) + .toString(); str = "{" + str.substring(str.indexOf("\"ts\":")); return Option.of(str.replaceAll("}", ", \"partitionpath\": \"" + record.getPartitionPath() + "\"}")); } catch (IOException e) { @@ -166,7 +207,7 @@ private Option convertToString(HoodieRecord record) { public List convertToStringList(List> records) { return records.stream().map(this::convertToString).filter(Option::isPresent).map(Option::get) - .collect(Collectors.toList()); + .collect(Collectors.toList()); } public int getNumExistingKeys() { diff --git a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java index 5a6db78f882e3..9c6293fe4471e 100644 --- a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java +++ b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java @@ -25,6 +25,7 @@ import org.apache.hudi.examples.common.HoodieExampleDataGenerator; import org.apache.hudi.examples.common.HoodieExampleSparkUtils; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; + import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; @@ -65,30 +66,51 @@ public static void main(String[] args) { public static void runQuickstart(JavaSparkContext jsc, SparkSession spark, String tableName, String tablePath) { final HoodieExampleDataGenerator dataGen = new HoodieExampleDataGenerator<>(); - insertData(spark, jsc, tablePath, tableName, dataGen); + String snapshotQuery = "SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table"; + + Dataset insertDf = insertData(spark, jsc, tablePath, tableName, dataGen); queryData(spark, jsc, tablePath, tableName, dataGen); + assert insertDf.except(spark.sql(snapshotQuery)).count() == 0; - updateData(spark, jsc, tablePath, tableName, dataGen); + Dataset snapshotBeforeUpdate = spark.sql(snapshotQuery); + Dataset updateDf = updateData(spark, jsc, tablePath, tableName, dataGen); queryData(spark, jsc, tablePath, tableName, dataGen); + Dataset snapshotAfterUpdate = spark.sql(snapshotQuery); + assert snapshotAfterUpdate.intersect(updateDf).count() == updateDf.count(); + assert snapshotAfterUpdate.except(updateDf).except(snapshotBeforeUpdate).count() == 0; incrementalQuery(spark, tablePath, tableName); pointInTimeQuery(spark, tablePath, tableName); - delete(spark, tablePath, tableName); + Dataset snapshotBeforeDelete = snapshotAfterUpdate; + Dataset deleteDf = delete(spark, tablePath, tableName); queryData(spark, jsc, tablePath, tableName, dataGen); + Dataset snapshotAfterDelete = spark.sql(snapshotQuery); + assert snapshotAfterDelete.intersect(deleteDf).count() == 0; + assert snapshotBeforeDelete.except(deleteDf).except(snapshotAfterDelete).count() == 0; - insertOverwriteData(spark, jsc, tablePath, tableName, dataGen); + Dataset snapshotBeforeOverwrite = snapshotAfterDelete; + Dataset overwriteDf = insertOverwriteData(spark, jsc, tablePath, tableName, dataGen); queryData(spark, jsc, tablePath, tableName, dataGen); + Dataset withoutThirdPartitionDf = snapshotBeforeOverwrite.filter("partitionpath != '" + HoodieExampleDataGenerator.DEFAULT_THIRD_PARTITION_PATH + "'"); + Dataset expectedDf = withoutThirdPartitionDf.union(overwriteDf); + Dataset snapshotAfterOverwrite = spark.sql(snapshotQuery); + assert snapshotAfterOverwrite.except(expectedDf).count() == 0; + + Dataset snapshotBeforeDeleteByPartition = snapshotAfterOverwrite; deleteByPartition(spark, tablePath, tableName); queryData(spark, jsc, tablePath, tableName, dataGen); + Dataset snapshotAfterDeleteByPartition = spark.sql(snapshotQuery); + assert snapshotAfterDeleteByPartition.intersect(snapshotBeforeDeleteByPartition.filter("partitionpath == '" + HoodieExampleDataGenerator.DEFAULT_FIRST_PARTITION_PATH + "'")).count() == 0; + assert snapshotAfterDeleteByPartition.count() == snapshotBeforeDeleteByPartition.filter("partitionpath != '" + HoodieExampleDataGenerator.DEFAULT_FIRST_PARTITION_PATH + "'").count(); } /** * Generate some new trips, load them into a DataFrame and write the DataFrame into the Hudi dataset as below. */ - public static void insertData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName, - HoodieExampleDataGenerator dataGen) { + public static Dataset insertData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName, + HoodieExampleDataGenerator dataGen) { String commitTime = Long.toString(System.currentTimeMillis()); List inserts = dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20)); Dataset df = spark.read().json(jsc.parallelize(inserts, 1)); @@ -101,15 +123,16 @@ public static void insertData(SparkSession spark, JavaSparkContext jsc, String t .option(TBL_NAME.key(), tableName) .mode(Overwrite) .save(tablePath); + return df; } /** * Generate new records, load them into a {@link Dataset} and insert-overwrite it into the Hudi dataset */ - public static void insertOverwriteData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName, - HoodieExampleDataGenerator dataGen) { + public static Dataset insertOverwriteData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName, + HoodieExampleDataGenerator dataGen) { String commitTime = Long.toString(System.currentTimeMillis()); - List inserts = dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20)); + List inserts = dataGen.convertToStringList(dataGen.generateInsertsOnPartition(commitTime, 20, HoodieExampleDataGenerator.DEFAULT_THIRD_PARTITION_PATH)); Dataset df = spark.read().json(jsc.parallelize(inserts, 1)); df.write().format("org.apache.hudi") @@ -121,9 +144,9 @@ public static void insertOverwriteData(SparkSession spark, JavaSparkContext jsc, .option(TBL_NAME.key(), tableName) .mode(Append) .save(tablePath); + return df; } - /** * Load the data files into a DataFrame. */ @@ -157,11 +180,11 @@ public static void queryData(SparkSession spark, JavaSparkContext jsc, String ta * This is similar to inserting new data. Generate updates to existing trips using the data generator, * load into a DataFrame and write DataFrame into the hudi dataset. */ - public static void updateData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName, - HoodieExampleDataGenerator dataGen) { + public static Dataset updateData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName, + HoodieExampleDataGenerator dataGen) { String commitTime = Long.toString(System.currentTimeMillis()); - List updates = dataGen.convertToStringList(dataGen.generateUpdates(commitTime, 10)); + List updates = dataGen.convertToStringList(dataGen.generateUniqueUpdates(commitTime)); Dataset df = spark.read().json(jsc.parallelize(updates, 1)); df.write().format("org.apache.hudi") .options(QuickstartUtils.getQuickstartWriteConfigs()) @@ -171,16 +194,18 @@ public static void updateData(SparkSession spark, JavaSparkContext jsc, String t .option(TBL_NAME.key(), tableName) .mode(Append) .save(tablePath); + return df; } /** * Deleta data based in data information. */ - public static void delete(SparkSession spark, String tablePath, String tableName) { + public static Dataset delete(SparkSession spark, String tablePath, String tableName) { Dataset roViewDF = spark.read().format("org.apache.hudi").load(tablePath + "/*/*/*/*"); roViewDF.createOrReplaceTempView("hudi_ro_table"); - Dataset df = spark.sql("select uuid, partitionpath, ts from hudi_ro_table limit 2"); + Dataset toBeDeletedDf = spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table limit 2"); + Dataset df = toBeDeletedDf.select("uuid", "partitionpath", "ts"); df.write().format("org.apache.hudi") .options(QuickstartUtils.getQuickstartWriteConfigs()) @@ -191,10 +216,11 @@ public static void delete(SparkSession spark, String tablePath, String tableName .option("hoodie.datasource.write.operation", WriteOperationType.DELETE.value()) .mode(Append) .save(tablePath); + return toBeDeletedDf; } /** - * Delete the data of a single or multiple partitions. + * Delete the data of the first partition. */ public static void deleteByPartition(SparkSession spark, String tablePath, String tableName) { Dataset df = spark.emptyDataFrame(); @@ -204,9 +230,8 @@ public static void deleteByPartition(SparkSession spark, String tablePath, Strin .option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid") .option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath") .option(TBL_NAME.key(), tableName) - .option("hoodie.datasource.write.operation", WriteOperationType.DELETE.value()) - .option("hoodie.datasource.write.partitions.to.delete", - String.join(", ", HoodieExampleDataGenerator.DEFAULT_PARTITION_PATHS)) + .option("hoodie.datasource.write.operation", WriteOperationType.DELETE_PARTITION.value()) + .option("hoodie.datasource.write.partitions.to.delete", HoodieExampleDataGenerator.DEFAULT_FIRST_PARTITION_PATH) .mode(Append) .save(tablePath); } @@ -223,7 +248,7 @@ public static void incrementalQuery(SparkSession spark, String tablePath, String .map((Function) row -> row.getString(0)) .take(50); - String beginTime = commits.get(commits.size() - 2); // commit time we are interested in + String beginTime = commits.get(commits.size() - 1); // commit time we are interested in // incrementally query data Dataset incViewDF = spark @@ -250,7 +275,7 @@ public static void pointInTimeQuery(SparkSession spark, String tablePath, String .map((Function) row -> row.getString(0)) .take(50); String beginTime = "000"; // Represents all commits > this time. - String endTime = commits.get(commits.size() - 2); // commit time we are interested in + String endTime = commits.get(commits.size() - 1); // commit time we are interested in //incrementally query data Dataset incViewDF = spark.read().format("org.apache.hudi")