Skip to content

Commit

Permalink
SPARK-1446: Spark examples should not do a System.exit
Browse files Browse the repository at this point in the history
Spark examples should exit nice using SparkContext.stop() method, rather than System.exit
System.exit can cause issues like in SPARK-1407

Author: Sandeep <sandeep@techaddict.me>

Closes apache#370 from techaddict/1446 and squashes the following commits:

e9234cf [Sandeep] SPARK-1446: Spark examples should not do a System.exit Spark examples should exit nice using SparkContext.stop() method, rather than System.exit System.exit can cause issues like in SPARK-1407
  • Loading branch information
techaddict authored and pwendell committed Apr 10, 2014
1 parent 8ca3b2b commit e55cc4b
Show file tree
Hide file tree
Showing 35 changed files with 60 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,6 @@ public static void main(String[] args) {

System.out.print("Final w: ");
printWeights(w);
System.exit(0);
sc.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,6 @@ public Stats call(Stats stats, Stats stats2) {
for (Tuple2<?,?> t : output) {
System.out.println(t._1() + "\t" + t._2());
}
System.exit(0);
jsc.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,6 @@ public Double call(Double sum) {
System.out.println(tuple._1() + " has rank: " + tuple._2() + ".");
}

System.exit(0);
ctx.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,6 @@ public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> e) {
} while (nextCount != oldCount);

System.out.println("TC has " + tc.count() + " edges.");
System.exit(0);
sc.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ public Iterable<String> call(String s) {
return Arrays.asList(SPACE.split(s));
}
});

JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});

JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
Expand All @@ -67,6 +67,6 @@ public Integer call(Integer i1, Integer i2) {
for (Tuple2<?,?> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
}
System.exit(0);
ctx.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,6 @@ public static void main(String[] args) {
outputDir + "/productFeatures");
System.out.println("Final user/product features written to " + outputDir);

System.exit(0);
sc.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,6 @@ public static void main(String[] args) {
double cost = model.computeCost(points.rdd());
System.out.println("Cost: " + cost);

System.exit(0);
sc.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,6 @@ public static void main(String[] args) {

System.out.print("Final w: " + model.weights());

System.exit(0);
sc.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,6 @@ object BroadcastTest {
println("Iteration %d took %.0f milliseconds".format(i, (System.nanoTime - startTime) / 1E6))
}

System.exit(0)
sc.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ import org.apache.spark.SparkContext._
prod_id,
quantity) VALUES ('charlie', 1385983649000, 'iphone', 2);
*/

/**
* This example demonstrates how to read and write to cassandra column family created using CQL3
* using Spark.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ object ExceptionHandlingTest {
}
}

System.exit(0)
sc.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object GroupByTest {
"Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]")
System.exit(1)
}

var numMappers = if (args.length > 1) args(1).toInt else 2
var numKVPairs = if (args.length > 2) args(2).toInt else 1000
var valSize = if (args.length > 3) args(3).toInt else 1000
Expand All @@ -52,7 +52,6 @@ object GroupByTest {

println(pairs1.groupByKey(numReducers).count)

System.exit(0)
sc.stop()
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object HBaseTest {

val conf = HBaseConfiguration.create()

// Other options for configuring scan behavior are available. More information available at
// Other options for configuring scan behavior are available. More information available at
// http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html
conf.set(TableInputFormat.INPUT_TABLE, args(1))

Expand All @@ -41,12 +41,12 @@ object HBaseTest {
admin.createTable(tableDesc)
}

val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])

hBaseRDD.count()

System.exit(0)
sc.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ object HdfsTest {
val end = System.currentTimeMillis()
println("Iteration " + iter + " took " + (end-start) + " ms")
}
System.exit(0)
sc.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ object LocalALS {
}
}
printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS)

val R = generateR()

// Initialize m and u randomly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ object MultiBroadcastTest {
// Collect the small RDD so we can print the observed sizes locally.
observedSizes.collect().foreach(i => println(i))

System.exit(0)
sc.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ object SimpleSkewedGroupByTest {
System.err.println("Usage: SimpleSkewedGroupByTest <master> " +
"[numMappers] [numKVPairs] [valSize] [numReducers] [ratio]")
System.exit(1)
}
}

var numMappers = if (args.length > 1) args(1).toInt else 2
var numKVPairs = if (args.length > 2) args(2).toInt else 1000
Expand Down Expand Up @@ -58,14 +58,13 @@ object SimpleSkewedGroupByTest {
}.cache
// Enforce that everything has been calculated and in cache
pairs1.count

println("RESULT: " + pairs1.groupByKey(numReducers).count)
// Print how many keys each reducer got (for debugging)
// println("RESULT: " + pairs1.groupByKey(numReducers)
// .map{case (k,v) => (k, v.size)}
// .collectAsMap)

System.exit(0)
sc.stop()
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ object SkewedGroupByTest {
System.err.println(
"Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]")
System.exit(1)
}
}

var numMappers = if (args.length > 1) args(1).toInt else 2
var numKVPairs = if (args.length > 2) args(2).toInt else 1000
Expand All @@ -53,10 +53,9 @@ object SkewedGroupByTest {
}.cache()
// Enforce that everything has been calculated and in cache
pairs1.count()

println(pairs1.groupByKey(numReducers).count())

System.exit(0)
sc.stop()
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ object SparkALS {

val sc = new SparkContext(host, "SparkALS",
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))

val R = generateR()

// Initialize m and u randomly
Expand All @@ -137,6 +137,6 @@ object SparkALS {
println()
}

System.exit(0)
sc.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object SparkHdfsLR {
val inputPath = args(1)
val conf = SparkHadoopUtil.get.newConfiguration()
val sc = new SparkContext(args(0), "SparkHdfsLR",
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass), Map(),
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass), Map(),
InputFormatInfo.computePreferredLocations(
Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath))
))
Expand Down
20 changes: 10 additions & 10 deletions examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,24 @@ import org.apache.spark.SparkContext._
object SparkKMeans {
val R = 1000 // Scaling factor
val rand = new Random(42)

def parseVector(line: String): Vector = {
new Vector(line.split(' ').map(_.toDouble))
}

def closestPoint(p: Vector, centers: Array[Vector]): Int = {
var index = 0
var bestIndex = 0
var closest = Double.PositiveInfinity

for (i <- 0 until centers.length) {
val tempDist = p.squaredDist(centers(i))
if (tempDist < closest) {
closest = tempDist
bestIndex = i
}
}

bestIndex
}

Expand All @@ -60,22 +60,22 @@ object SparkKMeans {
val data = lines.map(parseVector _).cache()
val K = args(2).toInt
val convergeDist = args(3).toDouble

val kPoints = data.takeSample(withReplacement = false, K, 42).toArray
var tempDist = 1.0

while(tempDist > convergeDist) {
val closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))

val pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)}

val newPoints = pointStats.map {pair => (pair._1, pair._2._1 / pair._2._2)}.collectAsMap()

tempDist = 0.0
for (i <- 0 until K) {
tempDist += kPoints(i).squaredDist(newPoints(i))
}

for (newP <- newPoints) {
kPoints(newP._1) = newP._2
}
Expand All @@ -84,6 +84,6 @@ object SparkKMeans {

println("Final centers:")
kPoints.foreach(println)
System.exit(0)
sc.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,6 @@ object SparkLR {
}

println("Final w: " + w)
System.exit(0)
sc.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ object SparkPageRank {
val output = ranks.collect()
output.foreach(tup => println(tup._1 + " has rank: " + tup._2 + "."))

System.exit(0)
ctx.stop()
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,6 @@ object SparkTC {
} while (nextCount != oldCount)

println("TC has " + tc.count() + " edges.")
System.exit(0)
spark.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,6 @@ object SparkTachyonHdfsLR {
}

println("Final w: " + w)
System.exit(0)
sc.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ object SparkTachyonPi {
}
val spark = new SparkContext(args(0), "SparkTachyonPi",
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))

val slices = if (args.length > 1) args(1).toInt else 2
val n = 100000 * slices

val rdd = spark.parallelize(1 to n, slices)
rdd.persist(StorageLevel.OFF_HEAP)
val count = rdd.map { i =>
Expand All @@ -46,7 +46,7 @@ object SparkTachyonPi {
if (x * x + y * y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / n)

spark.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ object WikipediaPageRankStandalone {
val time = (System.currentTimeMillis - startTime) / 1000.0
println("Completed %d iterations in %f seconds: %f seconds per iteration"
.format(numIterations, time, time / numIterations))
System.exit(0)
sc.stop()
}

def parseArticle(line: String): (String, Array[String]) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/

package org.apache.spark.examples.mllib

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg.Vectors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/

package org.apache.spark.examples.mllib

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg.Vectors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,3 @@ object HdfsWordCount {
ssc.awaitTermination()
}
}

Loading

0 comments on commit e55cc4b

Please sign in to comment.