From a878660d2d7bb7ad9b5818a674e1e7c651077e78 Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Tue, 11 Nov 2014 14:29:18 -0800 Subject: [PATCH 01/66] SPARK-2269 Refactor mesos scheduler resourceOffers and add unit test Author: Timothy Chen Closes #1487 from tnachen/resource_offer_refactor and squashes the following commits: 4ea5dec [Timothy Chen] Rebase from master and address comments 9ccab09 [Timothy Chen] Address review comments e6494dc [Timothy Chen] Refactor class loading 8207428 [Timothy Chen] Refactor mesos scheduler resourceOffers and add unit test --- .../cluster/mesos/MesosSchedulerBackend.scala | 137 ++++++++---------- .../mesos/MesosSchedulerBackendSuite.scala | 94 ++++++++++++ 2 files changed, 152 insertions(+), 79 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index c5f3493477bc5..d13795186c48e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -166,29 +166,16 @@ private[spark] class MesosSchedulerBackend( execArgs } - private def setClassLoader(): ClassLoader = { - val oldClassLoader = Thread.currentThread.getContextClassLoader - Thread.currentThread.setContextClassLoader(classLoader) - oldClassLoader - } - - private def restoreClassLoader(oldClassLoader: ClassLoader) { - Thread.currentThread.setContextClassLoader(oldClassLoader) - } - override def offerRescinded(d: SchedulerDriver, o: OfferID) {} override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { - val oldClassLoader = setClassLoader() - try { + inClassLoader() { appId = frameworkId.getValue logInfo("Registered as framework ID " + appId) registeredLock.synchronized { isRegistered = true registeredLock.notifyAll() } - } finally { - restoreClassLoader(oldClassLoader) } } @@ -200,6 +187,16 @@ private[spark] class MesosSchedulerBackend( } } + private def inClassLoader()(fun: => Unit) = { + val oldClassLoader = Thread.currentThread.getContextClassLoader + Thread.currentThread.setContextClassLoader(classLoader) + try { + fun + } finally { + Thread.currentThread.setContextClassLoader(oldClassLoader) + } + } + override def disconnected(d: SchedulerDriver) {} override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {} @@ -210,66 +207,57 @@ private[spark] class MesosSchedulerBackend( * tasks are balanced across the cluster. */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { - val oldClassLoader = setClassLoader() - try { - synchronized { - // Build a big list of the offerable workers, and remember their indices so that we can - // figure out which Offer to reply to for each worker - val offerableWorkers = new ArrayBuffer[WorkerOffer] - val offerableIndices = new HashMap[String, Int] - - def sufficientOffer(o: Offer) = { - val mem = getResource(o.getResourcesList, "mem") - val cpus = getResource(o.getResourcesList, "cpus") - val slaveId = o.getSlaveId.getValue - (mem >= MemoryUtils.calculateTotalMemory(sc) && - // need at least 1 for executor, 1 for task - cpus >= 2 * scheduler.CPUS_PER_TASK) || - (slaveIdsWithExecutors.contains(slaveId) && - cpus >= scheduler.CPUS_PER_TASK) - } + inClassLoader() { + val (acceptedOffers, declinedOffers) = offers.partition { o => + val mem = getResource(o.getResourcesList, "mem") + val cpus = getResource(o.getResourcesList, "cpus") + val slaveId = o.getSlaveId.getValue + (mem >= MemoryUtils.calculateTotalMemory(sc) && + // need at least 1 for executor, 1 for task + cpus >= 2 * scheduler.CPUS_PER_TASK) || + (slaveIdsWithExecutors.contains(slaveId) && + cpus >= scheduler.CPUS_PER_TASK) + } - for ((offer, index) <- offers.zipWithIndex if sufficientOffer(offer)) { - val slaveId = offer.getSlaveId.getValue - offerableIndices.put(slaveId, index) - val cpus = if (slaveIdsWithExecutors.contains(slaveId)) { - getResource(offer.getResourcesList, "cpus").toInt - } else { - // If the executor doesn't exist yet, subtract CPU for executor - getResource(offer.getResourcesList, "cpus").toInt - - scheduler.CPUS_PER_TASK - } - offerableWorkers += new WorkerOffer( - offer.getSlaveId.getValue, - offer.getHostname, - cpus) + val offerableWorkers = acceptedOffers.map { o => + val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) { + getResource(o.getResourcesList, "cpus").toInt + } else { + // If the executor doesn't exist yet, subtract CPU for executor + getResource(o.getResourcesList, "cpus").toInt - + scheduler.CPUS_PER_TASK } + new WorkerOffer( + o.getSlaveId.getValue, + o.getHostname, + cpus) + } - // Call into the TaskSchedulerImpl - val taskLists = scheduler.resourceOffers(offerableWorkers) - - // Build a list of Mesos tasks for each slave - val mesosTasks = offers.map(o => new JArrayList[MesosTaskInfo]()) - for ((taskList, index) <- taskLists.zipWithIndex) { - if (!taskList.isEmpty) { - for (taskDesc <- taskList) { - val slaveId = taskDesc.executorId - val offerNum = offerableIndices(slaveId) - slaveIdsWithExecutors += slaveId - taskIdToSlaveId(taskDesc.taskId) = slaveId - mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId)) - } + val slaveIdToOffer = acceptedOffers.map(o => o.getSlaveId.getValue -> o).toMap + + val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]] + + // Call into the TaskSchedulerImpl + scheduler.resourceOffers(offerableWorkers) + .filter(!_.isEmpty) + .foreach { offer => + offer.foreach { taskDesc => + val slaveId = taskDesc.executorId + slaveIdsWithExecutors += slaveId + taskIdToSlaveId(taskDesc.taskId) = slaveId + mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo]) + .add(createMesosTask(taskDesc, slaveId)) } } - // Reply to the offers - val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout? - for (i <- 0 until offers.size) { - d.launchTasks(Collections.singleton(offers(i).getId), mesosTasks(i), filters) - } + // Reply to the offers + val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout? + + mesosTasks.foreach { case (slaveId, tasks) => + d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters) } - } finally { - restoreClassLoader(oldClassLoader) + + declinedOffers.foreach(o => d.declineOffer(o.getId)) } } @@ -308,8 +296,7 @@ private[spark] class MesosSchedulerBackend( } override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { - val oldClassLoader = setClassLoader() - try { + inClassLoader() { val tid = status.getTaskId.getValue.toLong val state = TaskState.fromMesos(status.getState) synchronized { @@ -322,18 +309,13 @@ private[spark] class MesosSchedulerBackend( } } scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer) - } finally { - restoreClassLoader(oldClassLoader) } } override def error(d: SchedulerDriver, message: String) { - val oldClassLoader = setClassLoader() - try { + inClassLoader() { logError("Mesos error: " + message) scheduler.error(message) - } finally { - restoreClassLoader(oldClassLoader) } } @@ -350,15 +332,12 @@ private[spark] class MesosSchedulerBackend( override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) { - val oldClassLoader = setClassLoader() - try { + inClassLoader() { logInfo("Mesos slave lost: " + slaveId.getValue) synchronized { slaveIdsWithExecutors -= slaveId.getValue } scheduler.executorLost(slaveId.getValue, reason) - } finally { - restoreClassLoader(oldClassLoader) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala new file mode 100644 index 0000000000000..bef8d3a58ba63 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.mesos + +import org.scalatest.FunSuite +import org.apache.spark.{scheduler, SparkConf, SparkContext, LocalSparkContext} +import org.apache.spark.scheduler.{TaskDescription, WorkerOffer, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.mesos.{MemoryUtils, MesosSchedulerBackend} +import org.apache.mesos.SchedulerDriver +import org.apache.mesos.Protos._ +import org.scalatest.mock.EasyMockSugar +import org.apache.mesos.Protos.Value.Scalar +import org.easymock.{Capture, EasyMock} +import java.nio.ByteBuffer +import java.util.Collections +import java.util +import scala.collection.mutable + +class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with EasyMockSugar { + test("mesos resource offer is launching tasks") { + def createOffer(id: Int, mem: Int, cpu: Int) = { + val builder = Offer.newBuilder() + builder.addResourcesBuilder() + .setName("mem") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(mem)) + builder.addResourcesBuilder() + .setName("cpus") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(cpu)) + builder.setId(OfferID.newBuilder().setValue(id.toString).build()).setFrameworkId(FrameworkID.newBuilder().setValue("f1")) + .setSlaveId(SlaveID.newBuilder().setValue("s1")).setHostname("localhost").build() + } + + val driver = EasyMock.createMock(classOf[SchedulerDriver]) + val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl]) + + val sc = EasyMock.createMock(classOf[SparkContext]) + + EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes() + EasyMock.expect(sc.getSparkHome()).andReturn(Option("/path")).anyTimes() + EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes() + EasyMock.expect(sc.conf).andReturn(new SparkConf).anyTimes() + EasyMock.replay(sc) + val minMem = MemoryUtils.calculateTotalMemory(sc).toInt + val minCpu = 4 + val offers = new java.util.ArrayList[Offer] + offers.add(createOffer(1, minMem, minCpu)) + offers.add(createOffer(1, minMem - 1, minCpu)) + val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") + val workerOffers = Seq(offers.get(0)).map(o => new WorkerOffer( + o.getSlaveId.getValue, + o.getHostname, + 2 + )) + val taskDesc = new TaskDescription(1L, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) + EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(workerOffers))).andReturn(Seq(Seq(taskDesc))) + EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes() + EasyMock.replay(taskScheduler) + val capture = new Capture[util.Collection[TaskInfo]] + EasyMock.expect( + driver.launchTasks( + EasyMock.eq(Collections.singleton(offers.get(0).getId)), + EasyMock.capture(capture), + EasyMock.anyObject(classOf[Filters]) + ) + ).andReturn(Status.valueOf(1)) + EasyMock.expect(driver.declineOffer(offers.get(1).getId)).andReturn(Status.valueOf(1)) + EasyMock.replay(driver) + backend.resourceOffers(driver, offers) + assert(capture.getValue.size() == 1) + val taskInfo = capture.getValue.iterator().next() + assert(taskInfo.getName.equals("n1")) + val cpus = taskInfo.getResourcesList.get(0) + assert(cpus.getName.equals("cpus")) + assert(cpus.getScalar.getValue.equals(2.0)) + assert(taskInfo.getSlaveId.getValue.equals("s1")) + } +} From 2ddb1415e2bea94004947506ded090c2e8ff8dad Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 11 Nov 2014 18:02:59 -0800 Subject: [PATCH 02/66] [Release] Log build output for each distribution --- dev/create-release/create-release.sh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dev/create-release/create-release.sh b/dev/create-release/create-release.sh index 281e8d4de6d71..50a9a2fa1cb9a 100755 --- a/dev/create-release/create-release.sh +++ b/dev/create-release/create-release.sh @@ -27,6 +27,7 @@ # Would be nice to add: # - Send output to stderr and have useful logging in stdout +# Note: The following variables must be set before use! GIT_USERNAME=${GIT_USERNAME:-pwendell} GIT_PASSWORD=${GIT_PASSWORD:-XXX} GPG_PASSPHRASE=${GPG_PASSPHRASE:-XXX} @@ -101,7 +102,7 @@ make_binary_release() { cp -r spark spark-$RELEASE_VERSION-bin-$NAME cd spark-$RELEASE_VERSION-bin-$NAME - ./make-distribution.sh --name $NAME --tgz $FLAGS + ./make-distribution.sh --name $NAME --tgz $FLAGS 2>&1 | tee binary-release-$NAME.log cd .. cp spark-$RELEASE_VERSION-bin-$NAME/spark-$RELEASE_VERSION-bin-$NAME.tgz . rm -rf spark-$RELEASE_VERSION-bin-$NAME From daaca14c16dc2c1abc98f15ab8c6f7c14761b627 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Tue, 11 Nov 2014 21:36:48 -0800 Subject: [PATCH 03/66] Support cross building for Scala 2.11 Let's give this another go using a version of Hive that shades its JLine dependency. Author: Prashant Sharma Author: Patrick Wendell Closes #3159 from pwendell/scala-2.11-prashant and squashes the following commits: e93aa3e [Patrick Wendell] Restoring -Phive-thriftserver profile and cleaning up build script. f65d17d [Patrick Wendell] Fixing build issue due to merge conflict a8c41eb [Patrick Wendell] Reverting dev/run-tests back to master state. 7a6eb18 [Patrick Wendell] Merge remote-tracking branch 'apache/master' into scala-2.11-prashant 583aa07 [Prashant Sharma] REVERT ME: removed hive thirftserver 3680e58 [Prashant Sharma] Revert "REVERT ME: Temporarily removing some Cli tests." 935fb47 [Prashant Sharma] Revert "Fixed by disabling a few tests temporarily." 925e90f [Prashant Sharma] Fixed by disabling a few tests temporarily. 2fffed3 [Prashant Sharma] Exclude groovy from sbt build, and also provide a way for such instances in future. 8bd4e40 [Prashant Sharma] Switched to gmaven plus, it fixes random failures observer with its predecessor gmaven. 5272ce5 [Prashant Sharma] SPARK_SCALA_VERSION related bugs. 2121071 [Patrick Wendell] Migrating version detection to PySpark b1ed44d [Patrick Wendell] REVERT ME: Temporarily removing some Cli tests. 1743a73 [Patrick Wendell] Removing decimal test that doesn't work with Scala 2.11 f5cad4e [Patrick Wendell] Add Scala 2.11 docs 210d7e1 [Patrick Wendell] Revert "Testing new Hive version with shaded jline" 48518ce [Patrick Wendell] Remove association of Hive and Thriftserver profiles. e9d0a06 [Patrick Wendell] Revert "Enable thritfserver for Scala 2.10 only" 67ec364 [Patrick Wendell] Guard building of thriftserver around Scala 2.10 check 8502c23 [Patrick Wendell] Enable thritfserver for Scala 2.10 only e22b104 [Patrick Wendell] Small fix in pom file ec402ab [Patrick Wendell] Various fixes 0be5a9d [Patrick Wendell] Testing new Hive version with shaded jline 4eaec65 [Prashant Sharma] Changed scripts to ignore target. 5167bea [Prashant Sharma] small correction a4fcac6 [Prashant Sharma] Run against scala 2.11 on jenkins. 80285f4 [Prashant Sharma] MAven equivalent of setting spark.executor.extraClasspath during tests. 034b369 [Prashant Sharma] Setting test jars on executor classpath during tests from sbt. d4874cb [Prashant Sharma] Fixed Python Runner suite. null check should be first case in scala 2.11. 6f50f13 [Prashant Sharma] Fixed build after rebasing with master. We should use ${scala.binary.version} instead of just 2.10 e56ca9d [Prashant Sharma] Print an error if build for 2.10 and 2.11 is spotted. 937c0b8 [Prashant Sharma] SCALA_VERSION -> SPARK_SCALA_VERSION cb059b0 [Prashant Sharma] Code review 0476e5e [Prashant Sharma] Scala 2.11 support with repl and all build changes. --- .rat-excludes | 1 + assembly/pom.xml | 13 +- bin/compute-classpath.sh | 46 +- bin/load-spark-env.sh | 20 + bin/pyspark | 6 +- bin/run-example | 8 +- bin/spark-class | 8 +- core/pom.xml | 57 +- .../apache/spark/deploy/PythonRunner.scala | 2 +- .../org/apache/spark/deploy/SparkSubmit.scala | 2 +- dev/change-version-to-2.10.sh | 20 + dev/change-version-to-2.11.sh | 21 + dev/create-release/create-release.sh | 12 +- dev/run-tests | 13 +- dev/scalastyle | 2 +- docs/building-spark.md | 31 +- docs/sql-programming-guide.md | 2 +- examples/pom.xml | 199 ++- .../streaming/JavaKafkaWordCount.java | 0 .../examples/streaming/KafkaWordCount.scala | 0 .../streaming/TwitterAlgebirdCMS.scala | 0 .../streaming/TwitterAlgebirdHLL.scala | 0 external/mqtt/pom.xml | 5 - make-distribution.sh | 2 +- network/shuffle/pom.xml | 4 +- network/yarn/pom.xml | 2 +- pom.xml | 178 ++- project/SparkBuild.scala | 36 +- project/project/SparkPluginBuild.scala | 2 +- repl/pom.xml | 90 +- .../scala/org/apache/spark/repl/Main.scala | 0 .../apache/spark/repl/SparkCommandLine.scala | 0 .../apache/spark/repl/SparkExprTyper.scala | 0 .../org/apache/spark/repl/SparkHelper.scala | 0 .../org/apache/spark/repl/SparkILoop.scala | 0 .../apache/spark/repl/SparkILoopInit.scala | 0 .../org/apache/spark/repl/SparkIMain.scala | 0 .../org/apache/spark/repl/SparkImports.scala | 0 .../spark/repl/SparkJLineCompletion.scala | 0 .../apache/spark/repl/SparkJLineReader.scala | 0 .../spark/repl/SparkMemberHandlers.scala | 0 .../spark/repl/SparkRunnerSettings.scala | 0 .../org/apache/spark/repl/ReplSuite.scala | 0 .../scala/org/apache/spark/repl/Main.scala | 85 ++ .../apache/spark/repl/SparkExprTyper.scala | 86 ++ .../org/apache/spark/repl/SparkILoop.scala | 966 ++++++++++++ .../org/apache/spark/repl/SparkIMain.scala | 1319 +++++++++++++++++ .../org/apache/spark/repl/SparkImports.scala | 201 +++ .../spark/repl/SparkJLineCompletion.scala | 350 +++++ .../spark/repl/SparkMemberHandlers.scala | 221 +++ .../apache/spark/repl/SparkReplReporter.scala | 53 + .../org/apache/spark/repl/ReplSuite.scala | 326 ++++ sql/catalyst/pom.xml | 29 +- .../catalyst/types/decimal/DecimalSuite.scala | 1 - 54 files changed, 4204 insertions(+), 215 deletions(-) create mode 100755 dev/change-version-to-2.10.sh create mode 100755 dev/change-version-to-2.11.sh rename examples/{ => scala-2.10}/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java (100%) rename examples/{ => scala-2.10}/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala (100%) rename examples/{ => scala-2.10}/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala (100%) rename examples/{ => scala-2.10}/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala (100%) rename repl/{ => scala-2.10}/src/main/scala/org/apache/spark/repl/Main.scala (100%) rename repl/{ => scala-2.10}/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala (100%) rename repl/{ => scala-2.10}/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala (100%) rename repl/{ => scala-2.10}/src/main/scala/org/apache/spark/repl/SparkHelper.scala (100%) rename repl/{ => scala-2.10}/src/main/scala/org/apache/spark/repl/SparkILoop.scala (100%) rename repl/{ => scala-2.10}/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala (100%) rename repl/{ => scala-2.10}/src/main/scala/org/apache/spark/repl/SparkIMain.scala (100%) rename repl/{ => scala-2.10}/src/main/scala/org/apache/spark/repl/SparkImports.scala (100%) rename repl/{ => scala-2.10}/src/main/scala/org/apache/spark/repl/SparkJLineCompletion.scala (100%) rename repl/{ => scala-2.10}/src/main/scala/org/apache/spark/repl/SparkJLineReader.scala (100%) rename repl/{ => scala-2.10}/src/main/scala/org/apache/spark/repl/SparkMemberHandlers.scala (100%) rename repl/{ => scala-2.10}/src/main/scala/org/apache/spark/repl/SparkRunnerSettings.scala (100%) rename repl/{ => scala-2.10}/src/test/scala/org/apache/spark/repl/ReplSuite.scala (100%) create mode 100644 repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala create mode 100644 repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala create mode 100644 repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala create mode 100644 repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkIMain.scala create mode 100644 repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkImports.scala create mode 100644 repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkJLineCompletion.scala create mode 100644 repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkMemberHandlers.scala create mode 100644 repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkReplReporter.scala create mode 100644 repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala diff --git a/.rat-excludes b/.rat-excludes index 20e3372464386..d8bee1f8e49c9 100644 --- a/.rat-excludes +++ b/.rat-excludes @@ -44,6 +44,7 @@ SparkImports.scala SparkJLineCompletion.scala SparkJLineReader.scala SparkMemberHandlers.scala +SparkReplReporter.scala sbt sbt-launch-lib.bash plugins.sbt diff --git a/assembly/pom.xml b/assembly/pom.xml index 31a01e4d8e1de..c65192bde64c6 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -66,22 +66,22 @@ org.apache.spark - spark-repl_${scala.binary.version} + spark-streaming_${scala.binary.version} ${project.version} org.apache.spark - spark-streaming_${scala.binary.version} + spark-graphx_${scala.binary.version} ${project.version} org.apache.spark - spark-graphx_${scala.binary.version} + spark-sql_${scala.binary.version} ${project.version} org.apache.spark - spark-sql_${scala.binary.version} + spark-repl_${scala.binary.version} ${project.version} @@ -197,6 +197,11 @@ spark-hive_${scala.binary.version} ${project.version} + + + + hive-thriftserver + org.apache.spark spark-hive-thriftserver_${scala.binary.version} diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh index 905bbaf99b374..298641f2684de 100755 --- a/bin/compute-classpath.sh +++ b/bin/compute-classpath.sh @@ -20,8 +20,6 @@ # This script computes Spark's classpath and prints it to stdout; it's used by both the "run" # script and the ExecutorRunner in standalone cluster mode. -SCALA_VERSION=2.10 - # Figure out where Spark is installed FWDIR="$(cd "`dirname "$0"`"/..; pwd)" @@ -36,7 +34,7 @@ else CLASSPATH="$CLASSPATH:$FWDIR/conf" fi -ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SCALA_VERSION" +ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SPARK_SCALA_VERSION" if [ -n "$JAVA_HOME" ]; then JAR_CMD="$JAVA_HOME/bin/jar" @@ -48,19 +46,19 @@ fi if [ -n "$SPARK_PREPEND_CLASSES" ]; then echo "NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark"\ "classes ahead of assembly." >&2 - CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SPARK_SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$FWDIR/core/target/jars/*" - CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SCALA_VERSION/classes" - CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SCALA_VERSION/classes" - CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SCALA_VERSION/classes" - CLASSPATH="$CLASSPATH:$FWDIR/graphx/target/scala-$SCALA_VERSION/classes" - CLASSPATH="$CLASSPATH:$FWDIR/streaming/target/scala-$SCALA_VERSION/classes" - CLASSPATH="$CLASSPATH:$FWDIR/tools/target/scala-$SCALA_VERSION/classes" - CLASSPATH="$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SCALA_VERSION/classes" - CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SCALA_VERSION/classes" - CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SCALA_VERSION/classes" - CLASSPATH="$CLASSPATH:$FWDIR/sql/hive-thriftserver/target/scala-$SCALA_VERSION/classes" - CLASSPATH="$CLASSPATH:$FWDIR/yarn/stable/target/scala-$SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SPARK_SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SPARK_SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SPARK_SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$FWDIR/graphx/target/scala-$SPARK_SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$FWDIR/streaming/target/scala-$SPARK_SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$FWDIR/tools/target/scala-$SPARK_SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SPARK_SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SPARK_SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SPARK_SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$FWDIR/sql/hive-thriftserver/target/scala-$SPARK_SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$FWDIR/yarn/stable/target/scala-$SPARK_SCALA_VERSION/classes" fi # Use spark-assembly jar from either RELEASE or assembly directory @@ -123,15 +121,15 @@ fi # Add test classes if we're running from SBT or Maven with SPARK_TESTING set to 1 if [[ $SPARK_TESTING == 1 ]]; then - CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SCALA_VERSION/test-classes" - CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SCALA_VERSION/test-classes" - CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SCALA_VERSION/test-classes" - CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SCALA_VERSION/test-classes" - CLASSPATH="$CLASSPATH:$FWDIR/graphx/target/scala-$SCALA_VERSION/test-classes" - CLASSPATH="$CLASSPATH:$FWDIR/streaming/target/scala-$SCALA_VERSION/test-classes" - CLASSPATH="$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SCALA_VERSION/test-classes" - CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SCALA_VERSION/test-classes" - CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SCALA_VERSION/test-classes" + CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SPARK_SCALA_VERSION/test-classes" + CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SPARK_SCALA_VERSION/test-classes" + CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SPARK_SCALA_VERSION/test-classes" + CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SPARK_SCALA_VERSION/test-classes" + CLASSPATH="$CLASSPATH:$FWDIR/graphx/target/scala-$SPARK_SCALA_VERSION/test-classes" + CLASSPATH="$CLASSPATH:$FWDIR/streaming/target/scala-$SPARK_SCALA_VERSION/test-classes" + CLASSPATH="$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SPARK_SCALA_VERSION/test-classes" + CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SPARK_SCALA_VERSION/test-classes" + CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SPARK_SCALA_VERSION/test-classes" fi # Add hadoop conf dir if given -- otherwise FileSystem.*, etc fail ! diff --git a/bin/load-spark-env.sh b/bin/load-spark-env.sh index 6d4231b204595..356b3d49b2ffe 100644 --- a/bin/load-spark-env.sh +++ b/bin/load-spark-env.sh @@ -36,3 +36,23 @@ if [ -z "$SPARK_ENV_LOADED" ]; then set +a fi fi + +# Setting SPARK_SCALA_VERSION if not already set. + +if [ -z "$SPARK_SCALA_VERSION" ]; then + + ASSEMBLY_DIR2="$FWDIR/assembly/target/scala-2.11" + ASSEMBLY_DIR1="$FWDIR/assembly/target/scala-2.10" + + if [[ -d "$ASSEMBLY_DIR2" && -d "$ASSEMBLY_DIR1" ]]; then + echo -e "Presence of build for both scala versions(SCALA 2.10 and SCALA 2.11) detected." 1>&2 + echo -e 'Either clean one of them or, export SPARK_SCALA_VERSION=2.11 in spark-env.sh.' 1>&2 + exit 1 + fi + + if [ -d "$ASSEMBLY_DIR2" ]; then + export SPARK_SCALA_VERSION="2.11" + else + export SPARK_SCALA_VERSION="2.10" + fi +fi diff --git a/bin/pyspark b/bin/pyspark index 96f30a260a09e..1d8c94d43d285 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -25,7 +25,7 @@ export SPARK_HOME="$FWDIR" source "$FWDIR/bin/utils.sh" -SCALA_VERSION=2.10 +source "$FWDIR"/bin/load-spark-env.sh function usage() { echo "Usage: ./bin/pyspark [options]" 1>&2 @@ -40,7 +40,7 @@ fi # Exit if the user hasn't compiled Spark if [ ! -f "$FWDIR/RELEASE" ]; then # Exit if the user hasn't compiled Spark - ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*.jar >& /dev/null + ls "$FWDIR"/assembly/target/scala-$SPARK_SCALA_VERSION/spark-assembly*hadoop*.jar >& /dev/null if [[ $? != 0 ]]; then echo "Failed to find Spark assembly in $FWDIR/assembly/target" 1>&2 echo "You need to build Spark before running this program" 1>&2 @@ -48,8 +48,6 @@ if [ ! -f "$FWDIR/RELEASE" ]; then fi fi -. "$FWDIR"/bin/load-spark-env.sh - # In Spark <= 1.1, setting IPYTHON=1 would cause the driver to be launched using the `ipython` # executable, while the worker would still be launched using PYSPARK_PYTHON. # diff --git a/bin/run-example b/bin/run-example index 34dd71c71880e..3d932509426fc 100755 --- a/bin/run-example +++ b/bin/run-example @@ -17,12 +17,12 @@ # limitations under the License. # -SCALA_VERSION=2.10 - FWDIR="$(cd "`dirname "$0"`"/..; pwd)" export SPARK_HOME="$FWDIR" EXAMPLES_DIR="$FWDIR"/examples +. "$FWDIR"/bin/load-spark-env.sh + if [ -n "$1" ]; then EXAMPLE_CLASS="$1" shift @@ -36,8 +36,8 @@ fi if [ -f "$FWDIR/RELEASE" ]; then export SPARK_EXAMPLES_JAR="`ls "$FWDIR"/lib/spark-examples-*hadoop*.jar`" -elif [ -e "$EXAMPLES_DIR"/target/scala-$SCALA_VERSION/spark-examples-*hadoop*.jar ]; then - export SPARK_EXAMPLES_JAR="`ls "$EXAMPLES_DIR"/target/scala-$SCALA_VERSION/spark-examples-*hadoop*.jar`" +elif [ -e "$EXAMPLES_DIR"/target/scala-$SPARK_SCALA_VERSION/spark-examples-*hadoop*.jar ]; then + export SPARK_EXAMPLES_JAR="`ls "$EXAMPLES_DIR"/target/scala-$SPARK_SCALA_VERSION/spark-examples-*hadoop*.jar`" fi if [[ -z "$SPARK_EXAMPLES_JAR" ]]; then diff --git a/bin/spark-class b/bin/spark-class index 925367b0dd187..0d58d95c1aee3 100755 --- a/bin/spark-class +++ b/bin/spark-class @@ -24,8 +24,6 @@ case "`uname`" in CYGWIN*) cygwin=true;; esac -SCALA_VERSION=2.10 - # Figure out where Spark is installed FWDIR="$(cd "`dirname "$0"`"/..; pwd)" @@ -128,9 +126,9 @@ fi TOOLS_DIR="$FWDIR"/tools SPARK_TOOLS_JAR="" -if [ -e "$TOOLS_DIR"/target/scala-$SCALA_VERSION/spark-tools*[0-9Tg].jar ]; then +if [ -e "$TOOLS_DIR"/target/scala-$SPARK_SCALA_VERSION/spark-tools*[0-9Tg].jar ]; then # Use the JAR from the SBT build - export SPARK_TOOLS_JAR="`ls "$TOOLS_DIR"/target/scala-$SCALA_VERSION/spark-tools*[0-9Tg].jar`" + export SPARK_TOOLS_JAR="`ls "$TOOLS_DIR"/target/scala-$SPARK_SCALA_VERSION/spark-tools*[0-9Tg].jar`" fi if [ -e "$TOOLS_DIR"/target/spark-tools*[0-9Tg].jar ]; then # Use the JAR from the Maven build @@ -149,7 +147,7 @@ fi if [[ "$1" =~ org.apache.spark.tools.* ]]; then if test -z "$SPARK_TOOLS_JAR"; then - echo "Failed to find Spark Tools Jar in $FWDIR/tools/target/scala-$SCALA_VERSION/" 1>&2 + echo "Failed to find Spark Tools Jar in $FWDIR/tools/target/scala-$SPARK_SCALA_VERSION/" 1>&2 echo "You need to build Spark before running $1." 1>&2 exit 1 fi diff --git a/core/pom.xml b/core/pom.xml index 41296e0eca330..492eddda744c2 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -34,6 +34,34 @@ Spark Project Core http://spark.apache.org/ + + com.twitter + chill_${scala.binary.version} + + + org.ow2.asm + asm + + + org.ow2.asm + asm-commons + + + + + com.twitter + chill-java + + + org.ow2.asm + asm + + + org.ow2.asm + asm-commons + + + org.apache.hadoop hadoop-client @@ -46,12 +74,12 @@ org.apache.spark - spark-network-common_2.10 + spark-network-common_${scala.binary.version} ${project.version} org.apache.spark - spark-network-shuffle_2.10 + spark-network-shuffle_${scala.binary.version} ${project.version} @@ -132,14 +160,6 @@ net.jpountz.lz4 lz4 - - com.twitter - chill_${scala.binary.version} - - - com.twitter - chill-java - org.roaringbitmap RoaringBitmap @@ -309,14 +329,16 @@ org.scalatest scalatest-maven-plugin - - - ${basedir}/.. - 1 - ${spark.classpath} - - + + + test + + test + + + + org.apache.maven.plugins @@ -424,4 +446,5 @@ + diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala index af94b05ce3847..039c8719e2867 100644 --- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala @@ -87,8 +87,8 @@ object PythonRunner { // Strip the URI scheme from the path formattedPath = new URI(formattedPath).getScheme match { - case Utils.windowsDrive(d) if windows => formattedPath case null => formattedPath + case Utils.windowsDrive(d) if windows => formattedPath case _ => new URI(formattedPath).getPath } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index b43e68e40f791..8a62519bd2315 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -340,7 +340,7 @@ object SparkSubmit { e.printStackTrace(printStream) if (childMainClass.contains("thriftserver")) { println(s"Failed to load main class $childMainClass.") - println("You need to build Spark with -Phive.") + println("You need to build Spark with -Phive and -Phive-thriftserver.") } System.exit(CLASS_NOT_FOUND_EXIT_STATUS) } diff --git a/dev/change-version-to-2.10.sh b/dev/change-version-to-2.10.sh new file mode 100755 index 0000000000000..7473c20d28e09 --- /dev/null +++ b/dev/change-version-to-2.10.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +find . -name 'pom.xml' | grep -v target \ + | xargs -I {} sed -i -e 's|\(artifactId.*\)_2.11|\1_2.10|g' {} diff --git a/dev/change-version-to-2.11.sh b/dev/change-version-to-2.11.sh new file mode 100755 index 0000000000000..3957a9f3ba258 --- /dev/null +++ b/dev/change-version-to-2.11.sh @@ -0,0 +1,21 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +find . -name 'pom.xml' | grep -v target \ + | xargs -I {} sed -i -e 's|\(artifactId.*\)_2.10|\1_2.11|g' {} diff --git a/dev/create-release/create-release.sh b/dev/create-release/create-release.sh index 50a9a2fa1cb9a..db441b3e49790 100755 --- a/dev/create-release/create-release.sh +++ b/dev/create-release/create-release.sh @@ -118,13 +118,13 @@ make_binary_release() { spark-$RELEASE_VERSION-bin-$NAME.tgz.sha } -make_binary_release "hadoop1" "-Phive -Dhadoop.version=1.0.4" & -make_binary_release "cdh4" "-Phive -Dhadoop.version=2.0.0-mr1-cdh4.2.0" & -make_binary_release "hadoop2.3" "-Phadoop-2.3 -Phive -Pyarn" & -make_binary_release "hadoop2.4" "-Phadoop-2.4 -Phive -Pyarn" & +make_binary_release "hadoop1" "-Phive -Phive-thriftserver -Dhadoop.version=1.0.4" & +make_binary_release "cdh4" "-Phive -Phive-thriftserver -Dhadoop.version=2.0.0-mr1-cdh4.2.0" & +make_binary_release "hadoop2.3" "-Phadoop-2.3 -Phive -Phive-thriftserver -Pyarn" & +make_binary_release "hadoop2.4" "-Phadoop-2.4 -Phive -Phive-thriftserver -Pyarn" & make_binary_release "hadoop2.4-without-hive" "-Phadoop-2.4 -Pyarn" & -make_binary_release "mapr3" "-Pmapr3 -Phive" & -make_binary_release "mapr4" "-Pmapr4 -Pyarn -Phive" & +make_binary_release "mapr3" "-Pmapr3 -Phive -Phive-thriftserver" & +make_binary_release "mapr4" "-Pmapr4 -Pyarn -Phive -Phive-thriftserver" & wait # Copy data diff --git a/dev/run-tests b/dev/run-tests index de607e4344453..328a73bd8b26d 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -139,9 +139,6 @@ echo "=========================================================================" CURRENT_BLOCK=$BLOCK_BUILD { - # We always build with Hive because the PySpark Spark SQL tests need it. - BUILD_MVN_PROFILE_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-0.12.0" - # NOTE: echo "q" is needed because sbt on encountering a build file with failure #+ (either resolution or compilation) prompts the user for input either q, r, etc @@ -151,15 +148,17 @@ CURRENT_BLOCK=$BLOCK_BUILD # QUESTION: Why doesn't 'yes "q"' work? # QUESTION: Why doesn't 'grep -v -e "^\[info\] Resolving"' work? # First build with 0.12 to ensure patches do not break the hive 12 build + HIVE_12_BUILD_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-thriftserver -Phive-0.12.0" echo "[info] Compile with hive 0.12" echo -e "q\n" \ - | sbt/sbt $BUILD_MVN_PROFILE_ARGS clean hive/compile hive-thriftserver/compile \ + | sbt/sbt $HIVE_12_BUILD_ARGS clean hive/compile hive-thriftserver/compile \ | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including" # Then build with default version(0.13.1) because tests are based on this version - echo "[info] Building Spark with these arguments: $SBT_MAVEN_PROFILES_ARGS -Phive" + echo "[info] Building Spark with these arguments: $SBT_MAVEN_PROFILES_ARGS"\ + " -Phive -Phive-thriftserver" echo -e "q\n" \ - | sbt/sbt $SBT_MAVEN_PROFILES_ARGS -Phive package assembly/assembly \ + | sbt/sbt $SBT_MAVEN_PROFILES_ARGS -Phive -Phive-thriftserver package assembly/assembly \ | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including" } @@ -174,7 +173,7 @@ CURRENT_BLOCK=$BLOCK_SPARK_UNIT_TESTS # If the Spark SQL tests are enabled, run the tests with the Hive profiles enabled. # This must be a single argument, as it is. if [ -n "$_RUN_SQL_TESTS" ]; then - SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive" + SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-thriftserver" fi if [ -n "$_SQL_TESTS_ONLY" ]; then diff --git a/dev/scalastyle b/dev/scalastyle index ed1b6b730af6e..c3c6012e74ffa 100755 --- a/dev/scalastyle +++ b/dev/scalastyle @@ -17,7 +17,7 @@ # limitations under the License. # -echo -e "q\n" | sbt/sbt -Phive scalastyle > scalastyle.txt +echo -e "q\n" | sbt/sbt -Phive -Phive-thriftserver scalastyle > scalastyle.txt # Check style with YARN alpha built too echo -e "q\n" | sbt/sbt -Pyarn-alpha -Phadoop-0.23 -Dhadoop.version=0.23.9 yarn-alpha/scalastyle \ >> scalastyle.txt diff --git a/docs/building-spark.md b/docs/building-spark.md index 238ddae15545e..20ba7da5d71ff 100644 --- a/docs/building-spark.md +++ b/docs/building-spark.md @@ -101,25 +101,34 @@ mvn -Pyarn-alpha -Phadoop-2.3 -Dhadoop.version=2.3.0 -Dyarn.version=0.23.7 -Dski # Building With Hive and JDBC Support To enable Hive integration for Spark SQL along with its JDBC server and CLI, -add the `-Phive` profile to your existing build options. By default Spark -will build with Hive 0.13.1 bindings. You can also build for Hive 0.12.0 using -the `-Phive-0.12.0` profile. +add the `-Phive` and `Phive-thriftserver` profiles to your existing build options. +By default Spark will build with Hive 0.13.1 bindings. You can also build for +Hive 0.12.0 using the `-Phive-0.12.0` profile. {% highlight bash %} # Apache Hadoop 2.4.X with Hive 13 support -mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -DskipTests clean package +mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package # Apache Hadoop 2.4.X with Hive 12 support -mvn -Pyarn -Phive-0.12.0 -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -DskipTests clean package +mvn -Pyarn -Phive -Phive-thriftserver-0.12.0 -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package {% endhighlight %} +# Building for Scala 2.11 +To produce a Spark package compiled with Scala 2.11, use the `-Pscala-2.11` profile: + + mvn -Pyarn -Phadoop-2.4 -Pscala-2.11 -DskipTests clean package + +Scala 2.11 support in Spark is experimental and does not support a few features. +Specifically, Spark's external Kafka library and JDBC component are not yet +supported in Scala 2.11 builds. + # Spark Tests in Maven Tests are run by default via the [ScalaTest Maven plugin](http://www.scalatest.org/user_guide/using_the_scalatest_maven_plugin). Some of the tests require Spark to be packaged first, so always run `mvn package` with `-DskipTests` the first time. The following is an example of a correct (build, test) sequence: - mvn -Pyarn -Phadoop-2.3 -DskipTests -Phive clean package - mvn -Pyarn -Phadoop-2.3 -Phive test + mvn -Pyarn -Phadoop-2.3 -DskipTests -Phive -Phive-thriftserver clean package + mvn -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver test The ScalaTest plugin also supports running only a specific test suite as follows: @@ -182,16 +191,16 @@ can be set to control the SBT build. For example: Some of the tests require Spark to be packaged first, so always run `sbt/sbt assembly` the first time. The following is an example of a correct (build, test) sequence: - sbt/sbt -Pyarn -Phadoop-2.3 -Phive assembly - sbt/sbt -Pyarn -Phadoop-2.3 -Phive test + sbt/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver assembly + sbt/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver test To run only a specific test suite as follows: - sbt/sbt -Pyarn -Phadoop-2.3 -Phive "test-only org.apache.spark.repl.ReplSuite" + sbt/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver "test-only org.apache.spark.repl.ReplSuite" To run test suites of a specific sub project as follows: - sbt/sbt -Pyarn -Phadoop-2.3 -Phive core/test + sbt/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver core/test # Speeding up Compilation with Zinc diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index ffcce2c588879..48e8267ac072c 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -728,7 +728,7 @@ anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD) Spark SQL also supports reading and writing data stored in [Apache Hive](http://hive.apache.org/). However, since Hive has a large number of dependencies, it is not included in the default Spark assembly. -In order to use Hive you must first run "`sbt/sbt -Phive assembly/assembly`" (or use `-Phive` for maven). +Hive support is enabled by adding the `-Phive` and `-Phive-thriftserver` flags to Spark's build. This command builds a new assembly jar that includes Hive. Note that this Hive assembly jar must also be present on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries (SerDes) in order to access data stored in Hive. diff --git a/examples/pom.xml b/examples/pom.xml index 910eb55308b9d..2ec5728154abf 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -34,48 +34,6 @@ Spark Project Examples http://spark.apache.org/ - - - kinesis-asl - - - org.apache.spark - spark-streaming-kinesis-asl_${scala.binary.version} - ${project.version} - - - org.apache.httpcomponents - httpclient - ${commons.httpclient.version} - - - - - hbase-hadoop2 - - - hbase.profile - hadoop2 - - - - 0.98.7-hadoop2 - - - - hbase-hadoop1 - - - !hbase.profile - - - - 0.98.7-hadoop1 - - - - - @@ -124,11 +82,6 @@ spark-streaming-twitter_${scala.binary.version} ${project.version} - - org.apache.spark - spark-streaming-kafka_${scala.binary.version} - ${project.version} - org.apache.spark spark-streaming-flume_${scala.binary.version} @@ -136,12 +89,12 @@ org.apache.spark - spark-streaming-zeromq_${scala.binary.version} + spark-streaming-mqtt_${scala.binary.version} ${project.version} org.apache.spark - spark-streaming-mqtt_${scala.binary.version} + spark-streaming-zeromq_${scala.binary.version} ${project.version} @@ -260,11 +213,6 @@ test-jar test - - com.twitter - algebird-core_${scala.binary.version} - 0.1.11 - org.apache.commons commons-math3 @@ -401,4 +349,147 @@ + + + kinesis-asl + + + org.apache.spark + spark-streaming-kinesis-asl_${scala.binary.version} + ${project.version} + + + org.apache.httpcomponents + httpclient + ${commons.httpclient.version} + + + + + hbase-hadoop2 + + + hbase.profile + hadoop2 + + + + 0.98.7-hadoop2 + + + + hbase-hadoop1 + + + !hbase.profile + + + + 0.98.7-hadoop1 + + + + + scala-2.10 + + true + + + + org.apache.spark + spark-streaming-kafka_${scala.binary.version} + ${project.version} + + + com.twitter + algebird-core_${scala.binary.version} + 0.1.11 + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-scala-sources + generate-sources + + add-source + + + + src/main/scala + scala-2.10/src/main/scala + scala-2.10/src/main/java + + + + + add-scala-test-sources + generate-test-sources + + add-test-source + + + + src/test/scala + scala-2.10/src/test/scala + scala-2.10/src/test/java + + + + + + + + + + scala-2.11 + + false + + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-scala-sources + generate-sources + + add-source + + + + src/main/scala + scala-2.11/src/main/scala + + + + + add-scala-test-sources + generate-test-sources + + add-test-source + + + + src/test/scala + scala-2.11/src/test/scala + + + + + + + + + diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java similarity index 100% rename from examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java rename to examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala similarity index 100% rename from examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala rename to examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala similarity index 100% rename from examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala rename to examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala similarity index 100% rename from examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala rename to examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml index 371f1f1e9d39a..362a76e515938 100644 --- a/external/mqtt/pom.xml +++ b/external/mqtt/pom.xml @@ -52,11 +52,6 @@ mqtt-client 0.4.0 - - ${akka.group} - akka-zeromq_${scala.binary.version} - ${akka.version} - org.scalatest scalatest_${scala.binary.version} diff --git a/make-distribution.sh b/make-distribution.sh index 0bc839e1dbe4d..d46edbc50d152 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -59,7 +59,7 @@ while (( "$#" )); do exit_with_usage ;; --with-hive) - echo "Error: '--with-hive' is no longer supported, use Maven option -Phive" + echo "Error: '--with-hive' is no longer supported, use Maven options -Phive and -Phive-thriftserver" exit_with_usage ;; --skip-java-test) diff --git a/network/shuffle/pom.xml b/network/shuffle/pom.xml index 27c8467687f10..a180a5e5f926e 100644 --- a/network/shuffle/pom.xml +++ b/network/shuffle/pom.xml @@ -39,7 +39,7 @@ org.apache.spark - spark-network-common_2.10 + spark-network-common_${scala.binary.version} ${project.version} @@ -58,7 +58,7 @@ org.apache.spark - spark-network-common_2.10 + spark-network-common_${scala.binary.version} ${project.version} test-jar test diff --git a/network/yarn/pom.xml b/network/yarn/pom.xml index 6e6f6f3e79296..85960eb85b482 100644 --- a/network/yarn/pom.xml +++ b/network/yarn/pom.xml @@ -39,7 +39,7 @@ org.apache.spark - spark-network-shuffle_2.10 + spark-network-shuffle_${scala.binary.version} ${project.version} diff --git a/pom.xml b/pom.xml index 4e0cd6c151d0b..7bbde31e572d9 100644 --- a/pom.xml +++ b/pom.xml @@ -97,30 +97,26 @@ sql/catalyst sql/core sql/hive - repl assembly external/twitter - external/kafka external/flume external/flume-sink - external/zeromq external/mqtt + external/zeromq examples + repl UTF-8 UTF-8 - + org.spark-project.akka + 2.3.4-spark 1.6 spark - 2.10.4 - 2.10 2.0.1 0.18.1 shaded-protobuf - org.spark-project.akka - 2.3.4-spark 1.7.5 1.2.17 1.0.4 @@ -137,7 +133,7 @@ 1.6.0rc3 1.2.3 8.1.14.v20131031 - 0.3.6 + 0.5.0 3.0.0 1.7.6 @@ -146,9 +142,13 @@ 1.1.0 4.2.6 3.1.1 - + ${project.build.directory}/spark-test-classpath.txt 64m 512m + 2.10.4 + 2.10 + ${scala.version} + org.scala-lang @@ -267,19 +267,66 @@ + + - org.spark-project.spark unused 1.0.0 + + + org.codehaus.groovy + groovy-all + 2.3.7 + provided + + + ${jline.groupid} + jline + ${jline.version} + + + com.twitter + chill_${scala.binary.version} + ${chill.version} + + + org.ow2.asm + asm + + + org.ow2.asm + asm-commons + + + + + com.twitter + chill-java + ${chill.version} + + + org.ow2.asm + asm + + + org.ow2.asm + asm-commons + + + org.eclipse.jetty jetty-util @@ -395,36 +442,6 @@ protobuf-java ${protobuf.version} - - com.twitter - chill_${scala.binary.version} - ${chill.version} - - - org.ow2.asm - asm - - - org.ow2.asm - asm-commons - - - - - com.twitter - chill-java - ${chill.version} - - - org.ow2.asm - asm - - - org.ow2.asm - asm-commons - - - ${akka.group} akka-actor_${scala.binary.version} @@ -512,11 +529,6 @@ scala-reflect ${scala.version} - - org.scala-lang - jline - ${scala.version} - org.scala-lang scala-library @@ -965,6 +977,7 @@ ${session.executionRootDirectory} 1 false + ${test_classpath} @@ -1026,6 +1039,47 @@ + + + org.apache.maven.plugins + maven-dependency-plugin + 2.9 + + + test-compile + + build-classpath + + + test + ${test_classpath_file} + + + + + + + + org.codehaus.gmavenplus + gmavenplus-plugin + 1.2 + + + process-test-classes + + execute + + + + + + + + + org.apache.maven.plugins @@ -1335,7 +1389,7 @@ - hive + hive-thriftserver false @@ -1365,5 +1419,35 @@ 10.10.1.1 + + + scala-2.10 + + true + + + 2.10.4 + 2.10 + ${scala.version} + org.scala-lang + + + external/kafka + + + + + scala-2.11 + + false + + + 2.11.2 + 2.11 + 2.12 + jline + + + diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 657e4b4432775..5eb3ed439cde9 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -31,8 +31,8 @@ object BuildCommons { private val buildLocation = file(".").getAbsoluteFile.getParentFile val allProjects@Seq(bagel, catalyst, core, graphx, hive, hiveThriftServer, mllib, repl, - sql, networkCommon, networkShuffle, streaming, streamingFlumeSink, streamingFlume, streamingKafka, - streamingMqtt, streamingTwitter, streamingZeromq) = + sql, networkCommon, networkShuffle, streaming, streamingFlumeSink, streamingFlume, streamingKafka, + streamingMqtt, streamingTwitter, streamingZeromq) = Seq("bagel", "catalyst", "core", "graphx", "hive", "hive-thriftserver", "mllib", "repl", "sql", "network-common", "network-shuffle", "streaming", "streaming-flume-sink", "streaming-flume", "streaming-kafka", "streaming-mqtt", "streaming-twitter", @@ -68,8 +68,8 @@ object SparkBuild extends PomBuild { profiles ++= Seq("spark-ganglia-lgpl") } if (Properties.envOrNone("SPARK_HIVE").isDefined) { - println("NOTE: SPARK_HIVE is deprecated, please use -Phive flag.") - profiles ++= Seq("hive") + println("NOTE: SPARK_HIVE is deprecated, please use -Phive and -Phive-thriftserver flags.") + profiles ++= Seq("hive", "hive-thriftserver") } Properties.envOrNone("SPARK_HADOOP_VERSION") match { case Some(v) => @@ -91,13 +91,21 @@ object SparkBuild extends PomBuild { profiles } - override val profiles = Properties.envOrNone("SBT_MAVEN_PROFILES") match { + override val profiles = { + val profiles = Properties.envOrNone("SBT_MAVEN_PROFILES") match { case None => backwardCompatibility case Some(v) => if (backwardCompatibility.nonEmpty) println("Note: We ignore environment variables, when use of profile is detected in " + "conjunction with environment variable.") v.split("(\\s+|,)").filterNot(_.isEmpty).map(_.trim.replaceAll("-P", "")).toSeq + } + if (profiles.exists(_.contains("scala-"))) { + profiles + } else { + println("Enabled default scala profile") + profiles ++ Seq("scala-2.10") + } } Properties.envOrNone("SBT_MAVEN_PROPERTIES") match { @@ -136,7 +144,8 @@ object SparkBuild extends PomBuild { // Note ordering of these settings matter. /* Enable shared settings on all projects */ - (allProjects ++ optionallyEnabledProjects ++ assemblyProjects).foreach(enable(sharedSettings)) + (allProjects ++ optionallyEnabledProjects ++ assemblyProjects ++ Seq(spark, tools)) + .foreach(enable(sharedSettings ++ ExludedDependencies.settings)) /* Enable tests settings for all projects except examples, assembly and tools */ (allProjects ++ optionallyEnabledProjects).foreach(enable(TestSettings.settings)) @@ -178,6 +187,16 @@ object Flume { lazy val settings = sbtavro.SbtAvro.avroSettings } +/** + This excludes library dependencies in sbt, which are specified in maven but are + not needed by sbt build. + */ +object ExludedDependencies { + lazy val settings = Seq( + libraryDependencies ~= { libs => libs.filterNot(_.name == "groovy-all") } + ) +} + /** * Following project only exists to pull previous artifacts of Spark for generating * Mima ignores. For more information see: SPARK 2071 @@ -353,8 +372,11 @@ object TestSettings { .map { case (k,v) => s"-D$k=$v" }.toSeq, javaOptions in Test ++= "-Xmx3g -XX:PermSize=128M -XX:MaxNewSize=256m -XX:MaxPermSize=1g" .split(" ").toSeq, + // This places test scope jars on the classpath of executors during tests. + javaOptions in Test += + "-Dspark.executor.extraClassPath=" + (fullClasspath in Test).value.files. + map(_.getAbsolutePath).mkString(":").stripSuffix(":"), javaOptions += "-Xmx3g", - // Show full stack trace and duration in test cases. testOptions in Test += Tests.Argument("-oDF"), testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"), diff --git a/project/project/SparkPluginBuild.scala b/project/project/SparkPluginBuild.scala index 3ef2d5451da0d..8863f272da415 100644 --- a/project/project/SparkPluginBuild.scala +++ b/project/project/SparkPluginBuild.scala @@ -26,7 +26,7 @@ import sbt.Keys._ object SparkPluginDef extends Build { lazy val root = Project("plugins", file(".")) dependsOn(sparkStyle, sbtPomReader) lazy val sparkStyle = Project("spark-style", file("spark-style"), settings = styleSettings) - lazy val sbtPomReader = uri("https://github.com/ScrapCodes/sbt-pom-reader.git") + lazy val sbtPomReader = uri("https://github.com/ScrapCodes/sbt-pom-reader.git#ignore_artifact_id") // There is actually no need to publish this artifact. def styleSettings = Defaults.defaultSettings ++ Seq ( diff --git a/repl/pom.xml b/repl/pom.xml index af528c8914335..bd688c8c1e752 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -38,6 +38,11 @@ + + ${jline.groupid} + jline + ${jline.version} + org.apache.spark spark-core_${scala.binary.version} @@ -75,11 +80,6 @@ scala-reflect ${scala.version} - - org.scala-lang - jline - ${scala.version} - org.slf4j jul-to-slf4j @@ -124,4 +124,84 @@ + + + scala-2.10 + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-scala-sources + generate-sources + + add-source + + + + src/main/scala + scala-2.10/src/main/scala + + + + + add-scala-test-sources + generate-test-sources + + add-test-source + + + + src/test/scala + scala-2.10/src/test/scala + + + + + + + + + + scala-2.11 + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-scala-sources + generate-sources + + add-source + + + + src/main/scala + scala-2.11/src/main/scala + + + + + add-scala-test-sources + generate-test-sources + + add-test-source + + + + src/test/scala + scala-2.11/src/test/scala + + + + + + + + + diff --git a/repl/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala similarity index 100% rename from repl/src/main/scala/org/apache/spark/repl/Main.scala rename to repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala similarity index 100% rename from repl/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala rename to repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala similarity index 100% rename from repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala rename to repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkHelper.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkHelper.scala similarity index 100% rename from repl/src/main/scala/org/apache/spark/repl/SparkHelper.scala rename to repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkHelper.scala diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala similarity index 100% rename from repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala rename to repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala similarity index 100% rename from repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala rename to repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala similarity index 100% rename from repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala rename to repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkImports.scala similarity index 100% rename from repl/src/main/scala/org/apache/spark/repl/SparkImports.scala rename to repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkImports.scala diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkJLineCompletion.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkJLineCompletion.scala similarity index 100% rename from repl/src/main/scala/org/apache/spark/repl/SparkJLineCompletion.scala rename to repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkJLineCompletion.scala diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkJLineReader.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkJLineReader.scala similarity index 100% rename from repl/src/main/scala/org/apache/spark/repl/SparkJLineReader.scala rename to repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkJLineReader.scala diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkMemberHandlers.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkMemberHandlers.scala similarity index 100% rename from repl/src/main/scala/org/apache/spark/repl/SparkMemberHandlers.scala rename to repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkMemberHandlers.scala diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkRunnerSettings.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkRunnerSettings.scala similarity index 100% rename from repl/src/main/scala/org/apache/spark/repl/SparkRunnerSettings.scala rename to repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkRunnerSettings.scala diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala similarity index 100% rename from repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala rename to repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala new file mode 100644 index 0000000000000..5e93a71995072 --- /dev/null +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.repl + +import org.apache.spark.util.Utils +import org.apache.spark._ + +import scala.tools.nsc.Settings +import scala.tools.nsc.interpreter.SparkILoop + +object Main extends Logging { + + val conf = new SparkConf() + val tmp = System.getProperty("java.io.tmpdir") + val rootDir = conf.get("spark.repl.classdir", tmp) + val outputDir = Utils.createTempDir(rootDir) + val s = new Settings() + s.processArguments(List("-Yrepl-class-based", + "-Yrepl-outdir", s"${outputDir.getAbsolutePath}", "-Yrepl-sync"), true) + val classServer = new HttpServer(outputDir, new SecurityManager(conf)) + var sparkContext: SparkContext = _ + var interp = new SparkILoop // this is a public var because tests reset it. + + def main(args: Array[String]) { + if (getMaster == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true") + // Start the classServer and store its URI in a spark system property + // (which will be passed to executors so that they can connect to it) + classServer.start() + interp.process(s) // Repl starts and goes in loop of R.E.P.L + classServer.stop() + Option(sparkContext).map(_.stop) + } + + + def getAddedJars: Array[String] = { + val envJars = sys.env.get("ADD_JARS") + val propJars = sys.props.get("spark.jars").flatMap { p => if (p == "") None else Some(p) } + val jars = propJars.orElse(envJars).getOrElse("") + Utils.resolveURIs(jars).split(",").filter(_.nonEmpty) + } + + def createSparkContext(): SparkContext = { + val execUri = System.getenv("SPARK_EXECUTOR_URI") + val jars = getAddedJars + val conf = new SparkConf() + .setMaster(getMaster) + .setAppName("Spark shell") + .setJars(jars) + .set("spark.repl.class.uri", classServer.uri) + logInfo("Spark class server started at " + classServer.uri) + if (execUri != null) { + conf.set("spark.executor.uri", execUri) + } + if (System.getenv("SPARK_HOME") != null) { + conf.setSparkHome(System.getenv("SPARK_HOME")) + } + sparkContext = new SparkContext(conf) + logInfo("Created spark context..") + sparkContext + } + + private def getMaster: String = { + val master = { + val envMaster = sys.env.get("MASTER") + val propMaster = sys.props.get("spark.master") + propMaster.orElse(envMaster).getOrElse("local[*]") + } + master + } +} diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala new file mode 100644 index 0000000000000..8e519fa67f649 --- /dev/null +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala @@ -0,0 +1,86 @@ +/* NSC -- new Scala compiler + * Copyright 2005-2013 LAMP/EPFL + * @author Paul Phillips + */ + +package scala.tools.nsc +package interpreter + +import scala.tools.nsc.ast.parser.Tokens.EOF + +trait SparkExprTyper { + val repl: SparkIMain + + import repl._ + import global.{ reporter => _, Import => _, _ } + import naming.freshInternalVarName + + def symbolOfLine(code: String): Symbol = { + def asExpr(): Symbol = { + val name = freshInternalVarName() + // Typing it with a lazy val would give us the right type, but runs + // into compiler bugs with things like existentials, so we compile it + // behind a def and strip the NullaryMethodType which wraps the expr. + val line = "def " + name + " = " + code + + interpretSynthetic(line) match { + case IR.Success => + val sym0 = symbolOfTerm(name) + // drop NullaryMethodType + sym0.cloneSymbol setInfo exitingTyper(sym0.tpe_*.finalResultType) + case _ => NoSymbol + } + } + def asDefn(): Symbol = { + val old = repl.definedSymbolList.toSet + + interpretSynthetic(code) match { + case IR.Success => + repl.definedSymbolList filterNot old match { + case Nil => NoSymbol + case sym :: Nil => sym + case syms => NoSymbol.newOverloaded(NoPrefix, syms) + } + case _ => NoSymbol + } + } + def asError(): Symbol = { + interpretSynthetic(code) + NoSymbol + } + beSilentDuring(asExpr()) orElse beSilentDuring(asDefn()) orElse asError() + } + + private var typeOfExpressionDepth = 0 + def typeOfExpression(expr: String, silent: Boolean = true): Type = { + if (typeOfExpressionDepth > 2) { + repldbg("Terminating typeOfExpression recursion for expression: " + expr) + return NoType + } + typeOfExpressionDepth += 1 + // Don't presently have a good way to suppress undesirable success output + // while letting errors through, so it is first trying it silently: if there + // is an error, and errors are desired, then it re-evaluates non-silently + // to induce the error message. + try beSilentDuring(symbolOfLine(expr).tpe) match { + case NoType if !silent => symbolOfLine(expr).tpe // generate error + case tpe => tpe + } + finally typeOfExpressionDepth -= 1 + } + + // This only works for proper types. + def typeOfTypeString(typeString: String): Type = { + def asProperType(): Option[Type] = { + val name = freshInternalVarName() + val line = "def %s: %s = ???" format (name, typeString) + interpretSynthetic(line) match { + case IR.Success => + val sym0 = symbolOfTerm(name) + Some(sym0.asMethod.returnType) + case _ => None + } + } + beSilentDuring(asProperType()) getOrElse NoType + } +} diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala new file mode 100644 index 0000000000000..a591e9fc4622b --- /dev/null +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -0,0 +1,966 @@ +/* NSC -- new Scala compiler + * Copyright 2005-2013 LAMP/EPFL + * @author Alexander Spoon + */ + +package scala +package tools.nsc +package interpreter + +import scala.language.{ implicitConversions, existentials } +import scala.annotation.tailrec +import Predef.{ println => _, _ } +import interpreter.session._ +import StdReplTags._ +import scala.reflect.api.{Mirror, Universe, TypeCreator} +import scala.util.Properties.{ jdkHome, javaVersion, versionString, javaVmName } +import scala.tools.nsc.util.{ ClassPath, Exceptional, stringFromWriter, stringFromStream } +import scala.reflect.{ClassTag, classTag} +import scala.reflect.internal.util.{ BatchSourceFile, ScalaClassLoader } +import ScalaClassLoader._ +import scala.reflect.io.{ File, Directory } +import scala.tools.util._ +import scala.collection.generic.Clearable +import scala.concurrent.{ ExecutionContext, Await, Future, future } +import ExecutionContext.Implicits._ +import java.io.{ BufferedReader, FileReader } + +/** The Scala interactive shell. It provides a read-eval-print loop + * around the Interpreter class. + * After instantiation, clients should call the main() method. + * + * If no in0 is specified, then input will come from the console, and + * the class will attempt to provide input editing feature such as + * input history. + * + * @author Moez A. Abdel-Gawad + * @author Lex Spoon + * @version 1.2 + */ +class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter) + extends AnyRef + with LoopCommands +{ + def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out) + def this() = this(None, new JPrintWriter(Console.out, true)) +// +// @deprecated("Use `intp` instead.", "2.9.0") def interpreter = intp +// @deprecated("Use `intp` instead.", "2.9.0") def interpreter_= (i: Interpreter): Unit = intp = i + + var in: InteractiveReader = _ // the input stream from which commands come + var settings: Settings = _ + var intp: SparkIMain = _ + + var globalFuture: Future[Boolean] = _ + + protected def asyncMessage(msg: String) { + if (isReplInfo || isReplPower) + echoAndRefresh(msg) + } + + def initializeSpark() { + intp.beQuietDuring { + command( """ + @transient val sc = org.apache.spark.repl.Main.createSparkContext(); + """) + command("import org.apache.spark.SparkContext._") + } + echo("Spark context available as sc.") + } + + /** Print a welcome message */ + def printWelcome() { + import org.apache.spark.SPARK_VERSION + echo("""Welcome to + ____ __ + / __/__ ___ _____/ /__ + _\ \/ _ \/ _ `/ __/ '_/ + /___/ .__/\_,_/_/ /_/\_\ version %s + /_/ + """.format(SPARK_VERSION)) + val welcomeMsg = "Using Scala %s (%s, Java %s)".format( + versionString, javaVmName, javaVersion) + echo(welcomeMsg) + echo("Type in expressions to have them evaluated.") + echo("Type :help for more information.") + } + + override def echoCommandMessage(msg: String) { + intp.reporter printUntruncatedMessage msg + } + + // lazy val power = new Power(intp, new StdReplVals(this))(tagOfStdReplVals, classTag[StdReplVals]) + def history = in.history + + // classpath entries added via :cp + var addedClasspath: String = "" + + /** A reverse list of commands to replay if the user requests a :replay */ + var replayCommandStack: List[String] = Nil + + /** A list of commands to replay if the user requests a :replay */ + def replayCommands = replayCommandStack.reverse + + /** Record a command for replay should the user request a :replay */ + def addReplay(cmd: String) = replayCommandStack ::= cmd + + def savingReplayStack[T](body: => T): T = { + val saved = replayCommandStack + try body + finally replayCommandStack = saved + } + def savingReader[T](body: => T): T = { + val saved = in + try body + finally in = saved + } + + /** Close the interpreter and set the var to null. */ + def closeInterpreter() { + if (intp ne null) { + intp.close() + intp = null + } + } + + class SparkILoopInterpreter extends SparkIMain(settings, out) { + outer => + + override lazy val formatting = new Formatting { + def prompt = SparkILoop.this.prompt + } + override protected def parentClassLoader = + settings.explicitParentLoader.getOrElse( classOf[SparkILoop].getClassLoader ) + } + + /** Create a new interpreter. */ + def createInterpreter() { + if (addedClasspath != "") + settings.classpath append addedClasspath + + intp = new SparkILoopInterpreter + } + + /** print a friendly help message */ + def helpCommand(line: String): Result = { + if (line == "") helpSummary() + else uniqueCommand(line) match { + case Some(lc) => echo("\n" + lc.help) + case _ => ambiguousError(line) + } + } + private def helpSummary() = { + val usageWidth = commands map (_.usageMsg.length) max + val formatStr = "%-" + usageWidth + "s %s" + + echo("All commands can be abbreviated, e.g. :he instead of :help.") + + commands foreach { cmd => + echo(formatStr.format(cmd.usageMsg, cmd.help)) + } + } + private def ambiguousError(cmd: String): Result = { + matchingCommands(cmd) match { + case Nil => echo(cmd + ": no such command. Type :help for help.") + case xs => echo(cmd + " is ambiguous: did you mean " + xs.map(":" + _.name).mkString(" or ") + "?") + } + Result(keepRunning = true, None) + } + private def matchingCommands(cmd: String) = commands filter (_.name startsWith cmd) + private def uniqueCommand(cmd: String): Option[LoopCommand] = { + // this lets us add commands willy-nilly and only requires enough command to disambiguate + matchingCommands(cmd) match { + case List(x) => Some(x) + // exact match OK even if otherwise appears ambiguous + case xs => xs find (_.name == cmd) + } + } + + /** Show the history */ + lazy val historyCommand = new LoopCommand("history", "show the history (optional num is commands to show)") { + override def usage = "[num]" + def defaultLines = 20 + + def apply(line: String): Result = { + if (history eq NoHistory) + return "No history available." + + val xs = words(line) + val current = history.index + val count = try xs.head.toInt catch { case _: Exception => defaultLines } + val lines = history.asStrings takeRight count + val offset = current - lines.size + 1 + + for ((line, index) <- lines.zipWithIndex) + echo("%3d %s".format(index + offset, line)) + } + } + + // When you know you are most likely breaking into the middle + // of a line being typed. This softens the blow. + protected def echoAndRefresh(msg: String) = { + echo("\n" + msg) + in.redrawLine() + } + protected def echo(msg: String) = { + out println msg + out.flush() + } + + /** Search the history */ + def searchHistory(_cmdline: String) { + val cmdline = _cmdline.toLowerCase + val offset = history.index - history.size + 1 + + for ((line, index) <- history.asStrings.zipWithIndex ; if line.toLowerCase contains cmdline) + echo("%d %s".format(index + offset, line)) + } + + private val currentPrompt = Properties.shellPromptString + + /** Prompt to print when awaiting input */ + def prompt = currentPrompt + + import LoopCommand.{ cmd, nullary } + + /** Standard commands **/ + lazy val standardCommands = List( + cmd("cp", "", "add a jar or directory to the classpath", addClasspath), + cmd("edit", "|", "edit history", editCommand), + cmd("help", "[command]", "print this summary or command-specific help", helpCommand), + historyCommand, + cmd("h?", "", "search the history", searchHistory), + cmd("imports", "[name name ...]", "show import history, identifying sources of names", importsCommand), + //cmd("implicits", "[-v]", "show the implicits in scope", intp.implicitsCommand), + cmd("javap", "", "disassemble a file or class name", javapCommand), + cmd("line", "|", "place line(s) at the end of history", lineCommand), + cmd("load", "", "interpret lines in a file", loadCommand), + cmd("paste", "[-raw] [path]", "enter paste mode or paste a file", pasteCommand), + // nullary("power", "enable power user mode", powerCmd), + nullary("quit", "exit the interpreter", () => Result(keepRunning = false, None)), + nullary("replay", "reset execution and replay all previous commands", replay), + nullary("reset", "reset the repl to its initial state, forgetting all session entries", resetCommand), + cmd("save", "", "save replayable session to a file", saveCommand), + shCommand, + cmd("settings", "[+|-]", "+enable/-disable flags, set compiler options", changeSettings), + nullary("silent", "disable/enable automatic printing of results", verbosity), +// cmd("type", "[-v] ", "display the type of an expression without evaluating it", typeCommand), +// cmd("kind", "[-v] ", "display the kind of expression's type", kindCommand), + nullary("warnings", "show the suppressed warnings from the most recent line which had any", warningsCommand) + ) + + /** Power user commands */ +// lazy val powerCommands: List[LoopCommand] = List( +// cmd("phase", "", "set the implicit phase for power commands", phaseCommand) +// ) + + private def importsCommand(line: String): Result = { + val tokens = words(line) + val handlers = intp.languageWildcardHandlers ++ intp.importHandlers + + handlers.filterNot(_.importedSymbols.isEmpty).zipWithIndex foreach { + case (handler, idx) => + val (types, terms) = handler.importedSymbols partition (_.name.isTypeName) + val imps = handler.implicitSymbols + val found = tokens filter (handler importsSymbolNamed _) + val typeMsg = if (types.isEmpty) "" else types.size + " types" + val termMsg = if (terms.isEmpty) "" else terms.size + " terms" + val implicitMsg = if (imps.isEmpty) "" else imps.size + " are implicit" + val foundMsg = if (found.isEmpty) "" else found.mkString(" // imports: ", ", ", "") + val statsMsg = List(typeMsg, termMsg, implicitMsg) filterNot (_ == "") mkString ("(", ", ", ")") + + intp.reporter.printMessage("%2d) %-30s %s%s".format( + idx + 1, + handler.importString, + statsMsg, + foundMsg + )) + } + } + + private def findToolsJar() = PathResolver.SupplementalLocations.platformTools + + private def addToolsJarToLoader() = { + val cl = findToolsJar() match { + case Some(tools) => ScalaClassLoader.fromURLs(Seq(tools.toURL), intp.classLoader) + case _ => intp.classLoader + } + if (Javap.isAvailable(cl)) { + repldbg(":javap available.") + cl + } + else { + repldbg(":javap unavailable: no tools.jar at " + jdkHome) + intp.classLoader + } + } +// +// protected def newJavap() = +// JavapClass(addToolsJarToLoader(), new IMain.ReplStrippingWriter(intp), Some(intp)) +// +// private lazy val javap = substituteAndLog[Javap]("javap", NoJavap)(newJavap()) + + // Still todo: modules. +// private def typeCommand(line0: String): Result = { +// line0.trim match { +// case "" => ":type [-v] " +// case s => intp.typeCommandInternal(s stripPrefix "-v " trim, verbose = s startsWith "-v ") +// } +// } + +// private def kindCommand(expr: String): Result = { +// expr.trim match { +// case "" => ":kind [-v] " +// case s => intp.kindCommandInternal(s stripPrefix "-v " trim, verbose = s startsWith "-v ") +// } +// } + + private def warningsCommand(): Result = { + if (intp.lastWarnings.isEmpty) + "Can't find any cached warnings." + else + intp.lastWarnings foreach { case (pos, msg) => intp.reporter.warning(pos, msg) } + } + + private def changeSettings(args: String): Result = { + def showSettings() = { + for (s <- settings.userSetSettings.toSeq.sorted) echo(s.toString) + } + def updateSettings() = { + // put aside +flag options + val (pluses, rest) = (args split "\\s+").toList partition (_.startsWith("+")) + val tmps = new Settings + val (ok, leftover) = tmps.processArguments(rest, processAll = true) + if (!ok) echo("Bad settings request.") + else if (leftover.nonEmpty) echo("Unprocessed settings.") + else { + // boolean flags set-by-user on tmp copy should be off, not on + val offs = tmps.userSetSettings filter (_.isInstanceOf[Settings#BooleanSetting]) + val (minuses, nonbools) = rest partition (arg => offs exists (_ respondsTo arg)) + // update non-flags + settings.processArguments(nonbools, processAll = true) + // also snag multi-value options for clearing, e.g. -Ylog: and -language: + for { + s <- settings.userSetSettings + if s.isInstanceOf[Settings#MultiStringSetting] || s.isInstanceOf[Settings#PhasesSetting] + if nonbools exists (arg => arg.head == '-' && arg.last == ':' && (s respondsTo arg.init)) + } s match { + case c: Clearable => c.clear() + case _ => + } + def update(bs: Seq[String], name: String=>String, setter: Settings#Setting=>Unit) = { + for (b <- bs) + settings.lookupSetting(name(b)) match { + case Some(s) => + if (s.isInstanceOf[Settings#BooleanSetting]) setter(s) + else echo(s"Not a boolean flag: $b") + case _ => + echo(s"Not an option: $b") + } + } + update(minuses, identity, _.tryToSetFromPropertyValue("false")) // turn off + update(pluses, "-" + _.drop(1), _.tryToSet(Nil)) // turn on + } + } + if (args.isEmpty) showSettings() else updateSettings() + } + + private def javapCommand(line: String): Result = { +// if (javap == null) +// ":javap unavailable, no tools.jar at %s. Set JDK_HOME.".format(jdkHome) +// else if (line == "") +// ":javap [-lcsvp] [path1 path2 ...]" +// else +// javap(words(line)) foreach { res => +// if (res.isError) return "Failed: " + res.value +// else res.show() +// } + } + + private def pathToPhaseWrapper = intp.originalPath("$r") + ".phased.atCurrent" + + private def phaseCommand(name: String): Result = { +// val phased: Phased = power.phased +// import phased.NoPhaseName +// +// if (name == "clear") { +// phased.set(NoPhaseName) +// intp.clearExecutionWrapper() +// "Cleared active phase." +// } +// else if (name == "") phased.get match { +// case NoPhaseName => "Usage: :phase (e.g. typer, erasure.next, erasure+3)" +// case ph => "Active phase is '%s'. (To clear, :phase clear)".format(phased.get) +// } +// else { +// val what = phased.parse(name) +// if (what.isEmpty || !phased.set(what)) +// "'" + name + "' does not appear to represent a valid phase." +// else { +// intp.setExecutionWrapper(pathToPhaseWrapper) +// val activeMessage = +// if (what.toString.length == name.length) "" + what +// else "%s (%s)".format(what, name) +// +// "Active phase is now: " + activeMessage +// } +// } + } + + /** Available commands */ + def commands: List[LoopCommand] = standardCommands ++ ( + // if (isReplPower) + // powerCommands + // else + Nil + ) + + val replayQuestionMessage = + """|That entry seems to have slain the compiler. Shall I replay + |your session? I can re-run each line except the last one. + |[y/n] + """.trim.stripMargin + + private val crashRecovery: PartialFunction[Throwable, Boolean] = { + case ex: Throwable => + val (err, explain) = ( + if (intp.isInitializeComplete) + (intp.global.throwableAsString(ex), "") + else + (ex.getMessage, "The compiler did not initialize.\n") + ) + echo(err) + + ex match { + case _: NoSuchMethodError | _: NoClassDefFoundError => + echo("\nUnrecoverable error.") + throw ex + case _ => + def fn(): Boolean = + try in.readYesOrNo(explain + replayQuestionMessage, { echo("\nYou must enter y or n.") ; fn() }) + catch { case _: RuntimeException => false } + + if (fn()) replay() + else echo("\nAbandoning crashed session.") + } + true + } + + // return false if repl should exit + def processLine(line: String): Boolean = { + import scala.concurrent.duration._ + Await.ready(globalFuture, 60.seconds) + + (line ne null) && (command(line) match { + case Result(false, _) => false + case Result(_, Some(line)) => addReplay(line) ; true + case _ => true + }) + } + + private def readOneLine() = { + out.flush() + in readLine prompt + } + + /** The main read-eval-print loop for the repl. It calls + * command() for each line of input, and stops when + * command() returns false. + */ + @tailrec final def loop() { + if ( try processLine(readOneLine()) catch crashRecovery ) + loop() + } + + /** interpret all lines from a specified file */ + def interpretAllFrom(file: File) { + savingReader { + savingReplayStack { + file applyReader { reader => + in = SimpleReader(reader, out, interactive = false) + echo("Loading " + file + "...") + loop() + } + } + } + } + + /** create a new interpreter and replay the given commands */ + def replay() { + reset() + if (replayCommandStack.isEmpty) + echo("Nothing to replay.") + else for (cmd <- replayCommands) { + echo("Replaying: " + cmd) // flush because maybe cmd will have its own output + command(cmd) + echo("") + } + } + def resetCommand() { + echo("Resetting interpreter state.") + if (replayCommandStack.nonEmpty) { + echo("Forgetting this session history:\n") + replayCommands foreach echo + echo("") + replayCommandStack = Nil + } + if (intp.namedDefinedTerms.nonEmpty) + echo("Forgetting all expression results and named terms: " + intp.namedDefinedTerms.mkString(", ")) + if (intp.definedTypes.nonEmpty) + echo("Forgetting defined types: " + intp.definedTypes.mkString(", ")) + + reset() + } + def reset() { + intp.reset() + unleashAndSetPhase() + } + + def lineCommand(what: String): Result = editCommand(what, None) + + // :edit id or :edit line + def editCommand(what: String): Result = editCommand(what, Properties.envOrNone("EDITOR")) + + def editCommand(what: String, editor: Option[String]): Result = { + def diagnose(code: String) = { + echo("The edited code is incomplete!\n") + val errless = intp compileSources new BatchSourceFile("", s"object pastel {\n$code\n}") + if (errless) echo("The compiler reports no errors.") + } + def historicize(text: String) = history match { + case jlh: JLineHistory => text.lines foreach jlh.add ; jlh.moveToEnd() ; true + case _ => false + } + def edit(text: String): Result = editor match { + case Some(ed) => + val tmp = File.makeTemp() + tmp.writeAll(text) + try { + val pr = new ProcessResult(s"$ed ${tmp.path}") + pr.exitCode match { + case 0 => + tmp.safeSlurp() match { + case Some(edited) if edited.trim.isEmpty => echo("Edited text is empty.") + case Some(edited) => + echo(edited.lines map ("+" + _) mkString "\n") + val res = intp interpret edited + if (res == IR.Incomplete) diagnose(edited) + else { + historicize(edited) + Result(lineToRecord = Some(edited), keepRunning = true) + } + case None => echo("Can't read edited text. Did you delete it?") + } + case x => echo(s"Error exit from $ed ($x), ignoring") + } + } finally { + tmp.delete() + } + case None => + if (historicize(text)) echo("Placing text in recent history.") + else echo(f"No EDITOR defined and you can't change history, echoing your text:%n$text") + } + + // if what is a number, use it as a line number or range in history + def isNum = what forall (c => c.isDigit || c == '-' || c == '+') + // except that "-" means last value + def isLast = (what == "-") + if (isLast || !isNum) { + val name = if (isLast) intp.mostRecentVar else what + val sym = intp.symbolOfIdent(name) + intp.prevRequestList collectFirst { case r if r.defines contains sym => r } match { + case Some(req) => edit(req.line) + case None => echo(s"No symbol in scope: $what") + } + } else try { + val s = what + // line 123, 120+3, -3, 120-123, 120-, note -3 is not 0-3 but (cur-3,cur) + val (start, len) = + if ((s indexOf '+') > 0) { + val (a,b) = s splitAt (s indexOf '+') + (a.toInt, b.drop(1).toInt) + } else { + (s indexOf '-') match { + case -1 => (s.toInt, 1) + case 0 => val n = s.drop(1).toInt ; (history.index - n, n) + case _ if s.last == '-' => val n = s.init.toInt ; (n, history.index - n) + case i => val n = s.take(i).toInt ; (n, s.drop(i+1).toInt - n) + } + } + import scala.collection.JavaConverters._ + val index = (start - 1) max 0 + val text = history match { + case jlh: JLineHistory => jlh.entries(index).asScala.take(len) map (_.value) mkString "\n" + case _ => history.asStrings.slice(index, index + len) mkString "\n" + } + edit(text) + } catch { + case _: NumberFormatException => echo(s"Bad range '$what'") + echo("Use line 123, 120+3, -3, 120-123, 120-, note -3 is not 0-3 but (cur-3,cur)") + } + } + + /** fork a shell and run a command */ + lazy val shCommand = new LoopCommand("sh", "run a shell command (result is implicitly => List[String])") { + override def usage = "" + def apply(line: String): Result = line match { + case "" => showUsage() + case _ => + val toRun = s"new ${classOf[ProcessResult].getName}(${string2codeQuoted(line)})" + intp interpret toRun + () + } + } + + def withFile[A](filename: String)(action: File => A): Option[A] = { + val res = Some(File(filename)) filter (_.exists) map action + if (res.isEmpty) echo("That file does not exist") // courtesy side-effect + res + } + + def loadCommand(arg: String) = { + var shouldReplay: Option[String] = None + withFile(arg)(f => { + interpretAllFrom(f) + shouldReplay = Some(":load " + arg) + }) + Result(keepRunning = true, shouldReplay) + } + + def saveCommand(filename: String): Result = ( + if (filename.isEmpty) echo("File name is required.") + else if (replayCommandStack.isEmpty) echo("No replay commands in session") + else File(filename).printlnAll(replayCommands: _*) + ) + + def addClasspath(arg: String): Unit = { + val f = File(arg).normalize + if (f.exists) { + addedClasspath = ClassPath.join(addedClasspath, f.path) + val totalClasspath = ClassPath.join(settings.classpath.value, addedClasspath) + echo("Added '%s'. Your new classpath is:\n\"%s\"".format(f.path, totalClasspath)) + replay() + } + else echo("The path '" + f + "' doesn't seem to exist.") + } + + def powerCmd(): Result = { + if (isReplPower) "Already in power mode." + else enablePowerMode(isDuringInit = false) + } + def enablePowerMode(isDuringInit: Boolean) = { + replProps.power setValue true + unleashAndSetPhase() + // asyncEcho(isDuringInit, power.banner) + } + private def unleashAndSetPhase() { + if (isReplPower) { + // power.unleash() + // Set the phase to "typer" + // intp beSilentDuring phaseCommand("typer") + } + } + + def asyncEcho(async: Boolean, msg: => String) { + if (async) asyncMessage(msg) + else echo(msg) + } + + def verbosity() = { + val old = intp.printResults + intp.printResults = !old + echo("Switched " + (if (old) "off" else "on") + " result printing.") + } + + /** Run one command submitted by the user. Two values are returned: + * (1) whether to keep running, (2) the line to record for replay, + * if any. */ + def command(line: String): Result = { + if (line startsWith ":") { + val cmd = line.tail takeWhile (x => !x.isWhitespace) + uniqueCommand(cmd) match { + case Some(lc) => lc(line.tail stripPrefix cmd dropWhile (_.isWhitespace)) + case _ => ambiguousError(cmd) + } + } + else if (intp.global == null) Result(keepRunning = false, None) // Notice failure to create compiler + else Result(keepRunning = true, interpretStartingWith(line)) + } + + private def readWhile(cond: String => Boolean) = { + Iterator continually in.readLine("") takeWhile (x => x != null && cond(x)) + } + + def pasteCommand(arg: String): Result = { + var shouldReplay: Option[String] = None + def result = Result(keepRunning = true, shouldReplay) + val (raw, file) = + if (arg.isEmpty) (false, None) + else { + val r = """(-raw)?(\s+)?([^\-]\S*)?""".r + arg match { + case r(flag, sep, name) => + if (flag != null && name != null && sep == null) + echo(s"""I assume you mean "$flag $name"?""") + (flag != null, Option(name)) + case _ => + echo("usage: :paste -raw file") + return result + } + } + val code = file match { + case Some(name) => + withFile(name)(f => { + shouldReplay = Some(s":paste $arg") + val s = f.slurp.trim + if (s.isEmpty) echo(s"File contains no code: $f") + else echo(s"Pasting file $f...") + s + }) getOrElse "" + case None => + echo("// Entering paste mode (ctrl-D to finish)\n") + val text = (readWhile(_ => true) mkString "\n").trim + if (text.isEmpty) echo("\n// Nothing pasted, nothing gained.\n") + else echo("\n// Exiting paste mode, now interpreting.\n") + text + } + def interpretCode() = { + val res = intp interpret code + // if input is incomplete, let the compiler try to say why + if (res == IR.Incomplete) { + echo("The pasted code is incomplete!\n") + // Remembrance of Things Pasted in an object + val errless = intp compileSources new BatchSourceFile("", s"object pastel {\n$code\n}") + if (errless) echo("...but compilation found no error? Good luck with that.") + } + } + def compileCode() = { + val errless = intp compileSources new BatchSourceFile("", code) + if (!errless) echo("There were compilation errors!") + } + if (code.nonEmpty) { + if (raw) compileCode() else interpretCode() + } + result + } + + private object paste extends Pasted { + val ContinueString = " | " + val PromptString = "scala> " + + def interpret(line: String): Unit = { + echo(line.trim) + intp interpret line + echo("") + } + + def transcript(start: String) = { + echo("\n// Detected repl transcript paste: ctrl-D to finish.\n") + apply(Iterator(start) ++ readWhile(_.trim != PromptString.trim)) + } + } + import paste.{ ContinueString, PromptString } + + /** Interpret expressions starting with the first line. + * Read lines until a complete compilation unit is available + * or until a syntax error has been seen. If a full unit is + * read, go ahead and interpret it. Return the full string + * to be recorded for replay, if any. + */ + def interpretStartingWith(code: String): Option[String] = { + // signal completion non-completion input has been received + in.completion.resetVerbosity() + + def reallyInterpret = { + val reallyResult = intp.interpret(code) + (reallyResult, reallyResult match { + case IR.Error => None + case IR.Success => Some(code) + case IR.Incomplete => + if (in.interactive && code.endsWith("\n\n")) { + echo("You typed two blank lines. Starting a new command.") + None + } + else in.readLine(ContinueString) match { + case null => + // we know compilation is going to fail since we're at EOF and the + // parser thinks the input is still incomplete, but since this is + // a file being read non-interactively we want to fail. So we send + // it straight to the compiler for the nice error message. + intp.compileString(code) + None + + case line => interpretStartingWith(code + "\n" + line) + } + }) + } + + /** Here we place ourselves between the user and the interpreter and examine + * the input they are ostensibly submitting. We intervene in several cases: + * + * 1) If the line starts with "scala> " it is assumed to be an interpreter paste. + * 2) If the line starts with "." (but not ".." or "./") it is treated as an invocation + * on the previous result. + * 3) If the Completion object's execute returns Some(_), we inject that value + * and avoid the interpreter, as it's likely not valid scala code. + */ + if (code == "") None + else if (!paste.running && code.trim.startsWith(PromptString)) { + paste.transcript(code) + None + } + else if (Completion.looksLikeInvocation(code) && intp.mostRecentVar != "") { + interpretStartingWith(intp.mostRecentVar + code) + } + else if (code.trim startsWith "//") { + // line comment, do nothing + None + } + else + reallyInterpret._2 + } + + // runs :load `file` on any files passed via -i + def loadFiles(settings: Settings) = settings match { + case settings: GenericRunnerSettings => + for (filename <- settings.loadfiles.value) { + val cmd = ":load " + filename + command(cmd) + addReplay(cmd) + echo("") + } + case _ => + } + + /** Tries to create a JLineReader, falling back to SimpleReader: + * unless settings or properties are such that it should start + * with SimpleReader. + */ + def chooseReader(settings: Settings): InteractiveReader = { + if (settings.Xnojline || Properties.isEmacsShell) + SimpleReader() + else try new JLineReader( + if (settings.noCompletion) NoCompletion + else new SparkJLineCompletion(intp) + ) + catch { + case ex @ (_: Exception | _: NoClassDefFoundError) => + echo("Failed to created JLineReader: " + ex + "\nFalling back to SimpleReader.") + SimpleReader() + } + } + protected def tagOfStaticClass[T: ClassTag]: u.TypeTag[T] = + u.TypeTag[T]( + m, + new TypeCreator { + def apply[U <: Universe with Singleton](m: Mirror[U]): U # Type = + m.staticClass(classTag[T].runtimeClass.getName).toTypeConstructor.asInstanceOf[U # Type] + }) + + private def loopPostInit() { + // Bind intp somewhere out of the regular namespace where + // we can get at it in generated code. + intp.quietBind(NamedParam[SparkIMain]("$intp", intp)(tagOfStaticClass[SparkIMain], classTag[SparkIMain])) + // Auto-run code via some setting. + ( replProps.replAutorunCode.option + flatMap (f => io.File(f).safeSlurp()) + foreach (intp quietRun _) + ) + // classloader and power mode setup + intp.setContextClassLoader() + if (isReplPower) { + // replProps.power setValue true + // unleashAndSetPhase() + // asyncMessage(power.banner) + } + // SI-7418 Now, and only now, can we enable TAB completion. + in match { + case x: JLineReader => x.consoleReader.postInit + case _ => + } + } + def process(settings: Settings): Boolean = savingContextLoader { + this.settings = settings + createInterpreter() + + // sets in to some kind of reader depending on environmental cues + in = in0.fold(chooseReader(settings))(r => SimpleReader(r, out, interactive = true)) + globalFuture = future { + intp.initializeSynchronous() + loopPostInit() + !intp.reporter.hasErrors + } + import scala.concurrent.duration._ + Await.ready(globalFuture, 10 seconds) + printWelcome() + initializeSpark() + loadFiles(settings) + + try loop() + catch AbstractOrMissingHandler() + finally closeInterpreter() + + true + } + + @deprecated("Use `process` instead", "2.9.0") + def main(settings: Settings): Unit = process(settings) //used by sbt +} + +object SparkILoop { + implicit def loopToInterpreter(repl: SparkILoop): SparkIMain = repl.intp + + // Designed primarily for use by test code: take a String with a + // bunch of code, and prints out a transcript of what it would look + // like if you'd just typed it into the repl. + def runForTranscript(code: String, settings: Settings): String = { + import java.io.{ BufferedReader, StringReader, OutputStreamWriter } + + stringFromStream { ostream => + Console.withOut(ostream) { + val output = new JPrintWriter(new OutputStreamWriter(ostream), true) { + override def write(str: String) = { + // completely skip continuation lines + if (str forall (ch => ch.isWhitespace || ch == '|')) () + else super.write(str) + } + } + val input = new BufferedReader(new StringReader(code.trim + "\n")) { + override def readLine(): String = { + val s = super.readLine() + // helping out by printing the line being interpreted. + if (s != null) + output.println(s) + s + } + } + val repl = new SparkILoop(input, output) + if (settings.classpath.isDefault) + settings.classpath.value = sys.props("java.class.path") + + repl process settings + } + } + } + + /** Creates an interpreter loop with default settings and feeds + * the given code to it as input. + */ + def run(code: String, sets: Settings = new Settings): String = { + import java.io.{ BufferedReader, StringReader, OutputStreamWriter } + + stringFromStream { ostream => + Console.withOut(ostream) { + val input = new BufferedReader(new StringReader(code)) + val output = new JPrintWriter(new OutputStreamWriter(ostream), true) + val repl = new SparkILoop(input, output) + + if (sets.classpath.isDefault) + sets.classpath.value = sys.props("java.class.path") + + repl process sets + } + } + } + def run(lines: List[String]): String = run(lines map (_ + "\n") mkString) +} diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkIMain.scala new file mode 100644 index 0000000000000..1bb62c84abddc --- /dev/null +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkIMain.scala @@ -0,0 +1,1319 @@ +/* NSC -- new Scala compiler + * Copyright 2005-2013 LAMP/EPFL + * @author Martin Odersky + */ + +package scala +package tools.nsc +package interpreter + +import PartialFunction.cond +import scala.language.implicitConversions +import scala.beans.BeanProperty +import scala.collection.mutable +import scala.concurrent.{ Future, ExecutionContext } +import scala.reflect.runtime.{ universe => ru } +import scala.reflect.{ ClassTag, classTag } +import scala.reflect.internal.util.{ BatchSourceFile, SourceFile } +import scala.tools.util.PathResolver +import scala.tools.nsc.io.AbstractFile +import scala.tools.nsc.typechecker.{ TypeStrings, StructuredTypeStrings } +import scala.tools.nsc.util.{ ScalaClassLoader, stringFromReader, stringFromWriter, StackTraceOps } +import scala.tools.nsc.util.Exceptional.unwrap +import javax.script.{AbstractScriptEngine, Bindings, ScriptContext, ScriptEngine, ScriptEngineFactory, ScriptException, CompiledScript, Compilable} + +/** An interpreter for Scala code. + * + * The main public entry points are compile(), interpret(), and bind(). + * The compile() method loads a complete Scala file. The interpret() method + * executes one line of Scala code at the request of the user. The bind() + * method binds an object to a variable that can then be used by later + * interpreted code. + * + * The overall approach is based on compiling the requested code and then + * using a Java classloader and Java reflection to run the code + * and access its results. + * + * In more detail, a single compiler instance is used + * to accumulate all successfully compiled or interpreted Scala code. To + * "interpret" a line of code, the compiler generates a fresh object that + * includes the line of code and which has public member(s) to export + * all variables defined by that code. To extract the result of an + * interpreted line to show the user, a second "result object" is created + * which imports the variables exported by the above object and then + * exports members called "$eval" and "$print". To accomodate user expressions + * that read from variables or methods defined in previous statements, "import" + * statements are used. + * + * This interpreter shares the strengths and weaknesses of using the + * full compiler-to-Java. The main strength is that interpreted code + * behaves exactly as does compiled code, including running at full speed. + * The main weakness is that redefining classes and methods is not handled + * properly, because rebinding at the Java level is technically difficult. + * + * @author Moez A. Abdel-Gawad + * @author Lex Spoon + */ +class SparkIMain(@BeanProperty val factory: ScriptEngineFactory, initialSettings: Settings, + protected val out: JPrintWriter) extends AbstractScriptEngine with Compilable with SparkImports { + imain => + + setBindings(createBindings, ScriptContext.ENGINE_SCOPE) + object replOutput extends ReplOutput(settings.Yreploutdir) { } + + @deprecated("Use replOutput.dir instead", "2.11.0") + def virtualDirectory = replOutput.dir + // Used in a test case. + def showDirectory() = replOutput.show(out) + + private[nsc] var printResults = true // whether to print result lines + private[nsc] var totalSilence = false // whether to print anything + private var _initializeComplete = false // compiler is initialized + private var _isInitialized: Future[Boolean] = null // set up initialization future + private var bindExceptions = true // whether to bind the lastException variable + private var _executionWrapper = "" // code to be wrapped around all lines + + /** We're going to go to some trouble to initialize the compiler asynchronously. + * It's critical that nothing call into it until it's been initialized or we will + * run into unrecoverable issues, but the perceived repl startup time goes + * through the roof if we wait for it. So we initialize it with a future and + * use a lazy val to ensure that any attempt to use the compiler object waits + * on the future. + */ + private var _classLoader: util.AbstractFileClassLoader = null // active classloader + private val _compiler: ReplGlobal = newCompiler(settings, reporter) // our private compiler + + def compilerClasspath: Seq[java.net.URL] = ( + if (isInitializeComplete) global.classPath.asURLs + else new PathResolver(settings).result.asURLs // the compiler's classpath + ) + def settings = initialSettings + // Run the code body with the given boolean settings flipped to true. + def withoutWarnings[T](body: => T): T = beQuietDuring { + val saved = settings.nowarn.value + if (!saved) + settings.nowarn.value = true + + try body + finally if (!saved) settings.nowarn.value = false + } + + /** construct an interpreter that reports to Console */ + def this(settings: Settings, out: JPrintWriter) = this(null, settings, out) + def this(factory: ScriptEngineFactory, settings: Settings) = this(factory, settings, new NewLinePrintWriter(new ConsoleWriter, true)) + def this(settings: Settings) = this(settings, new NewLinePrintWriter(new ConsoleWriter, true)) + def this(factory: ScriptEngineFactory) = this(factory, new Settings()) + def this() = this(new Settings()) + + lazy val formatting: Formatting = new Formatting { + val prompt = Properties.shellPromptString + } + lazy val reporter: SparkReplReporter = new SparkReplReporter(this) + + import formatting._ + import reporter.{ printMessage, printUntruncatedMessage } + + // This exists mostly because using the reporter too early leads to deadlock. + private def echo(msg: String) { Console println msg } + private def _initSources = List(new BatchSourceFile("", "class $repl_$init { }")) + private def _initialize() = { + try { + // if this crashes, REPL will hang its head in shame + val run = new _compiler.Run() + assert(run.typerPhase != NoPhase, "REPL requires a typer phase.") + run compileSources _initSources + _initializeComplete = true + true + } + catch AbstractOrMissingHandler() + } + private def tquoted(s: String) = "\"\"\"" + s + "\"\"\"" + private val logScope = scala.sys.props contains "scala.repl.scope" + private def scopelog(msg: String) = if (logScope) Console.err.println(msg) + + // argument is a thunk to execute after init is done + def initialize(postInitSignal: => Unit) { + synchronized { + if (_isInitialized == null) { + _isInitialized = + Future(try _initialize() finally postInitSignal)(ExecutionContext.global) + } + } + } + def initializeSynchronous(): Unit = { + if (!isInitializeComplete) { + _initialize() + assert(global != null, global) + } + } + def isInitializeComplete = _initializeComplete + + lazy val global: Global = { + if (!isInitializeComplete) _initialize() + _compiler + } + + import global._ + import definitions.{ ObjectClass, termMember, dropNullaryMethod} + + lazy val runtimeMirror = ru.runtimeMirror(classLoader) + + private def noFatal(body: => Symbol): Symbol = try body catch { case _: FatalError => NoSymbol } + + def getClassIfDefined(path: String) = ( + noFatal(runtimeMirror staticClass path) + orElse noFatal(rootMirror staticClass path) + ) + def getModuleIfDefined(path: String) = ( + noFatal(runtimeMirror staticModule path) + orElse noFatal(rootMirror staticModule path) + ) + + implicit class ReplTypeOps(tp: Type) { + def andAlso(fn: Type => Type): Type = if (tp eq NoType) tp else fn(tp) + } + + // TODO: If we try to make naming a lazy val, we run into big time + // scalac unhappiness with what look like cycles. It has not been easy to + // reduce, but name resolution clearly takes different paths. + object naming extends { + val global: imain.global.type = imain.global + } with Naming { + // make sure we don't overwrite their unwisely named res3 etc. + def freshUserTermName(): TermName = { + val name = newTermName(freshUserVarName()) + if (replScope containsName name) freshUserTermName() + else name + } + def isInternalTermName(name: Name) = isInternalVarName("" + name) + } + import naming._ + + object deconstruct extends { + val global: imain.global.type = imain.global + } with StructuredTypeStrings + + lazy val memberHandlers = new { + val intp: imain.type = imain + } with SparkMemberHandlers + import memberHandlers._ + + /** Temporarily be quiet */ + def beQuietDuring[T](body: => T): T = { + val saved = printResults + printResults = false + try body + finally printResults = saved + } + def beSilentDuring[T](operation: => T): T = { + val saved = totalSilence + totalSilence = true + try operation + finally totalSilence = saved + } + + def quietRun[T](code: String) = beQuietDuring(interpret(code)) + + /** takes AnyRef because it may be binding a Throwable or an Exceptional */ + private def withLastExceptionLock[T](body: => T, alt: => T): T = { + assert(bindExceptions, "withLastExceptionLock called incorrectly.") + bindExceptions = false + + try beQuietDuring(body) + catch logAndDiscard("withLastExceptionLock", alt) + finally bindExceptions = true + } + + def executionWrapper = _executionWrapper + def setExecutionWrapper(code: String) = _executionWrapper = code + def clearExecutionWrapper() = _executionWrapper = "" + + /** interpreter settings */ + lazy val isettings = new SparkISettings(this) + + /** Instantiate a compiler. Overridable. */ + protected def newCompiler(settings: Settings, reporter: reporters.Reporter): ReplGlobal = { + settings.outputDirs setSingleOutput replOutput.dir + settings.exposeEmptyPackage.value = true + new Global(settings, reporter) with ReplGlobal { override def toString: String = "" } + } + + /** Parent classloader. Overridable. */ + protected def parentClassLoader: ClassLoader = + settings.explicitParentLoader.getOrElse( this.getClass.getClassLoader() ) + + /* A single class loader is used for all commands interpreted by this Interpreter. + It would also be possible to create a new class loader for each command + to interpret. The advantages of the current approach are: + + - Expressions are only evaluated one time. This is especially + significant for I/O, e.g. "val x = Console.readLine" + + The main disadvantage is: + + - Objects, classes, and methods cannot be rebound. Instead, definitions + shadow the old ones, and old code objects refer to the old + definitions. + */ + def resetClassLoader() = { + repldbg("Setting new classloader: was " + _classLoader) + _classLoader = null + ensureClassLoader() + } + final def ensureClassLoader() { + if (_classLoader == null) + _classLoader = makeClassLoader() + } + def classLoader: util.AbstractFileClassLoader = { + ensureClassLoader() + _classLoader + } + + def backticked(s: String): String = ( + (s split '.').toList map { + case "_" => "_" + case s if nme.keywords(newTermName(s)) => s"`$s`" + case s => s + } mkString "." + ) + def readRootPath(readPath: String) = getModuleIfDefined(readPath) + + abstract class PhaseDependentOps { + def shift[T](op: => T): T + + def path(name: => Name): String = shift(path(symbolOfName(name))) + def path(sym: Symbol): String = backticked(shift(sym.fullName)) + def sig(sym: Symbol): String = shift(sym.defString) + } + object typerOp extends PhaseDependentOps { + def shift[T](op: => T): T = exitingTyper(op) + } + object flatOp extends PhaseDependentOps { + def shift[T](op: => T): T = exitingFlatten(op) + } + + def originalPath(name: String): String = originalPath(name: TermName) + def originalPath(name: Name): String = typerOp path name + def originalPath(sym: Symbol): String = typerOp path sym + def flatPath(sym: Symbol): String = flatOp shift sym.javaClassName + def translatePath(path: String) = { + val sym = if (path endsWith "$") symbolOfTerm(path.init) else symbolOfIdent(path) + sym.toOption map flatPath + } + def translateEnclosingClass(n: String) = symbolOfTerm(n).enclClass.toOption map flatPath + + private class TranslatingClassLoader(parent: ClassLoader) extends util.AbstractFileClassLoader(replOutput.dir, parent) { + /** Overridden here to try translating a simple name to the generated + * class name if the original attempt fails. This method is used by + * getResourceAsStream as well as findClass. + */ + override protected def findAbstractFile(name: String): AbstractFile = + super.findAbstractFile(name) match { + case null if _initializeComplete => translatePath(name) map (super.findAbstractFile(_)) orNull + case file => file + } + } + private def makeClassLoader(): util.AbstractFileClassLoader = + new TranslatingClassLoader(parentClassLoader match { + case null => ScalaClassLoader fromURLs compilerClasspath + case p => new ScalaClassLoader.URLClassLoader(compilerClasspath, p) + }) + + // Set the current Java "context" class loader to this interpreter's class loader + def setContextClassLoader() = classLoader.setAsContext() + + def allDefinedNames: List[Name] = exitingTyper(replScope.toList.map(_.name).sorted) + def unqualifiedIds: List[String] = allDefinedNames map (_.decode) sorted + + /** Most recent tree handled which wasn't wholly synthetic. */ + private def mostRecentlyHandledTree: Option[Tree] = { + prevRequests.reverse foreach { req => + req.handlers.reverse foreach { + case x: MemberDefHandler if x.definesValue && !isInternalTermName(x.name) => return Some(x.member) + case _ => () + } + } + None + } + + private def updateReplScope(sym: Symbol, isDefined: Boolean) { + def log(what: String) { + val mark = if (sym.isType) "t " else "v " + val name = exitingTyper(sym.nameString) + val info = cleanTypeAfterTyper(sym) + val defn = sym defStringSeenAs info + + scopelog(f"[$mark$what%6s] $name%-25s $defn%s") + } + if (ObjectClass isSubClass sym.owner) return + // unlink previous + replScope lookupAll sym.name foreach { sym => + log("unlink") + replScope unlink sym + } + val what = if (isDefined) "define" else "import" + log(what) + replScope enter sym + } + + def recordRequest(req: Request) { + if (req == null) + return + + prevRequests += req + + // warning about serially defining companions. It'd be easy + // enough to just redefine them together but that may not always + // be what people want so I'm waiting until I can do it better. + exitingTyper { + req.defines filterNot (s => req.defines contains s.companionSymbol) foreach { newSym => + val oldSym = replScope lookup newSym.name.companionName + if (Seq(oldSym, newSym).permutations exists { case Seq(s1, s2) => s1.isClass && s2.isModule }) { + replwarn(s"warning: previously defined $oldSym is not a companion to $newSym.") + replwarn("Companions must be defined together; you may wish to use :paste mode for this.") + } + } + } + exitingTyper { + req.imports foreach (sym => updateReplScope(sym, isDefined = false)) + req.defines foreach (sym => updateReplScope(sym, isDefined = true)) + } + } + + private[nsc] def replwarn(msg: => String) { + if (!settings.nowarnings) + printMessage(msg) + } + + def compileSourcesKeepingRun(sources: SourceFile*) = { + val run = new Run() + assert(run.typerPhase != NoPhase, "REPL requires a typer phase.") + reporter.reset() + run compileSources sources.toList + (!reporter.hasErrors, run) + } + + /** Compile an nsc SourceFile. Returns true if there are + * no compilation errors, or false otherwise. + */ + def compileSources(sources: SourceFile*): Boolean = + compileSourcesKeepingRun(sources: _*)._1 + + /** Compile a string. Returns true if there are no + * compilation errors, or false otherwise. + */ + def compileString(code: String): Boolean = + compileSources(new BatchSourceFile("