Skip to content

Commit

Permalink
[YSPARK-1595] Move TestSparkDistributedCache to spark-starter (apache#21
Browse files Browse the repository at this point in the history
)

* Add spark distribued cache oozie example

* Delete script and update workflow

* Update distributed cache to access file from hdfs

* Add README and update sparkTag
  • Loading branch information
Baohe Zhang authored and GitHub Enterprise committed Jun 2, 2020
1 parent 5d7f716 commit d20ba6c
Show file tree
Hide file tree
Showing 15 changed files with 460 additions and 0 deletions.
Binary file added src/main/resources/data/firstarchive.tgz
Binary file not shown.
1 change: 1 addition & 0 deletions src/main/resources/data/firstfile.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
20
Binary file added src/main/resources/data/secondarchive.tgz
Binary file not shown.
1 change: 1 addition & 0 deletions src/main/resources/data/secondfile.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
30
Binary file added src/main/resources/data/singlearchive.tgz
Binary file not shown.
1 change: 1 addition & 0 deletions src/main/resources/data/singlefile.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
100
Binary file added src/main/resources/data/thirdarchive.tgz
Binary file not shown.
1 change: 1 addition & 0 deletions src/main/resources/data/thirdfile.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
50
17 changes: 17 additions & 0 deletions src/main/resources/oozie/spark_distributed_cache/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
Instructions for running this oozie application:

- create a directory `spark_distributed_cache/` in HDFS for the oozie application.

- upload `workflow.xml` to `spark_distributed_cache/apps/spark/`.

- use `mvn clean package` to create the jar package of spark-starter if you haven't done so.

- upload the jar package `spark-starter/target/spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar` to `spark_distributed_cache/apps/lib/`.

- upload resource files `singlefile.txt, firstfile.txt, secondfile.txt, thirdfile.txt, singlearchive.tgz, firstarchive.tgz, secondarchive.tgz, thirdarchive.tgz` in `spark-starter/src/main/resources/data/` to `spark_distributed_cache/data/`.

- update `nameNode` and `jobTracker` in `job.properties` if you are running on the cluster other than AR.

- export OOZIE_URL, for example, `export OOZIE_URL=https://axonitered-oozie.red.ygrid.yahoo.com:4443/oozie/`.

- submit the oozie job using `oozie job -run -config job.properties -auth KERBEROS`
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
nameNode=hdfs://axonitered-nn1.red.ygrid.yahoo.com:8020
jobTracker=axonitered-jt1.red.ygrid.yahoo.com:8032
wfRoot=spark_distributed_cache
sparkTag=spark_latest
oozie.libpath=/user/${user.name}/${wfRoot}/apps/lib
oozie.wf.application.path=${nameNode}/user/${user.name}/${wfRoot}/apps/spark
276 changes: 276 additions & 0 deletions src/main/resources/oozie/spark_distributed_cache/workflow.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
<workflow-app xmlns='uri:oozie:workflow:0.5' name='SparkDistribuedCacheOozieTest'>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
</global>

<start to='SparkDistributedCacheOneFile' />

<action name='SparkDistributedCacheOneFile'>
<spark xmlns="uri:oozie:spark-action:0.2">
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${sparkTag}</value>
</property>
</configuration>
<master>yarn</master>
<mode>cluster</mode>
<name>SparkDistributedCacheOneFile</name>
<class>com.yahoo.spark.starter.distributedcache.SparkDistributedCacheSingleFile</class>
<jar>spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar</jar>
<spark-opts>--num-executors 3 --executor-memory 2g --executor-cores 1 --queue default</spark-opts>
<arg>singlefile.txt</arg>
<file>hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/singlefile.txt</file>
</spark>
<ok to="SparkDistributedCacheOneFileWithHash" />
<error to="fail" />
</action>

<action name='SparkDistributedCacheOneFileWithHash'>
<spark xmlns="uri:oozie:spark-action:0.2">
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${sparkTag}</value>
</property>
</configuration>
<master>yarn</master>
<mode>cluster</mode>
<name>SparkDistributedCacheOneFileWithHash</name>
<class>com.yahoo.spark.starter.distributedcache.SparkDistributedCacheSingleFile</class>
<jar>spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar</jar>
<spark-opts>--num-executors 3 --executor-memory 2g --executor-cores 1 --queue default</spark-opts>
<arg>renamed.txt</arg>
<file>hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/singlefile.txt#renamed.txt</file>
</spark>
<ok to="SparkDistributedCacheThreeFiles" />
<error to="fail" />
</action>

<action name='SparkDistributedCacheThreeFiles'>
<spark xmlns="uri:oozie:spark-action:0.2">
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${sparkTag}</value>
</property>
</configuration>
<master>yarn</master>
<mode>cluster</mode>
<name>SparkDistributedCacheThreeFiles</name>
<class>com.yahoo.spark.starter.distributedcache.SparkDistributedCacheThreeFiles</class>
<jar>spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar</jar>
<spark-opts>--num-executors 3 --executor-memory 2g --executor-cores 1 --queue default</spark-opts>
<arg>firstfile.txt</arg>
<arg>renamedfile.txt</arg>
<arg>thirdfile.txt</arg>
<file>hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/firstfile.txt</file>
<file>hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/secondfile.txt#renamedfile.txt</file>
<file>hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/thirdfile.txt</file>
</spark>
<ok to="SparkDistributedCacheOneFileHashBadFile" />
<error to="fail" />
</action>

<action name='SparkDistributedCacheOneFileHashBadFile'>
<spark xmlns="uri:oozie:spark-action:0.2">
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${sparkTag}</value>
</property>
</configuration>
<master>yarn</master>
<mode>cluster</mode>
<name>SparkDistributedCacheOneFileHashBadFile</name>
<class>com.yahoo.spark.starter.distributedcache.SparkDistributedCacheSingleFile</class>
<jar>spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar</jar>
<spark-opts>--num-executors 3 --executor-memory 2g --executor-cores 1 --queue default</spark-opts>
<arg>singlefile.txt</arg>
<file>hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/singlefile.txt#badfile.txt</file>
</spark>
<ok to="fail" />
<error to="SparkDistributedCacheNonExistFile" />
</action>

<action name='SparkDistributedCacheNonExistFile'>
<spark xmlns="uri:oozie:spark-action:0.2">
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${sparkTag}</value>
</property>
</configuration>
<master>yarn</master>
<mode>cluster</mode>
<name>SparkDistributedCacheNonExistFile</name>
<class>com.yahoo.spark.starter.distributedcache.SparkDistributedCacheSingleFile</class>
<jar>spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar</jar>
<spark-opts>--num-executors 3 --executor-memory 2g --executor-cores 1 --queue default</spark-opts>
<arg>nonexistentfile.txt</arg>
<file>hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/nonexistenfile.txt</file>
</spark>
<ok to="fail" />
<error to="SparkDistributedCacheOneFileFromHdfs" />
</action>

<action name='SparkDistributedCacheOneFileFromHdfs'>
<spark xmlns="uri:oozie:spark-action:0.2">
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${sparkTag}</value>
</property>
</configuration>
<master>yarn</master>
<mode>cluster</mode>
<name>SparkDistributedCacheOneFileFromHdfs</name>
<class>com.yahoo.spark.starter.distributedcache.SparkDistributedCacheSingleFile</class>
<jar>spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar</jar>
<spark-opts>--num-executors 3 --executor-memory 2g --executor-cores 1 --queue default</spark-opts>
<arg>singlefile.txt</arg>
<file>hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/singlefile.txt</file>
</spark>
<ok to="SparkDistributedCacheOneArchive" />
<error to="fail" />
</action>

<action name='SparkDistributedCacheOneArchive'>
<spark xmlns="uri:oozie:spark-action:0.2">
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${sparkTag}</value>
</property>
</configuration>
<master>yarn</master>
<mode>cluster</mode>
<name>SparkDistributedCacheOneArchive</name>
<class>com.yahoo.spark.starter.distributedcache.SparkDistributedCacheSingleArchive</class>
<jar>spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar</jar>
<spark-opts>--num-executors 3 --executor-memory 2g --executor-cores 1 --queue default</spark-opts>
<arg>singlearchive.tgz</arg>
<archive>hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/singlearchive.tgz</archive>
</spark>
<ok to="SparkDistributedCacheOneArchiveWithHash" />
<error to="fail" />
</action>

<action name='SparkDistributedCacheOneArchiveWithHash'>
<spark xmlns="uri:oozie:spark-action:0.2">
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${sparkTag}</value>
</property>
</configuration>
<master>yarn</master>
<mode>cluster</mode>
<name>SparkDistributedCacheOneArchiveWithHash</name>
<class>com.yahoo.spark.starter.distributedcache.SparkDistributedCacheSingleArchive</class>
<jar>spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar</jar>
<spark-opts>--num-executors 3 --executor-memory 2g --executor-cores 1 --queue default</spark-opts>
<arg>renamed.tgz</arg>
<archive>hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/singlearchive.tgz#renamed.tgz</archive>
</spark>
<ok to="SparkDistributedCacheThreeArchives" />
<error to="fail" />
</action>

<action name='SparkDistributedCacheThreeArchives'>
<spark xmlns="uri:oozie:spark-action:0.2">
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${sparkTag}</value>
</property>
</configuration>
<master>yarn</master>
<mode>cluster</mode>
<name>SparkDistributedCacheThreeArchives</name>
<class>com.yahoo.spark.starter.distributedcache.SparkDistributedCacheThreeArchives</class>
<jar>spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar</jar>
<spark-opts>--num-executors 3 --executor-memory 2g --executor-cores 1 --queue default</spark-opts>
<arg>firstarchive.tgz</arg>
<arg>renamedarchive.tgz</arg>
<arg>thirdarchive.tgz</arg>
<archive>hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/firstarchive.tgz</archive>
<archive>hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/secondarchive.tgz#renamedarchive.tgz</archive>
<archive>hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/thirdarchive.tgz</archive>
</spark>
<ok to="SparkDistributedCacheOneArchiveHashBad" />
<error to="fail" />
</action>

<action name='SparkDistributedCacheOneArchiveHashBad'>
<spark xmlns="uri:oozie:spark-action:0.2">
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${sparkTag}</value>
</property>
</configuration>
<master>yarn</master>
<mode>cluster</mode>
<name>SparkDistributedCacheOneArchiveHashBad</name>
<class>com.yahoo.spark.starter.distributedcache.SparkDistributedCacheSingleArchive</class>
<jar>spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar</jar>
<spark-opts>--num-executors 3 --executor-memory 2g --executor-cores 1 --queue default</spark-opts>
<arg>singlearchive.tgz</arg>
<archive>hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/singlearchive.tgz#badfile.tgz</archive>
</spark>
<ok to="fail" />
<error to="SparkDistributedCacheNonExistArchive" />
</action>

<action name='SparkDistributedCacheNonExistArchive'>
<spark xmlns="uri:oozie:spark-action:0.2">
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${sparkTag}</value>
</property>
</configuration>
<master>yarn</master>
<mode>cluster</mode>
<name>SparkDistributedCacheNonExistArchive</name>
<class>com.yahoo.spark.starter.distributedcache.SparkDistributedCacheSingleArchive</class>
<jar>spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar</jar>
<spark-opts>--num-executors 3 --executor-memory 2g --executor-cores 1 --queue default</spark-opts>
<arg>nonexistenarchive.tgz</arg>
<archive>hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/nonexistenarchive.tgz</archive>
</spark>
<ok to="fail" />
<error to="SparkDistributedCacheOneArchiveFromHdfs" />
</action>

<action name='SparkDistributedCacheOneArchiveFromHdfs'>
<spark xmlns="uri:oozie:spark-action:0.2">
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${sparkTag}</value>
</property>
</configuration>
<master>yarn</master>
<mode>cluster</mode>
<name>SparkDistributedCacheOneArchiveFromHdfs</name>
<class>com.yahoo.spark.starter.distributedcache.SparkDistributedCacheSingleArchive</class>
<jar>spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar</jar>
<spark-opts>--num-executors 3 --executor-memory 2g --executor-cores 1 --queue default</spark-opts>
<arg>singlearchive.tgz</arg>
<archive>hdfs:///user/${wf:conf('user.name')}/${wfRoot}/data/singlearchive.tgz</archive>
</spark>
<ok to="end" />
<error to="fail" />
</action>


<kill name="fail">
<message>Workflow failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]
</message>
</kill>
<end name='end' />
</workflow-app>
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.yahoo.spark.starter.distributedcache


import org.apache.spark._
import java.io.{FileReader, BufferedReader}

object SparkDistributedCacheSingleArchive {
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println("Usage: SparkDistributedCacheSingleArchive <inputfile>")
System.exit(1)
}
val conf = new SparkConf().setAppName("SparkDistributedCacheSingleArchive ")
val spark = new SparkContext(conf)

val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
val result = spark.parallelize(testData).reduceByKey {
// archive expected which contains a file named singlefile.txt which contain single value of 100
val in = new BufferedReader(new FileReader(args(0) + "/singlefile.txt"))
val fileVal = in.readLine().toInt
in.close()
_ * fileVal + _ * fileVal
}.collect()
println("result is: " + result)
val pass = (result.toSet == Set((1,200), (2,300), (3,500)))
println("pass is: " + pass)

if (!pass) {
println("Error, set isn't as expected")
spark.stop()
// we have to throw for the spark application master to mark app as failed
throw new Exception("Error, set isn't as expected")
System.exit(1)
}
spark.stop()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.yahoo.spark.starter.distributedcache


import org.apache.spark._
import java.io.{FileReader, BufferedReader}

object SparkDistributedCacheSingleFile {
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println("Usage: SparkDistributedCacheSingleFile <inputfile>")
System.exit(1)
}
val conf = new SparkConf().setAppName("SparkDistributedCacheSingleFile")
val spark = new SparkContext(conf)

val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
val result = spark.parallelize(testData).reduceByKey {
// file expected to contain single value of 100
val in = new BufferedReader(new FileReader(args(0)))
val fileVal = in.readLine().toInt
in.close()
_ * fileVal + _ * fileVal
}.collect()
println("result is: " + result)
val pass = (result.toSet == Set((1,200), (2,300), (3,500)))
println("pass is: " + pass)

if (!pass) {
println("Error, set isn't as expected")
spark.stop()
// we have to throw for the spark application master to mark app as failed
throw new Exception("Error, set isn't as expected")
System.exit(1)
}
spark.stop()
}
}
Loading

0 comments on commit d20ba6c

Please sign in to comment.