Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into function-exists
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed Apr 6, 2016
2 parents 6abe10b + 5abd02c commit f4cf00d
Show file tree
Hide file tree
Showing 20 changed files with 388 additions and 189 deletions.
6 changes: 3 additions & 3 deletions build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ install_mvn() {
local MVN_VERSION="3.3.9"

install_app \
"http://archive.apache.org/dist/maven/maven-3/${MVN_VERSION}/binaries" \
"https://archive.apache.org/dist/maven/maven-3/${MVN_VERSION}/binaries" \
"apache-maven-${MVN_VERSION}-bin.tar.gz" \
"apache-maven-${MVN_VERSION}/bin/mvn"

Expand All @@ -84,7 +84,7 @@ install_zinc() {
local zinc_path="zinc-0.3.9/bin/zinc"
[ ! -f "${_DIR}/${zinc_path}" ] && ZINC_INSTALL_FLAG=1
install_app \
"http://downloads.typesafe.com/zinc/0.3.9" \
"https://downloads.typesafe.com/zinc/0.3.9" \
"zinc-0.3.9.tgz" \
"${zinc_path}"
ZINC_BIN="${_DIR}/${zinc_path}"
Expand All @@ -100,7 +100,7 @@ install_scala() {
local scala_bin="${_DIR}/scala-${scala_version}/bin/scala"

install_app \
"http://downloads.typesafe.com/scala/${scala_version}" \
"https://downloads.typesafe.com/scala/${scala_version}" \
"scala-${scala_version}.tgz" \
"scala-${scala_version}/bin/scala"

Expand Down
2 changes: 1 addition & 1 deletion graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ object Pregel extends Logging {
mergeMsg: (A, A) => A)
: Graph[VD, ED] =
{
require(maxIterations > 0, s"Maximum of iterations must be greater than 0," +
require(maxIterations > 0, s"Maximum number of iterations must be greater than 0," +
s" but got ${maxIterations}")

var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
Expand Down
17 changes: 16 additions & 1 deletion python/pyspark/broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,26 @@ def value(self):

def unpersist(self, blocking=False):
"""
Delete cached copies of this broadcast on the executors.
Delete cached copies of this broadcast on the executors. If the
broadcast is used after this is called, it will need to be
re-sent to each executor.
:param blocking: Whether to block until unpersisting has completed
"""
if self._jbroadcast is None:
raise Exception("Broadcast can only be unpersisted in driver")
self._jbroadcast.unpersist(blocking)

def destroy(self):
"""
Destroy all data and metadata related to this broadcast variable.
Use this with caution; once a broadcast variable has been destroyed,
it cannot be used again. This method blocks until destroy has
completed.
"""
if self._jbroadcast is None:
raise Exception("Broadcast can only be destroyed in driver")
self._jbroadcast.destroy()
os.unlink(self._path)

def __reduce__(self):
Expand Down
15 changes: 15 additions & 0 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,21 @@ def test_large_broadcast(self):
m = self.sc.parallelize(range(1), 1).map(lambda x: len(bdata.value)).sum()
self.assertEqual(N, m)

def test_unpersist(self):
N = 1000
data = [[float(i) for i in range(300)] for i in range(N)]
bdata = self.sc.broadcast(data) # 3MB
bdata.unpersist()
m = self.sc.parallelize(range(1), 1).map(lambda x: len(bdata.value)).sum()
self.assertEqual(N, m)
bdata.destroy()
try:
self.sc.parallelize(range(1), 1).map(lambda x: len(bdata.value)).sum()
except Exception as e:
pass
else:
raise Exception("job should fail after destroy the broadcast")

def test_multiple_broadcasts(self):
N = 1 << 21
b1 = self.sc.broadcast(set(range(N))) # multiple blocks in JVM
Expand Down
2 changes: 1 addition & 1 deletion scalastyle-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ This file is divided into 3 sections:

<check level="error" class="org.scalastyle.file.NewLineAtEofChecker" enabled="true"></check>

<check level="error" class="org.scalastyle.scalariform.NonASCIICharacterChecker" enabled="true"></check>
<check customId="nonascii" level="error" class="org.scalastyle.scalariform.NonASCIICharacterChecker" enabled="true"></check>

<check level="error" class="org.scalastyle.scalariform.SpaceAfterCommentStartChecker" enabled="true"></check>

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.mutable

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}

import org.apache.spark.sql.catalyst.util.StringUtils

/**
* An in-memory (ephemeral) implementation of the system catalog.
Expand All @@ -47,11 +47,6 @@ class InMemoryCatalog extends ExternalCatalog {
// Database name -> description
private val catalog = new scala.collection.mutable.HashMap[String, DatabaseDesc]

private def filterPattern(names: Seq[String], pattern: String): Seq[String] = {
val regex = pattern.replaceAll("\\*", ".*").r
names.filter { funcName => regex.pattern.matcher(funcName).matches() }
}

private def partitionExists(db: String, table: String, spec: TablePartitionSpec): Boolean = {
requireTableExists(db, table)
catalog(db).tables(table).partitions.contains(spec)
Expand Down Expand Up @@ -136,7 +131,7 @@ class InMemoryCatalog extends ExternalCatalog {
}

override def listDatabases(pattern: String): Seq[String] = synchronized {
filterPattern(listDatabases(), pattern)
StringUtils.filterPattern(listDatabases(), pattern)
}

override def setCurrentDatabase(db: String): Unit = { /* no-op */ }
Expand Down Expand Up @@ -203,7 +198,7 @@ class InMemoryCatalog extends ExternalCatalog {
}

override def listTables(db: String, pattern: String): Seq[String] = synchronized {
filterPattern(listTables(db), pattern)
StringUtils.filterPattern(listTables(db), pattern)
}

// --------------------------------------------------------------------------
Expand Down Expand Up @@ -322,7 +317,7 @@ class InMemoryCatalog extends ExternalCatalog {

override def listFunctions(db: String, pattern: String): Seq[String] = synchronized {
requireDbExists(db)
filterPattern(catalog(db).functions.keysIterator.toSeq, pattern)
StringUtils.filterPattern(catalog(db).functions.keysIterator.toSeq, pattern)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchFunctionE
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}

import org.apache.spark.sql.catalyst.util.StringUtils

/**
* An internal catalog that is used by a Spark Session. This internal catalog serves as a
Expand Down Expand Up @@ -297,9 +297,7 @@ class SessionCatalog(
def listTables(db: String, pattern: String): Seq[TableIdentifier] = {
val dbTables =
externalCatalog.listTables(db, pattern).map { t => TableIdentifier(t, Some(db)) }
val regex = pattern.replaceAll("\\*", ".*").r
val _tempTables = tempTables.keys.toSeq
.filter { t => regex.pattern.matcher(t).matches() }
val _tempTables = StringUtils.filterPattern(tempTables.keys.toSeq, pattern)
.map { t => TableIdentifier(t) }
dbTables ++ _tempTables
}
Expand Down Expand Up @@ -610,9 +608,7 @@ class SessionCatalog(
def listFunctions(db: String, pattern: String): Seq[FunctionIdentifier] = {
val dbFunctions =
externalCatalog.listFunctions(db, pattern).map { f => FunctionIdentifier(f, Some(db)) }
val regex = pattern.replaceAll("\\*", ".*").r
val loadedFunctions = functionRegistry.listFunction()
.filter { f => regex.pattern.matcher(f).matches() }
val loadedFunctions = StringUtils.filterPattern(functionRegistry.listFunction(), pattern)
.map { f => FunctionIdentifier(f) }
// TODO: Actually, there will be dbFunctions that have been loaded into the FunctionRegistry.
// So, the returned list may have two entries for the same function.
Expand Down
Loading

0 comments on commit f4cf00d

Please sign in to comment.