diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 67833743f3a98..0d97506450a7f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -420,6 +420,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) /** * Group the values for each key in the RDD into a single sequence. Allows controlling the * partitioning of the resulting key-value pair RDD by passing a Partitioner. + * The ordering of elements within each group is not guaranteed, and may even differ + * each time the resulting RDD is evaluated. * * Note: This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] @@ -439,7 +441,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) /** * Group the values for each key in the RDD into a single sequence. Hash-partitions the - * resulting RDD with into `numPartitions` partitions. + * resulting RDD with into `numPartitions` partitions. The ordering of elements within + * each group is not guaranteed, and may even differ each time the resulting RDD is evaluated. * * Note: This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] @@ -535,7 +538,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) /** * Group the values for each key in the RDD into a single sequence. Hash-partitions the - * resulting RDD with the existing partitioner/parallelism level. + * resulting RDD with the existing partitioner/parallelism level. The ordering of elements + * within each group is not guaranteed, and may even differ each time the resulting RDD is + * evaluated. * * Note: This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] @@ -951,9 +956,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val writeShard = (context: TaskContext, iter: Iterator[(K,V)]) => { // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it // around by taking a mod. We expect that no task will be attempted 2 billion times. - val attemptNumber = (context.attemptId % Int.MaxValue).toInt + val attemptNumber = (context.getAttemptId % Int.MaxValue).toInt /* "reduce task" */ - val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId, + val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.getPartitionId, attemptNumber) val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId) val format = outfmt.newInstance @@ -1022,9 +1027,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => { // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it // around by taking a mod. We expect that no task will be attempted 2 billion times. - val attemptNumber = (context.attemptId % Int.MaxValue).toInt + val attemptNumber = (context.getAttemptId % Int.MaxValue).toInt - writer.setup(context.stageId, context.partitionId, attemptNumber) + writer.setup(context.getStageId, context.getPartitionId, attemptNumber) writer.open() try { var count = 0 diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index ba712c9d7776f..ab9e97c8fe409 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -509,7 +509,8 @@ abstract class RDD[T: ClassTag]( /** * Return an RDD of grouped items. Each group consists of a key and a sequence of elements - * mapping to that key. + * mapping to that key. The ordering of elements within each group is not guaranteed, and + * may even differ each time the resulting RDD is evaluated. * * Note: This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] @@ -520,7 +521,8 @@ abstract class RDD[T: ClassTag]( /** * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements - * mapping to that key. + * mapping to that key. The ordering of elements within each group is not guaranteed, and + * may even differ each time the resulting RDD is evaluated. * * Note: This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] @@ -531,7 +533,8 @@ abstract class RDD[T: ClassTag]( /** * Return an RDD of grouped items. Each group consists of a key and a sequence of elements - * mapping to that key. + * mapping to that key. The ordering of elements within each group is not guaranteed, and + * may even differ each time the resulting RDD is evaluated. * * Note: This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] @@ -1028,8 +1031,14 @@ abstract class RDD[T: ClassTag]( * Zips this RDD with its element indices. The ordering is first based on the partition index * and then the ordering of items within each partition. So the first item in the first * partition gets index 0, and the last item in the last partition receives the largest index. + * * This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type. * This method needs to trigger a spark job when this RDD contains more than one partitions. + * + * Note that some RDDs, such as those returned by groupBy(), do not guarantee order of + * elements in a partition. The index assigned to each element is therefore not guaranteed, + * and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee + * the same index assignments, you should sort the RDD with sortByKey() or save it to a file. */ def zipWithIndex(): RDD[(T, Long)] = new ZippedWithIndexRDD(this) @@ -1037,6 +1046,11 @@ abstract class RDD[T: ClassTag]( * Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k, * 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method * won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]]. + * + * Note that some RDDs, such as those returned by groupBy(), do not guarantee order of + * elements in a partition. The unique ID assigned to each element is therefore not guaranteed, + * and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee + * the same index assignments, you should sort the RDD with sortByKey() or save it to a file. */ def zipWithUniqueId(): RDD[(T, Long)] = { val n = this.partitions.size.toLong diff --git a/dev/check-license b/dev/check-license index 9ff0929e9a5e8..72b1013479964 100755 --- a/dev/check-license +++ b/dev/check-license @@ -20,11 +20,10 @@ acquire_rat_jar () { - URL1="http://search.maven.org/remotecontent?filepath=org/apache/rat/apache-rat/${RAT_VERSION}/apache-rat-${RAT_VERSION}.jar" - URL2="http://repo1.maven.org/maven2/org/apache/rat/apache-rat/${RAT_VERSION}/apache-rat-${RAT_VERSION}.jar" + URL="http://repo1.maven.org/maven2/org/apache/rat/apache-rat/${RAT_VERSION}/apache-rat-${RAT_VERSION}.jar" JAR="$rat_jar" - + if [[ ! -f "$rat_jar" ]]; then # Download rat launch jar if it hasn't been downloaded yet if [ ! -f "$JAR" ]; then @@ -32,15 +31,17 @@ acquire_rat_jar () { printf "Attempting to fetch rat\n" JAR_DL="${JAR}.part" if hash curl 2>/dev/null; then - (curl --silent "${URL1}" > "$JAR_DL" || curl --silent "${URL2}" > "$JAR_DL") && mv "$JAR_DL" "$JAR" + curl --silent "${URL}" > "$JAR_DL" && mv "$JAR_DL" "$JAR" elif hash wget 2>/dev/null; then - (wget --quiet ${URL1} -O "$JAR_DL" || wget --quiet ${URL2} -O "$JAR_DL") && mv "$JAR_DL" "$JAR" + wget --quiet ${URL} -O "$JAR_DL" && mv "$JAR_DL" "$JAR" else printf "You do not have curl or wget installed, please install rat manually.\n" exit -1 fi fi - if [ ! -f "$JAR" ]; then + + unzip -tq $JAR &> /dev/null + if [ $? -ne 0 ]; then # We failed to download printf "Our attempt to download rat locally to ${JAR} failed. Please install rat manually.\n" exit -1 @@ -55,7 +56,7 @@ cd "$FWDIR" if test -x "$JAVA_HOME/bin/java"; then declare java_cmd="$JAVA_HOME/bin/java" -else +else declare java_cmd=java fi diff --git a/dev/run-tests-jenkins b/dev/run-tests-jenkins index a6ecf3196d7d4..0b1e31b9413cf 100755 --- a/dev/run-tests-jenkins +++ b/dev/run-tests-jenkins @@ -141,8 +141,10 @@ function post_message () { test_result="$?" if [ "$test_result" -eq "124" ]; then - fail_message="**[Tests timed out](${BUILD_URL}consoleFull)** after \ - a configured wait of \`${TESTS_TIMEOUT}\`." + fail_message="**[Tests timed out](${BUILD_URL}consoleFull)** \ + for PR $ghprbPullId at commit [\`${SHORT_COMMIT_HASH}\`](${COMMIT_URL}) \ + after a configured wait of \`${TESTS_TIMEOUT}\`." + post_message "$fail_message" exit $test_result else diff --git a/docs/configuration.md b/docs/configuration.md index a6dd7245e1552..791b6f2aa3261 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -206,6 +206,25 @@ Apart from these, the following properties are also available, and may be useful used during aggregation goes above this amount, it will spill the data into disks. + + spark.python.profile + false + + Enable profiling in Python worker, the profile result will show up by `sc.show_profiles()`, + or it will be displayed before the driver exiting. It also can be dumped into disk by + `sc.dump_profiles(path)`. If some of the profile results had been displayed maually, + they will not be displayed automatically before driver exiting. + + + + spark.python.profile.dump + (none) + + The directory which is used to dump the profile result before driver exiting. + The results will be dumped as separated file for each RDD. They can be loaded + by ptats.Stats(). If this is specified, the profile result will not be displayed + automatically. + spark.python.worker.reuse true diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 510b47a2aaad1..1d61a3c555eaf 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -883,7 +883,7 @@ for details. groupByKey([numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
- Note: If you are grouping in order to perform an aggregation (such as a sum or + Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or combineByKey will yield much better performance.
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index 6ee7ac974b4a0..33235d150b4a5 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -31,7 +31,7 @@ import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase} import org.apache.spark.streaming.util.ManualClock -import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.util.Utils import org.jboss.netty.channel.ChannelPipeline import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory @@ -41,21 +41,26 @@ import org.jboss.netty.handler.codec.compression._ class FlumeStreamSuite extends TestSuiteBase { test("flume input stream") { - runFlumeStreamTest(false, 9998) + runFlumeStreamTest(false) } test("flume input compressed stream") { - runFlumeStreamTest(true, 9997) + runFlumeStreamTest(true) } - def runFlumeStreamTest(enableDecompression: Boolean, testPort: Int) { + def runFlumeStreamTest(enableDecompression: Boolean) { // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) - val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] = - FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, enableDecompression) + val (flumeStream, testPort) = + Utils.startServiceOnPort(9997, (trialPort: Int) => { + val dstream = FlumeUtils.createStream( + ssc, "localhost", trialPort, StorageLevel.MEMORY_AND_DISK, enableDecompression) + (dstream, trialPort) + }) + val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] with SynchronizedBuffer[Seq[SparkFlumeEvent]] - val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer) + val outputStream = new TestOutputStream(flumeStream, outputBuffer) outputStream.register() ssc.start() @@ -63,13 +68,13 @@ class FlumeStreamSuite extends TestSuiteBase { val input = Seq(1, 2, 3, 4, 5) Thread.sleep(1000) val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort)) - var client: AvroSourceProtocol = null; - + var client: AvroSourceProtocol = null + if (enableDecompression) { client = SpecificRequestor.getClient( classOf[AvroSourceProtocol], new NettyTransceiver(new InetSocketAddress("localhost", testPort), - new CompressionChannelFactory(6))); + new CompressionChannelFactory(6))) } else { client = SpecificRequestor.getClient( classOf[AvroSourceProtocol], transceiver) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala index 4e87fe088ecc5..2cc52e94282ba 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -85,7 +85,7 @@ sealed trait Matrix extends Serializable { } /** - * Column-majored dense matrix. + * Column-major dense matrix. * The entry values are stored in a single array of doubles with columns listed in sequence. * For example, the following matrix * {{{ @@ -128,7 +128,7 @@ class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double]) } /** - * Column-majored sparse matrix. + * Column-major sparse matrix. * The entry values are stored in Compressed Sparse Column (CSC) format. * For example, the following matrix * {{{ @@ -207,7 +207,7 @@ class SparseMatrix( object Matrices { /** - * Creates a column-majored dense matrix. + * Creates a column-major dense matrix. * * @param numRows number of rows * @param numCols number of columns @@ -218,7 +218,7 @@ object Matrices { } /** - * Creates a column-majored sparse matrix in Compressed Sparse Column (CSC) format. + * Creates a column-major sparse matrix in Compressed Sparse Column (CSC) format. * * @param numRows number of rows * @param numCols number of columns diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index ccbca67656c8d..b8cdbbe3cf2b6 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -215,6 +215,21 @@ def addInPlace(self, value1, value2): COMPLEX_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0j) +class PStatsParam(AccumulatorParam): + """PStatsParam is used to merge pstats.Stats""" + + @staticmethod + def zero(value): + return None + + @staticmethod + def addInPlace(value1, value2): + if value1 is None: + return value2 + value1.add(value2) + return value1 + + class _UpdateRequestHandler(SocketServer.StreamRequestHandler): """ diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 8e7b00469e246..e9418320ff781 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -20,6 +20,7 @@ import sys from threading import Lock from tempfile import NamedTemporaryFile +import atexit from pyspark import accumulators from pyspark.accumulators import Accumulator @@ -30,7 +31,6 @@ from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \ PairDeserializer, CompressedSerializer from pyspark.storagelevel import StorageLevel -from pyspark import rdd from pyspark.rdd import RDD from pyspark.traceback_utils import CallSite, first_spark_call @@ -192,6 +192,9 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, self._temp_dir = \ self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath() + # profiling stats collected for each PythonRDD + self._profile_stats = [] + def _initialize_context(self, jconf): """ Initialize SparkContext in function to allow subclass specific initialization @@ -792,6 +795,40 @@ def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False): it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) return list(mappedRDD._collect_iterator_through_file(it)) + def _add_profile(self, id, profileAcc): + if not self._profile_stats: + dump_path = self._conf.get("spark.python.profile.dump") + if dump_path: + atexit.register(self.dump_profiles, dump_path) + else: + atexit.register(self.show_profiles) + + self._profile_stats.append([id, profileAcc, False]) + + def show_profiles(self): + """ Print the profile stats to stdout """ + for i, (id, acc, showed) in enumerate(self._profile_stats): + stats = acc.value + if not showed and stats: + print "=" * 60 + print "Profile of RDD" % id + print "=" * 60 + stats.sort_stats("time", "cumulative").print_stats() + # mark it as showed + self._profile_stats[i][2] = True + + def dump_profiles(self, path): + """ Dump the profile stats into directory `path` + """ + if not os.path.exists(path): + os.makedirs(path) + for id, acc, _ in self._profile_stats: + stats = acc.value + if stats: + p = os.path.join(path, "rdd_%d.pstats" % id) + stats.dump_stats(p) + self._profile_stats = [] + def _test(): import atexit diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index 0a5dcaac55e46..51014a8ceb785 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -63,6 +63,41 @@ def _convert_to_vector(l): raise TypeError("Cannot convert type %s into Vector" % type(l)) +def _vector_size(v): + """ + Returns the size of the vector. + + >>> _vector_size([1., 2., 3.]) + 3 + >>> _vector_size((1., 2., 3.)) + 3 + >>> _vector_size(array.array('d', [1., 2., 3.])) + 3 + >>> _vector_size(np.zeros(3)) + 3 + >>> _vector_size(np.zeros((3, 1))) + 3 + >>> _vector_size(np.zeros((1, 3))) + Traceback (most recent call last): + ... + ValueError: Cannot treat an ndarray of shape (1, 3) as a vector + """ + if isinstance(v, Vector): + return len(v) + elif type(v) in (array.array, list, tuple): + return len(v) + elif type(v) == np.ndarray: + if v.ndim == 1 or (v.ndim == 2 and v.shape[1] == 1): + return len(v) + else: + raise ValueError("Cannot treat an ndarray of shape %s as a vector" % str(v.shape)) + elif _have_scipy and scipy.sparse.issparse(v): + assert v.shape[1] == 1, "Expected column vector" + return v.shape[0] + else: + raise TypeError("Cannot treat type %s as a vector" % type(v)) + + class Vector(object): """ Abstract class for DenseVector and SparseVector @@ -76,6 +111,9 @@ def toArray(self): class DenseVector(Vector): + """ + A dense vector represented by a value array. + """ def __init__(self, ar): if not isinstance(ar, array.array): ar = array.array('d', ar) @@ -100,15 +138,31 @@ def dot(self, other): 5.0 >>> dense.dot(np.array(range(1, 3))) 5.0 + >>> dense.dot([1.,]) + Traceback (most recent call last): + ... + AssertionError: dimension mismatch + >>> dense.dot(np.reshape([1., 2., 3., 4.], (2, 2), order='F')) + array([ 5., 11.]) + >>> dense.dot(np.reshape([1., 2., 3.], (3, 1), order='F')) + Traceback (most recent call last): + ... + AssertionError: dimension mismatch """ - if isinstance(other, SparseVector): - return other.dot(self) + if type(other) == np.ndarray and other.ndim > 1: + assert len(self) == other.shape[0], "dimension mismatch" + return np.dot(self.toArray(), other) elif _have_scipy and scipy.sparse.issparse(other): - return other.transpose().dot(self.toArray())[0] - elif isinstance(other, Vector): - return np.dot(self.toArray(), other.toArray()) + assert len(self) == other.shape[0], "dimension mismatch" + return other.transpose().dot(self.toArray()) else: - return np.dot(self.toArray(), other) + assert len(self) == _vector_size(other), "dimension mismatch" + if isinstance(other, SparseVector): + return other.dot(self) + elif isinstance(other, Vector): + return np.dot(self.toArray(), other.toArray()) + else: + return np.dot(self.toArray(), other) def squared_distance(self, other): """ @@ -126,7 +180,16 @@ def squared_distance(self, other): >>> sparse1 = SparseVector(2, [0, 1], [2., 1.]) >>> dense1.squared_distance(sparse1) 2.0 + >>> dense1.squared_distance([1.,]) + Traceback (most recent call last): + ... + AssertionError: dimension mismatch + >>> dense1.squared_distance(SparseVector(1, [0,], [1.,])) + Traceback (most recent call last): + ... + AssertionError: dimension mismatch """ + assert len(self) == _vector_size(other), "dimension mismatch" if isinstance(other, SparseVector): return other.squared_distance(self) elif _have_scipy and scipy.sparse.issparse(other): @@ -165,12 +228,10 @@ def __getattr__(self, item): class SparseVector(Vector): - """ A simple sparse vector class for passing data to MLlib. Users may alternatively pass SciPy's {scipy.sparse} data types. """ - def __init__(self, size, *args): """ Create a sparse vector, using either a dictionary, a list of @@ -222,20 +283,33 @@ def dot(self, other): 0.0 >>> a.dot(np.array([[1, 1], [2, 2], [3, 3], [4, 4]])) array([ 22., 22.]) + >>> a.dot([1., 2., 3.]) + Traceback (most recent call last): + ... + AssertionError: dimension mismatch + >>> a.dot(np.array([1., 2.])) + Traceback (most recent call last): + ... + AssertionError: dimension mismatch + >>> a.dot(DenseVector([1., 2.])) + Traceback (most recent call last): + ... + AssertionError: dimension mismatch + >>> a.dot(np.zeros((3, 2))) + Traceback (most recent call last): + ... + AssertionError: dimension mismatch """ if type(other) == np.ndarray: - if other.ndim == 1: - result = 0.0 - for i in xrange(len(self.indices)): - result += self.values[i] * other[self.indices[i]] - return result - elif other.ndim == 2: + if other.ndim == 2: results = [self.dot(other[:, i]) for i in xrange(other.shape[1])] return np.array(results) - else: - raise Exception("Cannot call dot with %d-dimensional array" % other.ndim) + elif other.ndim > 2: + raise ValueError("Cannot call dot with %d-dimensional array" % other.ndim) + + assert len(self) == _vector_size(other), "dimension mismatch" - elif type(other) in (array.array, DenseVector): + if type(other) in (np.ndarray, array.array, DenseVector): result = 0.0 for i in xrange(len(self.indices)): result += self.values[i] * other[self.indices[i]] @@ -254,6 +328,7 @@ def dot(self, other): else: j += 1 return result + else: return self.dot(_convert_to_vector(other)) @@ -273,7 +348,16 @@ def squared_distance(self, other): 30.0 >>> b.squared_distance(a) 30.0 + >>> b.squared_distance([1., 2.]) + Traceback (most recent call last): + ... + AssertionError: dimension mismatch + >>> b.squared_distance(SparseVector(3, [1,], [1.0,])) + Traceback (most recent call last): + ... + AssertionError: dimension mismatch """ + assert len(self) == _vector_size(other), "dimension mismatch" if type(other) in (list, array.array, DenseVector, np.array, np.ndarray): if type(other) is np.array and other.ndim != 1: raise Exception("Cannot call squared_distance with %d-dimensional array" % @@ -348,7 +432,6 @@ def __eq__(self, other): >>> v1 != v2 False """ - return (isinstance(other, self.__class__) and other.size == self.size and other.indices == self.indices @@ -414,23 +497,32 @@ def stringify(vector): class Matrix(object): - """ the Matrix """ - def __init__(self, nRow, nCol): - self.nRow = nRow - self.nCol = nCol + """ + Represents a local matrix. + """ + + def __init__(self, numRows, numCols): + self.numRows = numRows + self.numCols = numCols def toArray(self): + """ + Returns its elements in a NumPy ndarray. + """ raise NotImplementedError class DenseMatrix(Matrix): - def __init__(self, nRow, nCol, values): - Matrix.__init__(self, nRow, nCol) - assert len(values) == nRow * nCol + """ + Column-major dense matrix. + """ + def __init__(self, numRows, numCols, values): + Matrix.__init__(self, numRows, numCols) + assert len(values) == numRows * numCols self.values = values def __reduce__(self): - return DenseMatrix, (self.nRow, self.nCol, self.values) + return DenseMatrix, (self.numRows, self.numCols, self.values) def toArray(self): """ @@ -439,10 +531,10 @@ def toArray(self): >>> arr = array.array('d', [float(i) for i in range(4)]) >>> m = DenseMatrix(2, 2, arr) >>> m.toArray() - array([[ 0., 1.], - [ 2., 3.]]) + array([[ 0., 2.], + [ 1., 3.]]) """ - return np.ndarray((self.nRow, self.nCol), np.float64, buffer=self.values.tostring()) + return np.reshape(self.values, (self.numRows, self.numCols), order='F') def _test(): diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 680140d72d03c..8ed89e2f9769f 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -15,7 +15,6 @@ # limitations under the License. # -from base64 import standard_b64encode as b64enc import copy from collections import defaultdict from itertools import chain, ifilter, imap @@ -32,6 +31,7 @@ from random import Random from math import sqrt, log, isinf, isnan +from pyspark.accumulators import PStatsParam from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ BatchedSerializer, CloudPickleSerializer, PairDeserializer, \ PickleSerializer, pack_long, AutoBatchedSerializer @@ -2080,7 +2080,9 @@ def _jrdd(self): return self._jrdd_val if self._bypass_serializer: self._jrdd_deserializer = NoOpSerializer() - command = (self.func, self._prev_jrdd_deserializer, + enable_profile = self.ctx._conf.get("spark.python.profile", "false") == "true" + profileStats = self.ctx.accumulator(None, PStatsParam) if enable_profile else None + command = (self.func, profileStats, self._prev_jrdd_deserializer, self._jrdd_deserializer) # the serialized command will be compressed by broadcast ser = CloudPickleSerializer() @@ -2102,6 +2104,10 @@ def _jrdd(self): self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator) self._jrdd_val = python_rdd.asJavaRDD() + + if enable_profile: + self._id = self._jrdd_val.id() + self.ctx._add_profile(self._id, profileStats) return self._jrdd_val def id(self): diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index f71d24c470dc9..d8bdf22355ec8 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -960,7 +960,7 @@ def registerFunction(self, name, f, returnType=StringType()): [Row(c0=4)] """ func = lambda _, it: imap(lambda x: f(*x), it) - command = (func, + command = (func, None, BatchedSerializer(PickleSerializer(), 1024), BatchedSerializer(PickleSerializer(), 1024)) ser = CloudPickleSerializer() diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 29df754c6fd29..7e2bbc9cb617f 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -632,6 +632,36 @@ def test_distinct(self): self.assertEquals(result.count(), 3) +class TestProfiler(PySparkTestCase): + + def setUp(self): + self._old_sys_path = list(sys.path) + class_name = self.__class__.__name__ + conf = SparkConf().set("spark.python.profile", "true") + self.sc = SparkContext('local[4]', class_name, batchSize=2, conf=conf) + + def test_profiler(self): + + def heavy_foo(x): + for i in range(1 << 20): + x = 1 + rdd = self.sc.parallelize(range(100)) + rdd.foreach(heavy_foo) + profiles = self.sc._profile_stats + self.assertEqual(1, len(profiles)) + id, acc, _ = profiles[0] + stats = acc.value + self.assertTrue(stats is not None) + width, stat_list = stats.get_print_list([]) + func_names = [func_name for fname, n, func_name in stat_list] + self.assertTrue("heavy_foo" in func_names) + + self.sc.show_profiles() + d = tempfile.gettempdir() + self.sc.dump_profiles(d) + self.assertTrue("rdd_%d.pstats" % id in os.listdir(d)) + + class TestSQL(PySparkTestCase): def setUp(self): diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index c1f6e3e4a1f40..8257dddfee1c3 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -23,6 +23,8 @@ import time import socket import traceback +import cProfile +import pstats from pyspark.accumulators import _accumulatorRegistry from pyspark.broadcast import Broadcast, _broadcastRegistry @@ -90,10 +92,21 @@ def main(infile, outfile): command = pickleSer._read_with_length(infile) if isinstance(command, Broadcast): command = pickleSer.loads(command.value) - (func, deserializer, serializer) = command + (func, stats, deserializer, serializer) = command init_time = time.time() - iterator = deserializer.load_stream(infile) - serializer.dump_stream(func(split_index, iterator), outfile) + + def process(): + iterator = deserializer.load_stream(infile) + serializer.dump_stream(func(split_index, iterator), outfile) + + if stats: + p = cProfile.Profile() + p.runcall(process) + st = pstats.Stats(p) + st.stream = None # make it picklable + stats.add(st.strip_dirs()) + else: + process() except Exception: try: write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile) diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 35e9c9939d4b7..556c984ad392b 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -220,23 +220,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { */ override def whiteList = Seq( "add_part_exist", - "dynamic_partition_skip_default", - "infer_bucket_sort_dyn_part", - "load_dyn_part1", - "load_dyn_part2", - "load_dyn_part3", - "load_dyn_part4", - "load_dyn_part5", - "load_dyn_part6", - "load_dyn_part7", - "load_dyn_part8", - "load_dyn_part9", - "load_dyn_part10", - "load_dyn_part11", - "load_dyn_part12", - "load_dyn_part13", - "load_dyn_part14", - "load_dyn_part14_win", "add_part_multiple", "add_partition_no_whitelist", "add_partition_with_whitelist", diff --git a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala new file mode 100644 index 0000000000000..ab7862f4f9e06 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.io.IOException +import java.text.NumberFormat +import java.util.Date + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities} +import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat} +import org.apache.hadoop.hive.ql.plan.FileSinkDesc +import org.apache.hadoop.mapred._ +import org.apache.hadoop.io.Writable + +import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter} + +/** + * Internal helper class that saves an RDD using a Hive OutputFormat. + * It is based on [[SparkHadoopWriter]]. + */ +private[hive] class SparkHiveHadoopWriter( + @transient jobConf: JobConf, + fileSinkConf: FileSinkDesc) + extends Logging + with SparkHadoopMapRedUtil + with Serializable { + + private val now = new Date() + private val conf = new SerializableWritable(jobConf) + + private var jobID = 0 + private var splitID = 0 + private var attemptID = 0 + private var jID: SerializableWritable[JobID] = null + private var taID: SerializableWritable[TaskAttemptID] = null + + @transient private var writer: FileSinkOperator.RecordWriter = null + @transient private var format: HiveOutputFormat[AnyRef, Writable] = null + @transient private var committer: OutputCommitter = null + @transient private var jobContext: JobContext = null + @transient private var taskContext: TaskAttemptContext = null + + def preSetup() { + setIDs(0, 0, 0) + setConfParams() + + val jCtxt = getJobContext() + getOutputCommitter().setupJob(jCtxt) + } + + + def setup(jobid: Int, splitid: Int, attemptid: Int) { + setIDs(jobid, splitid, attemptid) + setConfParams() + } + + def open() { + val numfmt = NumberFormat.getInstance() + numfmt.setMinimumIntegerDigits(5) + numfmt.setGroupingUsed(false) + + val extension = Utilities.getFileExtension( + conf.value, + fileSinkConf.getCompressed, + getOutputFormat()) + + val outputName = "part-" + numfmt.format(splitID) + extension + val path = FileOutputFormat.getTaskOutputPath(conf.value, outputName) + + getOutputCommitter().setupTask(getTaskContext()) + writer = HiveFileFormatUtils.getHiveRecordWriter( + conf.value, + fileSinkConf.getTableInfo, + conf.value.getOutputValueClass.asInstanceOf[Class[Writable]], + fileSinkConf, + path, + null) + } + + def write(value: Writable) { + if (writer != null) { + writer.write(value) + } else { + throw new IOException("Writer is null, open() has not been called") + } + } + + def close() { + // Seems the boolean value passed into close does not matter. + writer.close(false) + } + + def commit() { + val taCtxt = getTaskContext() + val cmtr = getOutputCommitter() + if (cmtr.needsTaskCommit(taCtxt)) { + try { + cmtr.commitTask(taCtxt) + logInfo (taID + ": Committed") + } catch { + case e: IOException => + logError("Error committing the output of task: " + taID.value, e) + cmtr.abortTask(taCtxt) + throw e + } + } else { + logWarning ("No need to commit output of task: " + taID.value) + } + } + + def commitJob() { + // always ? Or if cmtr.needsTaskCommit ? + val cmtr = getOutputCommitter() + cmtr.commitJob(getJobContext()) + } + + // ********* Private Functions ********* + + private def getOutputFormat(): HiveOutputFormat[AnyRef,Writable] = { + if (format == null) { + format = conf.value.getOutputFormat() + .asInstanceOf[HiveOutputFormat[AnyRef,Writable]] + } + format + } + + private def getOutputCommitter(): OutputCommitter = { + if (committer == null) { + committer = conf.value.getOutputCommitter + } + committer + } + + private def getJobContext(): JobContext = { + if (jobContext == null) { + jobContext = newJobContext(conf.value, jID.value) + } + jobContext + } + + private def getTaskContext(): TaskAttemptContext = { + if (taskContext == null) { + taskContext = newTaskAttemptContext(conf.value, taID.value) + } + taskContext + } + + private def setIDs(jobId: Int, splitId: Int, attemptId: Int) { + jobID = jobId + splitID = splitId + attemptID = attemptId + + jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobId)) + taID = new SerializableWritable[TaskAttemptID]( + new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID)) + } + + private def setConfParams() { + conf.value.set("mapred.job.id", jID.value.toString) + conf.value.set("mapred.tip.id", taID.value.getTaskID.toString) + conf.value.set("mapred.task.id", taID.value.toString) + conf.value.setBoolean("mapred.task.is.map", true) + conf.value.setInt("mapred.task.partition", splitID) + } +} + +private[hive] object SparkHiveHadoopWriter { + def createPathFromString(path: String, conf: JobConf): Path = { + if (path == null) { + throw new IllegalArgumentException("Output path is null") + } + val outputPath = new Path(path) + val fs = outputPath.getFileSystem(conf) + if (outputPath == null || fs == null) { + throw new IllegalArgumentException("Incorrectly formatted output path") + } + outputPath.makeQualified(fs) + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 4e30e6e06fe21..0aa6292c0184e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -837,6 +837,11 @@ private[hive] object HiveQl { cleanIdentifier(key.toLowerCase) -> None }.toMap).getOrElse(Map.empty) + if (partitionKeys.values.exists(p => p.isEmpty)) { + throw new NotImplementedError(s"Do not support INSERT INTO/OVERWRITE with" + + s"dynamic partitioning.") + } + InsertIntoTable(UnresolvedRelation(db, tableName, None), partitionKeys, query, overwrite) case a: ASTNode => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 3d2ee010696f6..a284a91a91e31 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -19,25 +19,27 @@ package org.apache.spark.sql.hive.execution import scala.collection.JavaConversions._ +import java.util.{HashMap => JHashMap} + import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar} -import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.metastore.MetaStoreUtils +import org.apache.hadoop.hive.ql.Context import org.apache.hadoop.hive.ql.metadata.Hive import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} -import org.apache.hadoop.hive.ql.{Context, ErrorMsg} import org.apache.hadoop.hive.serde2.Serializer -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption import org.apache.hadoop.hive.serde2.objectinspector._ -import org.apache.hadoop.hive.serde2.objectinspector.primitive.{JavaHiveDecimalObjectInspector, JavaHiveVarcharObjectInspector} +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharObjectInspector +import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf} +import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.Row -import org.apache.spark.sql.execution.{Command, SparkPlan, UnaryNode} -import org.apache.spark.sql.hive._ -import org.apache.spark.{SerializableWritable, SparkException, TaskContext} +import org.apache.spark.sql.execution.{SparkPlan, UnaryNode} +import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation, SparkHiveHadoopWriter} /** * :: DeveloperApi :: @@ -49,7 +51,7 @@ case class InsertIntoHiveTable( child: SparkPlan, overwrite: Boolean) (@transient sc: HiveContext) - extends UnaryNode with Command { + extends UnaryNode { @transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass @transient private lazy val hiveContext = new Context(sc.hiveconf) @@ -99,74 +101,66 @@ case class InsertIntoHiveTable( } def saveAsHiveFile( - rdd: RDD[Row], + rdd: RDD[Writable], valueClass: Class[_], fileSinkConf: FileSinkDesc, - conf: SerializableWritable[JobConf], - writerContainer: SparkHiveWriterContainer) { - assert(valueClass != null, "Output value class not set") - conf.value.setOutputValueClass(valueClass) - - val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName - assert(outputFileFormatClassName != null, "Output format class not set") - conf.value.set("mapred.output.format.class", outputFileFormatClassName) - - val isCompressed = conf.value.getBoolean( - ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal) - + conf: JobConf, + isCompressed: Boolean) { + if (valueClass == null) { + throw new SparkException("Output value class not set") + } + conf.setOutputValueClass(valueClass) + if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) { + throw new SparkException("Output format class not set") + } + // Doesn't work in Scala 2.9 due to what may be a generics bug + // TODO: Should we uncomment this for Scala 2.10? + // conf.setOutputFormat(outputFormatClass) + conf.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName) if (isCompressed) { // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec", // and "mapred.output.compression.type" have no impact on ORC because it uses table properties // to store compression information. - conf.value.set("mapred.output.compress", "true") + conf.set("mapred.output.compress", "true") fileSinkConf.setCompressed(true) - fileSinkConf.setCompressCodec(conf.value.get("mapred.output.compression.codec")) - fileSinkConf.setCompressType(conf.value.get("mapred.output.compression.type")) + fileSinkConf.setCompressCodec(conf.get("mapred.output.compression.codec")) + fileSinkConf.setCompressType(conf.get("mapred.output.compression.type")) } - conf.value.setOutputCommitter(classOf[FileOutputCommitter]) - + conf.setOutputCommitter(classOf[FileOutputCommitter]) FileOutputFormat.setOutputPath( - conf.value, - SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName, conf.value)) - log.debug("Saving as hadoop file of type " + valueClass.getSimpleName) + conf, + SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, conf)) - writerContainer.driverSideSetup() - sc.sparkContext.runJob(rdd, writeToFile _) - writerContainer.commitJob() - - // Note that this function is executed on executor side - def writeToFile(context: TaskContext, iterator: Iterator[Row]) { - val serializer = newSerializer(fileSinkConf.getTableInfo) - val standardOI = ObjectInspectorUtils - .getStandardObjectInspector( - fileSinkConf.getTableInfo.getDeserializer.getObjectInspector, - ObjectInspectorCopyOption.JAVA) - .asInstanceOf[StructObjectInspector] + log.debug("Saving as hadoop file of type " + valueClass.getSimpleName) - val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray - val outputData = new Array[Any](fieldOIs.length) + val writer = new SparkHiveHadoopWriter(conf, fileSinkConf) + writer.preSetup() + def writeToFile(context: TaskContext, iter: Iterator[Writable]) { // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it // around by taking a mod. We expect that no task will be attempted 2 billion times. val attemptNumber = (context.attemptId % Int.MaxValue).toInt - writerContainer.executorSideSetup(context.stageId, context.partitionId, attemptNumber) - iterator.foreach { row => - var i = 0 - while (i < fieldOIs.length) { - // TODO (lian) avoid per row dynamic dispatching and pattern matching cost in `wrap` - outputData(i) = wrap(row(i), fieldOIs(i)) - i += 1 - } + writer.setup(context.stageId, context.partitionId, attemptNumber) + writer.open() - val writer = writerContainer.getLocalFileWriter(row) - writer.write(serializer.serialize(outputData, standardOI)) + var count = 0 + while(iter.hasNext) { + val record = iter.next() + count += 1 + writer.write(record) } - writerContainer.close() + writer.close() + writer.commit() } + + sc.sparkContext.runJob(rdd, writeToFile _) + writer.commitJob() } + override def execute() = result + /** * Inserts all the rows in the table into Hive. Row objects are properly serialized with the * `org.apache.hadoop.hive.serde2.SerDe` and the @@ -174,57 +168,50 @@ case class InsertIntoHiveTable( * * Note: this is run once and then kept to avoid double insertions. */ - override protected[sql] lazy val sideEffectResult: Seq[Row] = { + private lazy val result: RDD[Row] = { + val childRdd = child.execute() + assert(childRdd != null) + // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer // instances within the closure, since Serializer is not serializable while TableDesc is. val tableDesc = table.tableDesc val tableLocation = table.hiveQlTable.getDataLocation val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) + val rdd = childRdd.mapPartitions { iter => + val serializer = newSerializer(fileSinkConf.getTableInfo) + val standardOI = ObjectInspectorUtils + .getStandardObjectInspector( + fileSinkConf.getTableInfo.getDeserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] - val numDynamicPartitions = partition.values.count(_.isEmpty) - val numStaticPartitions = partition.values.count(_.nonEmpty) - val partitionSpec = partition.map { - case (key, Some(value)) => key -> value - case (key, None) => key -> "" - } - - // All partition column names in the format of "//..." - val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns") - val partitionColumnNames = Option(partitionColumns).map(_.split("/")).orNull - - // Validate partition spec if there exist any dynamic partitions - if (numDynamicPartitions > 0) { - // Report error if dynamic partitioning is not enabled - if (!sc.hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) { - throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg) - } - // Report error if dynamic partition strict mode is on but no static partition is found - if (numStaticPartitions == 0 && - sc.hiveconf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) { - throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg) - } + val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray + val outputData = new Array[Any](fieldOIs.length) + iter.map { row => + var i = 0 + while (i < row.length) { + // Casts Strings to HiveVarchars when necessary. + outputData(i) = wrap(row(i), fieldOIs(i)) + i += 1 + } - // Report error if any static partition appears after a dynamic partition - val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty) - isDynamic.init.zip(isDynamic.tail).find(_ == (true, false)).foreach { _ => - throw new SparkException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg) + serializer.serialize(outputData, standardOI) } } + // ORC stores compression information in table properties. While, there are other formats + // (e.g. RCFile) that rely on hadoop configurations to store compression information. val jobConf = new JobConf(sc.hiveconf) - val jobConfSer = new SerializableWritable(jobConf) - - val writerContainer = if (numDynamicPartitions > 0) { - val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions) - new SparkHiveDynamicPartitionWriterContainer(jobConf, fileSinkConf, dynamicPartColNames) - } else { - new SparkHiveWriterContainer(jobConf, fileSinkConf) - } - - saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer) - + saveAsHiveFile( + rdd, + outputClass, + fileSinkConf, + jobConf, + sc.hiveconf.getBoolean("hive.exec.compress.output", false)) + + // TODO: Handle dynamic partitioning. val outputPath = FileOutputFormat.getOutputPath(jobConf) // Have to construct the format of dbname.tablename. val qualifiedTableName = s"${table.databaseName}.${table.tableName}" @@ -233,6 +220,10 @@ case class InsertIntoHiveTable( // holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint. val holdDDLTime = false if (partition.nonEmpty) { + val partitionSpec = partition.map { + case (key, Some(value)) => key -> value + case (key, None) => key -> "" // Should not reach here right now. + } val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec) db.validatePartitionNameCharacters(partVals) // inheritTableSpecs is set to true. It should be set to false for a IMPORT query @@ -240,26 +231,14 @@ case class InsertIntoHiveTable( val inheritTableSpecs = true // TODO: Correctly set isSkewedStoreAsSubdir. val isSkewedStoreAsSubdir = false - if (numDynamicPartitions > 0) { - db.loadDynamicPartitions( - outputPath, - qualifiedTableName, - partitionSpec, - overwrite, - numDynamicPartitions, - holdDDLTime, - isSkewedStoreAsSubdir - ) - } else { - db.loadPartition( - outputPath, - qualifiedTableName, - partitionSpec, - overwrite, - holdDDLTime, - inheritTableSpecs, - isSkewedStoreAsSubdir) - } + db.loadPartition( + outputPath, + qualifiedTableName, + partitionSpec, + overwrite, + holdDDLTime, + inheritTableSpecs, + isSkewedStoreAsSubdir) } else { db.loadTable( outputPath, @@ -272,6 +251,6 @@ case class InsertIntoHiveTable( // however for now we return an empty list to simplify compatibility checks with hive, which // does not return anything for insert operations. // TODO: implement hive compatibility as rules. - Seq.empty[Row] + sc.sparkContext.makeRDD(Nil, 1) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala deleted file mode 100644 index a667188fa53bd..0000000000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import java.io.IOException -import java.text.NumberFormat -import java.util.Date - -import scala.collection.mutable - -import org.apache.hadoop.fs.Path -import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities} -import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat} -import org.apache.hadoop.hive.ql.plan.FileSinkDesc -import org.apache.hadoop.io.Writable -import org.apache.hadoop.mapred._ - -import org.apache.spark.sql.Row -import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter} - -/** - * Internal helper class that saves an RDD using a Hive OutputFormat. - * It is based on [[SparkHadoopWriter]]. - */ -private[hive] class SparkHiveWriterContainer( - @transient jobConf: JobConf, - fileSinkConf: FileSinkDesc) - extends Logging - with SparkHadoopMapRedUtil - with Serializable { - - private val now = new Date() - protected val conf = new SerializableWritable(jobConf) - - private var jobID = 0 - private var splitID = 0 - private var attemptID = 0 - private var jID: SerializableWritable[JobID] = null - private var taID: SerializableWritable[TaskAttemptID] = null - - @transient private var writer: FileSinkOperator.RecordWriter = null - @transient private lazy val committer = conf.value.getOutputCommitter - @transient private lazy val jobContext = newJobContext(conf.value, jID.value) - @transient private lazy val taskContext = newTaskAttemptContext(conf.value, taID.value) - @transient private lazy val outputFormat = - conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef,Writable]] - - def driverSideSetup() { - setIDs(0, 0, 0) - setConfParams() - committer.setupJob(jobContext) - } - - def executorSideSetup(jobId: Int, splitId: Int, attemptId: Int) { - setIDs(jobId, splitId, attemptId) - setConfParams() - committer.setupTask(taskContext) - initWriters() - } - - protected def getOutputName: String = { - val numberFormat = NumberFormat.getInstance() - numberFormat.setMinimumIntegerDigits(5) - numberFormat.setGroupingUsed(false) - val extension = Utilities.getFileExtension(conf.value, fileSinkConf.getCompressed, outputFormat) - "part-" + numberFormat.format(splitID) + extension - } - - def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = writer - - def close() { - // Seems the boolean value passed into close does not matter. - writer.close(false) - commit() - } - - def commitJob() { - committer.commitJob(jobContext) - } - - protected def initWriters() { - // NOTE this method is executed at the executor side. - // For Hive tables without partitions or with only static partitions, only 1 writer is needed. - writer = HiveFileFormatUtils.getHiveRecordWriter( - conf.value, - fileSinkConf.getTableInfo, - conf.value.getOutputValueClass.asInstanceOf[Class[Writable]], - fileSinkConf, - FileOutputFormat.getTaskOutputPath(conf.value, getOutputName), - Reporter.NULL) - } - - protected def commit() { - if (committer.needsTaskCommit(taskContext)) { - try { - committer.commitTask(taskContext) - logInfo (taID + ": Committed") - } catch { - case e: IOException => - logError("Error committing the output of task: " + taID.value, e) - committer.abortTask(taskContext) - throw e - } - } else { - logInfo("No need to commit output of task: " + taID.value) - } - } - - // ********* Private Functions ********* - - private def setIDs(jobId: Int, splitId: Int, attemptId: Int) { - jobID = jobId - splitID = splitId - attemptID = attemptId - - jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobId)) - taID = new SerializableWritable[TaskAttemptID]( - new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID)) - } - - private def setConfParams() { - conf.value.set("mapred.job.id", jID.value.toString) - conf.value.set("mapred.tip.id", taID.value.getTaskID.toString) - conf.value.set("mapred.task.id", taID.value.toString) - conf.value.setBoolean("mapred.task.is.map", true) - conf.value.setInt("mapred.task.partition", splitID) - } -} - -private[hive] object SparkHiveWriterContainer { - def createPathFromString(path: String, conf: JobConf): Path = { - if (path == null) { - throw new IllegalArgumentException("Output path is null") - } - val outputPath = new Path(path) - val fs = outputPath.getFileSystem(conf) - if (outputPath == null || fs == null) { - throw new IllegalArgumentException("Incorrectly formatted output path") - } - outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - } -} - -private[spark] class SparkHiveDynamicPartitionWriterContainer( - @transient jobConf: JobConf, - fileSinkConf: FileSinkDesc, - dynamicPartColNames: Array[String]) - extends SparkHiveWriterContainer(jobConf, fileSinkConf) { - - private val defaultPartName = jobConf.get( - ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal) - - @transient private var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _ - - override protected def initWriters(): Unit = { - // NOTE: This method is executed at the executor side. - // Actual writers are created for each dynamic partition on the fly. - writers = mutable.HashMap.empty[String, FileSinkOperator.RecordWriter] - } - - override def close(): Unit = { - writers.values.foreach(_.close(false)) - commit() - } - - override def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = { - val dynamicPartPath = dynamicPartColNames - .zip(row.takeRight(dynamicPartColNames.length)) - .map { case (col, rawVal) => - val string = String.valueOf(rawVal) - s"/$col=${if (rawVal == null || string.isEmpty) defaultPartName else string}" - } - .mkString - - def newWriter = { - val newFileSinkDesc = new FileSinkDesc( - fileSinkConf.getDirName + dynamicPartPath, - fileSinkConf.getTableInfo, - fileSinkConf.getCompressed) - newFileSinkDesc.setCompressCodec(fileSinkConf.getCompressCodec) - newFileSinkDesc.setCompressType(fileSinkConf.getCompressType) - - val path = { - val outputPath = FileOutputFormat.getOutputPath(conf.value) - assert(outputPath != null, "Undefined job output-path") - val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/")) - new Path(workPath, getOutputName) - } - - HiveFileFormatUtils.getHiveRecordWriter( - conf.value, - fileSinkConf.getTableInfo, - conf.value.getOutputValueClass.asInstanceOf[Class[Writable]], - newFileSinkDesc, - path, - Reporter.NULL) - } - - writers.getOrElseUpdate(dynamicPartPath, newWriter) - } -} diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-0-be33aaa7253c8f248ff3921cd7dae340 b/sql/hive/src/test/resources/golden/dynamic_partition-0-be33aaa7253c8f248ff3921cd7dae340 deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-1-640552dd462707563fd255a713f83b41 b/sql/hive/src/test/resources/golden/dynamic_partition-1-640552dd462707563fd255a713f83b41 deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-2-36456c9d0d2e3ef72ab5ba9ba48e5493 b/sql/hive/src/test/resources/golden/dynamic_partition-2-36456c9d0d2e3ef72ab5ba9ba48e5493 deleted file mode 100644 index 573541ac9702d..0000000000000 --- a/sql/hive/src/test/resources/golden/dynamic_partition-2-36456c9d0d2e3ef72ab5ba9ba48e5493 +++ /dev/null @@ -1 +0,0 @@ -0 diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-3-b7f7fa7ebf666f4fee27e149d8c6961f b/sql/hive/src/test/resources/golden/dynamic_partition-3-b7f7fa7ebf666f4fee27e149d8c6961f deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-4-8bdb71ad8cb3cc3026043def2525de3a b/sql/hive/src/test/resources/golden/dynamic_partition-4-8bdb71ad8cb3cc3026043def2525de3a deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-5-c630dce438f3792e7fb0f523fbbb3e1e b/sql/hive/src/test/resources/golden/dynamic_partition-5-c630dce438f3792e7fb0f523fbbb3e1e deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-6-7abc9ec8a36cdc5e89e955265a7fd7cf b/sql/hive/src/test/resources/golden/dynamic_partition-6-7abc9ec8a36cdc5e89e955265a7fd7cf deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-7-be33aaa7253c8f248ff3921cd7dae340 b/sql/hive/src/test/resources/golden/dynamic_partition-7-be33aaa7253c8f248ff3921cd7dae340 deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 5d743a51b47c5..2da8a6fac3d99 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -19,9 +19,6 @@ package org.apache.spark.sql.hive.execution import scala.util.Try -import org.apache.hadoop.hive.conf.HiveConf.ConfVars - -import org.apache.spark.SparkException import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ @@ -383,7 +380,7 @@ class HiveQuerySuite extends HiveComparisonTest { def isExplanation(result: SchemaRDD) = { val explanation = result.select('plan).collect().map { case Row(plan: String) => plan } - explanation.contains("== Physical Plan ==") + explanation.exists(_ == "== Physical Plan ==") } test("SPARK-1704: Explain commands as a SchemaRDD") { @@ -571,91 +568,6 @@ class HiveQuerySuite extends HiveComparisonTest { case class LogEntry(filename: String, message: String) case class LogFile(name: String) - createQueryTest("dynamic_partition", - """ - |DROP TABLE IF EXISTS dynamic_part_table; - |CREATE TABLE dynamic_part_table(intcol INT) PARTITIONED BY (partcol1 INT, partcol2 INT); - | - |SET hive.exec.dynamic.partition.mode=nonstrict; - | - |INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2) - |SELECT 1, 1, 1 FROM src WHERE key=150; - | - |INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2) - |SELECT 1, NULL, 1 FROM src WHERE key=150; - | - |INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2) - |SELECT 1, 1, NULL FROM src WHERE key=150; - | - |INSERT INTO TABLe dynamic_part_table PARTITION(partcol1, partcol2) - |SELECT 1, NULL, NULL FROM src WHERE key=150; - | - |DROP TABLE IF EXISTS dynamic_part_table; - """.stripMargin) - - test("Dynamic partition folder layout") { - sql("DROP TABLE IF EXISTS dynamic_part_table") - sql("CREATE TABLE dynamic_part_table(intcol INT) PARTITIONED BY (partcol1 INT, partcol2 INT)") - sql("SET hive.exec.dynamic.partition.mode=nonstrict") - - val data = Map( - Seq("1", "1") -> 1, - Seq("1", "NULL") -> 2, - Seq("NULL", "1") -> 3, - Seq("NULL", "NULL") -> 4) - - data.foreach { case (parts, value) => - sql( - s"""INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2) - |SELECT $value, ${parts.mkString(", ")} FROM src WHERE key=150 - """.stripMargin) - - val partFolder = Seq("partcol1", "partcol2") - .zip(parts) - .map { case (k, v) => - if (v == "NULL") { - s"$k=${ConfVars.DEFAULTPARTITIONNAME.defaultVal}" - } else { - s"$k=$v" - } - } - .mkString("/") - - // Loads partition data to a temporary table to verify contents - val path = s"$warehousePath/dynamic_part_table/$partFolder/part-00000" - - sql("DROP TABLE IF EXISTS dp_verify") - sql("CREATE TABLE dp_verify(intcol INT)") - sql(s"LOAD DATA LOCAL INPATH '$path' INTO TABLE dp_verify") - - assert(sql("SELECT * FROM dp_verify").collect() === Array(Row(value))) - } - } - - test("Partition spec validation") { - sql("DROP TABLE IF EXISTS dp_test") - sql("CREATE TABLE dp_test(key INT, value STRING) PARTITIONED BY (dp INT, sp INT)") - sql("SET hive.exec.dynamic.partition.mode=strict") - - // Should throw when using strict dynamic partition mode without any static partition - intercept[SparkException] { - sql( - """INSERT INTO TABLE dp_test PARTITION(dp) - |SELECT key, value, key % 5 FROM src - """.stripMargin) - } - - sql("SET hive.exec.dynamic.partition.mode=nonstrict") - - // Should throw when a static partition appears after a dynamic partition - intercept[SparkException] { - sql( - """INSERT INTO TABLE dp_test PARTITION(dp, sp = 1) - |SELECT key, value, key % 5 FROM src - """.stripMargin) - } - } - test("SPARK-3414 regression: should store analyzed logical plan when registering a temp table") { sparkContext.makeRDD(Seq.empty[LogEntry]).registerTempTable("rawLogs") sparkContext.makeRDD(Seq.empty[LogFile]).registerTempTable("logFiles") @@ -713,27 +625,27 @@ class HiveQuerySuite extends HiveComparisonTest { assert(sql("SET").collect().size == 0) assertResult(Set(testKey -> testVal)) { - collectResults(sql(s"SET $testKey=$testVal")) + collectResults(hql(s"SET $testKey=$testVal")) } assert(hiveconf.get(testKey, "") == testVal) assertResult(Set(testKey -> testVal)) { - collectResults(sql("SET")) + collectResults(hql("SET")) } sql(s"SET ${testKey + testKey}=${testVal + testVal}") assert(hiveconf.get(testKey + testKey, "") == testVal + testVal) assertResult(Set(testKey -> testVal, (testKey + testKey) -> (testVal + testVal))) { - collectResults(sql("SET")) + collectResults(hql("SET")) } // "set key" assertResult(Set(testKey -> testVal)) { - collectResults(sql(s"SET $testKey")) + collectResults(hql(s"SET $testKey")) } assertResult(Set(nonexistentKey -> "")) { - collectResults(sql(s"SET $nonexistentKey")) + collectResults(hql(s"SET $nonexistentKey")) } // Assert that sql() should have the same effects as sql() by repeating the above using sql(). diff --git a/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 857a4447dd738..4b6635679f053 100644 --- a/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -86,13 +86,13 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers { super.afterAll() } - test("run Spark in yarn-client mode") { + ignore("run Spark in yarn-client mode") { var result = File.createTempFile("result", null, tempDir) YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath())) checkResult(result) } - test("run Spark in yarn-cluster mode") { + ignore("run Spark in yarn-cluster mode") { val main = YarnClusterDriver.getClass.getName().stripSuffix("$") var result = File.createTempFile("result", null, tempDir)