Skip to content

Commit

Permalink
Add pickle protocol option support for Spark (#3001)
Browse files Browse the repository at this point in the history
* Add pickle protocol option support for Spark

* Pull from spark config section

Co-authored-by: Dillon Stadther <dlstadther+github@gmail.com>
  • Loading branch information
gunnsoo and dlstadther authored Jan 11, 2022
1 parent 800f53f commit 42df63f
Showing 1 changed file with 6 additions and 2 deletions.
8 changes: 6 additions & 2 deletions luigi/contrib/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,10 @@ def files(self):
if self.deploy_mode == "cluster":
return [self.run_pickle]

@property
def pickle_protocol(self):
return configuration.get_config().getint('spark', 'pickle-protocol', pickle.DEFAULT_PROTOCOL)

def setup(self, conf):
"""
Called by the pyspark_runner with a SparkConf instance that will be used to instantiate the SparkContext
Expand Down Expand Up @@ -335,12 +339,12 @@ def run(self):
def _dump(self, fd):
with self.no_unpicklable_properties():
if self.__module__ == '__main__':
d = pickle.dumps(self)
d = pickle.dumps(self, protocol=self.pickle_protocol)
module_name = os.path.basename(sys.argv[0]).rsplit('.', 1)[0]
d = d.replace(b'c__main__', b'c' + module_name.encode('ascii'))
fd.write(d)
else:
pickle.dump(self, fd)
pickle.dump(self, fd, protocol=self.pickle_protocol)

def _setup_packages(self, sc):
"""
Expand Down

0 comments on commit 42df63f

Please sign in to comment.