Skip to content

Commit

Permalink
Move code block from load_from_path to value
Browse files Browse the repository at this point in the history
  • Loading branch information
schintap committed Nov 28, 2018
1 parent 605ed93 commit d9994b7
Showing 1 changed file with 11 additions and 11 deletions.
22 changes: 11 additions & 11 deletions python/pyspark/broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,8 @@ def dump(self, value, f):
f.close()

def load_from_path(self, path):
# we only need to decrypt it here if its on the driver since executor
# decryption handled already
if self._sc is not None and self._sc._encryption_enabled:
port, auth_secret = self._python_broadcast.setupDecryptionServer()
(decrypted_sock_file, _) = local_connect_and_auth(port, auth_secret)
self._python_broadcast.waitTillBroadcastDataSent()
return self.load(decrypted_sock_file)
else:
with open(path, 'rb', 1 << 20) as f:
return self.load(f)
with open(path, 'rb', 1 << 20) as f:
return self.load(f)

def load(self, file):
# "file" could also be a socket
Expand All @@ -145,7 +137,15 @@ def value(self):
""" Return the broadcasted value
"""
if not hasattr(self, "_value") and self._path is not None:
self._value = self.load_from_path(self._path)
# we only need to decrypt it here when encryption is enabled and
# if its on the driver, since executor decryption is handled already
if self._sc._encryption_enabled:
port, auth_secret = self._python_broadcast.setupDecryptionServer()
(decrypted_sock_file, _) = local_connect_and_auth(port, auth_secret)
self._python_broadcast.waitTillBroadcastDataSent()
return self.load(decrypted_sock_file)
else:
self._value = self.load_from_path(self._path)
return self._value

def unpersist(self, blocking=False):
Expand Down

0 comments on commit d9994b7

Please sign in to comment.