Skip to content

Commit

Permalink
add the SparkTachyonHdfsLR example and some comments
Browse files Browse the repository at this point in the history
  • Loading branch information
RongGu committed Mar 27, 2014
1 parent fd84156 commit e700d9c
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples

import java.util.Random
import scala.math.exp
import org.apache.spark.util.Vector
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.InputFormatInfo
import org.apache.spark.storage.StorageLevel

/**
* Logistic regression based classification.
* This example uses Tachyon to persist rdds during computation.
*/
object SparkTachyonHdfsLR {
val D = 10 // Numer of dimensions
val rand = new Random(42)

case class DataPoint(x: Vector, y: Double)

def parsePoint(line: String): DataPoint = {
//val nums = line.split(' ').map(_.toDouble)
//return DataPoint(new Vector(nums.slice(1, D+1)), nums(0))
val tok = new java.util.StringTokenizer(line, " ")
var y = tok.nextToken.toDouble
var x = new Array[Double](D)
var i = 0
while (i < D) {
x(i) = tok.nextToken.toDouble; i += 1
}
DataPoint(new Vector(x), y)
}

def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: SparkTachyonHdfsLR <master> <file> <iters>")
System.exit(1)
}
val inputPath = args(1)
val conf = SparkHadoopUtil.get.newConfiguration()
val sc = new SparkContext(args(0), "SparkTachyonHdfsLR",
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass), Map(),
InputFormatInfo.computePreferredLocations(
Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath))
))
val lines = sc.textFile(inputPath)
val points = lines.map(parsePoint _).persist(StorageLevel.TACHYON)
val ITERATIONS = args(2).toInt

// Initialize w to a random value
var w = Vector(D, _ => 2 * rand.nextDouble - 1)
println("Initial w: " + w)

for (i <- 1 to ITERATIONS) {
println("On iteration " + i)
val gradient = points.map { p =>
(1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y * p.x
}.reduce(_ + _)
w -= gradient
}

println("Final w: " + w)
System.exit(0)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import scala.math.random
import org.apache.spark._
import org.apache.spark.storage.StorageLevel

/** Computes an approximation to pi */
/** Computes an approximation to pi
* This example uses Tachyon to persist rdds during computation.
*/
object SparkTachyonPi {
def main(args: Array[String]) {
if (args.length == 0) {
Expand Down

0 comments on commit e700d9c

Please sign in to comment.