From d9994b7f9b6aaaf9f87ff09b1d45f3c204f7b4d3 Mon Sep 17 00:00:00 2001 From: schintap Date: Wed, 28 Nov 2018 15:08:56 -0500 Subject: [PATCH] Move code block from load_from_path to value --- python/pyspark/broadcast.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py index 508d7300325e6..b8b349d593f05 100644 --- a/python/pyspark/broadcast.py +++ b/python/pyspark/broadcast.py @@ -121,16 +121,8 @@ def dump(self, value, f): f.close() def load_from_path(self, path): - # we only need to decrypt it here if its on the driver since executor - # decryption handled already - if self._sc is not None and self._sc._encryption_enabled: - port, auth_secret = self._python_broadcast.setupDecryptionServer() - (decrypted_sock_file, _) = local_connect_and_auth(port, auth_secret) - self._python_broadcast.waitTillBroadcastDataSent() - return self.load(decrypted_sock_file) - else: - with open(path, 'rb', 1 << 20) as f: - return self.load(f) + with open(path, 'rb', 1 << 20) as f: + return self.load(f) def load(self, file): # "file" could also be a socket @@ -145,7 +137,15 @@ def value(self): """ Return the broadcasted value """ if not hasattr(self, "_value") and self._path is not None: - self._value = self.load_from_path(self._path) + # we only need to decrypt it here when encryption is enabled and + # if its on the driver, since executor decryption is handled already + if self._sc._encryption_enabled: + port, auth_secret = self._python_broadcast.setupDecryptionServer() + (decrypted_sock_file, _) = local_connect_and_auth(port, auth_secret) + self._python_broadcast.waitTillBroadcastDataSent() + return self.load(decrypted_sock_file) + else: + self._value = self.load_from_path(self._path) return self._value def unpersist(self, blocking=False):