Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
Browse files Browse the repository at this point in the history
  • Loading branch information
yhuai committed Jul 11, 2014
2 parents eca7d04 + b23e9c3 commit fc649d7
Show file tree
Hide file tree
Showing 21 changed files with 235 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ private[spark] class Master(
val waitingApps = new ArrayBuffer[ApplicationInfo]
val completedApps = new ArrayBuffer[ApplicationInfo]
var nextAppNumber = 0

val appIdToUI = new HashMap[String, SparkUI]
val fileSystemsUsed = new HashSet[FileSystem]

val drivers = new HashSet[DriverInfo]
val completedDrivers = new ArrayBuffer[DriverInfo]
Expand Down Expand Up @@ -159,7 +157,6 @@ private[spark] class Master(
recoveryCompletionTask.cancel()
}
webUi.stop()
fileSystemsUsed.foreach(_.close())
masterMetricsSystem.stop()
applicationMetricsSystem.stop()
persistenceEngine.close()
Expand Down
9 changes: 9 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,15 @@ Apart from these, the following properties are also available, and may be useful
into blocks of data before storing them in Spark.
</td>
</tr>
<tr>
<td><code>spark.streaming.receiver.maxRate</code></td>
<td>infinite</td>
<td>
Maximum rate (per second) at which each receiver will push data into blocks. Effectively,
each stream will consume at most this number of records per second.
Setting this configuration to 0 or a negative number will put no limit on the rate.
</td>
</tr>
<tr>
<td><code>spark.streaming.unpersist</code></td>
<td>true</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ object HBaseTest {
val conf = HBaseConfiguration.create()
// Other options for configuring scan behavior are available. More information available at
// http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html
conf.set(TableInputFormat.INPUT_TABLE, args(1))
conf.set(TableInputFormat.INPUT_TABLE, args(0))

// Initialize hBase table if necessary
val admin = new HBaseAdmin(conf)
if(!admin.isTableAvailable(args(1))) {
val tableDesc = new HTableDescriptor(args(1))
if (!admin.isTableAvailable(args(0))) {
val tableDesc = new HTableDescriptor(args(0))
admin.createTable(tableDesc)
}

Expand Down
10 changes: 8 additions & 2 deletions examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,22 @@ package org.apache.spark.examples

import org.apache.spark._


object HdfsTest {

/** Usage: HdfsTest [file] */
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println("Usage: HdfsTest <file>")
System.exit(1)
}
val sparkConf = new SparkConf().setAppName("HdfsTest")
val sc = new SparkContext(sparkConf)
val file = sc.textFile(args(1))
val file = sc.textFile(args(0))
val mapped = file.map(s => s.length).cache()
for (iter <- 1 to 10) {
val start = System.currentTimeMillis()
for (x <- mapped) { x + 2 }
// println("Processing: " + x)
val end = System.currentTimeMillis()
println("Iteration " + iter + " took " + (end-start) + " ms")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ import org.apache.spark.{SparkConf, SparkContext}
*/
object SparkPageRank {
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println("Usage: SparkPageRank <file> <iter>")
System.exit(1)
}
val sparkConf = new SparkConf().setAppName("PageRank")
var iters = args(1).toInt
val iters = if (args.length > 0) args(1).toInt else 10
val ctx = new SparkContext(sparkConf)
val lines = ctx.textFile(args(0), 1)
val links = lines.map{ s =>
Expand Down
6 changes: 4 additions & 2 deletions graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,16 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
/**
* Repartitions the edges in the graph according to `partitionStrategy`.
*
* @param the partitioning strategy to use when partitioning the edges in the graph.
* @param partitionStrategy the partitioning strategy to use when partitioning the edges
* in the graph.
*/
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]

/**
* Repartitions the edges in the graph according to `partitionStrategy`.
*
* @param the partitioning strategy to use when partitioning the edges in the graph.
* @param partitionStrategy the partitioning strategy to use when partitioning the edges
* in the graph.
* @param numPartitions the number of edge partitions in the new graph.
*/
def partitionBy(partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED]
Expand Down
4 changes: 2 additions & 2 deletions graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ class VertexRDD[@specialized VD: ClassTag](
/**
* Left joins this RDD with another VertexRDD with the same index. This function will fail if
* both VertexRDDs do not share the same index. The resulting vertex set contains an entry for
* each
* vertex in `this`. If `other` is missing any vertex in this VertexRDD, `f` is passed `None`.
* each vertex in `this`.
* If `other` is missing any vertex in this VertexRDD, `f` is passed `None`.
*
* @tparam VD2 the attribute type of the other VertexRDD
* @tparam VD3 the attribute type of the resulting VertexRDD
Expand Down
13 changes: 13 additions & 0 deletions mllib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,19 @@
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<profile>
<id>netlib-lgpl</id>
<dependencies>
<dependency>
<groupId>com.github.fommil.netlib</groupId>
<artifactId>all</artifactId>
<version>1.1.2</version>
<type>pom</type>
</dependency>
</dependencies>
</profile>
</profiles>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
Expand Down
9 changes: 7 additions & 2 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,21 @@ object SparkBuild extends PomBuild {
profiles
}

override val profiles = Properties.envOrNone("MAVEN_PROFILES") match {
override val profiles = Properties.envOrNone("SBT_MAVEN_PROFILES") match {
case None => backwardCompatibility
// Rationale: If -P option exists no need to support backwardCompatibility.
case Some(v) =>
if (backwardCompatibility.nonEmpty)
println("Note: We ignore environment variables, when use of profile is detected in " +
"conjunction with environment variable.")
v.split("(\\s+|,)").filterNot(_.isEmpty).map(_.trim.replaceAll("-P", "")).toSeq
}

Properties.envOrNone("SBT_MAVEN_PROPERTIES") match {
case Some(v) =>
v.split("(\\s+|,)").filterNot(_.isEmpty).map(_.split("=")).foreach(x => System.setProperty(x(0), x(1)))
case _ =>
}

override val userPropertiesMap = System.getProperties.toMap

lazy val sharedSettings = graphSettings ++ ScalaStyleSettings ++ Seq (
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def preexec_func():
(stdout, _) = proc.communicate()
exit_code = proc.poll()
error_msg = "Launching GatewayServer failed"
error_msg += " with exit code %d!" % exit_code if exit_code else "! "
error_msg += " with exit code %d! " % exit_code if exit_code else "! "
error_msg += "(Warning: unexpected output detected.)\n\n"
error_msg += gateway_port + stdout
raise Exception(error_msg)
Expand Down
2 changes: 1 addition & 1 deletion sbt/sbt-launch-lib.bash
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ addJava () {
enableProfile () {
dlog "[enableProfile] arg = '$1'"
maven_profiles=( "${maven_profiles[@]}" "$1" )
export MAVEN_PROFILES="${maven_profiles[@]}"
export SBT_MAVEN_PROFILES="${maven_profiles[@]}"
}

addSbt () {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.apache.spark.sql.catalyst.types.DataType
import org.apache.spark.sql.catalyst.types.StringType
import org.apache.spark.sql.catalyst.types.BooleanType


trait StringRegexExpression {
self: BinaryExpression =>

Expand All @@ -32,7 +31,7 @@ trait StringRegexExpression {
def escape(v: String): String
def matches(regex: Pattern, str: String): Boolean

def nullable: Boolean = true
def nullable: Boolean = left.nullable || right.nullable
def dataType: DataType = BooleanType

// try cache the pattern for Literal
Expand Down Expand Up @@ -157,19 +156,13 @@ case class Lower(child: Expression) extends UnaryExpression with CaseConversionE
override def toString() = s"Lower($child)"
}

/** A base class for functions that compare two strings, returning a boolean. */
abstract class StringComparison extends Expression {
self: Product =>
/** A base trait for functions that compare two strings, returning a boolean. */
trait StringComparison {
self: BinaryExpression =>

type EvaluatedType = Any

def left: Expression
def right: Expression

override def references = children.flatMap(_.references).toSet
override def children = left :: right :: Nil

override def nullable: Boolean = true
def nullable: Boolean = left.nullable || right.nullable
override def dataType: DataType = BooleanType

def compare(l: String, r: String): Boolean
Expand All @@ -184,26 +177,31 @@ abstract class StringComparison extends Expression {
}
}

def symbol: String = nodeName

override def toString() = s"$nodeName($left, $right)"
}

/**
* A function that returns true if the string `left` contains the string `right`.
*/
case class Contains(left: Expression, right: Expression) extends StringComparison {
case class Contains(left: Expression, right: Expression)
extends BinaryExpression with StringComparison {
override def compare(l: String, r: String) = l.contains(r)
}

/**
* A function that returns true if the string `left` starts with the string `right`.
*/
case class StartsWith(left: Expression, right: Expression) extends StringComparison {
case class StartsWith(left: Expression, right: Expression)
extends BinaryExpression with StringComparison {
def compare(l: String, r: String) = l.startsWith(r)
}

/**
* A function that returns true if the string `left` ends with the string `right`.
*/
case class EndsWith(left: Expression, right: Expression) extends StringComparison {
case class EndsWith(left: Expression, right: Expression)
extends BinaryExpression with StringComparison {
def compare(l: String, r: String) = l.endsWith(r)
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ object LikeSimplification extends Rule[LogicalPlan] {
val startsWith = "([^_%]+)%".r
val endsWith = "%([^_%]+)".r
val contains = "%([^_%]+)%".r
val equalTo = "([^_%]*)".r

def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
case Like(l, Literal(startsWith(pattern), StringType)) if !pattern.endsWith("\\") =>
Expand All @@ -131,6 +132,8 @@ object LikeSimplification extends Rule[LogicalPlan] {
EndsWith(l, Literal(pattern))
case Like(l, Literal(contains(pattern), StringType)) if !pattern.endsWith("\\") =>
Contains(l, Literal(pattern))
case Like(l, Literal(equalTo(pattern), StringType)) =>
EqualTo(l, Literal(pattern))
}
}

Expand Down
20 changes: 20 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,26 @@ class SchemaRDD(
def unionAll(otherPlan: SchemaRDD) =
new SchemaRDD(sqlContext, Union(logicalPlan, otherPlan.logicalPlan))

/**
* Performs a relational except on two SchemaRDDs
*
* @param otherPlan the [[SchemaRDD]] that should be excepted from this one.
*
* @group Query
*/
def except(otherPlan: SchemaRDD): SchemaRDD =
new SchemaRDD(sqlContext, Except(logicalPlan, otherPlan.logicalPlan))

/**
* Performs a relational intersect on two SchemaRDDs
*
* @param otherPlan the [[SchemaRDD]] that should be intersected with this one.
*
* @group Query
*/
def intersect(otherPlan: SchemaRDD): SchemaRDD =
new SchemaRDD(sqlContext, Intersect(logicalPlan, otherPlan.logicalPlan))

/**
* Filters tuples using a function over the value of the specified column.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,9 @@ private[parquet] class CatalystGroupConverter(
this(attributes.map(a => new FieldType(a.name, a.dataType, a.nullable)), 0, null)

protected [parquet] val converters: Array[Converter] =
schema.map(field =>
CatalystConverter.createConverter(field, schema.indexOf(field), this))
.toArray
schema.zipWithIndex.map {
case (field, idx) => CatalystConverter.createConverter(field, idx, this)
}.toArray

override val size = schema.size

Expand Down Expand Up @@ -288,9 +288,9 @@ private[parquet] class CatalystPrimitiveRowConverter(
new ParquetRelation.RowType(attributes.length))

protected [parquet] val converters: Array[Converter] =
schema.map(field =>
CatalystConverter.createConverter(field, schema.indexOf(field), this))
.toArray
schema.zipWithIndex.map {
case (field, idx) => CatalystConverter.createConverter(field, idx, this)
}.toArray

override val size = schema.size

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
writer.startMessage()
while(index < attributes.size) {
// null values indicate optional fields but we do not check currently
if (record(index) != null && record(index) != Nil) {
if (record(index) != null) {
writer.startField(attributes(index).name, index)
writeValue(attributes(index).dataType, record(index))
writer.endField(attributes(index).name, index)
Expand All @@ -167,7 +167,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
}

private[parquet] def writeValue(schema: DataType, value: Any): Unit = {
if (value != null && value != Nil) {
if (value != null) {
schema match {
case t @ ArrayType(_) => writeArray(
t,
Expand All @@ -184,7 +184,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
}

private[parquet] def writePrimitive(schema: PrimitiveType, value: Any): Unit = {
if (value != null && value != Nil) {
if (value != null) {
schema match {
case StringType => writer.addBinary(
Binary.fromByteArray(
Expand All @@ -206,12 +206,12 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
private[parquet] def writeStruct(
schema: StructType,
struct: CatalystConverter.StructScalaType[_]): Unit = {
if (struct != null && struct != Nil) {
if (struct != null) {
val fields = schema.fields.toArray
writer.startGroup()
var i = 0
while(i < fields.size) {
if (struct(i) != null && struct(i) != Nil) {
if (struct(i) != null) {
writer.startField(fields(i).name, i)
writeValue(fields(i).dataType, struct(i))
writer.endField(fields(i).name, i)
Expand Down
21 changes: 21 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,25 @@ class DslQuerySuite extends QueryTest {
test("zero count") {
assert(emptyTableData.count() === 0)
}

test("except") {
checkAnswer(
lowerCaseData.except(upperCaseData),
(1, "a") ::
(2, "b") ::
(3, "c") ::
(4, "d") :: Nil)
checkAnswer(lowerCaseData.except(lowerCaseData), Nil)
checkAnswer(upperCaseData.except(upperCaseData), Nil)
}

test("intersect") {
checkAnswer(
lowerCaseData.intersect(lowerCaseData),
(1, "a") ::
(2, "b") ::
(3, "c") ::
(4, "d") :: Nil)
checkAnswer(lowerCaseData.intersect(upperCaseData), Nil)
}
}
Loading

0 comments on commit fc649d7

Please sign in to comment.