Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26019][PYSPARK] Allow insecure py4j gateways #23337

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,17 @@ private[spark] object PythonGatewayServer extends Logging {
// with the same secret, in case the app needs callbacks from the JVM to the underlying
// python processes.
val localhost = InetAddress.getLoopbackAddress()
val gatewayServer: GatewayServer = new GatewayServer.GatewayServerBuilder()
.authToken(secret)
val builder = new GatewayServer.GatewayServerBuilder()
.javaPort(0)
.javaAddress(localhost)
.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, localhost, secret)
.build()
if (sys.env.getOrElse("_PYSPARK_INSECURE_GATEWAY", "0") != "1") {
builder.authToken(secret)
} else {
assert(sys.env.getOrElse("SPARK_TESTING", "0") == "1",
"Creating insecure Java gateways only allowed for testing")
squito marked this conversation as resolved.
Show resolved Hide resolved
}
val gatewayServer: GatewayServer = builder.build()

gatewayServer.start()
val boundPort: Int = gatewayServer.getListeningPort
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,8 +616,10 @@ private[spark] class PythonAccumulatorV2(
if (socket == null || socket.isClosed) {
socket = new Socket(serverHost, serverPort)
logInfo(s"Connected to AccumulatorServer at host: $serverHost port: $serverPort")
// send the secret just for the initial authentication when opening a new connection
socket.getOutputStream.write(secretToken.getBytes(StandardCharsets.UTF_8))
if (secretToken != null) {
// send the secret just for the initial authentication when opening a new connection
socket.getOutputStream.write(secretToken.getBytes(StandardCharsets.UTF_8))
}
}
socket
}
Expand Down
7 changes: 4 additions & 3 deletions python/pyspark/accumulators.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,10 @@ def authenticate_and_accum_updates():
raise Exception(
"The value of the provided token to the AccumulatorServer is not correct.")

# first we keep polling till we've received the authentication token
poll(authenticate_and_accum_updates)
# now we've authenticated, don't need to check for the token anymore
if auth_token:
squito marked this conversation as resolved.
Show resolved Hide resolved
# first we keep polling till we've received the authentication token
poll(authenticate_and_accum_updates)
# now we've authenticated if needed, don't need to check for the token anymore
poll(accum_updates)


Expand Down
12 changes: 12 additions & 0 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,18 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
ValueError:...
"""
self._callsite = first_spark_call() or CallSite(None, None, None)
if gateway is not None and gateway.gateway_parameters.auth_token is None:
if conf and conf.get("spark.python.allowInsecurePy4j", "false") == "true":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requires the config to be set in the SparkConf object, instead of allowing it to be in a config file, right? I don't think the config file has been read at this point in the code.

The issue I see with that is that it would require code changes to make this work, instead of being simply something to add to a config file in apps that currently pass in insecure gateways.

(But then, given that the config file is probably read by the JVM, you'd have a chicken-and-egg problem, so maybe this is ok...)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better if we just control this via environment variable instead? The gateway will already be in driver side so we could simply set it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm good point about not reading a file. I can switch to an environment variable -- is that really better for a use case like zeppelin? neither solution seems ideal, but I would really like to keep some check here that forces users to opt-in.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think either way is possible but I assume it's easier to handle environment variable instead? No strong opinion on this ..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this more, and I realized that if we're requiring a code change for the end user, it sort of defeats the entire purpose of this change (eg. users need this because they can't upgrade zeppelin). One thing I could do is delay this check until we get the full spark conf back from java:

# Reset the SparkConf to the one actually used by the SparkContext in JVM.
self._conf = SparkConf(_jconf=self._jsc.sc().conf())

then the check could be based on the real conf, which includes anything set in a file

Copy link
Member

@HyukjinKwon HyukjinKwon Dec 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just happened to know a bit due to a bit of related work lately .. not an expert but both indeed are pretty much feasible, and IMHO env is easier and cleaner.

cc @zjffdu and maybe .. @Leemoonsoo as well .. for sure.

The argument here is basically which one is better in Zeppelin PySpark interpreter side:

  1. The config to control a behaviour- but to be set only in the SparkConf object (instead of allowing it to be in a config file)
  2. The environment variable to control a behaviour.

This conf or env might have to be set in Zeppelin side to allow insecure connection if I am not mistaken. We're not sure which one is preferred and easier in Zeppelin. To me, env looks preferred IMHO.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forget to cc one who's committer at both projects :-) @felixcheung

Copy link
Member

@felixcheung felixcheung Dec 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scratch that.

Zeppelin is setting SparkConf before creating the SparkContext, here https://github.com/apache/zeppelin/blob/master/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java#L97 so I don't think it matters.

either is fine and perhaps SparkConf is more consistent with other config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait, lets be clear on the end goal here. The point is for an end user to be able to enable the opt-in without them changing the code in zeppelin at all. If we're going to change the code in zeppelin, it should just be changed to do the right thing and create a secure gateway (as zeppelin already has changed in master, and I think even v0.8.0 now that I look more closely).

so looking at an old version of zeppelin, eg:

https://github.com/apache/zeppelin/blob/v0.7.3/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java#L205-L229

If I'm reading that correctly, it looks like environment variables the user has set when starting zeppelin will get passed through to the python command (I think that is what EnvironmentUtils.getProcEnvironment() does). But there isn't any way for the user to add additional confs to that command. I assume things other than zeppelin would work similarly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated this to require the env variable PYSPARK_ALLOW_INSECURE_GATEWAY to be set to 1 / true

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume we are intentionally not documenting this config as we only expect zeppelin to use?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that was my thinking. I could go either way -- documenting it would give a spot to warn about the security hole it opens up.

warnings.warn(
"You are passing in an insecure Py4j gateway. This "
"presents a security risk, and will be completely forbidden in Spark 3.0")
else:
raise Exception(
squito marked this conversation as resolved.
Show resolved Hide resolved
"You are trying to pass an insecure Py4j gateway to Spark. This"
" presents a security risk. If you are sure you understand and accept this"
" risk, you can add the conf 'spark.python.allowInsecurePy4j=true', but"
" note this option will be removed in Spark 3.0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Honestly, I still think insecure is a misusage of Spark and It should be removed. I'm going to merge this as an effort to help upgrading Spark easier in other projects like Zeppelin.


SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
try:
self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
Expand Down
22 changes: 18 additions & 4 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,19 @@ def launch_gateway(conf=None):
"""
launch jvm gateway
:param conf: spark configuration passed to spark-submit
:return:
:return: a JVM gateway
"""
return _launch_gateway(conf)


def _launch_gateway(conf=None, insecure=False):
"""
launch jvm gateway
:param conf: spark configuration passed to spark-submit
squito marked this conversation as resolved.
Show resolved Hide resolved
:return: a JVM gateway
"""
if insecure and not os.environ.get("SPARK_TESTING", "0") == "1":
squito marked this conversation as resolved.
Show resolved Hide resolved
raise Exception("creating insecure gateways is only for testing")
squito marked this conversation as resolved.
Show resolved Hide resolved
if "PYSPARK_GATEWAY_PORT" in os.environ:
gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
gateway_secret = os.environ["PYSPARK_GATEWAY_SECRET"]
Expand Down Expand Up @@ -74,6 +85,8 @@ def launch_gateway(conf=None):

env = dict(os.environ)
env["_PYSPARK_DRIVER_CONN_INFO_PATH"] = conn_info_file
if insecure:
env["_PYSPARK_INSECURE_GATEWAY"] = "1"

# Launch the Java gateway.
# We open a pipe to stdin so that the Java gateway can die when the pipe is broken
Expand Down Expand Up @@ -116,9 +129,10 @@ def killChild():
atexit.register(killChild)

# Connect to the gateway
gateway = JavaGateway(
gateway_parameters=GatewayParameters(port=gateway_port, auth_token=gateway_secret,
auto_convert=True))
gateway_params = GatewayParameters(port=gateway_port, auto_convert=True)
if not insecure:
gateway_params.auth_token = gateway_secret
gateway = JavaGateway(gateway_parameters=gateway_params)

# Import the classes used by PySpark
java_import(gateway.jvm, "org.apache.spark.SparkConf")
Expand Down
25 changes: 25 additions & 0 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from pyspark import keyword_only
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.java_gateway import _launch_gateway
from pyspark.rdd import RDD
from pyspark.files import SparkFiles
from pyspark.serializers import read_int, BatchedSerializer, MarshalSerializer, PickleSerializer, \
Expand Down Expand Up @@ -2381,6 +2382,30 @@ def test_startTime(self):
with SparkContext() as sc:
self.assertGreater(sc.startTime, 0)

def test_forbid_insecure_gateway(self):
# By default, we fail immediately if you try to create a SparkContext
# with an insecure gateway
gateway = _launch_gateway(insecure=True)
with self.assertRaises(Exception) as context:
BryanCutler marked this conversation as resolved.
Show resolved Hide resolved
SparkContext(gateway=gateway)
self.assertIn("insecure Py4j gateway", str(context.exception))
self.assertIn("spark.python.allowInsecurePy4j", str(context.exception))
self.assertIn("removed in Spark 3.0", str(context.exception))

def test_allow_insecure_gateway_with_conf(self):
with SparkContext._lock:
SparkContext._gateway = None
SparkContext._jvm = None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this part of the test really bothers me, so I'd like to explain to reviewers. Without this, the test passes -- but it passes even without the changes to the main code! Or rather, it only passes when its run as part of the entire suite, it would fail when run individually.

What's happening is that SparkContext._gateway and SparkContext._jvm don't get reset by most tests (eg., they are not reset in sc.stop()), so a test running before this one will set those variables, and then this test will end up holding on to a gateway which does have the auth_token set, and so the accumulator server would still work.

Now that in itself sounds crazy to me, and seems like a problem for things like Zeppelin. I tried just adding these two lines into sc.stop(), but then when I ran all the tests, I got a lot of java.io.IOException: error=23, Too many open files in system. So maybe something else is not getting properly cleaned up properly in the pyspark tests?

I was hoping somebody else might have some ideas about what is going on or if there is a better way to do this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's really answering my question. I don't have a problem calling start & stop, I'm wondering why SparkContext._gateway and SparkContext._jvm don't get reset in sc.stop(). This means that if you have multiple spark contexts in one python session (as we do in our tests), they all reuse the same gateway:

if not SparkContext._gateway:
SparkContext._gateway = gateway or launch_gateway(conf)
SparkContext._jvm = SparkContext._gateway.jvm

for normal use of spark, that's not a problem; but it seems like it would be a problem (a) in our tests and (b) for systems like zeppelin, that might have multiple spark contexts over the lifetime of the python session (I assume, anyway ...)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, gotya. In that case, we could consider simply move the test class to the top in a separate class as well but .. yes I suspect tests depending on its order isn't a great idea in a way as well. I'm okay as long as the tests pass. I can take a separate look for this later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this logic is sort rough, in spark-testing-base for example in between tests where folks do not intend to reuse the same Spark context we also clear some extra properties (although we do reuse the gateway). I think for environments where folks want multiple SparkContexts from Python on the same machine they end up using multiple Python processes anyways.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok its good to get some confirmation of this weird behavior ... but I feel like I still don't understand why we don't reset SparkContext._gateway and SparkContext._jvm in sc.stop(); and why when I tried to make that change, I hit all those errors. if nothing else, any chance this is related to general flakiness in pyspark tests?

gateway = _launch_gateway(insecure=True)
conf = SparkConf()
conf.set("spark.python.allowInsecurePy4j", "true")
with SparkContext(conf=conf, gateway=gateway) as sc:
print("sc created, about to create accum")
a = sc.accumulator(1)
rdd = sc.parallelize([1, 2, 3])
rdd.foreach(lambda x: a.add(x))
self.assertEqual(7, a.value)


class ConfTests(unittest.TestCase):
def test_memory_conf(self):
Expand Down