diff --git a/core/src/main/scala/org/apache/spark/api/conda/CondaEnvironmentManager.scala b/core/src/main/scala/org/apache/spark/api/conda/CondaEnvironmentManager.scala index 9fa8415b370ad..2b5c2d0b00643 100644 --- a/core/src/main/scala/org/apache/spark/api/conda/CondaEnvironmentManager.scala +++ b/core/src/main/scala/org/apache/spark/api/conda/CondaEnvironmentManager.scala @@ -26,7 +26,6 @@ import scala.sys.process.BasicIO import scala.sys.process.Process import scala.sys.process.ProcessBuilder import scala.sys.process.ProcessIO -import scala.sys.process.ProcessLogger import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.dataformat.yaml.YAMLFactory @@ -55,19 +54,10 @@ final class CondaEnvironmentManager(condaBinaryPath: String, logInfo("Retrieving the conda installation's info") val command = Process(List(condaBinaryPath, "info", "--json"), None) - val buffer = new StringBuffer - val io = BasicIO(withIn = false, - buffer, - Some(ProcessLogger(line => logDebug(s" $line")))) - - val exitCode = command.run(io).exitValue() - if (exitCode != 0) { - throw new SparkException(s"Attempt to retrieve initial conda info exited with code: " - + f"$exitCode%nCommand was: $command%nOutput was:%n${buffer.toString}") - } + val out = runOrFail(command, "retrieving the conda installation's info") implicit val format = org.json4s.DefaultFormats - JsonMethods.parse(buffer.toString).extract[Map[String, JValue]] + JsonMethods.parse(out).extract[Map[String, JValue]] } lazy val defaultPkgsDirs: List[String] = { @@ -165,17 +155,27 @@ final class CondaEnvironmentManager(condaBinaryPath: String, logInfo(s"Successfully executed $command with environment $extraEnv") } - private[this] def runOrFail(command: ProcessBuilder, description: String): Unit = { - val buffer = new StringBuffer + /** + * Attempts to run a command and return its stdout, + * @return the stdout of the process + */ + private[this] def runOrFail(command: ProcessBuilder, description: String): String = { + val out = new StringBuffer + val err = new StringBuffer val collectErrOutToBuffer = new ProcessIO( BasicIO.input(false), - BasicIO.processFully(buffer), - BasicIO.processFully(buffer)) + BasicIO.processFully(out), + BasicIO.processFully(line => { + err append line + err append BasicIO.Newline + log.info(s" $line") + })) val exitCode = command.run(collectErrOutToBuffer).exitValue() if (exitCode != 0) { throw new SparkException(s"Attempt to $description exited with code: " - + f"$exitCode%nCommand was: $command%nOutput was:%n${buffer.toString}") + + f"$exitCode%nCommand was: $command%nStdout was:%n$out%nStderr was:%n$err") } + out.toString } }