Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
Browse files Browse the repository at this point in the history
  • Loading branch information
yhuai committed Jul 16, 2014
2 parents c3f4a02 + efc452a commit 42d47a3
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 73 deletions.
4 changes: 2 additions & 2 deletions python/pyspark/cloudpickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,10 @@ def save_function(self, obj, name=None, pack=struct.pack):
# if func is lambda, def'ed at prompt, is in main, or is nested, then
# we'll pickle the actual function object rather than simply saving a
# reference (as is done in default pickler), via save_function_tuple.
if islambda(obj) or obj.func_code.co_filename == '<stdin>' or themodule == None:
if islambda(obj) or obj.func_code.co_filename == '<stdin>' or themodule is None:
#Force server to import modules that have been imported in main
modList = None
if themodule == None and not self.savedForceImports:
if themodule is None and not self.savedForceImports:
mainmod = sys.modules['__main__']
if useForcedImports and hasattr(mainmod,'___pyc_forcedImports__'):
modList = list(mainmod.___pyc_forcedImports__)
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
u'local'
>>> sc.appName
u'My app'
>>> sc.sparkHome == None
>>> sc.sparkHome is None
True
>>> conf = SparkConf(loadDefaults=False)
Expand Down Expand Up @@ -116,7 +116,7 @@ def setSparkHome(self, value):

def setExecutorEnv(self, key=None, value=None, pairs=None):
"""Set an environment variable to be passed to executors."""
if (key != None and pairs != None) or (key == None and pairs == None):
if (key is not None and pairs is not None) or (key is None and pairs is None):
raise Exception("Either pass one key-value pair or a list of pairs")
elif key != None:
self._jconf.setExecutorEnv(key, value)
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/rddsampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def getPoissonSample(self, split, mean):
return (num_arrivals - 1)

def shuffle(self, vals):
if self._random == None:
if self._random is None:
self.initRandomGenerator(0) # this should only ever called on the master so
# the split does not matter

Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from pyspark.storagelevel import StorageLevel

# this is the equivalent of ADD_JARS
add_files = os.environ.get("ADD_FILES").split(',') if os.environ.get("ADD_FILES") != None else None
add_files = os.environ.get("ADD_FILES").split(',') if os.environ.get("ADD_FILES") is not None else None

if os.environ.get("SPARK_EXECUTOR_URI"):
SparkContext.setSystemProperty("spark.executor.uri", os.environ["SPARK_EXECUTOR_URI"])
Expand All @@ -55,7 +55,7 @@
platform.python_build()[1]))
print("SparkContext available as sc.")

if add_files != None:
if add_files is not None:
print("Adding files: [%s]" % ", ".join(add_files))

# The ./bin/pyspark script stores the old PYTHONSTARTUP value in OLD_PYTHONSTARTUP,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,19 +215,19 @@ case class EndsWith(left: Expression, right: Expression)
case class Substring(str: Expression, pos: Expression, len: Expression) extends Expression {

type EvaluatedType = Any
def nullable: Boolean = true

def nullable: Boolean = str.nullable || pos.nullable || len.nullable
def dataType: DataType = {
if (!resolved) {
throw new UnresolvedException(this, s"Cannot resolve since $children are not resolved")
}
if (str.dataType == BinaryType) str.dataType else StringType
}

def references = children.flatMap(_.references).toSet

override def children = str :: pos :: len :: Nil

@inline
def slice[T, C <: Any](str: C, startPos: Int, sliceLen: Int)
(implicit ev: (C=>IndexedSeqOptimized[T,_])): Any = {
Expand All @@ -237,40 +237,40 @@ case class Substring(str: Expression, pos: Expression, len: Expression) extends
// refers to element i-1 in the sequence. If a start index i is less than 0, it refers
// to the -ith element before the end of the sequence. If a start index i is 0, it
// refers to the first element.

val start = startPos match {
case pos if pos > 0 => pos - 1
case neg if neg < 0 => len + neg
case _ => 0
}

val end = sliceLen match {
case max if max == Integer.MAX_VALUE => max
case x => start + x
}

str.slice(start, end)
}

override def eval(input: Row): Any = {
val string = str.eval(input)

val po = pos.eval(input)
val ln = len.eval(input)

if ((string == null) || (po == null) || (ln == null)) {
null
} else {
val start = po.asInstanceOf[Int]
val length = ln.asInstanceOf[Int]

string match {
case ba: Array[Byte] => slice(ba, start, length)
case other => slice(other.toString, start, length)
}
}
}

override def toString = len match {
case max if max == Integer.MAX_VALUE => s"SUBSTR($str, $pos)"
case _ => s"SUBSTR($str, $pos, $len)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ object NullPropagation extends Rule[LogicalPlan] {
case Literal(candidate, _) if candidate == v => true
case _ => false
})) => Literal(true, BooleanType)
case e @ Substring(Literal(null, _), _, _) => Literal(null, e.dataType)
case e @ Substring(_, Literal(null, _), _) => Literal(null, e.dataType)
case e @ Substring(_, _, Literal(null, _)) => Literal(null, e.dataType)
// Put exceptional cases above if any
case e: BinaryArithmetic => e.children match {
case Literal(null, _) :: right :: Nil => Literal(null, e.dataType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,9 +469,9 @@ class ExpressionEvaluationSuite extends FunSuite {

test("Substring") {
val row = new GenericRow(Array[Any]("example", "example".toArray.map(_.toByte)))

val s = 'a.string.at(0)

// substring from zero position with less-than-full length
checkEvaluation(Substring(s, Literal(0, IntegerType), Literal(2, IntegerType)), "ex", row)
checkEvaluation(Substring(s, Literal(1, IntegerType), Literal(2, IntegerType)), "ex", row)
Expand Down Expand Up @@ -501,7 +501,7 @@ class ExpressionEvaluationSuite extends FunSuite {

// substring(null, _, _) -> null
checkEvaluation(Substring(s, Literal(100, IntegerType), Literal(4, IntegerType)), null, new GenericRow(Array[Any](null)))

// substring(_, null, _) -> null
checkEvaluation(Substring(s, Literal(null, IntegerType), Literal(4, IntegerType)), null, row)

Expand All @@ -514,6 +514,12 @@ class ExpressionEvaluationSuite extends FunSuite {

// 2-arg substring from nonzero position
checkEvaluation(Substring(s, Literal(2, IntegerType), Literal(Integer.MAX_VALUE, IntegerType)), "xample", row)

val s_notNull = 'a.string.notNull.at(0)

assert(Substring(s, Literal(0, IntegerType), Literal(2, IntegerType)).nullable === true)
assert(Substring(s_notNull, Literal(0, IntegerType), Literal(2, IntegerType)).nullable === false)
assert(Substring(s_notNull, Literal(null, IntegerType), Literal(2, IntegerType)).nullable === true)
assert(Substring(s_notNull, Literal(0, IntegerType), Literal(null, IntegerType)).nullable === true)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.api.java

import java.util.{List => JList}

import org.apache.spark.Partitioner
import org.apache.spark.api.java.{JavaRDDLike, JavaRDD}
import org.apache.spark.api.java.function.{Function => JFunction}
Expand Down Expand Up @@ -96,6 +98,20 @@ class JavaSchemaRDD(
this
}

// Overridden actions from JavaRDDLike.

override def collect(): JList[Row] = {
import scala.collection.JavaConversions._
val arr: java.util.Collection[Row] = baseSchemaRDD.collect().toSeq.map(new Row(_))
new java.util.ArrayList(arr)
}

override def take(num: Int): JList[Row] = {
import scala.collection.JavaConversions._
val arr: java.util.Collection[Row] = baseSchemaRDD.take(num).toSeq.map(new Row(_))
new java.util.ArrayList(arr)
}

// Transformations (return a new RDD)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,34 @@

package org.apache.spark.sql.parquet

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.util.Try

import java.io.IOException
import java.lang.{Long => JLong}
import java.text.SimpleDateFormat
import java.util.Date
import java.util.{Date, List => JList}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat, FileOutputCommitter}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter

import parquet.hadoop.{ParquetRecordReader, ParquetInputFormat, ParquetOutputFormat}
import parquet.hadoop.api.ReadSupport
import parquet.hadoop._
import parquet.hadoop.api.{InitContext, ReadSupport}
import parquet.hadoop.metadata.GlobalMetaData
import parquet.hadoop.util.ContextUtil
import parquet.io.InvalidRecordException
import parquet.io.ParquetDecodingException
import parquet.schema.MessageType

import org.apache.spark.{Logging, SerializableWritable, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row}
import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
import org.apache.spark.{Logging, SerializableWritable, TaskContext}

/**
* Parquet table scan operator. Imports the file that backs the given
Expand All @@ -55,16 +62,14 @@ case class ParquetTableScan(
override def execute(): RDD[Row] = {
val sc = sqlContext.sparkContext
val job = new Job(sc.hadoopConfiguration)
ParquetInputFormat.setReadSupportClass(
job,
classOf[org.apache.spark.sql.parquet.RowReadSupport])
ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])

val conf: Configuration = ContextUtil.getConfiguration(job)
val fileList = FileSystemHelper.listFiles(relation.path, conf)
// add all paths in the directory but skip "hidden" ones such
// as "_SUCCESS" and "_metadata"
for (path <- fileList if !path.getName.startsWith("_")) {
NewFileInputFormat.addInputPath(job, path)
val qualifiedPath = {
val path = new Path(relation.path)
path.getFileSystem(conf).makeQualified(path)
}
NewFileInputFormat.addInputPath(job, qualifiedPath)

// Store both requested and original schema in `Configuration`
conf.set(
Expand All @@ -87,7 +92,7 @@ case class ParquetTableScan(

sc.newAPIHadoopRDD(
conf,
classOf[org.apache.spark.sql.parquet.FilteringParquetRowInputFormat],
classOf[FilteringParquetRowInputFormat],
classOf[Void],
classOf[Row])
.map(_._2)
Expand Down Expand Up @@ -122,14 +127,7 @@ case class ParquetTableScan(
private def validateProjection(projection: Seq[Attribute]): Boolean = {
val original: MessageType = relation.parquetSchema
val candidate: MessageType = ParquetTypesConverter.convertFromAttributes(projection)
try {
original.checkContains(candidate)
true
} catch {
case e: InvalidRecordException => {
false
}
}
Try(original.checkContains(candidate)).isSuccess
}
}

Expand Down Expand Up @@ -302,6 +300,11 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int)
*/
private[parquet] class FilteringParquetRowInputFormat
extends parquet.hadoop.ParquetInputFormat[Row] with Logging {

private var footers: JList[Footer] = _

private var fileStatuses= Map.empty[Path, FileStatus]

override def createRecordReader(
inputSplit: InputSplit,
taskAttemptContext: TaskAttemptContext): RecordReader[Void, Row] = {
Expand All @@ -318,6 +321,70 @@ private[parquet] class FilteringParquetRowInputFormat
new ParquetRecordReader[Row](readSupport)
}
}

override def getFooters(jobContext: JobContext): JList[Footer] = {
if (footers eq null) {
val statuses = listStatus(jobContext)
fileStatuses = statuses.map(file => file.getPath -> file).toMap
footers = getFooters(ContextUtil.getConfiguration(jobContext), statuses)
}

footers
}

// TODO Remove this method and related code once PARQUET-16 is fixed
// This method together with the `getFooters` method and the `fileStatuses` field are just used
// to mimic this PR: https://github.com/apache/incubator-parquet-mr/pull/17
override def getSplits(
configuration: Configuration,
footers: JList[Footer]): JList[ParquetInputSplit] = {

val maxSplitSize: JLong = configuration.getLong("mapred.max.split.size", Long.MaxValue)
val minSplitSize: JLong =
Math.max(getFormatMinSplitSize(), configuration.getLong("mapred.min.split.size", 0L))
if (maxSplitSize < 0 || minSplitSize < 0) {
throw new ParquetDecodingException(
s"maxSplitSize or minSplitSie should not be negative: maxSplitSize = $maxSplitSize;" +
s" minSplitSize = $minSplitSize")
}

val getGlobalMetaData =
classOf[ParquetFileWriter].getDeclaredMethod("getGlobalMetaData", classOf[JList[Footer]])
getGlobalMetaData.setAccessible(true)
val globalMetaData = getGlobalMetaData.invoke(null, footers).asInstanceOf[GlobalMetaData]

val readContext = getReadSupport(configuration).init(
new InitContext(configuration,
globalMetaData.getKeyValueMetaData(),
globalMetaData.getSchema()))

val generateSplits =
classOf[ParquetInputFormat[_]].getDeclaredMethods.find(_.getName == "generateSplits").get
generateSplits.setAccessible(true)

val splits = mutable.ArrayBuffer.empty[ParquetInputSplit]
for (footer <- footers) {
val fs = footer.getFile.getFileSystem(configuration)
val file = footer.getFile
val fileStatus = fileStatuses.getOrElse(file, fs.getFileStatus(file))
val parquetMetaData = footer.getParquetMetadata
val blocks = parquetMetaData.getBlocks
val fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen)
splits.addAll(
generateSplits.invoke(
null,
blocks,
fileBlockLocations,
fileStatus,
parquetMetaData.getFileMetaData,
readContext.getRequestedSchema.toString,
readContext.getReadSupportMetadata,
minSplitSize,
maxSplitSize).asInstanceOf[JList[ParquetInputSplit]])
}

splits
}
}

private[parquet] object FileSystemHelper {
Expand Down
Loading

0 comments on commit 42d47a3

Please sign in to comment.