From d20ba6c0f19148a44a5a18a1c07fdaf915d706e1 Mon Sep 17 00:00:00 2001 From: Baohe Zhang Date: Tue, 2 Jun 2020 12:45:06 -0500 Subject: [PATCH] [YSPARK-1595] Move TestSparkDistributedCache to spark-starter (#21) * Add spark distribued cache oozie example * Delete script and update workflow * Update distributed cache to access file from hdfs * Add README and update sparkTag --- src/main/resources/data/firstarchive.tgz | Bin 0 -> 137 bytes src/main/resources/data/firstfile.txt | 1 + src/main/resources/data/secondarchive.tgz | Bin 0 -> 138 bytes src/main/resources/data/secondfile.txt | 1 + src/main/resources/data/singlearchive.tgz | Bin 0 -> 139 bytes src/main/resources/data/singlefile.txt | 1 + src/main/resources/data/thirdarchive.tgz | Bin 0 -> 137 bytes src/main/resources/data/thirdfile.txt | 1 + .../oozie/spark_distributed_cache/README.md | 17 ++ .../spark_distributed_cache/job.properties | 6 + .../spark_distributed_cache/workflow.xml | 276 ++++++++++++++++++ .../SparkDistributedCacheSingleArchive.scala | 37 +++ .../SparkDistributedCacheSingleFile.scala | 37 +++ .../SparkDistributedCacheThreeArchives.scala | 42 +++ .../SparkDistributedCacheThreeFiles.scala | 41 +++ 15 files changed, 460 insertions(+) create mode 100644 src/main/resources/data/firstarchive.tgz create mode 100644 src/main/resources/data/firstfile.txt create mode 100644 src/main/resources/data/secondarchive.tgz create mode 100644 src/main/resources/data/secondfile.txt create mode 100644 src/main/resources/data/singlearchive.tgz create mode 100644 src/main/resources/data/singlefile.txt create mode 100644 src/main/resources/data/thirdarchive.tgz create mode 100644 src/main/resources/data/thirdfile.txt create mode 100644 src/main/resources/oozie/spark_distributed_cache/README.md create mode 100644 src/main/resources/oozie/spark_distributed_cache/job.properties create mode 100644 src/main/resources/oozie/spark_distributed_cache/workflow.xml create mode 100644 src/main/scala/com/yahoo/spark/starter/distributedcache/SparkDistributedCacheSingleArchive.scala create mode 100644 src/main/scala/com/yahoo/spark/starter/distributedcache/SparkDistributedCacheSingleFile.scala create mode 100644 src/main/scala/com/yahoo/spark/starter/distributedcache/SparkDistributedCacheThreeArchives.scala create mode 100644 src/main/scala/com/yahoo/spark/starter/distributedcache/SparkDistributedCacheThreeFiles.scala diff --git a/src/main/resources/data/firstarchive.tgz b/src/main/resources/data/firstarchive.tgz new file mode 100644 index 0000000000000000000000000000000000000000..e37a48afe109ba103ad79ff391ba786fa7981636 GIT binary patch literal 137 zcmb2|=3r>z`5Mf?{Pvt7-ys75m%!_D4ykvP?e(jfC&Jg|lB(bFSXg6%#sY^fflKvw zGnUOr&AmJQYY6-PsvGS&%jPt%xHaXp@@rfB$O|c{A)6O%d#mHg8Cdq~^P@Ypd4_EE k+kOW9Zja4ug`!2_7yOnIjN$;puxZZ0M2DQm;e9( literal 0 HcmV?d00001 diff --git a/src/main/resources/data/secondfile.txt b/src/main/resources/data/secondfile.txt new file mode 100644 index 0000000000000..64bb6b746dcea --- /dev/null +++ b/src/main/resources/data/secondfile.txt @@ -0,0 +1 @@ +30 diff --git a/src/main/resources/data/singlearchive.tgz b/src/main/resources/data/singlearchive.tgz new file mode 100644 index 0000000000000000000000000000000000000000..1ecf02129b1aa34c145e0e07eb04bdd98a8e7206 GIT binary patch literal 139 zcmb2|=3rRN^EH@(`R%!lTug=nEf3e$cG)z(QG5Nga$U#qgNYMeK7O>-W#MM+c5eCq zx@~6Ct(sZWisnq;^!~cu>P=w(e~JKvx4{m+vm!_@Pd?$OoWF9lRzn3c!Pjh4^m mxU}3V&g=IE?XR`JxBHZSf6dQ;3>x0wWmpx_*~p;5zyJU##XaZ% literal 0 HcmV?d00001 diff --git a/src/main/resources/data/singlefile.txt b/src/main/resources/data/singlefile.txt new file mode 100644 index 0000000000000..29d6383b52c13 --- /dev/null +++ b/src/main/resources/data/singlefile.txt @@ -0,0 +1 @@ +100 diff --git a/src/main/resources/data/thirdarchive.tgz b/src/main/resources/data/thirdarchive.tgz new file mode 100644 index 0000000000000000000000000000000000000000..6284a890bbf267aa2dbcc690da2294ae05f0cc60 GIT binary patch literal 137 zcmb2|=3to2^EH@(`R%!lT!$P4S{|;g?W$(lc{Z--yzq&cENk>u|M;OM*3>HIB=GS6 zuEb>$^PgGGc~)%s>ujog;nk%ofme4;k&oQ|CuLFZ)3CEei(j8JS>3YZ@_g~>M}JE- k{fPLT9clN*tM>lCwPtd4wTukN;6tl*PEjfwg9ZZw0K + + ${jobTracker} + ${nameNode} + + + + + + + + + oozie.action.sharelib.for.spark + ${sparkTag} + + + yarn + cluster + SparkDistributedCacheOneFile + com.yahoo.spark.starter.distributedcache.SparkDistributedCacheSingleFile + spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar + --num-executors 3 --executor-memory 2g --executor-cores 1 --queue default + singlefile.txt + hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/singlefile.txt + + + + + + + + + + oozie.action.sharelib.for.spark + ${sparkTag} + + + yarn + cluster + SparkDistributedCacheOneFileWithHash + com.yahoo.spark.starter.distributedcache.SparkDistributedCacheSingleFile + spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar + --num-executors 3 --executor-memory 2g --executor-cores 1 --queue default + renamed.txt + hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/singlefile.txt#renamed.txt + + + + + + + + + + oozie.action.sharelib.for.spark + ${sparkTag} + + + yarn + cluster + SparkDistributedCacheThreeFiles + com.yahoo.spark.starter.distributedcache.SparkDistributedCacheThreeFiles + spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar + --num-executors 3 --executor-memory 2g --executor-cores 1 --queue default + firstfile.txt + renamedfile.txt + thirdfile.txt + hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/firstfile.txt + hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/secondfile.txt#renamedfile.txt + hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/thirdfile.txt + + + + + + + + + + oozie.action.sharelib.for.spark + ${sparkTag} + + + yarn + cluster + SparkDistributedCacheOneFileHashBadFile + com.yahoo.spark.starter.distributedcache.SparkDistributedCacheSingleFile + spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar + --num-executors 3 --executor-memory 2g --executor-cores 1 --queue default + singlefile.txt + hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/singlefile.txt#badfile.txt + + + + + + + + + + oozie.action.sharelib.for.spark + ${sparkTag} + + + yarn + cluster + SparkDistributedCacheNonExistFile + com.yahoo.spark.starter.distributedcache.SparkDistributedCacheSingleFile + spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar + --num-executors 3 --executor-memory 2g --executor-cores 1 --queue default + nonexistentfile.txt + hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/nonexistenfile.txt + + + + + + + + + + oozie.action.sharelib.for.spark + ${sparkTag} + + + yarn + cluster + SparkDistributedCacheOneFileFromHdfs + com.yahoo.spark.starter.distributedcache.SparkDistributedCacheSingleFile + spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar + --num-executors 3 --executor-memory 2g --executor-cores 1 --queue default + singlefile.txt + hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/singlefile.txt + + + + + + + + + + oozie.action.sharelib.for.spark + ${sparkTag} + + + yarn + cluster + SparkDistributedCacheOneArchive + com.yahoo.spark.starter.distributedcache.SparkDistributedCacheSingleArchive + spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar + --num-executors 3 --executor-memory 2g --executor-cores 1 --queue default + singlearchive.tgz + hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/singlearchive.tgz + + + + + + + + + + oozie.action.sharelib.for.spark + ${sparkTag} + + + yarn + cluster + SparkDistributedCacheOneArchiveWithHash + com.yahoo.spark.starter.distributedcache.SparkDistributedCacheSingleArchive + spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar + --num-executors 3 --executor-memory 2g --executor-cores 1 --queue default + renamed.tgz + hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/singlearchive.tgz#renamed.tgz + + + + + + + + + + oozie.action.sharelib.for.spark + ${sparkTag} + + + yarn + cluster + SparkDistributedCacheThreeArchives + com.yahoo.spark.starter.distributedcache.SparkDistributedCacheThreeArchives + spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar + --num-executors 3 --executor-memory 2g --executor-cores 1 --queue default + firstarchive.tgz + renamedarchive.tgz + thirdarchive.tgz + hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/firstarchive.tgz + hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/secondarchive.tgz#renamedarchive.tgz + hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/thirdarchive.tgz + + + + + + + + + + oozie.action.sharelib.for.spark + ${sparkTag} + + + yarn + cluster + SparkDistributedCacheOneArchiveHashBad + com.yahoo.spark.starter.distributedcache.SparkDistributedCacheSingleArchive + spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar + --num-executors 3 --executor-memory 2g --executor-cores 1 --queue default + singlearchive.tgz + hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/singlearchive.tgz#badfile.tgz + + + + + + + + + + oozie.action.sharelib.for.spark + ${sparkTag} + + + yarn + cluster + SparkDistributedCacheNonExistArchive + com.yahoo.spark.starter.distributedcache.SparkDistributedCacheSingleArchive + spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar + --num-executors 3 --executor-memory 2g --executor-cores 1 --queue default + nonexistenarchive.tgz + hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/nonexistenarchive.tgz + + + + + + + + + + oozie.action.sharelib.for.spark + ${sparkTag} + + + yarn + cluster + SparkDistributedCacheOneArchiveFromHdfs + com.yahoo.spark.starter.distributedcache.SparkDistributedCacheSingleArchive + spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar + --num-executors 3 --executor-memory 2g --executor-cores 1 --queue default + singlearchive.tgz + hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/singlearchive.tgz + + + + + + + + Workflow failed, error + message[${wf:errorMessage(wf:lastErrorNode())}] + + + + diff --git a/src/main/scala/com/yahoo/spark/starter/distributedcache/SparkDistributedCacheSingleArchive.scala b/src/main/scala/com/yahoo/spark/starter/distributedcache/SparkDistributedCacheSingleArchive.scala new file mode 100644 index 0000000000000..f142827491834 --- /dev/null +++ b/src/main/scala/com/yahoo/spark/starter/distributedcache/SparkDistributedCacheSingleArchive.scala @@ -0,0 +1,37 @@ +package com.yahoo.spark.starter.distributedcache + + +import org.apache.spark._ +import java.io.{FileReader, BufferedReader} + +object SparkDistributedCacheSingleArchive { + def main(args: Array[String]) { + if (args.length < 1) { + System.err.println("Usage: SparkDistributedCacheSingleArchive ") + System.exit(1) + } + val conf = new SparkConf().setAppName("SparkDistributedCacheSingleArchive ") + val spark = new SparkContext(conf) + + val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0)) + val result = spark.parallelize(testData).reduceByKey { + // archive expected which contains a file named singlefile.txt which contain single value of 100 + val in = new BufferedReader(new FileReader(args(0) + "/singlefile.txt")) + val fileVal = in.readLine().toInt + in.close() + _ * fileVal + _ * fileVal + }.collect() + println("result is: " + result) + val pass = (result.toSet == Set((1,200), (2,300), (3,500))) + println("pass is: " + pass) + + if (!pass) { + println("Error, set isn't as expected") + spark.stop() + // we have to throw for the spark application master to mark app as failed + throw new Exception("Error, set isn't as expected") + System.exit(1) + } + spark.stop() + } +} diff --git a/src/main/scala/com/yahoo/spark/starter/distributedcache/SparkDistributedCacheSingleFile.scala b/src/main/scala/com/yahoo/spark/starter/distributedcache/SparkDistributedCacheSingleFile.scala new file mode 100644 index 0000000000000..a1d971b0c9935 --- /dev/null +++ b/src/main/scala/com/yahoo/spark/starter/distributedcache/SparkDistributedCacheSingleFile.scala @@ -0,0 +1,37 @@ +package com.yahoo.spark.starter.distributedcache + + +import org.apache.spark._ +import java.io.{FileReader, BufferedReader} + +object SparkDistributedCacheSingleFile { + def main(args: Array[String]) { + if (args.length < 1) { + System.err.println("Usage: SparkDistributedCacheSingleFile ") + System.exit(1) + } + val conf = new SparkConf().setAppName("SparkDistributedCacheSingleFile") + val spark = new SparkContext(conf) + + val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0)) + val result = spark.parallelize(testData).reduceByKey { + // file expected to contain single value of 100 + val in = new BufferedReader(new FileReader(args(0))) + val fileVal = in.readLine().toInt + in.close() + _ * fileVal + _ * fileVal + }.collect() + println("result is: " + result) + val pass = (result.toSet == Set((1,200), (2,300), (3,500))) + println("pass is: " + pass) + + if (!pass) { + println("Error, set isn't as expected") + spark.stop() + // we have to throw for the spark application master to mark app as failed + throw new Exception("Error, set isn't as expected") + System.exit(1) + } + spark.stop() + } +} diff --git a/src/main/scala/com/yahoo/spark/starter/distributedcache/SparkDistributedCacheThreeArchives.scala b/src/main/scala/com/yahoo/spark/starter/distributedcache/SparkDistributedCacheThreeArchives.scala new file mode 100644 index 0000000000000..3abfc86cdd7b4 --- /dev/null +++ b/src/main/scala/com/yahoo/spark/starter/distributedcache/SparkDistributedCacheThreeArchives.scala @@ -0,0 +1,42 @@ +package com.yahoo.spark.starter.distributedcache + + +import org.apache.spark._ +import java.io.{FileReader, BufferedReader} + +object SparkDistributedCacheThreeArchives { + def main(args: Array[String]) { + if (args.length < 1) { + System.err.println("Usage: SparkDistributedCacheThreeArchives ") + System.exit(1) + } + val conf = new SparkConf().setAppName("SparkDistributedCacheThreeArchives") + val spark = new SparkContext(conf) + + val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0)) + val result = spark.parallelize(testData).reduceByKey { + // archives expected to contain single files name firstfile.txt, secondfile.txt, + // thirdfile.txt that have the value of 20, 30, 50 + val in = new BufferedReader(new FileReader(args(0) + "/firstfile.txt")) + val fileVal = in.readLine().toInt + val in2 = new BufferedReader(new FileReader(args(1) + "/secondfile.txt")) + val fileVal2 = in2.readLine().toInt + val in3 = new BufferedReader(new FileReader(args(2) + "/thirdfile.txt")) + val fileVal3 = in3.readLine().toInt + in.close() + _ * (fileVal + fileVal2 + fileVal3) + _ * (fileVal + fileVal2 + fileVal3) + }.collect() + println("result is: " + result) + val pass = (result.toSet == Set((1,200), (2,300), (3,500))) + println("pass is: " + pass) + + if (!pass) { + println("Error, set isn't as expected") + spark.stop() + // we have to throw for the spark application master to mark app as failed + throw new Exception("Error, set isn't as expected") + System.exit(1) + } + spark.stop() + } +} diff --git a/src/main/scala/com/yahoo/spark/starter/distributedcache/SparkDistributedCacheThreeFiles.scala b/src/main/scala/com/yahoo/spark/starter/distributedcache/SparkDistributedCacheThreeFiles.scala new file mode 100644 index 0000000000000..b5f11097c8010 --- /dev/null +++ b/src/main/scala/com/yahoo/spark/starter/distributedcache/SparkDistributedCacheThreeFiles.scala @@ -0,0 +1,41 @@ +package com.yahoo.spark.starter.distributedcache + + +import org.apache.spark._ +import java.io.{FileReader, BufferedReader} + +object SparkDistributedCacheThreeFiles { + def main(args: Array[String]) { + if (args.length < 1) { + System.err.println("Usage: SparkDistributedCacheThreeFiles ") + System.exit(1) + } + val conf = new SparkConf().setAppName("SparkDistributedCacheThreeFiles") + val spark = new SparkContext(conf) + + val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0)) + val result = spark.parallelize(testData).reduceByKey { + // files expected to contain single value of 20, 30, 50 + val in = new BufferedReader(new FileReader(args(0))) + val fileVal = in.readLine().toInt + val in2 = new BufferedReader(new FileReader(args(1))) + val fileVal2 = in2.readLine().toInt + val in3 = new BufferedReader(new FileReader(args(2))) + val fileVal3 = in3.readLine().toInt + in.close() + _ * (fileVal + fileVal2 + fileVal3) + _ * (fileVal + fileVal2 + fileVal3) + }.collect() + println("result is: " + result) + val pass = (result.toSet == Set((1,200), (2,300), (3,500))) + println("pass is: " + pass) + + if (!pass) { + println("Error, set isn't as expected") + spark.stop() + // we have to throw for the spark application master to mark app as failed + throw new Exception("Error, set isn't as expected") + System.exit(1) + } + spark.stop() + } +}