From 06ee46f414c96d23e6d0ff5d41fcc6befb01d6f7 Mon Sep 17 00:00:00 2001 From: gunso Date: Sat, 19 Sep 2020 19:47:55 +0900 Subject: [PATCH 1/2] Add pickle protocol option support for Spark --- luigi/contrib/spark.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/luigi/contrib/spark.py b/luigi/contrib/spark.py index 28950828ea..8777dd3628 100644 --- a/luigi/contrib/spark.py +++ b/luigi/contrib/spark.py @@ -292,6 +292,10 @@ def files(self): if self.deploy_mode == "cluster": return [self.run_pickle] + @property + def pickle_protocol(self): + return configuration.get_config().getint(self.spark_version, "pickle-protocol", pickle.DEFAULT_PROTOCOL) + def setup(self, conf): """ Called by the pyspark_runner with a SparkConf instance that will be used to instantiate the SparkContext @@ -335,12 +339,12 @@ def run(self): def _dump(self, fd): with self.no_unpicklable_properties(): if self.__module__ == '__main__': - d = pickle.dumps(self) + d = pickle.dumps(self, protocol=self.pickle_protocol) module_name = os.path.basename(sys.argv[0]).rsplit('.', 1)[0] d = d.replace(b'c__main__', b'c' + module_name.encode('ascii')) fd.write(d) else: - pickle.dump(self, fd) + pickle.dump(self, fd, protocol=self.pickle_protocol) def _setup_packages(self, sc): """ From a3498e17a84f0cc035f20b627602ea8d24e39d09 Mon Sep 17 00:00:00 2001 From: gunso Date: Sun, 27 Sep 2020 23:10:27 +0900 Subject: [PATCH 2/2] Pull from spark config section --- luigi/contrib/spark.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/luigi/contrib/spark.py b/luigi/contrib/spark.py index 8777dd3628..20c7dddbb6 100644 --- a/luigi/contrib/spark.py +++ b/luigi/contrib/spark.py @@ -294,7 +294,7 @@ def files(self): @property def pickle_protocol(self): - return configuration.get_config().getint(self.spark_version, "pickle-protocol", pickle.DEFAULT_PROTOCOL) + return configuration.get_config().getint('spark', 'pickle-protocol', pickle.DEFAULT_PROTOCOL) def setup(self, conf): """