Skip to content

Commit

Permalink
PythonRDDSuite fix
Browse files Browse the repository at this point in the history
  • Loading branch information
gaborgsomogyi committed Nov 17, 2020
1 parent 17d357b commit 424be64
Showing 1 changed file with 20 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,18 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.mockito.Mockito.mock

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv, SparkFunSuite}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.security.{SocketAuthHelper, SocketAuthServer}
import org.apache.spark.util.Utils

class PythonRDDSuite extends SparkFunSuite with LocalSparkContext {

private def doReturn(value: Any) = org.mockito.Mockito.doReturn(value, Seq.empty: _*)

var tempDir: File = _

override def beforeAll(): Unit = {
Expand Down Expand Up @@ -76,12 +79,22 @@ class PythonRDDSuite extends SparkFunSuite with LocalSparkContext {
}

test("python server error handling") {
val authHelper = new SocketAuthHelper(new SparkConf())
val errorServer = new ExceptionPythonServer(authHelper)
val client = new Socket(InetAddress.getLoopbackAddress(), errorServer.port)
authHelper.authToServer(client)
val ex = intercept[Exception] { errorServer.getResult(Duration(1, "second")) }
assert(ex.getCause().getMessage().contains("exception within handleConnection"))
val savedSparkEnv = SparkEnv.get
try {
val conf = new SparkConf()
val env = mock(classOf[SparkEnv])
doReturn(conf).when(env).conf
SparkEnv.set(env)

val authHelper = new SocketAuthHelper(conf)
val errorServer = new ExceptionPythonServer(authHelper)
val client = new Socket(InetAddress.getLoopbackAddress(), errorServer.port)
authHelper.authToServer(client)
val ex = intercept[Exception] { errorServer.getResult(Duration(1, "second")) }
assert(ex.getCause().getMessage().contains("exception within handleConnection"))
} finally {
SparkEnv.set(savedSparkEnv)
}
}

class ExceptionPythonServer(authHelper: SocketAuthHelper)
Expand Down

0 comments on commit 424be64

Please sign in to comment.